Module pandare.extras.fileHook

Expand source code
#!/usr/bin/env python3

import logging
import sys
from os import path


class FileHook:
    '''
    Class to modify guest memory just before syscalls with filename arguments.
    As the system call is about to be executed, change the data pointed to by the
    filename pointer. When the syscall returns, restore the mutated data to its
    original values.

    This provides a simple, cross-platform interface to redirect file accesses
    just using the OSI plugin.

    usage:
        panda = Panda(...)
        hook = FileHook(panda)
        hook.rename_file("/rename_this", "/to_this")
    '''

    def __init__(self, panda, use_osi=True):
        '''
        Store a reference to the panda object, and register
        the appropriate syscalls2 callbacks for entering and exiting
        from all syscalls that have a char* filename argument.
        '''

        self._panda = panda
        self._renamed_files = {} # old_fname (str): new_fname (bytes)
        self._changed_strs = {} # callback_name: original_data
        self.use_osi = use_osi

        self.logger = logging.getLogger('panda.filehook')
        try:
            import coloredlogs
            coloredlogs.install(level='WARN')
        except ImportError:
            pass
        self.pending_virt_read = None

        panda.load_plugin("syscalls2")

        # For each architecture, we have a different set of syscalls. They all
        # either call our functions with (cpu, pc, filename_ptr, ...)
        # or (cpu, pc, something_else, filename_ptr, ...). Here we
        # Programmatically generate callbacks for all of them

        # These lists were made with commands like the following in syscalls2/generated:
        # grep filename syscall_switch_enter_linux_x86.cpp | grep "\['const char " | grep -o sys_[a-zA-Z0-9]* | grep -o [a-z0-9]*$
        # grep filename syscall_switch_enter_linux_x86.cpp | grep -v "\['const char " | grep -o sys_[a-zA-Z0-9]* | grep -o [a-z0-9]*$
        to_hook = {}
        if panda.arch_name == "i386":
            to_hook[0] = ["open", "execve", "chdir", "mknod", "chmod", "lchown16", "stat", "access", "chroot",
                         "lstat", "newstat", "newlstat", "chown16", "stat64", "lstat64", "lchown", "chown" ]
            to_hook[1] = ["utime", "utimes", "openat", "mknodat", "fchownat", "futimesat", "fstatat64",
                          "fchmodat", "faccessat", "utimensat", "execveat"]

        elif panda.arch_name == "x86_64":
            to_hook[0] = ["open", "newstat", "newlstat", "access", "chdir", "chmod", "chown", "lchown", "mknod", "chroot"]
            to_hook[1] = ["utime", "utimes", "openat", "mknodat", "fchownat", "futimesat", "newfstatat", "fchmodat", "faccessat", "utimensat"]

        elif panda.arch_name == "arm":
            to_hook[0] = ["open", "execve", "chdir", "mknod", "chmod", "lchown16", "access", "chroot", "newstat", "newlstat", "chown16", "stat64", "lstat64", "lchown", "chown"]
            to_hook[1] = ["utime", "utimes", "openat", "mknodat", "fchownat", "futimesat", "fstatat64", "fchmodat", "faccessat", "utimensat", "execveat"]
        else:
            raise ValueError(f"Unsupported PANDA arch: {panda.arch_name}")

        # Register the callbacks
        for arg_offset, names in to_hook.items():
            for name in names:
                self._gen_cb(name, arg_offset)


        # Fallback callback used when syscall with file name isn't mapped into memory
        @self._panda.cb_virt_mem_before_read(enabled=False)
        def before_virt_read(cpu, pc, addr, size):
            '''
            This callback is necessary for the case when we enter a syscall but the filename pointer is paged out.
            When that happens, we enable this (slow) callback which checks every mem-read while we're in that syscall
            to see if the memory has since been paged-in. It should always eventually be paged in. Once it is,
            we mutate the memory and then disable this callback.

            If this hasn't run by the time the callback returns, we give up and disable it
            '''
            if not self.pending_virt_read:
                return

            # Is our pending read a subset of the current read? If so try to read it
            if addr <= self.pending_virt_read and addr+size > self.pending_virt_read:
                try:
                    fname = self._panda.read_str(cpu, self.pending_virt_read)
                except ValueError:
                    return # Still not available. Keep waiting
                self.logger.debug(f"recovered missed filename: {fname}")

                # It is available! Disable this slow callback and rerurn _enter_cb with the data
                fname_ptr = self.pending_virt_read
                self.pending_virt_read = None
                self._panda.disable_callback('before_virt_read')
                self._enter_cb(self.pending_syscall, args=(cpu, pc), fname_ptr=fname_ptr)


    def rename_file(self, old_name, new_name):
        '''
        Mutate a given filename into a new name at the syscall interface
        '''
        assert(old_name not in self._renamed_files), f"Already have a rename rule for {old_name}"

        if not isinstance(new_name, bytes):
            new_name = new_name.encode("utf8")

        if not new_name.endswith(b"\x00"):
            new_name += b"\x00"

        self._renamed_files[old_name] = new_name

    def _get_fname(self, cpu, fd):
        '''
        Use OSI to get the filename behind a file descriptor.
        If not self.use_osi, return None
        '''
        if not self.use_osi:
            return None
        fname_s = None
        proc = self._panda.plugins['osi'].get_current_process(cpu)
        if proc != self._panda.ffi.NULL:
            fname = self._panda.plugins['osi_linux'].osi_linux_fd_to_filename(cpu, proc, self._panda.ffi.cast("int", fd))
            if fname != self._panda.ffi.NULL:
                fname_s = self._panda.ffi.string(fname).decode('utf8', 'ignore')
        return fname_s

    def _gen_cb(self, name, fname_ptr_pos):
        '''
        Register syscalls2 PPP callback on enter and return for the given name
        which has an argument of char* filename at fname_ptr_pos in the arguments list
        '''
        self._panda.ppp("syscalls2", f"on_sys_{name}_enter", name = f"file_hook_enter_{name}")( \
                    lambda *args: self._enter_cb(name, fname_ptr_pos, args=args))
        self._panda.ppp("syscalls2", f"on_sys_{name}_return", name = f"file_hook_return_{name}")( \
                    lambda *args: self._return_cb(name, fname_ptr_pos, args=args))

    def _enter_cb(self, syscall_name, fname_ptr_pos=0, args=None, fname_ptr=None):
        '''
        When we return, check if we mutated the fname buffer. If so,
        we need to restore whatever data was there (we may have written
        past the end of the string).

        if fname_ptr is set, just skip the logic to extract it
        '''

        assert(args)
        (cpu, pc) = args[0:2]

        if not fname_ptr:
            fname_ptr = args[2+fname_ptr_pos] # offset to after (cpu, pc) in callback args

        try:
            fname = self._panda.read_str(cpu, fname_ptr)
        except:
            fname = self._get_fname(cpu, args[2+fname_ptr_pos])

            if fname:
                self.logger.info(f"OSI found fname after simple logic missed it in call to {syscall_name}")
            else:
                self.logger.debug(f"missed filename at 0x{fname_ptr:x} in call to {syscall_name} - trying to find")
                self.pending_virt_read = cpu.env_ptr.regs[0]
                self.pending_syscall = syscall_name
                self._panda.enable_callback('before_virt_read')
                #self._panda_enable_memcb()
                return

        fname = path.normpath(fname) # Normalize it
        #self.logger.info(f"Entering {syscall_name} with file={fname}")

        if fname in self._renamed_files:
            # It matches, now let's take our action! Either rename or callback

            self.logger.debug(f"modifying filename {fname} in {syscall_name} to {self._renamed_files[fname]}")
            assert(syscall_name not in self._changed_strs), "Entering syscall that already has a pending restore"

            # First read a buffer of the same size as our new value. XXX the string we already read might be shorter
            # than what we're inserting so we read again so we can later restore the old data
            try:
                clobbered_data = self._panda.virtual_memory_read(cpu, fname_ptr, len(self._renamed_files[fname]))
            except ValueError:
                self.logger.error(f"Failed to read target buffer at call into {syscall_name}")
                return

            # Now replace those bytes with our new name
            try:
                self._panda.virtual_memory_write(cpu, fname_ptr, self._renamed_files[fname])
            except ValueError:
                self.logger.warn(f"Failed to mutate filename buffer at call into {syscall_name}")
                return

            # If it all worked, save the clobbered data
            asid = self._panda.current_asid(cpu)
            self._changed_strs[(syscall_name, asid)] = clobbered_data

            self._before_modified_enter(cpu, pc, syscall_name, fname)


    def _return_cb(self, syscall_name, fname_ptr_pos, args=None):
        '''
        When we return, check if we mutated the fname buffer. If so,
        we need to restore whatever data was there (we may have written
        past the end of the string)
        '''
        (cpu, pc) = args[0:2]
        if self.pending_virt_read:
            fname_ptr = args[2+fname_ptr_pos] # offset to after (cpu, pc) in callback args

            self.logger.warning(f"missed filename in call to {syscall_name} with fname at 0x{fname_ptr:x}. Ignoring it")

            self._panda.disable_callback('before_virt_read') # No point in continuing this
            self.pending_virt_read = None # Virtual address that we're waiting to read as soon as possible
            return

        asid = self._panda.current_asid(cpu)
        if (syscall_name, asid) in self._changed_strs:
            assert(args)
            fname_ptr = args[2+fname_ptr_pos] # offset to after (cpu, pc) in callback args
            try:
                self._panda.virtual_memory_write(cpu, fname_ptr, self._changed_strs[(syscall_name, asid)])
            except ValueError:
                self.logger.warn(f"Failed to fix filename buffer at return of {syscall_name}")
            del self._changed_strs[(syscall_name, asid)]

            fd = self._panda.arch.get_retval(cpu, convention='syscall')
            self.logger.info(f"Returning from {syscall_name} after modifying argument - modified FD is {fd}")
            self._after_modified_return(cpu, pc, syscall_name, fd=fd)

    def _before_modified_enter(self, cpu, pc, syscall_name, fname):
        '''
        Internal callback run before we enter a syscall where we mutated
        the filename. Exists to be overloaded by subclasses
        '''
        pass

    def _after_modified_return(self, cpu, pc, syscall_name, fd):
        '''
        Internal callback run before we return from a syscall where we mutated
        the filename. Exists to be overloaded by subclasses
        '''
        pass

if __name__ == '__main__':
    from pandare import Panda

    panda = Panda(generic="x86_64")

    # Reads to /does_not_exist should be redirected to /etc/issue
    hook = FileHook(panda)
    hook.rename_file("/does_not_exist", "/etc/issue")

    @panda.queue_blocking
    def read_it():
        panda.revert_sync('root')
        data = panda.run_serial_cmd("cat /does_not_exist")
        assert("Ubuntu" in data), f"Hook failed"
        panda.end_analysis()

    panda.run()
    print("Success")

Classes

class FileHook (panda, use_osi=True)

Class to modify guest memory just before syscalls with filename arguments. As the system call is about to be executed, change the data pointed to by the filename pointer. When the syscall returns, restore the mutated data to its original values.

This provides a simple, cross-platform interface to redirect file accesses just using the OSI plugin.

usage: panda = Panda(…) hook = FileHook(panda) hook.rename_file("/rename_this", "/to_this")

Store a reference to the panda object, and register the appropriate syscalls2 callbacks for entering and exiting from all syscalls that have a char* filename argument.

Expand source code
class FileHook:
    '''
    Class to modify guest memory just before syscalls with filename arguments.
    As the system call is about to be executed, change the data pointed to by the
    filename pointer. When the syscall returns, restore the mutated data to its
    original values.

    This provides a simple, cross-platform interface to redirect file accesses
    just using the OSI plugin.

    usage:
        panda = Panda(...)
        hook = FileHook(panda)
        hook.rename_file("/rename_this", "/to_this")
    '''

    def __init__(self, panda, use_osi=True):
        '''
        Store a reference to the panda object, and register
        the appropriate syscalls2 callbacks for entering and exiting
        from all syscalls that have a char* filename argument.
        '''

        self._panda = panda
        self._renamed_files = {} # old_fname (str): new_fname (bytes)
        self._changed_strs = {} # callback_name: original_data
        self.use_osi = use_osi

        self.logger = logging.getLogger('panda.filehook')
        try:
            import coloredlogs
            coloredlogs.install(level='WARN')
        except ImportError:
            pass
        self.pending_virt_read = None

        panda.load_plugin("syscalls2")

        # For each architecture, we have a different set of syscalls. They all
        # either call our functions with (cpu, pc, filename_ptr, ...)
        # or (cpu, pc, something_else, filename_ptr, ...). Here we
        # Programmatically generate callbacks for all of them

        # These lists were made with commands like the following in syscalls2/generated:
        # grep filename syscall_switch_enter_linux_x86.cpp | grep "\['const char " | grep -o sys_[a-zA-Z0-9]* | grep -o [a-z0-9]*$
        # grep filename syscall_switch_enter_linux_x86.cpp | grep -v "\['const char " | grep -o sys_[a-zA-Z0-9]* | grep -o [a-z0-9]*$
        to_hook = {}
        if panda.arch_name == "i386":
            to_hook[0] = ["open", "execve", "chdir", "mknod", "chmod", "lchown16", "stat", "access", "chroot",
                         "lstat", "newstat", "newlstat", "chown16", "stat64", "lstat64", "lchown", "chown" ]
            to_hook[1] = ["utime", "utimes", "openat", "mknodat", "fchownat", "futimesat", "fstatat64",
                          "fchmodat", "faccessat", "utimensat", "execveat"]

        elif panda.arch_name == "x86_64":
            to_hook[0] = ["open", "newstat", "newlstat", "access", "chdir", "chmod", "chown", "lchown", "mknod", "chroot"]
            to_hook[1] = ["utime", "utimes", "openat", "mknodat", "fchownat", "futimesat", "newfstatat", "fchmodat", "faccessat", "utimensat"]

        elif panda.arch_name == "arm":
            to_hook[0] = ["open", "execve", "chdir", "mknod", "chmod", "lchown16", "access", "chroot", "newstat", "newlstat", "chown16", "stat64", "lstat64", "lchown", "chown"]
            to_hook[1] = ["utime", "utimes", "openat", "mknodat", "fchownat", "futimesat", "fstatat64", "fchmodat", "faccessat", "utimensat", "execveat"]
        else:
            raise ValueError(f"Unsupported PANDA arch: {panda.arch_name}")

        # Register the callbacks
        for arg_offset, names in to_hook.items():
            for name in names:
                self._gen_cb(name, arg_offset)


        # Fallback callback used when syscall with file name isn't mapped into memory
        @self._panda.cb_virt_mem_before_read(enabled=False)
        def before_virt_read(cpu, pc, addr, size):
            '''
            This callback is necessary for the case when we enter a syscall but the filename pointer is paged out.
            When that happens, we enable this (slow) callback which checks every mem-read while we're in that syscall
            to see if the memory has since been paged-in. It should always eventually be paged in. Once it is,
            we mutate the memory and then disable this callback.

            If this hasn't run by the time the callback returns, we give up and disable it
            '''
            if not self.pending_virt_read:
                return

            # Is our pending read a subset of the current read? If so try to read it
            if addr <= self.pending_virt_read and addr+size > self.pending_virt_read:
                try:
                    fname = self._panda.read_str(cpu, self.pending_virt_read)
                except ValueError:
                    return # Still not available. Keep waiting
                self.logger.debug(f"recovered missed filename: {fname}")

                # It is available! Disable this slow callback and rerurn _enter_cb with the data
                fname_ptr = self.pending_virt_read
                self.pending_virt_read = None
                self._panda.disable_callback('before_virt_read')
                self._enter_cb(self.pending_syscall, args=(cpu, pc), fname_ptr=fname_ptr)


    def rename_file(self, old_name, new_name):
        '''
        Mutate a given filename into a new name at the syscall interface
        '''
        assert(old_name not in self._renamed_files), f"Already have a rename rule for {old_name}"

        if not isinstance(new_name, bytes):
            new_name = new_name.encode("utf8")

        if not new_name.endswith(b"\x00"):
            new_name += b"\x00"

        self._renamed_files[old_name] = new_name

    def _get_fname(self, cpu, fd):
        '''
        Use OSI to get the filename behind a file descriptor.
        If not self.use_osi, return None
        '''
        if not self.use_osi:
            return None
        fname_s = None
        proc = self._panda.plugins['osi'].get_current_process(cpu)
        if proc != self._panda.ffi.NULL:
            fname = self._panda.plugins['osi_linux'].osi_linux_fd_to_filename(cpu, proc, self._panda.ffi.cast("int", fd))
            if fname != self._panda.ffi.NULL:
                fname_s = self._panda.ffi.string(fname).decode('utf8', 'ignore')
        return fname_s

    def _gen_cb(self, name, fname_ptr_pos):
        '''
        Register syscalls2 PPP callback on enter and return for the given name
        which has an argument of char* filename at fname_ptr_pos in the arguments list
        '''
        self._panda.ppp("syscalls2", f"on_sys_{name}_enter", name = f"file_hook_enter_{name}")( \
                    lambda *args: self._enter_cb(name, fname_ptr_pos, args=args))
        self._panda.ppp("syscalls2", f"on_sys_{name}_return", name = f"file_hook_return_{name}")( \
                    lambda *args: self._return_cb(name, fname_ptr_pos, args=args))

    def _enter_cb(self, syscall_name, fname_ptr_pos=0, args=None, fname_ptr=None):
        '''
        When we return, check if we mutated the fname buffer. If so,
        we need to restore whatever data was there (we may have written
        past the end of the string).

        if fname_ptr is set, just skip the logic to extract it
        '''

        assert(args)
        (cpu, pc) = args[0:2]

        if not fname_ptr:
            fname_ptr = args[2+fname_ptr_pos] # offset to after (cpu, pc) in callback args

        try:
            fname = self._panda.read_str(cpu, fname_ptr)
        except:
            fname = self._get_fname(cpu, args[2+fname_ptr_pos])

            if fname:
                self.logger.info(f"OSI found fname after simple logic missed it in call to {syscall_name}")
            else:
                self.logger.debug(f"missed filename at 0x{fname_ptr:x} in call to {syscall_name} - trying to find")
                self.pending_virt_read = cpu.env_ptr.regs[0]
                self.pending_syscall = syscall_name
                self._panda.enable_callback('before_virt_read')
                #self._panda_enable_memcb()
                return

        fname = path.normpath(fname) # Normalize it
        #self.logger.info(f"Entering {syscall_name} with file={fname}")

        if fname in self._renamed_files:
            # It matches, now let's take our action! Either rename or callback

            self.logger.debug(f"modifying filename {fname} in {syscall_name} to {self._renamed_files[fname]}")
            assert(syscall_name not in self._changed_strs), "Entering syscall that already has a pending restore"

            # First read a buffer of the same size as our new value. XXX the string we already read might be shorter
            # than what we're inserting so we read again so we can later restore the old data
            try:
                clobbered_data = self._panda.virtual_memory_read(cpu, fname_ptr, len(self._renamed_files[fname]))
            except ValueError:
                self.logger.error(f"Failed to read target buffer at call into {syscall_name}")
                return

            # Now replace those bytes with our new name
            try:
                self._panda.virtual_memory_write(cpu, fname_ptr, self._renamed_files[fname])
            except ValueError:
                self.logger.warn(f"Failed to mutate filename buffer at call into {syscall_name}")
                return

            # If it all worked, save the clobbered data
            asid = self._panda.current_asid(cpu)
            self._changed_strs[(syscall_name, asid)] = clobbered_data

            self._before_modified_enter(cpu, pc, syscall_name, fname)


    def _return_cb(self, syscall_name, fname_ptr_pos, args=None):
        '''
        When we return, check if we mutated the fname buffer. If so,
        we need to restore whatever data was there (we may have written
        past the end of the string)
        '''
        (cpu, pc) = args[0:2]
        if self.pending_virt_read:
            fname_ptr = args[2+fname_ptr_pos] # offset to after (cpu, pc) in callback args

            self.logger.warning(f"missed filename in call to {syscall_name} with fname at 0x{fname_ptr:x}. Ignoring it")

            self._panda.disable_callback('before_virt_read') # No point in continuing this
            self.pending_virt_read = None # Virtual address that we're waiting to read as soon as possible
            return

        asid = self._panda.current_asid(cpu)
        if (syscall_name, asid) in self._changed_strs:
            assert(args)
            fname_ptr = args[2+fname_ptr_pos] # offset to after (cpu, pc) in callback args
            try:
                self._panda.virtual_memory_write(cpu, fname_ptr, self._changed_strs[(syscall_name, asid)])
            except ValueError:
                self.logger.warn(f"Failed to fix filename buffer at return of {syscall_name}")
            del self._changed_strs[(syscall_name, asid)]

            fd = self._panda.arch.get_retval(cpu, convention='syscall')
            self.logger.info(f"Returning from {syscall_name} after modifying argument - modified FD is {fd}")
            self._after_modified_return(cpu, pc, syscall_name, fd=fd)

    def _before_modified_enter(self, cpu, pc, syscall_name, fname):
        '''
        Internal callback run before we enter a syscall where we mutated
        the filename. Exists to be overloaded by subclasses
        '''
        pass

    def _after_modified_return(self, cpu, pc, syscall_name, fd):
        '''
        Internal callback run before we return from a syscall where we mutated
        the filename. Exists to be overloaded by subclasses
        '''
        pass

Subclasses

Methods

def rename_file(self, old_name, new_name)

Mutate a given filename into a new name at the syscall interface

Expand source code
def rename_file(self, old_name, new_name):
    '''
    Mutate a given filename into a new name at the syscall interface
    '''
    assert(old_name not in self._renamed_files), f"Already have a rename rule for {old_name}"

    if not isinstance(new_name, bytes):
        new_name = new_name.encode("utf8")

    if not new_name.endswith(b"\x00"):
        new_name += b"\x00"

    self._renamed_files[old_name] = new_name