Module pandare.extras

Extras are PyPANDA plugins which you can import into other python analyses. Typically this is done by passing a handle from your script's PANDA object to the plugin.

Expand source code
"""
Extras are PyPANDA plugins which you can import into other python analyses. Typically
this is done by passing a handle from your script's PANDA object to the plugin.
"""

# Note file names should not contain underscores, let's keep these in lower
# camelCase going forward (e.g., modeFilter) so they match the class names.
from .fileFaker import FakeFile, FileFaker
from .fileHook import FileHook
from .ioctlFaker import IoctlFaker
from .modeFilter import ModeFilter
from .procWriteCapture import ProcWriteCapture
from .procTrace import ProcGraph


__all__ = ['FakeFile', 'FileFaker', 'FileHook', 'IoctlFaker', 'ModeFilter', 'ProcWriteCapture',
           'Snake', 'ProcGraph']

Sub-modules

pandare.extras.fileFaker

Framework for halucinating files inside the guest through modifications around syscalls involving filenames and file descriptors …

pandare.extras.fileHook
pandare.extras.ioctlFaker
pandare.extras.modeFilter

Simple helper and example to selectively execute callbacks based on a mode string

pandare.extras.procTrace

Create a graph of which processes run/ran over time …

pandare.extras.procWriteCapture

Classes

class FakeFile (fake_contents='', filename=None)

A fake file behind a hyperFD - this class will generate data when the corresponding file descriptor(s) are accessed. Users can inherit and modify this to customize how data is generated

Note: a single FileFaker might be opened and in use by multiple FDs in the guest

Expand source code
class FakeFile:
    '''
    A fake file behind a hyperFD - this class will generate data when the
    corresponding file descriptor(s) are accessed.
    Users can inherit and modify this to customize how data is generated

    Note: a single FileFaker might be opened and in use by multiple FDs in the guest

    '''
    def __init__(self, fake_contents="", filename=None):
        self.logger = logging.getLogger('panda.filehook.fakefile')

        if isinstance(fake_contents, str):
            fake_contents = fake_contents.encode("utf8")
        self.contents = fake_contents
        self.initial_contents = fake_contents
        self.refcount = 0 # Reference count
        self.filename = filename # Just for debug printing

    def read(self, size, offset):
        '''
        Generate data for a given read of size.  Returns data.
        '''

        if offset >= len(self.contents):  # No bytes left to read
            return b""
        # Otherwise there are bytes left to read
        read_data = self.contents[offset:offset+size]

        return read_data

    def write(self, offset, write_data):
        '''
        Update contents from offset. It's a bytearray so we can't just mutate
        Return how much HyperFD offset should be incremented by
        XXX what about writes past end of the file?
        '''
        new_data  = self.contents[:offset]
        new_data += write_data
        new_data += self.contents[offset+len(new_data):]
        
        self.logger.info(f"FakeFD({self.filename}) writing {new_data} at offset {offset}")

        self.contents = new_data
        return len(write_data)

    def close(self):
        self.refcount -= 1
        if self.refcount == 0: # All FDs are now closed
            if self.initial_contents == self.contents:
                self.logger.debug(f"All handles to Faker({self.filename}) closed. Unmodified contents")
            else: # it was mutated!
                self.logger.info(f"All handles to Faker({self.filename}) closed. Modified contents: {repr(self.contents)}")

    def get_mode(self):
        return 0o664 # Regular file (octal)

    def get_size(self, bytesize):
        return ceil(len(self.contents)/bytesize)

    def __str__(self):
        return f"Faker({self.filename} -> {repr(self.contents[:10])}..."


    def _delete(self):
        self.close()

    def __del__(self):
        # XXX: This destructor isn't called automatically
        self._delete()

Methods

def close(self)
Expand source code
def close(self):
    self.refcount -= 1
    if self.refcount == 0: # All FDs are now closed
        if self.initial_contents == self.contents:
            self.logger.debug(f"All handles to Faker({self.filename}) closed. Unmodified contents")
        else: # it was mutated!
            self.logger.info(f"All handles to Faker({self.filename}) closed. Modified contents: {repr(self.contents)}")
def get_mode(self)
Expand source code
def get_mode(self):
    return 0o664 # Regular file (octal)
def get_size(self, bytesize)
Expand source code
def get_size(self, bytesize):
    return ceil(len(self.contents)/bytesize)
def read(self, size, offset)

Generate data for a given read of size. Returns data.

Expand source code
def read(self, size, offset):
    '''
    Generate data for a given read of size.  Returns data.
    '''

    if offset >= len(self.contents):  # No bytes left to read
        return b""
    # Otherwise there are bytes left to read
    read_data = self.contents[offset:offset+size]

    return read_data
def write(self, offset, write_data)

Update contents from offset. It's a bytearray so we can't just mutate Return how much HyperFD offset should be incremented by XXX what about writes past end of the file?

Expand source code
def write(self, offset, write_data):
    '''
    Update contents from offset. It's a bytearray so we can't just mutate
    Return how much HyperFD offset should be incremented by
    XXX what about writes past end of the file?
    '''
    new_data  = self.contents[:offset]
    new_data += write_data
    new_data += self.contents[offset+len(new_data):]
    
    self.logger.info(f"FakeFD({self.filename}) writing {new_data} at offset {offset}")

    self.contents = new_data
    return len(write_data)
class FileFaker (panda)

Class to halucinate fake files within the guest. When the guest attempts to access a faked file, we transparenly redirect the access to another file on disk and grab the FD generated using FileHook.

When the guest attempts to use a FD related to a faked file, we mutate the request. Reads are created from fake conents and writes are logged.

Initialize FileHook and vars. Setup callbacks for all fd-based syscalls

Expand source code
class FileFaker(FileHook):
    '''
    Class to halucinate fake files within the guest. When the guest attempts to access a faked file,
    we transparenly redirect the access to another file on disk and grab the FD generated using FileHook.

    When the guest attempts to use a FD related to a faked file, we mutate the request. Reads are created
    from fake conents and writes are logged.
    '''

    def __init__(self, panda):
        '''
        Initialize FileHook and vars. Setup callbacks for all fd-based syscalls
        '''
        super().__init__(panda)
        self.ff_logger = logging.getLogger('panda.filehook.fakefile')

        self.faked_files = {} # filename: Fake
        self.hooked_fds = {} # (fd, cr3): HyperFD->faker
        self.pending_hfd = None

        to_hook = {} # index of fd argument: list of names
        if panda.arch_name == "i386":
            # grep 'int fd' syscall_switch_enter_linux_x86.cpp  | grep "\['int fd\|\['unsigned int fd" | grep -o sys_[a-zA-Z0-9_]* | sed -n -e 's/sys_\(.*\)/"\1" /p' | paste -sd "," -
            # Note the grep commands missed dup2 and dup3 which take oldfd as 1st arg
            to_hook[0] = ["read", "write", "close", "lseek", "fstat", "ioctl", "fcntl", "ftruncate", "fchmod",
                          "fchown16", "fstatfs", "newfstat", "fsync", "fchdir", "llseek", "getdents", "flock",
                          "fdatasync", "pread64", "pwrite64", "ftruncate64", "fchown", "getdents64", "fcntl64",
                          "readahead", "fsetxattr", "fgetxattr", "flistxattr", "fremovexattr", "fadvise64",
                          "fstatfs64", "fadvise64_64", "inotify_add_watch", "inotify_rm_watch", "splice",
                          "sync_file_range", "tee", "vmsplice", "fallocate", "recvmmsg", "syncfs", "sendmmsg",
                          "setns", "finit_module", "getsockopt", "setsockopt", "sendmsg", "recvmsg", "dup2",
                          "dup3" ]

            # grep 'int fd' syscall_switch_enter_linux_x86.cpp  | grep -v "\['int fd\|\['unsigned int fd" # + manual
            to_hook[2] = ["epoll_ctl"]
            to_hook[3] = ["fanotify_mark"]

        elif panda.arch_name == "x86_64":
            to_hook[0] = ["read", "write", "close", "newfstat", "lseek", "ioctl", "pread64", "pwrite64", "sendmsg",
                          "recvmsg", "setsockopt", "getsockopt", "fcntl", "flock", "fsync", "fdatasync", "ftruncate",
                          "getdents", "fchdir", "fchmod", "fchown", "fstatfs", "readahead", "fsetxattr", "fgetxattr",
                          "flistxattr", "fremovexattr", "getdents64", "fadvise64", "inotify_add_watch",
                          "inotify_rm_watch", "splice", "tee", "sync_file_range", "vmsplice", "fallocate", "recvmmsg",
                          "syncfs", "sendmmsg", "setns", "finit_module", "copy_file_range", "dup2", "dup3"]
            to_hook[2] = ["epoll_ctl"]
            to_hook[3] = ["fanotify_mark"]

        elif panda.arch_name == "arm":
            to_hook[0] = ["read", "write", "close", "lseek", "ioctl", "fcntl", "ftruncate", "fchmod", "fchown16",
                          "fstatfs", "newfstat", "fsync", "fchdir", "llseek", "getdents", "flock", "fdatasync",
                          "pread64", "pwrite64", "ftruncate64", "fchown", "getdents64", "fcntl64", "readahead",
                          "fsetxattr", "fgetxattr", "flistxattr", "fremovexattr", "fstatfs64", "arm_fadvise64_64",
                          "setsockopt", "getsockopt", "sendmsg", "recvmsg", "inotify_add_watch", "inotify_rm_watch",
                          "splice", "sync_file_range2", "tee", "vmsplice", "fallocate", "recvmmsg", "syncfs",
                          "sendmmsg", "setns", "finit_module", "dup2", "dup3"]
            to_hook[2] = ["epoll_ctl"]
            to_hook[3] = ["fanotify_mark"]
        else:
            raise ValueError(f"Unsupported PANDA arch: {panda.arch_name}")

        for arg_offset, names in to_hook.items():
            for name in names:
                self._gen_fd_cb(name, arg_offset)

    def replace_file(self, filename, faker, disk_file="/etc/passwd"):
        '''
        Replace all accesses to filename with accesses to the fake file instead
        which optionally may be specified by disk_file.
        '''
        self.faked_files[filename] = faker

        # XXX: We rename the files to real files to the guest kernel can manage FDs for us.
        #      this may need to use different real files depending on permissions requested
        self.rename_file(filename, disk_file)

    def _gen_fd_cb(self, name, fd_offset):
        '''
        Register syscalls2 PPP callback on enter and return for the given name
        which has an argument of fd at fd_offset in the argument list
        '''
        self._panda.ppp("syscalls2", f"on_sys_{name}_return", name=f"file_faker_return_{name}")( \
                    lambda *args: self._return_fd_cb(name, fd_offset, args=args))

    def _return_fd_cb(self, syscall_name, fd_pos, args=None):
        '''
        When we're returnuing from a syscall, mutate memory
        to put the results we want there
        '''

        (cpu, pc) = args[0:2]
        fd = args[2+fd_pos]
        asid = self._panda.current_asid(cpu)

        if (fd, asid) not in self.hooked_fds:
            return

        assert(args)
        hfd = self.hooked_fds[(fd, asid)]

        if syscall_name == "read":
            # Place up to `count` bytes of data into memory at `buf_ptr`
            buf_ptr = args[3]
            count   = args[4]

            (data, data_len) = hfd.read(count)
            if data:
                try:
                    self._panda.virtual_memory_write(cpu, buf_ptr, data)
                except ValueError:
                    self.ff_logger.error(f"Unable to store fake data after read to {hfd}")
                    return

            cpu.env_ptr.regs[0] = data_len

            self.ff_logger.info(f"Read - returning {data_len} bytes")

        elif syscall_name == "close":
            # We want the guest to close the real FD. Delete it from our map of hooked fds
            hfd.close()
            if (fd, asid) in self.hooked_fds:
                del self.hooked_fds[(fd, asid)]

        elif syscall_name == "write":
            # read count bytes from buf, add to our hyper-fd
            buf_ptr = args[3]
            count   = args[4]
            try:
                data = self._panda.virtual_memory_read(cpu, buf_ptr, count)
            except ValueError:
                self.ff_logger.error(f"Unable to read buffer that was being written")
                return

            bytes_written = hfd.write(data)
            cpu.env_ptr.regs[0] = bytes_written

        elif syscall_name == "lseek": # LLSEEK?
            offset = args[2]
            whence = args[3]
            hfd.seek(offset, whence)


        elif syscall_name in ["dup2", "dup3"]:
            # add newfd
            oldfd = args[2]
            newfd = args[3]
            self.ff_logger.debug(f"Duplicating faked fd {oldfd} to {newfd}")

            # Duplicate the old hfd - but not the file behind it
            new_hfd = HyperFD(hfd.name, hfd.file, hfd.offset)
            self.hooked_fds[(newfd, asid)] = new_hfd

        else:
            self.ff_logger.error(f"Unsupported syscall on FakeFD{fd}: {syscall_name}. Not intercepting (Running on real guest FD)")


    def _before_modified_enter(self, cpu, pc, syscall_name, fname):
        '''
        Overload FileHook function. Determine if a syscall we're about to
        enter is using a filename we want to fake

        After the modified syscall returns, we grab the real FD and map it to the HFD
        '''
        if fname in self.faked_files:
            self.pending_hfd =  HyperFD(fname, self.faked_files[fname]) # Create HFD
            asid = self._panda.current_asid(cpu)

    def _after_modified_return(self, cpu, pc, syscall_name, fd):
        '''
        Overload FileHook function. Determine if a syscall we're about to
        return from was using a filename we want to fake. If so, grab the FD
        '''
        if self.pending_hfd:
            asid = self._panda.current_asid(cpu)
            self.hooked_fds[(fd, asid)] =  self.pending_hfd
            self.logger.info(f"A file we want to hook was created {self.pending_hfd}")
            self.pending_hfd = None

    def close(self):
        # Close all open hfds
        if len(self.hooked_fds):
            self.ff_logger.debug("Cleaning up open hyper file descriptors")
            for (fd, asid) in list(self.hooked_fds.keys()):
                self.hooked_fds[(fd, asid)].close()
                del self.hooked_fds[(fd, asid)]


    def __del__(self):
        # XXX: This isn't being called for some reason on destruction
        self.close()

Ancestors

Methods

def close(self)
Expand source code
def close(self):
    # Close all open hfds
    if len(self.hooked_fds):
        self.ff_logger.debug("Cleaning up open hyper file descriptors")
        for (fd, asid) in list(self.hooked_fds.keys()):
            self.hooked_fds[(fd, asid)].close()
            del self.hooked_fds[(fd, asid)]
def replace_file(self, filename, faker, disk_file='/etc/passwd')

Replace all accesses to filename with accesses to the fake file instead which optionally may be specified by disk_file.

Expand source code
def replace_file(self, filename, faker, disk_file="/etc/passwd"):
    '''
    Replace all accesses to filename with accesses to the fake file instead
    which optionally may be specified by disk_file.
    '''
    self.faked_files[filename] = faker

    # XXX: We rename the files to real files to the guest kernel can manage FDs for us.
    #      this may need to use different real files depending on permissions requested
    self.rename_file(filename, disk_file)

Inherited members

class FileHook (panda, use_osi=True)

Class to modify guest memory just before syscalls with filename arguments. As the system call is about to be executed, change the data pointed to by the filename pointer. When the syscall returns, restore the mutated data to its original values.

This provides a simple, cross-platform interface to redirect file accesses just using the OSI plugin.

usage: panda = Panda(…) hook = FileHook(panda) hook.rename_file("/rename_this", "/to_this")

Store a reference to the panda object, and register the appropriate syscalls2 callbacks for entering and exiting from all syscalls that have a char* filename argument.

Expand source code
class FileHook:
    '''
    Class to modify guest memory just before syscalls with filename arguments.
    As the system call is about to be executed, change the data pointed to by the
    filename pointer. When the syscall returns, restore the mutated data to its
    original values.

    This provides a simple, cross-platform interface to redirect file accesses
    just using the OSI plugin.

    usage:
        panda = Panda(...)
        hook = FileHook(panda)
        hook.rename_file("/rename_this", "/to_this")
    '''

    def __init__(self, panda, use_osi=True):
        '''
        Store a reference to the panda object, and register
        the appropriate syscalls2 callbacks for entering and exiting
        from all syscalls that have a char* filename argument.
        '''

        self._panda = panda
        self._renamed_files = {} # old_fname (str): new_fname (bytes)
        self._changed_strs = {} # callback_name: original_data
        self.use_osi = use_osi

        self.logger = logging.getLogger('panda.filehook')
        try:
            import coloredlogs
            coloredlogs.install(level='WARN')
        except ImportError:
            pass
        self.pending_virt_read = None

        panda.load_plugin("syscalls2")

        # For each architecture, we have a different set of syscalls. They all
        # either call our functions with (cpu, pc, filename_ptr, ...)
        # or (cpu, pc, something_else, filename_ptr, ...). Here we
        # Programmatically generate callbacks for all of them

        # These lists were made with commands like the following in syscalls2/generated:
        # grep filename syscall_switch_enter_linux_x86.cpp | grep "\['const char " | grep -o sys_[a-zA-Z0-9]* | grep -o [a-z0-9]*$
        # grep filename syscall_switch_enter_linux_x86.cpp | grep -v "\['const char " | grep -o sys_[a-zA-Z0-9]* | grep -o [a-z0-9]*$
        to_hook = {}
        if panda.arch_name == "i386":
            to_hook[0] = ["open", "execve", "chdir", "mknod", "chmod", "lchown16", "stat", "access", "chroot",
                         "lstat", "newstat", "newlstat", "chown16", "stat64", "lstat64", "lchown", "chown" ]
            to_hook[1] = ["utime", "utimes", "openat", "mknodat", "fchownat", "futimesat", "fstatat64",
                          "fchmodat", "faccessat", "utimensat", "execveat"]

        elif panda.arch_name == "x86_64":
            to_hook[0] = ["open", "newstat", "newlstat", "access", "chdir", "chmod", "chown", "lchown", "mknod", "chroot"]
            to_hook[1] = ["utime", "utimes", "openat", "mknodat", "fchownat", "futimesat", "newfstatat", "fchmodat", "faccessat", "utimensat"]

        elif panda.arch_name == "arm":
            to_hook[0] = ["open", "execve", "chdir", "mknod", "chmod", "lchown16", "access", "chroot", "newstat", "newlstat", "chown16", "stat64", "lstat64", "lchown", "chown"]
            to_hook[1] = ["utime", "utimes", "openat", "mknodat", "fchownat", "futimesat", "fstatat64", "fchmodat", "faccessat", "utimensat", "execveat"]
        else:
            raise ValueError(f"Unsupported PANDA arch: {panda.arch_name}")

        # Register the callbacks
        for arg_offset, names in to_hook.items():
            for name in names:
                self._gen_cb(name, arg_offset)


        # Fallback callback used when syscall with file name isn't mapped into memory
        @self._panda.cb_virt_mem_before_read(enabled=False)
        def before_virt_read(cpu, pc, addr, size):
            '''
            This callback is necessary for the case when we enter a syscall but the filename pointer is paged out.
            When that happens, we enable this (slow) callback which checks every mem-read while we're in that syscall
            to see if the memory has since been paged-in. It should always eventually be paged in. Once it is,
            we mutate the memory and then disable this callback.

            If this hasn't run by the time the callback returns, we give up and disable it
            '''
            if not self.pending_virt_read:
                return

            # Is our pending read a subset of the current read? If so try to read it
            if addr <= self.pending_virt_read and addr+size > self.pending_virt_read:
                try:
                    fname = self._panda.read_str(cpu, self.pending_virt_read)
                except ValueError:
                    return # Still not available. Keep waiting
                self.logger.debug(f"recovered missed filename: {fname}")

                # It is available! Disable this slow callback and rerurn _enter_cb with the data
                fname_ptr = self.pending_virt_read
                self.pending_virt_read = None
                self._panda.disable_callback('before_virt_read')
                self._enter_cb(self.pending_syscall, args=(cpu, pc), fname_ptr=fname_ptr)


    def rename_file(self, old_name, new_name):
        '''
        Mutate a given filename into a new name at the syscall interface
        '''
        assert(old_name not in self._renamed_files), f"Already have a rename rule for {old_name}"

        if not isinstance(new_name, bytes):
            new_name = new_name.encode("utf8")

        if not new_name.endswith(b"\x00"):
            new_name += b"\x00"

        self._renamed_files[old_name] = new_name

    def _get_fname(self, cpu, fd):
        '''
        Use OSI to get the filename behind a file descriptor.
        If not self.use_osi, return None
        '''
        if not self.use_osi:
            return None
        fname_s = None
        proc = self._panda.plugins['osi'].get_current_process(cpu)
        if proc != self._panda.ffi.NULL:
            fname = self._panda.plugins['osi_linux'].osi_linux_fd_to_filename(cpu, proc, self._panda.ffi.cast("int", fd))
            if fname != self._panda.ffi.NULL:
                fname_s = self._panda.ffi.string(fname).decode('utf8', 'ignore')
        return fname_s

    def _gen_cb(self, name, fname_ptr_pos):
        '''
        Register syscalls2 PPP callback on enter and return for the given name
        which has an argument of char* filename at fname_ptr_pos in the arguments list
        '''
        self._panda.ppp("syscalls2", f"on_sys_{name}_enter", name = f"file_hook_enter_{name}")( \
                    lambda *args: self._enter_cb(name, fname_ptr_pos, args=args))
        self._panda.ppp("syscalls2", f"on_sys_{name}_return", name = f"file_hook_return_{name}")( \
                    lambda *args: self._return_cb(name, fname_ptr_pos, args=args))

    def _enter_cb(self, syscall_name, fname_ptr_pos=0, args=None, fname_ptr=None):
        '''
        When we return, check if we mutated the fname buffer. If so,
        we need to restore whatever data was there (we may have written
        past the end of the string).

        if fname_ptr is set, just skip the logic to extract it
        '''

        assert(args)
        (cpu, pc) = args[0:2]

        if not fname_ptr:
            fname_ptr = args[2+fname_ptr_pos] # offset to after (cpu, pc) in callback args

        try:
            fname = self._panda.read_str(cpu, fname_ptr)
        except:
            fname = self._get_fname(cpu, args[2+fname_ptr_pos])

            if fname:
                self.logger.info(f"OSI found fname after simple logic missed it in call to {syscall_name}")
            else:
                self.logger.debug(f"missed filename at 0x{fname_ptr:x} in call to {syscall_name} - trying to find")
                self.pending_virt_read = cpu.env_ptr.regs[0]
                self.pending_syscall = syscall_name
                self._panda.enable_callback('before_virt_read')
                #self._panda_enable_memcb()
                return

        fname = path.normpath(fname) # Normalize it
        #self.logger.info(f"Entering {syscall_name} with file={fname}")

        if fname in self._renamed_files:
            # It matches, now let's take our action! Either rename or callback

            self.logger.debug(f"modifying filename {fname} in {syscall_name} to {self._renamed_files[fname]}")
            assert(syscall_name not in self._changed_strs), "Entering syscall that already has a pending restore"

            # First read a buffer of the same size as our new value. XXX the string we already read might be shorter
            # than what we're inserting so we read again so we can later restore the old data
            try:
                clobbered_data = self._panda.virtual_memory_read(cpu, fname_ptr, len(self._renamed_files[fname]))
            except ValueError:
                self.logger.error(f"Failed to read target buffer at call into {syscall_name}")
                return

            # Now replace those bytes with our new name
            try:
                self._panda.virtual_memory_write(cpu, fname_ptr, self._renamed_files[fname])
            except ValueError:
                self.logger.warn(f"Failed to mutate filename buffer at call into {syscall_name}")
                return

            # If it all worked, save the clobbered data
            asid = self._panda.current_asid(cpu)
            self._changed_strs[(syscall_name, asid)] = clobbered_data

            self._before_modified_enter(cpu, pc, syscall_name, fname)


    def _return_cb(self, syscall_name, fname_ptr_pos, args=None):
        '''
        When we return, check if we mutated the fname buffer. If so,
        we need to restore whatever data was there (we may have written
        past the end of the string)
        '''
        (cpu, pc) = args[0:2]
        if self.pending_virt_read:
            fname_ptr = args[2+fname_ptr_pos] # offset to after (cpu, pc) in callback args

            self.logger.warning(f"missed filename in call to {syscall_name} with fname at 0x{fname_ptr:x}. Ignoring it")

            self._panda.disable_callback('before_virt_read') # No point in continuing this
            self.pending_virt_read = None # Virtual address that we're waiting to read as soon as possible
            return

        asid = self._panda.current_asid(cpu)
        if (syscall_name, asid) in self._changed_strs:
            assert(args)
            fname_ptr = args[2+fname_ptr_pos] # offset to after (cpu, pc) in callback args
            try:
                self._panda.virtual_memory_write(cpu, fname_ptr, self._changed_strs[(syscall_name, asid)])
            except ValueError:
                self.logger.warn(f"Failed to fix filename buffer at return of {syscall_name}")
            del self._changed_strs[(syscall_name, asid)]

            fd = self._panda.arch.get_retval(cpu, convention='syscall')
            self.logger.info(f"Returning from {syscall_name} after modifying argument - modified FD is {fd}")
            self._after_modified_return(cpu, pc, syscall_name, fd=fd)

    def _before_modified_enter(self, cpu, pc, syscall_name, fname):
        '''
        Internal callback run before we enter a syscall where we mutated
        the filename. Exists to be overloaded by subclasses
        '''
        pass

    def _after_modified_return(self, cpu, pc, syscall_name, fd):
        '''
        Internal callback run before we return from a syscall where we mutated
        the filename. Exists to be overloaded by subclasses
        '''
        pass

Subclasses

Methods

def rename_file(self, old_name, new_name)

Mutate a given filename into a new name at the syscall interface

Expand source code
def rename_file(self, old_name, new_name):
    '''
    Mutate a given filename into a new name at the syscall interface
    '''
    assert(old_name not in self._renamed_files), f"Already have a rename rule for {old_name}"

    if not isinstance(new_name, bytes):
        new_name = new_name.encode("utf8")

    if not new_name.endswith(b"\x00"):
        new_name += b"\x00"

    self._renamed_files[old_name] = new_name
class IoctlFaker (panda, use_osi_linux=False, log=False, ignore=[], intercept_ret_vals=[-25], intercept_all_non_zero=False)

Interpose ioctl() syscall returns, forcing successes for specific error codes to simulate missing drivers/peripherals. Bin all returns into failures (needed forcing) and successes, store for later retrival/analysis.

Log enables/disables logging. ignore contains a list of tuples (filename, cmd#) to be ignored. intercept_ret_vals is a list of ioctl return values that should be intercepted. By default we just intercept just -25 which indicates that a driver is not present to handle the ioctl. intercept_all_non_zero is aggressive setting that takes precedence if set - any non-zero return code id changed to zero.

Expand source code
class IoctlFaker():

    '''
    Interpose ioctl() syscall returns, forcing successes for specific error codes to simulate missing drivers/peripherals.
    Bin all returns into failures (needed forcing) and successes, store for later retrival/analysis.
    '''

    def __init__(
            self,
            panda,
            use_osi_linux = False,
            log = False,
            ignore = [],
            intercept_ret_vals = [-25],
            intercept_all_non_zero = False
        ):

        '''
        Log enables/disables logging.
        ignore contains a list of tuples (filename, cmd#) to be ignored.
        intercept_ret_vals is a list of ioctl return values that should be intercepted. By default
          we just intercept just -25 which indicates that a driver is not present to handle the ioctl.
        intercept_all_non_zero is aggressive setting that takes precedence if set - any non-zero return code id changed to zero.
        '''

        self.osi = use_osi_linux
        self._panda = panda
        self._panda.load_plugin("syscalls2")
        self._log = log
        self.ignore = ignore
        self.intercept_ret_vals = intercept_ret_vals
        self.intercept_all_non_zero = intercept_all_non_zero

        if self.osi:
            self._panda.load_plugin("osi")
            self._panda.load_plugin("osi_linux")

        if self._log:
            self._logger = logging.getLogger('panda.ioctls')
            self._logger.setLevel(logging.DEBUG)

        # Track ioctls in two sets: modified (forced_returns) and unmodified
        self._forced_returns = set()
        self._unmodified_returns = set()

        # Force success returns for missing drivers/peripherals
        @self._panda.ppp("syscalls2", "on_sys_ioctl_return")
        def ioctl_faker_on_sys_ioctl_return(cpu, pc, fd, cmd, arg):

            ioctl = Ioctl(self._panda, cpu, fd, cmd, arg, self.osi)
            ioctl.get_ret_code(self._panda, cpu)

            # Modify
            if (self.intercept_all_non_zero and ioctl.original_ret_code != 0) or \
                ioctl.original_ret_code in self.intercept_ret_vals and \
                        (ioctl.file_name, ioctl.cmd.bits.cmd_num) not in self.ignore: # Allow ignoring specific commands on specific files

                if panda.arch_name == "mipsel" or panda.arch_name == "mips":
                    cpu.env_ptr.active_tc.gpr[2] = 0
                elif panda.arch_name == "aarch64":
                    cpu.env_ptr.xregs[0] = 0
                elif panda.arch_name == "ppc":
                    raise RuntimeError("PPC currently unsupported!")
                else: # x86/x64/ARM
                    cpu.env_ptr.regs[0] = 0

                self._forced_returns.add(ioctl)

                if ioctl.has_buf and self._log:
                    self._logger.warning("Forcing success return for data-containing {}".format(ioctl))
                elif self._log:
                    self._logger.info("Forcing success return for data-less {}".format(ioctl))

            # Don't modify
            else:
                self._unmodified_returns.add(ioctl)

    def _get_returns(self, source, with_buf_only):

        if with_buf_only:
            return list(filter(lambda i: (i.has_buf == True), source))
        else:
            return source

    def get_forced_returns(self, with_buf_only = False):

        '''
        Retrieve ioctls whose error codes where overwritten
        '''

        return self._get_returns(self._forced_returns, with_buf_only)

    def get_unmodified_returns(self, with_buf_only = False):

        '''
        Retrieve ioctl that completed normally
        '''

        return self._get_returns(self._unmodified_returns, with_buf_only)

Methods

def get_forced_returns(self, with_buf_only=False)

Retrieve ioctls whose error codes where overwritten

Expand source code
def get_forced_returns(self, with_buf_only = False):

    '''
    Retrieve ioctls whose error codes where overwritten
    '''

    return self._get_returns(self._forced_returns, with_buf_only)
def get_unmodified_returns(self, with_buf_only=False)

Retrieve ioctl that completed normally

Expand source code
def get_unmodified_returns(self, with_buf_only = False):

    '''
    Retrieve ioctl that completed normally
    '''

    return self._get_returns(self._unmodified_returns, with_buf_only)
class ModeFilter

Simple, inheritable class to provide a decorator to enable/disable callbacks depending on self.mode value.

It is ill-advised to use on callbacks with high-performance impacts such as before_block_exec as this is a pure-Python plugin.

Example

from pandare import Panda from pandare.extras import ModeFilter

class MyClass(ModeFilter): def init(self, panda) self.panda = panda self.set_mode("mode1")

    @self.mode_filter("mode1")
    @self.panda.ppp("syscalls2", "on_sys_open_enter")
    def on_open(cpu, pc, fname_ptr, flags, mode):
        # assert(self.mode == "mode1") # Note decorator ensures this
        self.set_mode("mode2") # Change mode - so this callback won't run again
...
def run(self):
    self.panda.run()

p = panda(…) mc = MyClass(panda) mc.run()

Expand source code
class ModeFilter:
    '''
    Simple, inheritable class to provide a decorator to enable/disable callbacks
    depending on self.mode value.

    It is ill-advised to use on callbacks with high-performance impacts such as
    before_block_exec as this is a pure-Python plugin.

    Example:
        from pandare import Panda
        from pandare.extras import ModeFilter

        class MyClass(ModeFilter):
            def __init__(self, panda)
                self.panda = panda
                self.set_mode("mode1")

                @self.mode_filter("mode1")
                @self.panda.ppp("syscalls2", "on_sys_open_enter")
                def on_open(cpu, pc, fname_ptr, flags, mode):
                    # assert(self.mode == "mode1") # Note decorator ensures this
                    self.set_mode("mode2") # Change mode - so this callback won't run again
            ...
            def run(self):
                self.panda.run()

        p = panda(...)
        mc = MyClass(panda)
        mc.run()
    '''
    mode = "start"

    def mode_filter(self, mode_filter):
        '''
        Decorator to only run a function if self.mode matches the provided string
        '''
        def __mode_filter(func):
            @wraps(func)
            def wrapper(*args, **kwargs):
                if self.mode == mode_filter:
                    # Mode matches - run it!
                    func(*args, **kwargs)
            return wrapper
        return __mode_filter

    def set_mode(self, new):
        '''
        Helper to change mode
        '''
        if new != self.mode:
            print(f"Switching modes from {self.mode} to {new}")
        self.mode = new

Subclasses

Class variables

var mode

Methods

def mode_filter(self, mode_filter)

Decorator to only run a function if self.mode matches the provided string

Expand source code
def mode_filter(self, mode_filter):
    '''
    Decorator to only run a function if self.mode matches the provided string
    '''
    def __mode_filter(func):
        @wraps(func)
        def wrapper(*args, **kwargs):
            if self.mode == mode_filter:
                # Mode matches - run it!
                func(*args, **kwargs)
        return wrapper
    return __mode_filter
def set_mode(self, new)

Helper to change mode

Expand source code
def set_mode(self, new):
    '''
    Helper to change mode
    '''
    if new != self.mode:
        print(f"Switching modes from {self.mode} to {new}")
    self.mode = new
class ProcGraph (panda)

Base class which PyPANDA plugins should inherit. Subclasses may register callbacks using the provided panda object and use the PyPlugin APIs:

  • self.get_args or self.get_arg_bool to check argument values
  • self.ppp to interact with other PyPlugins via PPP interfaces
  • self.ppp_cb_boilerplate('cb_name') to register a ppp-style callback
  • self.ppp_run_cb('cb_name') to run a previously-registered ppp-style callback
  • @PyPlugin.ppp_export to mark a class method as ppp-exported

For more information, check out the pyplugin documentation.

Expand source code
class ProcGraph(PyPlugin):
    def __init__(self, panda):
        # Data collection
        self.procinfo = {} # PID: info
        self.time_data = [] # [(PID, #blocks)]
        self.total_insns = 0
        self.n_insns = 0
        self.last_pid = None
        self.show_ranges = not self.get_arg("hide_ranges")
        self.show_graph = not self.get_arg("hide_graph")
        self.panda = panda

        # config option: number of columns
        self.n_cols = self.get_arg("cols") or 120

        @panda.cb_start_block_exec
        def sbe(cpu, tb):
            self.n_insns += tb.icount
            self.total_insns += tb.icount

        @panda.ppp("osi", "on_task_change")
        def task_change(cpu):
            proc = panda.plugins['osi'].get_current_process(cpu)
            thread = panda.plugins['osi'].get_current_thread(cpu)

            if proc == panda.ffi.NULL:
                print(f"Warning: Unable to identify process at {self.n_insns}")
                return
            if thread == panda.ffi.NULL:
                print(f"Warning: Unable to identify thread at {self.n_insns}")
                return

            proc_key = (proc.pid, thread.tid)
            if proc_key not in self.procinfo:
                self.procinfo[proc_key] = {"names": set(), #"tids": set(),
                                      "first": self.total_insns, "last": None,
                                      "count": 0}

            name = panda.ffi.string(proc.name)  if proc.name != panda.ffi.NULL else "(error)"
            self.procinfo[proc_key]["names"].add(name)

            # Update insn count for last process and indicate it (maybe) ends at total_insns-1
            if self.last_pid:
                # count since we last ran is it's old end value, minus where it just ended
                self.procinfo[self.last_pid]["count"] += (self.total_insns-1) - self.procinfo[self.last_pid]["last"]  \
                                                if self.procinfo[self.last_pid]["last"] is not None \
                                                else (self.total_insns-1) - self.procinfo[self.last_pid]["first"]
                self.procinfo[self.last_pid]["last"] = self.total_insns-1

            self.last_pid = proc_key

            self.time_data.append((proc_key, self.n_insns))
            self.n_insns = 0

    def uninit(self):
        render_graph(self.procinfo, self.time_data, self.total_insns, n_cols=self.n_cols, show_ranges=self.show_ranges, show_graph=self.show_graph)

        # Fully reset state
        self.panda.disable_ppp("task_change")
        self.procinfo = {} # PID: info
        self.time_data = [] # [(PID, #blocks)]
        self.total_insns = 0
        self.n_insns = 0
        self.last_pid = None

Ancestors

Methods

def uninit(self)
Expand source code
def uninit(self):
    render_graph(self.procinfo, self.time_data, self.total_insns, n_cols=self.n_cols, show_ranges=self.show_ranges, show_graph=self.show_graph)

    # Fully reset state
    self.panda.disable_ppp("task_change")
    self.procinfo = {} # PID: info
    self.time_data = [] # [(PID, #blocks)]
    self.total_insns = 0
    self.n_insns = 0
    self.last_pid = None

Inherited members

class ProcWriteCapture (panda, console_capture=False, proc_name=None, log_dir=None, rm_existing_logs=False)

Set console_capture = True to capture all console output to file, including boot messages. Set proc_name = "name_of_proc" to, for a named process, capture stdout/stderr and any file writes from the hypervisor, mirror results to log directory. Can be stacked with console capture.

Expand source code
class ProcWriteCapture():

    '''
    Set console_capture = True to capture all console output to file, including boot messages.
    Set proc_name = "name_of_proc" to, for a named process, capture stdout/stderr and any file writes from the hypervisor, mirror results to log directory.
    Can be stacked with console capture.
    '''

    def __init__(self, panda, console_capture = False, proc_name = None, log_dir = None, rm_existing_logs = False):

        self._panda = panda
        self._files_written = set()
        self._rm = rm_existing_logs
        self._console_capture = console_capture
        self._proc_name = proc_name
        self._proc_printed_err = False
        self._console_printed_err = False

        if log_dir == None:
            self._console_log_dir = Path.cwd()
            if proc_name:
                self._proc_log_dir = Path.cwd() / self._proc_name
        else:
            self._console_log_dir = Path(log_dir)
            if proc_name:
                self._proc_log_dir = Path(log_dir).joinpath(self._proc_name)

        # Setup logging dir
        self._console_log_dir.mkdir(parents=True, exist_ok=True)
        if proc_name:
            self._proc_log_dir.mkdir(parents=True, exist_ok=True)
        if self._rm:
            if proc_name:
                shutil.rmtree(self._proc_log_dir)
            shutil.rmtree(self._console_log_dir)

        # Mirror writes
        @self._panda.ppp("syscalls2", "on_sys_write_enter")
        def proc_write_capture_on_sys_write_enter(cpu, pc, fd, buf, cnt):

            try_read = False

            # Capture console output
            if self._console_capture:

                # Fun trick: lazy eval of OSI
                # Based on the idea that a non-POSIX FD will only be used after boot is finished an OSI is functional
                # Note: doesn't capture boot logs (would require hooking kernel's printk, not write syscall)
                if (fd == 1) or (fd == 2) or (fd == 3):
                    try_read = True
                else:
                    curr_proc = panda.plugins['osi'].get_current_process(cpu)
                    file_name_ptr = panda.plugins['osi_linux'].osi_linux_fd_to_filename(cpu, curr_proc, fd)
                    file_path = panda.ffi.string(file_name_ptr).decode()
                    if ("tty" in file_path):
                        try_read = True

                if try_read:

                    try:
                        data = panda.virtual_memory_read(cpu, buf, cnt)
                    except ValueError:
                        raise RuntimeError(f"Failed to read buffer: addr 0x{buf:016x}")

                    if fd == 2:
                        self._console_printed_err = True

                    log_file = self._console_log_dir.joinpath("console.out")
                    with open(log_file, "ab") as f:
                        f.write(data)

                    self._files_written.add(str(log_file))

            # Use OSI to capture logs for a named process
            if self._proc_name:

                curr_proc = panda.plugins['osi'].get_current_process(cpu)
                curr_proc_name = panda.ffi.string(curr_proc.name).decode()

                if self._proc_name == curr_proc_name:

                    if not try_read: # If we didn't already read this data in once for console capture
                        try:
                            data = panda.virtual_memory_read(cpu, buf, cnt)
                        except ValueError:
                            raise RuntimeError(f"Failed to read buffer: proc \'{curr_proc_name}\', addr 0x{buf:016x}")

                    file_name_ptr = panda.plugins['osi_linux'].osi_linux_fd_to_filename(cpu, curr_proc, fd)
                    file_path = panda.ffi.string(file_name_ptr).decode()

                    # For informational purposes only, collection not reliant on this exact mapping
                    if fd == 1: # POSIX stdout
                        file_path += ".stdout"
                    elif fd == 2: # POSIX stderr
                        file_path += ".stderr"
                        self._proc_printed_err = True

                    log_file = self._proc_log_dir.joinpath(file_path.replace("//", "_").replace("/", "_"))
                    with open(log_file, "ab") as f:
                        f.write(data)

                    self._files_written.add(str(log_file))

    def proc_printed_err(self):
        return self._proc_printed_err

    def console_printed_post_boot_err(self):
        return self._console_printed_err

    def get_files_written(self):
        return self._files_written

Methods

def console_printed_post_boot_err(self)
Expand source code
def console_printed_post_boot_err(self):
    return self._console_printed_err
def get_files_written(self)
Expand source code
def get_files_written(self):
    return self._files_written
def proc_printed_err(self)
Expand source code
def proc_printed_err(self):
    return self._proc_printed_err
class Snake (panda, console_capture=False, proc_name=None, log_dir=None, rm_existing_logs=False)

Set console_capture = True to capture all console output to file, including boot messages. Set proc_name = "name_of_proc" to, for a named process, capture stdout/stderr and any file writes from the hypervisor, mirror results to log directory. Can be stacked with console capture.

Expand source code
class ProcWriteCapture():

    '''
    Set console_capture = True to capture all console output to file, including boot messages.
    Set proc_name = "name_of_proc" to, for a named process, capture stdout/stderr and any file writes from the hypervisor, mirror results to log directory.
    Can be stacked with console capture.
    '''

    def __init__(self, panda, console_capture = False, proc_name = None, log_dir = None, rm_existing_logs = False):

        self._panda = panda
        self._files_written = set()
        self._rm = rm_existing_logs
        self._console_capture = console_capture
        self._proc_name = proc_name
        self._proc_printed_err = False
        self._console_printed_err = False

        if log_dir == None:
            self._console_log_dir = Path.cwd()
            if proc_name:
                self._proc_log_dir = Path.cwd() / self._proc_name
        else:
            self._console_log_dir = Path(log_dir)
            if proc_name:
                self._proc_log_dir = Path(log_dir).joinpath(self._proc_name)

        # Setup logging dir
        self._console_log_dir.mkdir(parents=True, exist_ok=True)
        if proc_name:
            self._proc_log_dir.mkdir(parents=True, exist_ok=True)
        if self._rm:
            if proc_name:
                shutil.rmtree(self._proc_log_dir)
            shutil.rmtree(self._console_log_dir)

        # Mirror writes
        @self._panda.ppp("syscalls2", "on_sys_write_enter")
        def proc_write_capture_on_sys_write_enter(cpu, pc, fd, buf, cnt):

            try_read = False

            # Capture console output
            if self._console_capture:

                # Fun trick: lazy eval of OSI
                # Based on the idea that a non-POSIX FD will only be used after boot is finished an OSI is functional
                # Note: doesn't capture boot logs (would require hooking kernel's printk, not write syscall)
                if (fd == 1) or (fd == 2) or (fd == 3):
                    try_read = True
                else:
                    curr_proc = panda.plugins['osi'].get_current_process(cpu)
                    file_name_ptr = panda.plugins['osi_linux'].osi_linux_fd_to_filename(cpu, curr_proc, fd)
                    file_path = panda.ffi.string(file_name_ptr).decode()
                    if ("tty" in file_path):
                        try_read = True

                if try_read:

                    try:
                        data = panda.virtual_memory_read(cpu, buf, cnt)
                    except ValueError:
                        raise RuntimeError(f"Failed to read buffer: addr 0x{buf:016x}")

                    if fd == 2:
                        self._console_printed_err = True

                    log_file = self._console_log_dir.joinpath("console.out")
                    with open(log_file, "ab") as f:
                        f.write(data)

                    self._files_written.add(str(log_file))

            # Use OSI to capture logs for a named process
            if self._proc_name:

                curr_proc = panda.plugins['osi'].get_current_process(cpu)
                curr_proc_name = panda.ffi.string(curr_proc.name).decode()

                if self._proc_name == curr_proc_name:

                    if not try_read: # If we didn't already read this data in once for console capture
                        try:
                            data = panda.virtual_memory_read(cpu, buf, cnt)
                        except ValueError:
                            raise RuntimeError(f"Failed to read buffer: proc \'{curr_proc_name}\', addr 0x{buf:016x}")

                    file_name_ptr = panda.plugins['osi_linux'].osi_linux_fd_to_filename(cpu, curr_proc, fd)
                    file_path = panda.ffi.string(file_name_ptr).decode()

                    # For informational purposes only, collection not reliant on this exact mapping
                    if fd == 1: # POSIX stdout
                        file_path += ".stdout"
                    elif fd == 2: # POSIX stderr
                        file_path += ".stderr"
                        self._proc_printed_err = True

                    log_file = self._proc_log_dir.joinpath(file_path.replace("//", "_").replace("/", "_"))
                    with open(log_file, "ab") as f:
                        f.write(data)

                    self._files_written.add(str(log_file))

    def proc_printed_err(self):
        return self._proc_printed_err

    def console_printed_post_boot_err(self):
        return self._console_printed_err

    def get_files_written(self):
        return self._files_written

Methods

def console_printed_post_boot_err(self)
Expand source code
def console_printed_post_boot_err(self):
    return self._console_printed_err
def get_files_written(self)
Expand source code
def get_files_written(self):
    return self._files_written
def proc_printed_err(self)
Expand source code
def proc_printed_err(self):
    return self._proc_printed_err