Module pandare.extras
Extras are PyPANDA plugins which you can import into other python analyses. Typically this is done by passing a handle from your script's PANDA object to the plugin.
Expand source code
"""
Extras are PyPANDA plugins which you can import into other python analyses. Typically
this is done by passing a handle from your script's PANDA object to the plugin.
"""
# Note file names should not contain underscores, let's keep these in lower
# camelCase going forward (e.g., modeFilter) so they match the class names.
from .fileFaker import FakeFile, FileFaker
from .fileHook import FileHook
from .ioctlFaker import IoctlFaker
from .modeFilter import ModeFilter
from .procWriteCapture import ProcWriteCapture
from .procTrace import ProcGraph
__all__ = ['FakeFile', 'FileFaker', 'FileHook', 'IoctlFaker', 'ModeFilter', 'ProcWriteCapture',
'Snake', 'ProcGraph']
Sub-modules
pandare.extras.dwarfdump
pandare.extras.fileFaker
-
Framework for halucinating files inside the guest through modifications around syscalls involving filenames and file descriptors …
pandare.extras.fileHook
pandare.extras.ioctlFaker
pandare.extras.modeFilter
-
Simple helper and example to selectively execute callbacks based on a mode string
pandare.extras.procTrace
-
Create a graph of which processes run/ran over time …
pandare.extras.procWriteCapture
Classes
class FakeFile (fake_contents='', filename=None)
-
A fake file behind a hyperFD - this class will generate data when the corresponding file descriptor(s) are accessed. Users can inherit and modify this to customize how data is generated
Note: a single FileFaker might be opened and in use by multiple FDs in the guest
Expand source code
class FakeFile: ''' A fake file behind a hyperFD - this class will generate data when the corresponding file descriptor(s) are accessed. Users can inherit and modify this to customize how data is generated Note: a single FileFaker might be opened and in use by multiple FDs in the guest ''' def __init__(self, fake_contents="", filename=None): self.logger = logging.getLogger('panda.filehook.fakefile') if isinstance(fake_contents, str): fake_contents = fake_contents.encode("utf8") self.contents = fake_contents self.initial_contents = fake_contents self.refcount = 0 # Reference count self.filename = filename # Just for debug printing def read(self, size, offset): ''' Generate data for a given read of size. Returns data. ''' if offset >= len(self.contents): # No bytes left to read return b"" # Otherwise there are bytes left to read read_data = self.contents[offset:offset+size] return read_data def write(self, offset, write_data): ''' Update contents from offset. It's a bytearray so we can't just mutate Return how much HyperFD offset should be incremented by XXX what about writes past end of the file? ''' new_data = self.contents[:offset] new_data += write_data new_data += self.contents[offset+len(new_data):] self.logger.info(f"FakeFD({self.filename}) writing {new_data} at offset {offset}") self.contents = new_data return len(write_data) def close(self): self.refcount -= 1 if self.refcount == 0: # All FDs are now closed if self.initial_contents == self.contents: self.logger.debug(f"All handles to Faker({self.filename}) closed. Unmodified contents") else: # it was mutated! self.logger.info(f"All handles to Faker({self.filename}) closed. Modified contents: {repr(self.contents)}") def get_mode(self): return 0o664 # Regular file (octal) def get_size(self, bytesize): return ceil(len(self.contents)/bytesize) def __str__(self): return f"Faker({self.filename} -> {repr(self.contents[:10])}..." def _delete(self): self.close() def __del__(self): # XXX: This destructor isn't called automatically self._delete()
Methods
def close(self)
-
Expand source code
def close(self): self.refcount -= 1 if self.refcount == 0: # All FDs are now closed if self.initial_contents == self.contents: self.logger.debug(f"All handles to Faker({self.filename}) closed. Unmodified contents") else: # it was mutated! self.logger.info(f"All handles to Faker({self.filename}) closed. Modified contents: {repr(self.contents)}")
def get_mode(self)
-
Expand source code
def get_mode(self): return 0o664 # Regular file (octal)
def get_size(self, bytesize)
-
Expand source code
def get_size(self, bytesize): return ceil(len(self.contents)/bytesize)
def read(self, size, offset)
-
Generate data for a given read of size. Returns data.
Expand source code
def read(self, size, offset): ''' Generate data for a given read of size. Returns data. ''' if offset >= len(self.contents): # No bytes left to read return b"" # Otherwise there are bytes left to read read_data = self.contents[offset:offset+size] return read_data
def write(self, offset, write_data)
-
Update contents from offset. It's a bytearray so we can't just mutate Return how much HyperFD offset should be incremented by XXX what about writes past end of the file?
Expand source code
def write(self, offset, write_data): ''' Update contents from offset. It's a bytearray so we can't just mutate Return how much HyperFD offset should be incremented by XXX what about writes past end of the file? ''' new_data = self.contents[:offset] new_data += write_data new_data += self.contents[offset+len(new_data):] self.logger.info(f"FakeFD({self.filename}) writing {new_data} at offset {offset}") self.contents = new_data return len(write_data)
class FileFaker (panda)
-
Class to halucinate fake files within the guest. When the guest attempts to access a faked file, we transparenly redirect the access to another file on disk and grab the FD generated using FileHook.
When the guest attempts to use a FD related to a faked file, we mutate the request. Reads are created from fake conents and writes are logged.
Initialize FileHook and vars. Setup callbacks for all fd-based syscalls
Expand source code
class FileFaker(FileHook): ''' Class to halucinate fake files within the guest. When the guest attempts to access a faked file, we transparenly redirect the access to another file on disk and grab the FD generated using FileHook. When the guest attempts to use a FD related to a faked file, we mutate the request. Reads are created from fake conents and writes are logged. ''' def __init__(self, panda): ''' Initialize FileHook and vars. Setup callbacks for all fd-based syscalls ''' super().__init__(panda) self.ff_logger = logging.getLogger('panda.filehook.fakefile') self.faked_files = {} # filename: Fake self.hooked_fds = {} # (fd, cr3): HyperFD->faker self.pending_hfd = None to_hook = {} # index of fd argument: list of names if panda.arch_name == "i386": # grep 'int fd' syscall_switch_enter_linux_x86.cpp | grep "\['int fd\|\['unsigned int fd" | grep -o sys_[a-zA-Z0-9_]* | sed -n -e 's/sys_\(.*\)/"\1" /p' | paste -sd "," - # Note the grep commands missed dup2 and dup3 which take oldfd as 1st arg to_hook[0] = ["read", "write", "close", "lseek", "fstat", "ioctl", "fcntl", "ftruncate", "fchmod", "fchown16", "fstatfs", "newfstat", "fsync", "fchdir", "llseek", "getdents", "flock", "fdatasync", "pread64", "pwrite64", "ftruncate64", "fchown", "getdents64", "fcntl64", "readahead", "fsetxattr", "fgetxattr", "flistxattr", "fremovexattr", "fadvise64", "fstatfs64", "fadvise64_64", "inotify_add_watch", "inotify_rm_watch", "splice", "sync_file_range", "tee", "vmsplice", "fallocate", "recvmmsg", "syncfs", "sendmmsg", "setns", "finit_module", "getsockopt", "setsockopt", "sendmsg", "recvmsg", "dup2", "dup3" ] # grep 'int fd' syscall_switch_enter_linux_x86.cpp | grep -v "\['int fd\|\['unsigned int fd" # + manual to_hook[2] = ["epoll_ctl"] to_hook[3] = ["fanotify_mark"] elif panda.arch_name == "x86_64": to_hook[0] = ["read", "write", "close", "newfstat", "lseek", "ioctl", "pread64", "pwrite64", "sendmsg", "recvmsg", "setsockopt", "getsockopt", "fcntl", "flock", "fsync", "fdatasync", "ftruncate", "getdents", "fchdir", "fchmod", "fchown", "fstatfs", "readahead", "fsetxattr", "fgetxattr", "flistxattr", "fremovexattr", "getdents64", "fadvise64", "inotify_add_watch", "inotify_rm_watch", "splice", "tee", "sync_file_range", "vmsplice", "fallocate", "recvmmsg", "syncfs", "sendmmsg", "setns", "finit_module", "copy_file_range", "dup2", "dup3"] to_hook[2] = ["epoll_ctl"] to_hook[3] = ["fanotify_mark"] elif panda.arch_name == "arm": to_hook[0] = ["read", "write", "close", "lseek", "ioctl", "fcntl", "ftruncate", "fchmod", "fchown16", "fstatfs", "newfstat", "fsync", "fchdir", "llseek", "getdents", "flock", "fdatasync", "pread64", "pwrite64", "ftruncate64", "fchown", "getdents64", "fcntl64", "readahead", "fsetxattr", "fgetxattr", "flistxattr", "fremovexattr", "fstatfs64", "arm_fadvise64_64", "setsockopt", "getsockopt", "sendmsg", "recvmsg", "inotify_add_watch", "inotify_rm_watch", "splice", "sync_file_range2", "tee", "vmsplice", "fallocate", "recvmmsg", "syncfs", "sendmmsg", "setns", "finit_module", "dup2", "dup3"] to_hook[2] = ["epoll_ctl"] to_hook[3] = ["fanotify_mark"] else: raise ValueError(f"Unsupported PANDA arch: {panda.arch_name}") for arg_offset, names in to_hook.items(): for name in names: self._gen_fd_cb(name, arg_offset) def replace_file(self, filename, faker, disk_file="/etc/passwd"): ''' Replace all accesses to filename with accesses to the fake file instead which optionally may be specified by disk_file. ''' self.faked_files[filename] = faker # XXX: We rename the files to real files to the guest kernel can manage FDs for us. # this may need to use different real files depending on permissions requested self.rename_file(filename, disk_file) def _gen_fd_cb(self, name, fd_offset): ''' Register syscalls2 PPP callback on enter and return for the given name which has an argument of fd at fd_offset in the argument list ''' self._panda.ppp("syscalls2", f"on_sys_{name}_return", name=f"file_faker_return_{name}")( \ lambda *args: self._return_fd_cb(name, fd_offset, args=args)) def _return_fd_cb(self, syscall_name, fd_pos, args=None): ''' When we're returnuing from a syscall, mutate memory to put the results we want there ''' (cpu, pc) = args[0:2] fd = args[2+fd_pos] asid = self._panda.current_asid(cpu) if (fd, asid) not in self.hooked_fds: return assert(args) hfd = self.hooked_fds[(fd, asid)] if syscall_name == "read": # Place up to `count` bytes of data into memory at `buf_ptr` buf_ptr = args[3] count = args[4] (data, data_len) = hfd.read(count) if data: try: self._panda.virtual_memory_write(cpu, buf_ptr, data) except ValueError: self.ff_logger.error(f"Unable to store fake data after read to {hfd}") return cpu.env_ptr.regs[0] = data_len self.ff_logger.info(f"Read - returning {data_len} bytes") elif syscall_name == "close": # We want the guest to close the real FD. Delete it from our map of hooked fds hfd.close() if (fd, asid) in self.hooked_fds: del self.hooked_fds[(fd, asid)] elif syscall_name == "write": # read count bytes from buf, add to our hyper-fd buf_ptr = args[3] count = args[4] try: data = self._panda.virtual_memory_read(cpu, buf_ptr, count) except ValueError: self.ff_logger.error(f"Unable to read buffer that was being written") return bytes_written = hfd.write(data) cpu.env_ptr.regs[0] = bytes_written elif syscall_name == "lseek": # LLSEEK? offset = args[2] whence = args[3] hfd.seek(offset, whence) elif syscall_name in ["dup2", "dup3"]: # add newfd oldfd = args[2] newfd = args[3] self.ff_logger.debug(f"Duplicating faked fd {oldfd} to {newfd}") # Duplicate the old hfd - but not the file behind it new_hfd = HyperFD(hfd.name, hfd.file, hfd.offset) self.hooked_fds[(newfd, asid)] = new_hfd else: self.ff_logger.error(f"Unsupported syscall on FakeFD{fd}: {syscall_name}. Not intercepting (Running on real guest FD)") def _before_modified_enter(self, cpu, pc, syscall_name, fname): ''' Overload FileHook function. Determine if a syscall we're about to enter is using a filename we want to fake After the modified syscall returns, we grab the real FD and map it to the HFD ''' if fname in self.faked_files: self.pending_hfd = HyperFD(fname, self.faked_files[fname]) # Create HFD asid = self._panda.current_asid(cpu) def _after_modified_return(self, cpu, pc, syscall_name, fd): ''' Overload FileHook function. Determine if a syscall we're about to return from was using a filename we want to fake. If so, grab the FD ''' if self.pending_hfd: asid = self._panda.current_asid(cpu) self.hooked_fds[(fd, asid)] = self.pending_hfd self.logger.info(f"A file we want to hook was created {self.pending_hfd}") self.pending_hfd = None def close(self): # Close all open hfds if len(self.hooked_fds): self.ff_logger.debug("Cleaning up open hyper file descriptors") for (fd, asid) in list(self.hooked_fds.keys()): self.hooked_fds[(fd, asid)].close() del self.hooked_fds[(fd, asid)] def __del__(self): # XXX: This isn't being called for some reason on destruction self.close()
Ancestors
Methods
def close(self)
-
Expand source code
def close(self): # Close all open hfds if len(self.hooked_fds): self.ff_logger.debug("Cleaning up open hyper file descriptors") for (fd, asid) in list(self.hooked_fds.keys()): self.hooked_fds[(fd, asid)].close() del self.hooked_fds[(fd, asid)]
def replace_file(self, filename, faker, disk_file='/etc/passwd')
-
Replace all accesses to filename with accesses to the fake file instead which optionally may be specified by disk_file.
Expand source code
def replace_file(self, filename, faker, disk_file="/etc/passwd"): ''' Replace all accesses to filename with accesses to the fake file instead which optionally may be specified by disk_file. ''' self.faked_files[filename] = faker # XXX: We rename the files to real files to the guest kernel can manage FDs for us. # this may need to use different real files depending on permissions requested self.rename_file(filename, disk_file)
Inherited members
class FileHook (panda, use_osi=True)
-
Class to modify guest memory just before syscalls with filename arguments. As the system call is about to be executed, change the data pointed to by the filename pointer. When the syscall returns, restore the mutated data to its original values.
This provides a simple, cross-platform interface to redirect file accesses just using the OSI plugin.
usage: panda = Panda(…) hook = FileHook(panda) hook.rename_file("/rename_this", "/to_this")
Store a reference to the panda object, and register the appropriate syscalls2 callbacks for entering and exiting from all syscalls that have a char* filename argument.
Expand source code
class FileHook: ''' Class to modify guest memory just before syscalls with filename arguments. As the system call is about to be executed, change the data pointed to by the filename pointer. When the syscall returns, restore the mutated data to its original values. This provides a simple, cross-platform interface to redirect file accesses just using the OSI plugin. usage: panda = Panda(...) hook = FileHook(panda) hook.rename_file("/rename_this", "/to_this") ''' def __init__(self, panda, use_osi=True): ''' Store a reference to the panda object, and register the appropriate syscalls2 callbacks for entering and exiting from all syscalls that have a char* filename argument. ''' self._panda = panda self._renamed_files = {} # old_fname (str): new_fname (bytes) self._changed_strs = {} # callback_name: original_data self.use_osi = use_osi self.logger = logging.getLogger('panda.filehook') try: import coloredlogs coloredlogs.install(level='WARN') except ImportError: pass self.pending_virt_read = None panda.load_plugin("syscalls2") # For each architecture, we have a different set of syscalls. They all # either call our functions with (cpu, pc, filename_ptr, ...) # or (cpu, pc, something_else, filename_ptr, ...). Here we # Programmatically generate callbacks for all of them # These lists were made with commands like the following in syscalls2/generated: # grep filename syscall_switch_enter_linux_x86.cpp | grep "\['const char " | grep -o sys_[a-zA-Z0-9]* | grep -o [a-z0-9]*$ # grep filename syscall_switch_enter_linux_x86.cpp | grep -v "\['const char " | grep -o sys_[a-zA-Z0-9]* | grep -o [a-z0-9]*$ to_hook = {} if panda.arch_name == "i386": to_hook[0] = ["open", "execve", "chdir", "mknod", "chmod", "lchown16", "stat", "access", "chroot", "lstat", "newstat", "newlstat", "chown16", "stat64", "lstat64", "lchown", "chown" ] to_hook[1] = ["utime", "utimes", "openat", "mknodat", "fchownat", "futimesat", "fstatat64", "fchmodat", "faccessat", "utimensat", "execveat"] elif panda.arch_name == "x86_64": to_hook[0] = ["open", "newstat", "newlstat", "access", "chdir", "chmod", "chown", "lchown", "mknod", "chroot"] to_hook[1] = ["utime", "utimes", "openat", "mknodat", "fchownat", "futimesat", "newfstatat", "fchmodat", "faccessat", "utimensat"] elif panda.arch_name == "arm": to_hook[0] = ["open", "execve", "chdir", "mknod", "chmod", "lchown16", "access", "chroot", "newstat", "newlstat", "chown16", "stat64", "lstat64", "lchown", "chown"] to_hook[1] = ["utime", "utimes", "openat", "mknodat", "fchownat", "futimesat", "fstatat64", "fchmodat", "faccessat", "utimensat", "execveat"] else: raise ValueError(f"Unsupported PANDA arch: {panda.arch_name}") # Register the callbacks for arg_offset, names in to_hook.items(): for name in names: self._gen_cb(name, arg_offset) # Fallback callback used when syscall with file name isn't mapped into memory @self._panda.cb_virt_mem_before_read(enabled=False) def before_virt_read(cpu, pc, addr, size): ''' This callback is necessary for the case when we enter a syscall but the filename pointer is paged out. When that happens, we enable this (slow) callback which checks every mem-read while we're in that syscall to see if the memory has since been paged-in. It should always eventually be paged in. Once it is, we mutate the memory and then disable this callback. If this hasn't run by the time the callback returns, we give up and disable it ''' if not self.pending_virt_read: return # Is our pending read a subset of the current read? If so try to read it if addr <= self.pending_virt_read and addr+size > self.pending_virt_read: try: fname = self._panda.read_str(cpu, self.pending_virt_read) except ValueError: return # Still not available. Keep waiting self.logger.debug(f"recovered missed filename: {fname}") # It is available! Disable this slow callback and rerurn _enter_cb with the data fname_ptr = self.pending_virt_read self.pending_virt_read = None self._panda.disable_callback('before_virt_read') self._enter_cb(self.pending_syscall, args=(cpu, pc), fname_ptr=fname_ptr) def rename_file(self, old_name, new_name): ''' Mutate a given filename into a new name at the syscall interface ''' assert(old_name not in self._renamed_files), f"Already have a rename rule for {old_name}" if not isinstance(new_name, bytes): new_name = new_name.encode("utf8") if not new_name.endswith(b"\x00"): new_name += b"\x00" self._renamed_files[old_name] = new_name def _get_fname(self, cpu, fd): ''' Use OSI to get the filename behind a file descriptor. If not self.use_osi, return None ''' if not self.use_osi: return None fname_s = None proc = self._panda.plugins['osi'].get_current_process(cpu) if proc != self._panda.ffi.NULL: fname = self._panda.plugins['osi_linux'].osi_linux_fd_to_filename(cpu, proc, self._panda.ffi.cast("int", fd)) if fname != self._panda.ffi.NULL: fname_s = self._panda.ffi.string(fname).decode('utf8', 'ignore') return fname_s def _gen_cb(self, name, fname_ptr_pos): ''' Register syscalls2 PPP callback on enter and return for the given name which has an argument of char* filename at fname_ptr_pos in the arguments list ''' self._panda.ppp("syscalls2", f"on_sys_{name}_enter", name = f"file_hook_enter_{name}")( \ lambda *args: self._enter_cb(name, fname_ptr_pos, args=args)) self._panda.ppp("syscalls2", f"on_sys_{name}_return", name = f"file_hook_return_{name}")( \ lambda *args: self._return_cb(name, fname_ptr_pos, args=args)) def _enter_cb(self, syscall_name, fname_ptr_pos=0, args=None, fname_ptr=None): ''' When we return, check if we mutated the fname buffer. If so, we need to restore whatever data was there (we may have written past the end of the string). if fname_ptr is set, just skip the logic to extract it ''' assert(args) (cpu, pc) = args[0:2] if not fname_ptr: fname_ptr = args[2+fname_ptr_pos] # offset to after (cpu, pc) in callback args try: fname = self._panda.read_str(cpu, fname_ptr) except: fname = self._get_fname(cpu, args[2+fname_ptr_pos]) if fname: self.logger.info(f"OSI found fname after simple logic missed it in call to {syscall_name}") else: self.logger.debug(f"missed filename at 0x{fname_ptr:x} in call to {syscall_name} - trying to find") self.pending_virt_read = cpu.env_ptr.regs[0] self.pending_syscall = syscall_name self._panda.enable_callback('before_virt_read') #self._panda_enable_memcb() return fname = path.normpath(fname) # Normalize it #self.logger.info(f"Entering {syscall_name} with file={fname}") if fname in self._renamed_files: # It matches, now let's take our action! Either rename or callback self.logger.debug(f"modifying filename {fname} in {syscall_name} to {self._renamed_files[fname]}") assert(syscall_name not in self._changed_strs), "Entering syscall that already has a pending restore" # First read a buffer of the same size as our new value. XXX the string we already read might be shorter # than what we're inserting so we read again so we can later restore the old data try: clobbered_data = self._panda.virtual_memory_read(cpu, fname_ptr, len(self._renamed_files[fname])) except ValueError: self.logger.error(f"Failed to read target buffer at call into {syscall_name}") return # Now replace those bytes with our new name try: self._panda.virtual_memory_write(cpu, fname_ptr, self._renamed_files[fname]) except ValueError: self.logger.warn(f"Failed to mutate filename buffer at call into {syscall_name}") return # If it all worked, save the clobbered data asid = self._panda.current_asid(cpu) self._changed_strs[(syscall_name, asid)] = clobbered_data self._before_modified_enter(cpu, pc, syscall_name, fname) def _return_cb(self, syscall_name, fname_ptr_pos, args=None): ''' When we return, check if we mutated the fname buffer. If so, we need to restore whatever data was there (we may have written past the end of the string) ''' (cpu, pc) = args[0:2] if self.pending_virt_read: fname_ptr = args[2+fname_ptr_pos] # offset to after (cpu, pc) in callback args self.logger.warning(f"missed filename in call to {syscall_name} with fname at 0x{fname_ptr:x}. Ignoring it") self._panda.disable_callback('before_virt_read') # No point in continuing this self.pending_virt_read = None # Virtual address that we're waiting to read as soon as possible return asid = self._panda.current_asid(cpu) if (syscall_name, asid) in self._changed_strs: assert(args) fname_ptr = args[2+fname_ptr_pos] # offset to after (cpu, pc) in callback args try: self._panda.virtual_memory_write(cpu, fname_ptr, self._changed_strs[(syscall_name, asid)]) except ValueError: self.logger.warn(f"Failed to fix filename buffer at return of {syscall_name}") del self._changed_strs[(syscall_name, asid)] fd = self._panda.arch.get_retval(cpu, convention='syscall') self.logger.info(f"Returning from {syscall_name} after modifying argument - modified FD is {fd}") self._after_modified_return(cpu, pc, syscall_name, fd=fd) def _before_modified_enter(self, cpu, pc, syscall_name, fname): ''' Internal callback run before we enter a syscall where we mutated the filename. Exists to be overloaded by subclasses ''' pass def _after_modified_return(self, cpu, pc, syscall_name, fd): ''' Internal callback run before we return from a syscall where we mutated the filename. Exists to be overloaded by subclasses ''' pass
Subclasses
Methods
def rename_file(self, old_name, new_name)
-
Mutate a given filename into a new name at the syscall interface
Expand source code
def rename_file(self, old_name, new_name): ''' Mutate a given filename into a new name at the syscall interface ''' assert(old_name not in self._renamed_files), f"Already have a rename rule for {old_name}" if not isinstance(new_name, bytes): new_name = new_name.encode("utf8") if not new_name.endswith(b"\x00"): new_name += b"\x00" self._renamed_files[old_name] = new_name
class IoctlFaker (panda, use_osi_linux=False, log=False, ignore=[], intercept_ret_vals=[-25], intercept_all_non_zero=False)
-
Interpose ioctl() syscall returns, forcing successes for specific error codes to simulate missing drivers/peripherals. Bin all returns into failures (needed forcing) and successes, store for later retrival/analysis.
Log enables/disables logging. ignore contains a list of tuples (filename, cmd#) to be ignored. intercept_ret_vals is a list of ioctl return values that should be intercepted. By default we just intercept just -25 which indicates that a driver is not present to handle the ioctl. intercept_all_non_zero is aggressive setting that takes precedence if set - any non-zero return code id changed to zero.
Expand source code
class IoctlFaker(): ''' Interpose ioctl() syscall returns, forcing successes for specific error codes to simulate missing drivers/peripherals. Bin all returns into failures (needed forcing) and successes, store for later retrival/analysis. ''' def __init__( self, panda, use_osi_linux = False, log = False, ignore = [], intercept_ret_vals = [-25], intercept_all_non_zero = False ): ''' Log enables/disables logging. ignore contains a list of tuples (filename, cmd#) to be ignored. intercept_ret_vals is a list of ioctl return values that should be intercepted. By default we just intercept just -25 which indicates that a driver is not present to handle the ioctl. intercept_all_non_zero is aggressive setting that takes precedence if set - any non-zero return code id changed to zero. ''' self.osi = use_osi_linux self._panda = panda self._panda.load_plugin("syscalls2") self._log = log self.ignore = ignore self.intercept_ret_vals = intercept_ret_vals self.intercept_all_non_zero = intercept_all_non_zero if self.osi: self._panda.load_plugin("osi") self._panda.load_plugin("osi_linux") if self._log: self._logger = logging.getLogger('panda.ioctls') self._logger.setLevel(logging.DEBUG) # Track ioctls in two sets: modified (forced_returns) and unmodified self._forced_returns = set() self._unmodified_returns = set() # Force success returns for missing drivers/peripherals @self._panda.ppp("syscalls2", "on_sys_ioctl_return") def ioctl_faker_on_sys_ioctl_return(cpu, pc, fd, cmd, arg): ioctl = Ioctl(self._panda, cpu, fd, cmd, arg, self.osi) ioctl.get_ret_code(self._panda, cpu) # Modify if (self.intercept_all_non_zero and ioctl.original_ret_code != 0) or \ ioctl.original_ret_code in self.intercept_ret_vals and \ (ioctl.file_name, ioctl.cmd.bits.cmd_num) not in self.ignore: # Allow ignoring specific commands on specific files if panda.arch_name == "mipsel" or panda.arch_name == "mips": cpu.env_ptr.active_tc.gpr[2] = 0 elif panda.arch_name == "aarch64": cpu.env_ptr.xregs[0] = 0 elif panda.arch_name == "ppc": raise RuntimeError("PPC currently unsupported!") else: # x86/x64/ARM cpu.env_ptr.regs[0] = 0 self._forced_returns.add(ioctl) if ioctl.has_buf and self._log: self._logger.warning("Forcing success return for data-containing {}".format(ioctl)) elif self._log: self._logger.info("Forcing success return for data-less {}".format(ioctl)) # Don't modify else: self._unmodified_returns.add(ioctl) def _get_returns(self, source, with_buf_only): if with_buf_only: return list(filter(lambda i: (i.has_buf == True), source)) else: return source def get_forced_returns(self, with_buf_only = False): ''' Retrieve ioctls whose error codes where overwritten ''' return self._get_returns(self._forced_returns, with_buf_only) def get_unmodified_returns(self, with_buf_only = False): ''' Retrieve ioctl that completed normally ''' return self._get_returns(self._unmodified_returns, with_buf_only)
Methods
def get_forced_returns(self, with_buf_only=False)
-
Retrieve ioctls whose error codes where overwritten
Expand source code
def get_forced_returns(self, with_buf_only = False): ''' Retrieve ioctls whose error codes where overwritten ''' return self._get_returns(self._forced_returns, with_buf_only)
def get_unmodified_returns(self, with_buf_only=False)
-
Retrieve ioctl that completed normally
Expand source code
def get_unmodified_returns(self, with_buf_only = False): ''' Retrieve ioctl that completed normally ''' return self._get_returns(self._unmodified_returns, with_buf_only)
class ModeFilter
-
Simple, inheritable class to provide a decorator to enable/disable callbacks depending on self.mode value.
It is ill-advised to use on callbacks with high-performance impacts such as before_block_exec as this is a pure-Python plugin.
Example
from pandare import Panda from pandare.extras import ModeFilter
class MyClass(ModeFilter): def init(self, panda) self.panda = panda self.set_mode("mode1")
@self.mode_filter("mode1") @self.panda.ppp("syscalls2", "on_sys_open_enter") def on_open(cpu, pc, fname_ptr, flags, mode): # assert(self.mode == "mode1") # Note decorator ensures this self.set_mode("mode2") # Change mode - so this callback won't run again ... def run(self): self.panda.run()
p = panda(…) mc = MyClass(panda) mc.run()
Expand source code
class ModeFilter: ''' Simple, inheritable class to provide a decorator to enable/disable callbacks depending on self.mode value. It is ill-advised to use on callbacks with high-performance impacts such as before_block_exec as this is a pure-Python plugin. Example: from pandare import Panda from pandare.extras import ModeFilter class MyClass(ModeFilter): def __init__(self, panda) self.panda = panda self.set_mode("mode1") @self.mode_filter("mode1") @self.panda.ppp("syscalls2", "on_sys_open_enter") def on_open(cpu, pc, fname_ptr, flags, mode): # assert(self.mode == "mode1") # Note decorator ensures this self.set_mode("mode2") # Change mode - so this callback won't run again ... def run(self): self.panda.run() p = panda(...) mc = MyClass(panda) mc.run() ''' mode = "start" def mode_filter(self, mode_filter): ''' Decorator to only run a function if self.mode matches the provided string ''' def __mode_filter(func): @wraps(func) def wrapper(*args, **kwargs): if self.mode == mode_filter: # Mode matches - run it! func(*args, **kwargs) return wrapper return __mode_filter def set_mode(self, new): ''' Helper to change mode ''' if new != self.mode: print(f"Switching modes from {self.mode} to {new}") self.mode = new
Subclasses
Class variables
var mode
Methods
def mode_filter(self, mode_filter)
-
Decorator to only run a function if self.mode matches the provided string
Expand source code
def mode_filter(self, mode_filter): ''' Decorator to only run a function if self.mode matches the provided string ''' def __mode_filter(func): @wraps(func) def wrapper(*args, **kwargs): if self.mode == mode_filter: # Mode matches - run it! func(*args, **kwargs) return wrapper return __mode_filter
def set_mode(self, new)
-
Helper to change mode
Expand source code
def set_mode(self, new): ''' Helper to change mode ''' if new != self.mode: print(f"Switching modes from {self.mode} to {new}") self.mode = new
class ProcGraph (panda)
-
Base class which PyPANDA plugins should inherit. Subclasses may register callbacks using the provided panda object and use the PyPlugin APIs:
- self.get_args or self.get_arg_bool to check argument values
- self.ppp to interact with other PyPlugins via PPP interfaces
- self.ppp_cb_boilerplate('cb_name') to register a ppp-style callback
- self.ppp_run_cb('cb_name') to run a previously-registered ppp-style callback
- @PyPlugin.ppp_export to mark a class method as ppp-exported
For more information, check out the pyplugin documentation.
Expand source code
class ProcGraph(PyPlugin): def __init__(self, panda): # Data collection self.procinfo = {} # PID: info self.time_data = [] # [(PID, #blocks)] self.total_insns = 0 self.n_insns = 0 self.last_pid = None self.show_ranges = not self.get_arg("hide_ranges") self.show_graph = not self.get_arg("hide_graph") self.panda = panda # config option: number of columns self.n_cols = self.get_arg("cols") or 120 @panda.cb_start_block_exec def sbe(cpu, tb): self.n_insns += tb.icount self.total_insns += tb.icount @panda.ppp("osi", "on_task_change") def task_change(cpu): proc = panda.plugins['osi'].get_current_process(cpu) thread = panda.plugins['osi'].get_current_thread(cpu) if proc == panda.ffi.NULL: print(f"Warning: Unable to identify process at {self.n_insns}") return if thread == panda.ffi.NULL: print(f"Warning: Unable to identify thread at {self.n_insns}") return proc_key = (proc.pid, thread.tid) if proc_key not in self.procinfo: self.procinfo[proc_key] = {"names": set(), #"tids": set(), "first": self.total_insns, "last": None, "count": 0} name = panda.ffi.string(proc.name) if proc.name != panda.ffi.NULL else "(error)" self.procinfo[proc_key]["names"].add(name) # Update insn count for last process and indicate it (maybe) ends at total_insns-1 if self.last_pid: # count since we last ran is it's old end value, minus where it just ended self.procinfo[self.last_pid]["count"] += (self.total_insns-1) - self.procinfo[self.last_pid]["last"] \ if self.procinfo[self.last_pid]["last"] is not None \ else (self.total_insns-1) - self.procinfo[self.last_pid]["first"] self.procinfo[self.last_pid]["last"] = self.total_insns-1 self.last_pid = proc_key self.time_data.append((proc_key, self.n_insns)) self.n_insns = 0 def uninit(self): render_graph(self.procinfo, self.time_data, self.total_insns, n_cols=self.n_cols, show_ranges=self.show_ranges, show_graph=self.show_graph) # Fully reset state self.panda.disable_ppp("task_change") self.procinfo = {} # PID: info self.time_data = [] # [(PID, #blocks)] self.total_insns = 0 self.n_insns = 0 self.last_pid = None
Ancestors
Methods
def uninit(self)
-
Expand source code
def uninit(self): render_graph(self.procinfo, self.time_data, self.total_insns, n_cols=self.n_cols, show_ranges=self.show_ranges, show_graph=self.show_graph) # Fully reset state self.panda.disable_ppp("task_change") self.procinfo = {} # PID: info self.time_data = [] # [(PID, #blocks)] self.total_insns = 0 self.n_insns = 0 self.last_pid = None
Inherited members
class ProcWriteCapture (panda, console_capture=False, proc_name=None, log_dir=None, rm_existing_logs=False)
-
Set console_capture = True to capture all console output to file, including boot messages. Set proc_name = "name_of_proc" to, for a named process, capture stdout/stderr and any file writes from the hypervisor, mirror results to log directory. Can be stacked with console capture.
Expand source code
class ProcWriteCapture(): ''' Set console_capture = True to capture all console output to file, including boot messages. Set proc_name = "name_of_proc" to, for a named process, capture stdout/stderr and any file writes from the hypervisor, mirror results to log directory. Can be stacked with console capture. ''' def __init__(self, panda, console_capture = False, proc_name = None, log_dir = None, rm_existing_logs = False): self._panda = panda self._files_written = set() self._rm = rm_existing_logs self._console_capture = console_capture self._proc_name = proc_name self._proc_printed_err = False self._console_printed_err = False if log_dir == None: self._console_log_dir = Path.cwd() if proc_name: self._proc_log_dir = Path.cwd() / self._proc_name else: self._console_log_dir = Path(log_dir) if proc_name: self._proc_log_dir = Path(log_dir).joinpath(self._proc_name) # Setup logging dir self._console_log_dir.mkdir(parents=True, exist_ok=True) if proc_name: self._proc_log_dir.mkdir(parents=True, exist_ok=True) if self._rm: if proc_name: shutil.rmtree(self._proc_log_dir) shutil.rmtree(self._console_log_dir) # Mirror writes @self._panda.ppp("syscalls2", "on_sys_write_enter") def proc_write_capture_on_sys_write_enter(cpu, pc, fd, buf, cnt): try_read = False # Capture console output if self._console_capture: # Fun trick: lazy eval of OSI # Based on the idea that a non-POSIX FD will only be used after boot is finished an OSI is functional # Note: doesn't capture boot logs (would require hooking kernel's printk, not write syscall) if (fd == 1) or (fd == 2) or (fd == 3): try_read = True else: curr_proc = panda.plugins['osi'].get_current_process(cpu) file_name_ptr = panda.plugins['osi_linux'].osi_linux_fd_to_filename(cpu, curr_proc, fd) file_path = panda.ffi.string(file_name_ptr).decode() if ("tty" in file_path): try_read = True if try_read: try: data = panda.virtual_memory_read(cpu, buf, cnt) except ValueError: raise RuntimeError(f"Failed to read buffer: addr 0x{buf:016x}") if fd == 2: self._console_printed_err = True log_file = self._console_log_dir.joinpath("console.out") with open(log_file, "ab") as f: f.write(data) self._files_written.add(str(log_file)) # Use OSI to capture logs for a named process if self._proc_name: curr_proc = panda.plugins['osi'].get_current_process(cpu) curr_proc_name = panda.ffi.string(curr_proc.name).decode() if self._proc_name == curr_proc_name: if not try_read: # If we didn't already read this data in once for console capture try: data = panda.virtual_memory_read(cpu, buf, cnt) except ValueError: raise RuntimeError(f"Failed to read buffer: proc \'{curr_proc_name}\', addr 0x{buf:016x}") file_name_ptr = panda.plugins['osi_linux'].osi_linux_fd_to_filename(cpu, curr_proc, fd) file_path = panda.ffi.string(file_name_ptr).decode() # For informational purposes only, collection not reliant on this exact mapping if fd == 1: # POSIX stdout file_path += ".stdout" elif fd == 2: # POSIX stderr file_path += ".stderr" self._proc_printed_err = True log_file = self._proc_log_dir.joinpath(file_path.replace("//", "_").replace("/", "_")) with open(log_file, "ab") as f: f.write(data) self._files_written.add(str(log_file)) def proc_printed_err(self): return self._proc_printed_err def console_printed_post_boot_err(self): return self._console_printed_err def get_files_written(self): return self._files_written
Methods
def console_printed_post_boot_err(self)
-
Expand source code
def console_printed_post_boot_err(self): return self._console_printed_err
def get_files_written(self)
-
Expand source code
def get_files_written(self): return self._files_written
def proc_printed_err(self)
-
Expand source code
def proc_printed_err(self): return self._proc_printed_err
class Snake (panda, console_capture=False, proc_name=None, log_dir=None, rm_existing_logs=False)
-
Set console_capture = True to capture all console output to file, including boot messages. Set proc_name = "name_of_proc" to, for a named process, capture stdout/stderr and any file writes from the hypervisor, mirror results to log directory. Can be stacked with console capture.
Expand source code
class ProcWriteCapture(): ''' Set console_capture = True to capture all console output to file, including boot messages. Set proc_name = "name_of_proc" to, for a named process, capture stdout/stderr and any file writes from the hypervisor, mirror results to log directory. Can be stacked with console capture. ''' def __init__(self, panda, console_capture = False, proc_name = None, log_dir = None, rm_existing_logs = False): self._panda = panda self._files_written = set() self._rm = rm_existing_logs self._console_capture = console_capture self._proc_name = proc_name self._proc_printed_err = False self._console_printed_err = False if log_dir == None: self._console_log_dir = Path.cwd() if proc_name: self._proc_log_dir = Path.cwd() / self._proc_name else: self._console_log_dir = Path(log_dir) if proc_name: self._proc_log_dir = Path(log_dir).joinpath(self._proc_name) # Setup logging dir self._console_log_dir.mkdir(parents=True, exist_ok=True) if proc_name: self._proc_log_dir.mkdir(parents=True, exist_ok=True) if self._rm: if proc_name: shutil.rmtree(self._proc_log_dir) shutil.rmtree(self._console_log_dir) # Mirror writes @self._panda.ppp("syscalls2", "on_sys_write_enter") def proc_write_capture_on_sys_write_enter(cpu, pc, fd, buf, cnt): try_read = False # Capture console output if self._console_capture: # Fun trick: lazy eval of OSI # Based on the idea that a non-POSIX FD will only be used after boot is finished an OSI is functional # Note: doesn't capture boot logs (would require hooking kernel's printk, not write syscall) if (fd == 1) or (fd == 2) or (fd == 3): try_read = True else: curr_proc = panda.plugins['osi'].get_current_process(cpu) file_name_ptr = panda.plugins['osi_linux'].osi_linux_fd_to_filename(cpu, curr_proc, fd) file_path = panda.ffi.string(file_name_ptr).decode() if ("tty" in file_path): try_read = True if try_read: try: data = panda.virtual_memory_read(cpu, buf, cnt) except ValueError: raise RuntimeError(f"Failed to read buffer: addr 0x{buf:016x}") if fd == 2: self._console_printed_err = True log_file = self._console_log_dir.joinpath("console.out") with open(log_file, "ab") as f: f.write(data) self._files_written.add(str(log_file)) # Use OSI to capture logs for a named process if self._proc_name: curr_proc = panda.plugins['osi'].get_current_process(cpu) curr_proc_name = panda.ffi.string(curr_proc.name).decode() if self._proc_name == curr_proc_name: if not try_read: # If we didn't already read this data in once for console capture try: data = panda.virtual_memory_read(cpu, buf, cnt) except ValueError: raise RuntimeError(f"Failed to read buffer: proc \'{curr_proc_name}\', addr 0x{buf:016x}") file_name_ptr = panda.plugins['osi_linux'].osi_linux_fd_to_filename(cpu, curr_proc, fd) file_path = panda.ffi.string(file_name_ptr).decode() # For informational purposes only, collection not reliant on this exact mapping if fd == 1: # POSIX stdout file_path += ".stdout" elif fd == 2: # POSIX stderr file_path += ".stderr" self._proc_printed_err = True log_file = self._proc_log_dir.joinpath(file_path.replace("//", "_").replace("/", "_")) with open(log_file, "ab") as f: f.write(data) self._files_written.add(str(log_file)) def proc_printed_err(self): return self._proc_printed_err def console_printed_post_boot_err(self): return self._console_printed_err def get_files_written(self): return self._files_written
Methods
def console_printed_post_boot_err(self)
-
Expand source code
def console_printed_post_boot_err(self): return self._console_printed_err
def get_files_written(self)
-
Expand source code
def get_files_written(self): return self._files_written
def proc_printed_err(self)
-
Expand source code
def proc_printed_err(self): return self._proc_printed_err