Module pandare.extras.fileFaker

Framework for halucinating files inside the guest through modifications around syscalls involving filenames and file descriptors.

High-level idea: When we see an open of a file we want to fake, change it to another filename that really exists and capture the file descriptor assigned to it. Then whenever there are uses of that file descriptor, ignore/drop the request and fake return values.

Expand source code
#!/usr/bin/env python3

"""
Framework for halucinating files inside the guest through
modifications around syscalls involving filenames and file
descriptors.

High-level idea:
    When we see an open of a file we want to fake, change it to another filename
    that really exists and capture the file descriptor assigned to it.
    Then whenever there are uses of that file descriptor, ignore/drop the request
    and fake return values.
"""


if __name__ == '__main__': # Script run directly
    from pandare.extras import FileHook
else: # Load from module
    from .fileHook import FileHook

from math import ceil
import logging


class FakeFile:
    '''
    A fake file behind a hyperFD - this class will generate data when the
    corresponding file descriptor(s) are accessed.
    Users can inherit and modify this to customize how data is generated

    Note: a single FileFaker might be opened and in use by multiple FDs in the guest

    '''
    def __init__(self, fake_contents="", filename=None):
        self.logger = logging.getLogger('panda.filehook.fakefile')

        if isinstance(fake_contents, str):
            fake_contents = fake_contents.encode("utf8")
        self.contents = fake_contents
        self.initial_contents = fake_contents
        self.refcount = 0 # Reference count
        self.filename = filename # Just for debug printing

    def read(self, size, offset):
        '''
        Generate data for a given read of size.  Returns data.
        '''

        if offset >= len(self.contents):  # No bytes left to read
            return b""
        # Otherwise there are bytes left to read
        read_data = self.contents[offset:offset+size]

        return read_data

    def write(self, offset, write_data):
        '''
        Update contents from offset. It's a bytearray so we can't just mutate
        Return how much HyperFD offset should be incremented by
        XXX what about writes past end of the file?
        '''
        new_data  = self.contents[:offset]
        new_data += write_data
        new_data += self.contents[offset+len(new_data):]
        
        self.logger.info(f"FakeFD({self.filename}) writing {new_data} at offset {offset}")

        self.contents = new_data
        return len(write_data)

    def close(self):
        self.refcount -= 1
        if self.refcount == 0: # All FDs are now closed
            if self.initial_contents == self.contents:
                self.logger.debug(f"All handles to Faker({self.filename}) closed. Unmodified contents")
            else: # it was mutated!
                self.logger.info(f"All handles to Faker({self.filename}) closed. Modified contents: {repr(self.contents)}")

    def get_mode(self):
        return 0o664 # Regular file (octal)

    def get_size(self, bytesize):
        return ceil(len(self.contents)/bytesize)

    def __str__(self):
        return f"Faker({self.filename} -> {repr(self.contents[:10])}..."


    def _delete(self):
        self.close()

    def __del__(self):
        # XXX: This destructor isn't called automatically
        self._delete()

class HyperFD:
    '''
    A HyperFD is what we use to track the state of a faked FD in the guest.
    It is backed by a FakeFile.
    Stores the filename originally associated with it at time of open
    '''
    def __init__(self,  filename, fakefile, offset=0):
        self.name = filename
        self.file = fakefile
        self.file.refcount+=1 # Count of open FDs pointing to the file
        self.offset = offset

    def read(self, size):
        '''
        Read from the file descriptor. Determine current offset
        and then pass request through to FakeFile
        Returns (data read, count)
        '''
        assert(self.file)
        data = self.file.read(size, self.offset)
        self.offset+=len(data)
        return (data, len(data))

    def write(self, data):
        assert(self.file)
        bytes_written =  self.file.write(self.offset, data)
        self.offset +- bytes_written
        return bytes_written

    def get_mode(self):
        assert(self.file)
        return self.file.get_mode()

    def get_size(self):
        assert(self.file)
        return self.file.get_mode()

    def close(self):
        '''
        Decrement the reference counter
        '''
        assert(self.file)
        self.file.close()
        #del self # ???

    def seek(self, offset, whence):
        # From include/uapi/linux/fs.h
        SEEK_SET = 0
        SEEK_CUR = 1
        SEEK_END = 2

        if whence == SEEK_SET:
            self.offset = offset
        elif whence == SEEK_CUR:
            self.offset = self.offset + offset
        elif whence == SEEK_END:
            self.offset = self.offset + offset
        else:
            raise ValueError(f"Unsupported whence {whence} in seek")

    def __str__(self):
        return f"HyperFD with name {self.name} offset {self.offset} backed by {self.file}"


class FileFaker(FileHook):
    '''
    Class to halucinate fake files within the guest. When the guest attempts to access a faked file,
    we transparenly redirect the access to another file on disk and grab the FD generated using FileHook.

    When the guest attempts to use a FD related to a faked file, we mutate the request. Reads are created
    from fake conents and writes are logged.
    '''

    def __init__(self, panda):
        '''
        Initialize FileHook and vars. Setup callbacks for all fd-based syscalls
        '''
        super().__init__(panda)
        self.ff_logger = logging.getLogger('panda.filehook.fakefile')

        self.faked_files = {} # filename: Fake
        self.hooked_fds = {} # (fd, cr3): HyperFD->faker
        self.pending_hfd = None

        to_hook = {} # index of fd argument: list of names
        if panda.arch_name == "i386":
            # grep 'int fd' syscall_switch_enter_linux_x86.cpp  | grep "\['int fd\|\['unsigned int fd" | grep -o sys_[a-zA-Z0-9_]* | sed -n -e 's/sys_\(.*\)/"\1" /p' | paste -sd "," -
            # Note the grep commands missed dup2 and dup3 which take oldfd as 1st arg
            to_hook[0] = ["read", "write", "close", "lseek", "fstat", "ioctl", "fcntl", "ftruncate", "fchmod",
                          "fchown16", "fstatfs", "newfstat", "fsync", "fchdir", "llseek", "getdents", "flock",
                          "fdatasync", "pread64", "pwrite64", "ftruncate64", "fchown", "getdents64", "fcntl64",
                          "readahead", "fsetxattr", "fgetxattr", "flistxattr", "fremovexattr", "fadvise64",
                          "fstatfs64", "fadvise64_64", "inotify_add_watch", "inotify_rm_watch", "splice",
                          "sync_file_range", "tee", "vmsplice", "fallocate", "recvmmsg", "syncfs", "sendmmsg",
                          "setns", "finit_module", "getsockopt", "setsockopt", "sendmsg", "recvmsg", "dup2",
                          "dup3" ]

            # grep 'int fd' syscall_switch_enter_linux_x86.cpp  | grep -v "\['int fd\|\['unsigned int fd" # + manual
            to_hook[2] = ["epoll_ctl"]
            to_hook[3] = ["fanotify_mark"]

        elif panda.arch_name == "x86_64":
            to_hook[0] = ["read", "write", "close", "newfstat", "lseek", "ioctl", "pread64", "pwrite64", "sendmsg",
                          "recvmsg", "setsockopt", "getsockopt", "fcntl", "flock", "fsync", "fdatasync", "ftruncate",
                          "getdents", "fchdir", "fchmod", "fchown", "fstatfs", "readahead", "fsetxattr", "fgetxattr",
                          "flistxattr", "fremovexattr", "getdents64", "fadvise64", "inotify_add_watch",
                          "inotify_rm_watch", "splice", "tee", "sync_file_range", "vmsplice", "fallocate", "recvmmsg",
                          "syncfs", "sendmmsg", "setns", "finit_module", "copy_file_range", "dup2", "dup3"]
            to_hook[2] = ["epoll_ctl"]
            to_hook[3] = ["fanotify_mark"]

        elif panda.arch_name == "arm":
            to_hook[0] = ["read", "write", "close", "lseek", "ioctl", "fcntl", "ftruncate", "fchmod", "fchown16",
                          "fstatfs", "newfstat", "fsync", "fchdir", "llseek", "getdents", "flock", "fdatasync",
                          "pread64", "pwrite64", "ftruncate64", "fchown", "getdents64", "fcntl64", "readahead",
                          "fsetxattr", "fgetxattr", "flistxattr", "fremovexattr", "fstatfs64", "arm_fadvise64_64",
                          "setsockopt", "getsockopt", "sendmsg", "recvmsg", "inotify_add_watch", "inotify_rm_watch",
                          "splice", "sync_file_range2", "tee", "vmsplice", "fallocate", "recvmmsg", "syncfs",
                          "sendmmsg", "setns", "finit_module", "dup2", "dup3"]
            to_hook[2] = ["epoll_ctl"]
            to_hook[3] = ["fanotify_mark"]
        else:
            raise ValueError(f"Unsupported PANDA arch: {panda.arch_name}")

        for arg_offset, names in to_hook.items():
            for name in names:
                self._gen_fd_cb(name, arg_offset)

    def replace_file(self, filename, faker, disk_file="/etc/passwd"):
        '''
        Replace all accesses to filename with accesses to the fake file instead
        which optionally may be specified by disk_file.
        '''
        self.faked_files[filename] = faker

        # XXX: We rename the files to real files to the guest kernel can manage FDs for us.
        #      this may need to use different real files depending on permissions requested
        self.rename_file(filename, disk_file)

    def _gen_fd_cb(self, name, fd_offset):
        '''
        Register syscalls2 PPP callback on enter and return for the given name
        which has an argument of fd at fd_offset in the argument list
        '''
        self._panda.ppp("syscalls2", f"on_sys_{name}_return", name=f"file_faker_return_{name}")( \
                    lambda *args: self._return_fd_cb(name, fd_offset, args=args))

    def _return_fd_cb(self, syscall_name, fd_pos, args=None):
        '''
        When we're returnuing from a syscall, mutate memory
        to put the results we want there
        '''

        (cpu, pc) = args[0:2]
        fd = args[2+fd_pos]
        asid = self._panda.current_asid(cpu)

        if (fd, asid) not in self.hooked_fds:
            return

        assert(args)
        hfd = self.hooked_fds[(fd, asid)]

        if syscall_name == "read":
            # Place up to `count` bytes of data into memory at `buf_ptr`
            buf_ptr = args[3]
            count   = args[4]

            (data, data_len) = hfd.read(count)
            if data:
                try:
                    self._panda.virtual_memory_write(cpu, buf_ptr, data)
                except ValueError:
                    self.ff_logger.error(f"Unable to store fake data after read to {hfd}")
                    return

            cpu.env_ptr.regs[0] = data_len

            self.ff_logger.info(f"Read - returning {data_len} bytes")

        elif syscall_name == "close":
            # We want the guest to close the real FD. Delete it from our map of hooked fds
            hfd.close()
            if (fd, asid) in self.hooked_fds:
                del self.hooked_fds[(fd, asid)]

        elif syscall_name == "write":
            # read count bytes from buf, add to our hyper-fd
            buf_ptr = args[3]
            count   = args[4]
            try:
                data = self._panda.virtual_memory_read(cpu, buf_ptr, count)
            except ValueError:
                self.ff_logger.error(f"Unable to read buffer that was being written")
                return

            bytes_written = hfd.write(data)
            cpu.env_ptr.regs[0] = bytes_written

        elif syscall_name == "lseek": # LLSEEK?
            offset = args[2]
            whence = args[3]
            hfd.seek(offset, whence)


        elif syscall_name in ["dup2", "dup3"]:
            # add newfd
            oldfd = args[2]
            newfd = args[3]
            self.ff_logger.debug(f"Duplicating faked fd {oldfd} to {newfd}")

            # Duplicate the old hfd - but not the file behind it
            new_hfd = HyperFD(hfd.name, hfd.file, hfd.offset)
            self.hooked_fds[(newfd, asid)] = new_hfd

        else:
            self.ff_logger.error(f"Unsupported syscall on FakeFD{fd}: {syscall_name}. Not intercepting (Running on real guest FD)")


    def _before_modified_enter(self, cpu, pc, syscall_name, fname):
        '''
        Overload FileHook function. Determine if a syscall we're about to
        enter is using a filename we want to fake

        After the modified syscall returns, we grab the real FD and map it to the HFD
        '''
        if fname in self.faked_files:
            self.pending_hfd =  HyperFD(fname, self.faked_files[fname]) # Create HFD
            asid = self._panda.current_asid(cpu)

    def _after_modified_return(self, cpu, pc, syscall_name, fd):
        '''
        Overload FileHook function. Determine if a syscall we're about to
        return from was using a filename we want to fake. If so, grab the FD
        '''
        if self.pending_hfd:
            asid = self._panda.current_asid(cpu)
            self.hooked_fds[(fd, asid)] =  self.pending_hfd
            self.logger.info(f"A file we want to hook was created {self.pending_hfd}")
            self.pending_hfd = None

    def close(self):
        # Close all open hfds
        if len(self.hooked_fds):
            self.ff_logger.debug("Cleaning up open hyper file descriptors")
            for (fd, asid) in list(self.hooked_fds.keys()):
                self.hooked_fds[(fd, asid)].close()
                del self.hooked_fds[(fd, asid)]


    def __del__(self):
        # XXX: This isn't being called for some reason on destruction
        self.close()


if __name__ == '__main__':
    from pandare import Panda

    panda = Panda(generic="x86_64")

    # Replace all syscalls that reference /foo with a custom string
    fake_str = "Hello world. This is data generated from python!"
    faker = FileFaker(panda)
    faker.replace_file("/foo", FakeFile(fake_str))

    @panda.queue_blocking
    def driver():
        new_str = "This is some new data"

        panda.revert_sync('root')
        data = panda.run_serial_cmd("cat /foo") # note run_serial_cmd must end with a blank line and our fake file doesn't
        assert(fake_str in data), f"Failed to read fake file /foo: {data}"

        panda.run_serial_cmd(f'echo {new_str} > /foo')
        data = panda.run_serial_cmd("cat /foo")
        assert(new_str in data), f"Failed to update fake file /foo: {data}. Expected: {new_str}"
        panda.end_analysis()

    panda.run()
    print("Success")

Classes

class FakeFile (fake_contents='', filename=None)

A fake file behind a hyperFD - this class will generate data when the corresponding file descriptor(s) are accessed. Users can inherit and modify this to customize how data is generated

Note: a single FileFaker might be opened and in use by multiple FDs in the guest

Expand source code
class FakeFile:
    '''
    A fake file behind a hyperFD - this class will generate data when the
    corresponding file descriptor(s) are accessed.
    Users can inherit and modify this to customize how data is generated

    Note: a single FileFaker might be opened and in use by multiple FDs in the guest

    '''
    def __init__(self, fake_contents="", filename=None):
        self.logger = logging.getLogger('panda.filehook.fakefile')

        if isinstance(fake_contents, str):
            fake_contents = fake_contents.encode("utf8")
        self.contents = fake_contents
        self.initial_contents = fake_contents
        self.refcount = 0 # Reference count
        self.filename = filename # Just for debug printing

    def read(self, size, offset):
        '''
        Generate data for a given read of size.  Returns data.
        '''

        if offset >= len(self.contents):  # No bytes left to read
            return b""
        # Otherwise there are bytes left to read
        read_data = self.contents[offset:offset+size]

        return read_data

    def write(self, offset, write_data):
        '''
        Update contents from offset. It's a bytearray so we can't just mutate
        Return how much HyperFD offset should be incremented by
        XXX what about writes past end of the file?
        '''
        new_data  = self.contents[:offset]
        new_data += write_data
        new_data += self.contents[offset+len(new_data):]
        
        self.logger.info(f"FakeFD({self.filename}) writing {new_data} at offset {offset}")

        self.contents = new_data
        return len(write_data)

    def close(self):
        self.refcount -= 1
        if self.refcount == 0: # All FDs are now closed
            if self.initial_contents == self.contents:
                self.logger.debug(f"All handles to Faker({self.filename}) closed. Unmodified contents")
            else: # it was mutated!
                self.logger.info(f"All handles to Faker({self.filename}) closed. Modified contents: {repr(self.contents)}")

    def get_mode(self):
        return 0o664 # Regular file (octal)

    def get_size(self, bytesize):
        return ceil(len(self.contents)/bytesize)

    def __str__(self):
        return f"Faker({self.filename} -> {repr(self.contents[:10])}..."


    def _delete(self):
        self.close()

    def __del__(self):
        # XXX: This destructor isn't called automatically
        self._delete()

Methods

def close(self)
Expand source code
def close(self):
    self.refcount -= 1
    if self.refcount == 0: # All FDs are now closed
        if self.initial_contents == self.contents:
            self.logger.debug(f"All handles to Faker({self.filename}) closed. Unmodified contents")
        else: # it was mutated!
            self.logger.info(f"All handles to Faker({self.filename}) closed. Modified contents: {repr(self.contents)}")
def get_mode(self)
Expand source code
def get_mode(self):
    return 0o664 # Regular file (octal)
def get_size(self, bytesize)
Expand source code
def get_size(self, bytesize):
    return ceil(len(self.contents)/bytesize)
def read(self, size, offset)

Generate data for a given read of size. Returns data.

Expand source code
def read(self, size, offset):
    '''
    Generate data for a given read of size.  Returns data.
    '''

    if offset >= len(self.contents):  # No bytes left to read
        return b""
    # Otherwise there are bytes left to read
    read_data = self.contents[offset:offset+size]

    return read_data
def write(self, offset, write_data)

Update contents from offset. It's a bytearray so we can't just mutate Return how much HyperFD offset should be incremented by XXX what about writes past end of the file?

Expand source code
def write(self, offset, write_data):
    '''
    Update contents from offset. It's a bytearray so we can't just mutate
    Return how much HyperFD offset should be incremented by
    XXX what about writes past end of the file?
    '''
    new_data  = self.contents[:offset]
    new_data += write_data
    new_data += self.contents[offset+len(new_data):]
    
    self.logger.info(f"FakeFD({self.filename}) writing {new_data} at offset {offset}")

    self.contents = new_data
    return len(write_data)
class FileFaker (panda)

Class to halucinate fake files within the guest. When the guest attempts to access a faked file, we transparenly redirect the access to another file on disk and grab the FD generated using FileHook.

When the guest attempts to use a FD related to a faked file, we mutate the request. Reads are created from fake conents and writes are logged.

Initialize FileHook and vars. Setup callbacks for all fd-based syscalls

Expand source code
class FileFaker(FileHook):
    '''
    Class to halucinate fake files within the guest. When the guest attempts to access a faked file,
    we transparenly redirect the access to another file on disk and grab the FD generated using FileHook.

    When the guest attempts to use a FD related to a faked file, we mutate the request. Reads are created
    from fake conents and writes are logged.
    '''

    def __init__(self, panda):
        '''
        Initialize FileHook and vars. Setup callbacks for all fd-based syscalls
        '''
        super().__init__(panda)
        self.ff_logger = logging.getLogger('panda.filehook.fakefile')

        self.faked_files = {} # filename: Fake
        self.hooked_fds = {} # (fd, cr3): HyperFD->faker
        self.pending_hfd = None

        to_hook = {} # index of fd argument: list of names
        if panda.arch_name == "i386":
            # grep 'int fd' syscall_switch_enter_linux_x86.cpp  | grep "\['int fd\|\['unsigned int fd" | grep -o sys_[a-zA-Z0-9_]* | sed -n -e 's/sys_\(.*\)/"\1" /p' | paste -sd "," -
            # Note the grep commands missed dup2 and dup3 which take oldfd as 1st arg
            to_hook[0] = ["read", "write", "close", "lseek", "fstat", "ioctl", "fcntl", "ftruncate", "fchmod",
                          "fchown16", "fstatfs", "newfstat", "fsync", "fchdir", "llseek", "getdents", "flock",
                          "fdatasync", "pread64", "pwrite64", "ftruncate64", "fchown", "getdents64", "fcntl64",
                          "readahead", "fsetxattr", "fgetxattr", "flistxattr", "fremovexattr", "fadvise64",
                          "fstatfs64", "fadvise64_64", "inotify_add_watch", "inotify_rm_watch", "splice",
                          "sync_file_range", "tee", "vmsplice", "fallocate", "recvmmsg", "syncfs", "sendmmsg",
                          "setns", "finit_module", "getsockopt", "setsockopt", "sendmsg", "recvmsg", "dup2",
                          "dup3" ]

            # grep 'int fd' syscall_switch_enter_linux_x86.cpp  | grep -v "\['int fd\|\['unsigned int fd" # + manual
            to_hook[2] = ["epoll_ctl"]
            to_hook[3] = ["fanotify_mark"]

        elif panda.arch_name == "x86_64":
            to_hook[0] = ["read", "write", "close", "newfstat", "lseek", "ioctl", "pread64", "pwrite64", "sendmsg",
                          "recvmsg", "setsockopt", "getsockopt", "fcntl", "flock", "fsync", "fdatasync", "ftruncate",
                          "getdents", "fchdir", "fchmod", "fchown", "fstatfs", "readahead", "fsetxattr", "fgetxattr",
                          "flistxattr", "fremovexattr", "getdents64", "fadvise64", "inotify_add_watch",
                          "inotify_rm_watch", "splice", "tee", "sync_file_range", "vmsplice", "fallocate", "recvmmsg",
                          "syncfs", "sendmmsg", "setns", "finit_module", "copy_file_range", "dup2", "dup3"]
            to_hook[2] = ["epoll_ctl"]
            to_hook[3] = ["fanotify_mark"]

        elif panda.arch_name == "arm":
            to_hook[0] = ["read", "write", "close", "lseek", "ioctl", "fcntl", "ftruncate", "fchmod", "fchown16",
                          "fstatfs", "newfstat", "fsync", "fchdir", "llseek", "getdents", "flock", "fdatasync",
                          "pread64", "pwrite64", "ftruncate64", "fchown", "getdents64", "fcntl64", "readahead",
                          "fsetxattr", "fgetxattr", "flistxattr", "fremovexattr", "fstatfs64", "arm_fadvise64_64",
                          "setsockopt", "getsockopt", "sendmsg", "recvmsg", "inotify_add_watch", "inotify_rm_watch",
                          "splice", "sync_file_range2", "tee", "vmsplice", "fallocate", "recvmmsg", "syncfs",
                          "sendmmsg", "setns", "finit_module", "dup2", "dup3"]
            to_hook[2] = ["epoll_ctl"]
            to_hook[3] = ["fanotify_mark"]
        else:
            raise ValueError(f"Unsupported PANDA arch: {panda.arch_name}")

        for arg_offset, names in to_hook.items():
            for name in names:
                self._gen_fd_cb(name, arg_offset)

    def replace_file(self, filename, faker, disk_file="/etc/passwd"):
        '''
        Replace all accesses to filename with accesses to the fake file instead
        which optionally may be specified by disk_file.
        '''
        self.faked_files[filename] = faker

        # XXX: We rename the files to real files to the guest kernel can manage FDs for us.
        #      this may need to use different real files depending on permissions requested
        self.rename_file(filename, disk_file)

    def _gen_fd_cb(self, name, fd_offset):
        '''
        Register syscalls2 PPP callback on enter and return for the given name
        which has an argument of fd at fd_offset in the argument list
        '''
        self._panda.ppp("syscalls2", f"on_sys_{name}_return", name=f"file_faker_return_{name}")( \
                    lambda *args: self._return_fd_cb(name, fd_offset, args=args))

    def _return_fd_cb(self, syscall_name, fd_pos, args=None):
        '''
        When we're returnuing from a syscall, mutate memory
        to put the results we want there
        '''

        (cpu, pc) = args[0:2]
        fd = args[2+fd_pos]
        asid = self._panda.current_asid(cpu)

        if (fd, asid) not in self.hooked_fds:
            return

        assert(args)
        hfd = self.hooked_fds[(fd, asid)]

        if syscall_name == "read":
            # Place up to `count` bytes of data into memory at `buf_ptr`
            buf_ptr = args[3]
            count   = args[4]

            (data, data_len) = hfd.read(count)
            if data:
                try:
                    self._panda.virtual_memory_write(cpu, buf_ptr, data)
                except ValueError:
                    self.ff_logger.error(f"Unable to store fake data after read to {hfd}")
                    return

            cpu.env_ptr.regs[0] = data_len

            self.ff_logger.info(f"Read - returning {data_len} bytes")

        elif syscall_name == "close":
            # We want the guest to close the real FD. Delete it from our map of hooked fds
            hfd.close()
            if (fd, asid) in self.hooked_fds:
                del self.hooked_fds[(fd, asid)]

        elif syscall_name == "write":
            # read count bytes from buf, add to our hyper-fd
            buf_ptr = args[3]
            count   = args[4]
            try:
                data = self._panda.virtual_memory_read(cpu, buf_ptr, count)
            except ValueError:
                self.ff_logger.error(f"Unable to read buffer that was being written")
                return

            bytes_written = hfd.write(data)
            cpu.env_ptr.regs[0] = bytes_written

        elif syscall_name == "lseek": # LLSEEK?
            offset = args[2]
            whence = args[3]
            hfd.seek(offset, whence)


        elif syscall_name in ["dup2", "dup3"]:
            # add newfd
            oldfd = args[2]
            newfd = args[3]
            self.ff_logger.debug(f"Duplicating faked fd {oldfd} to {newfd}")

            # Duplicate the old hfd - but not the file behind it
            new_hfd = HyperFD(hfd.name, hfd.file, hfd.offset)
            self.hooked_fds[(newfd, asid)] = new_hfd

        else:
            self.ff_logger.error(f"Unsupported syscall on FakeFD{fd}: {syscall_name}. Not intercepting (Running on real guest FD)")


    def _before_modified_enter(self, cpu, pc, syscall_name, fname):
        '''
        Overload FileHook function. Determine if a syscall we're about to
        enter is using a filename we want to fake

        After the modified syscall returns, we grab the real FD and map it to the HFD
        '''
        if fname in self.faked_files:
            self.pending_hfd =  HyperFD(fname, self.faked_files[fname]) # Create HFD
            asid = self._panda.current_asid(cpu)

    def _after_modified_return(self, cpu, pc, syscall_name, fd):
        '''
        Overload FileHook function. Determine if a syscall we're about to
        return from was using a filename we want to fake. If so, grab the FD
        '''
        if self.pending_hfd:
            asid = self._panda.current_asid(cpu)
            self.hooked_fds[(fd, asid)] =  self.pending_hfd
            self.logger.info(f"A file we want to hook was created {self.pending_hfd}")
            self.pending_hfd = None

    def close(self):
        # Close all open hfds
        if len(self.hooked_fds):
            self.ff_logger.debug("Cleaning up open hyper file descriptors")
            for (fd, asid) in list(self.hooked_fds.keys()):
                self.hooked_fds[(fd, asid)].close()
                del self.hooked_fds[(fd, asid)]


    def __del__(self):
        # XXX: This isn't being called for some reason on destruction
        self.close()

Ancestors

Methods

def close(self)
Expand source code
def close(self):
    # Close all open hfds
    if len(self.hooked_fds):
        self.ff_logger.debug("Cleaning up open hyper file descriptors")
        for (fd, asid) in list(self.hooked_fds.keys()):
            self.hooked_fds[(fd, asid)].close()
            del self.hooked_fds[(fd, asid)]
def replace_file(self, filename, faker, disk_file='/etc/passwd')

Replace all accesses to filename with accesses to the fake file instead which optionally may be specified by disk_file.

Expand source code
def replace_file(self, filename, faker, disk_file="/etc/passwd"):
    '''
    Replace all accesses to filename with accesses to the fake file instead
    which optionally may be specified by disk_file.
    '''
    self.faked_files[filename] = faker

    # XXX: We rename the files to real files to the guest kernel can manage FDs for us.
    #      this may need to use different real files depending on permissions requested
    self.rename_file(filename, disk_file)

Inherited members

class HyperFD (filename, fakefile, offset=0)

A HyperFD is what we use to track the state of a faked FD in the guest. It is backed by a FakeFile. Stores the filename originally associated with it at time of open

Expand source code
class HyperFD:
    '''
    A HyperFD is what we use to track the state of a faked FD in the guest.
    It is backed by a FakeFile.
    Stores the filename originally associated with it at time of open
    '''
    def __init__(self,  filename, fakefile, offset=0):
        self.name = filename
        self.file = fakefile
        self.file.refcount+=1 # Count of open FDs pointing to the file
        self.offset = offset

    def read(self, size):
        '''
        Read from the file descriptor. Determine current offset
        and then pass request through to FakeFile
        Returns (data read, count)
        '''
        assert(self.file)
        data = self.file.read(size, self.offset)
        self.offset+=len(data)
        return (data, len(data))

    def write(self, data):
        assert(self.file)
        bytes_written =  self.file.write(self.offset, data)
        self.offset +- bytes_written
        return bytes_written

    def get_mode(self):
        assert(self.file)
        return self.file.get_mode()

    def get_size(self):
        assert(self.file)
        return self.file.get_mode()

    def close(self):
        '''
        Decrement the reference counter
        '''
        assert(self.file)
        self.file.close()
        #del self # ???

    def seek(self, offset, whence):
        # From include/uapi/linux/fs.h
        SEEK_SET = 0
        SEEK_CUR = 1
        SEEK_END = 2

        if whence == SEEK_SET:
            self.offset = offset
        elif whence == SEEK_CUR:
            self.offset = self.offset + offset
        elif whence == SEEK_END:
            self.offset = self.offset + offset
        else:
            raise ValueError(f"Unsupported whence {whence} in seek")

    def __str__(self):
        return f"HyperFD with name {self.name} offset {self.offset} backed by {self.file}"

Methods

def close(self)

Decrement the reference counter

Expand source code
def close(self):
    '''
    Decrement the reference counter
    '''
    assert(self.file)
    self.file.close()
    #del self # ???
def get_mode(self)
Expand source code
def get_mode(self):
    assert(self.file)
    return self.file.get_mode()
def get_size(self)
Expand source code
def get_size(self):
    assert(self.file)
    return self.file.get_mode()
def read(self, size)

Read from the file descriptor. Determine current offset and then pass request through to FakeFile Returns (data read, count)

Expand source code
def read(self, size):
    '''
    Read from the file descriptor. Determine current offset
    and then pass request through to FakeFile
    Returns (data read, count)
    '''
    assert(self.file)
    data = self.file.read(size, self.offset)
    self.offset+=len(data)
    return (data, len(data))
def seek(self, offset, whence)
Expand source code
def seek(self, offset, whence):
    # From include/uapi/linux/fs.h
    SEEK_SET = 0
    SEEK_CUR = 1
    SEEK_END = 2

    if whence == SEEK_SET:
        self.offset = offset
    elif whence == SEEK_CUR:
        self.offset = self.offset + offset
    elif whence == SEEK_END:
        self.offset = self.offset + offset
    else:
        raise ValueError(f"Unsupported whence {whence} in seek")
def write(self, data)
Expand source code
def write(self, data):
    assert(self.file)
    bytes_written =  self.file.write(self.offset, data)
    self.offset +- bytes_written
    return bytes_written