Module pandare.extras.ioctlFaker
Expand source code
#!/usr/bin/env python3
import sys
import logging
from cffi import FFI
ffi = FFI()
# TODO: only for logger, should probably move it to a separate file
if __name__ == '__main__': # Script run directly
from pandare.extras import FileHook
else:
from .fileHook import FileHook
# TODO: Ability to fake buffers for specific commands
ioctl_initialized = False
def do_ioctl_init(panda):
'''
One-time init for arch-specific bit-packed ioctl cmd struct.
'''
# Default config (x86, x86-64, ARM, AArch 64) with options for PPC
global ioctl_initialized
if ioctl_initialized:
return
ioctl_initialized = True
TYPE_BITS = 8
CMD_BITS = 8
SIZE_BITS = 14 if panda.arch_name != "ppc" else 13
DIR_BITS = 2 if panda.arch_name != "ppc" else 3
ffi.cdef("""
struct IoctlCmdBits {
uint8_t type_num:%d;
uint8_t cmd_num:%d;
uint16_t arg_size:%d;
uint8_t direction:%d;
};
union IoctlCmdUnion {
struct IoctlCmdBits bits;
uint32_t asUnsigned32;
};
enum ioctl_direction {
IO = 0,
IOW = 1,
IOR = 2,
IOWR = 3
};
""" % (TYPE_BITS, CMD_BITS, SIZE_BITS, DIR_BITS), packed=True)
class Ioctl():
'''
Unpacked ioctl command with optional buffer.
'''
def __init__(self, panda, cpu, fd, cmd, guest_ptr, use_osi_linux = False):
'''
Do unpacking, optionally using OSI for process and file name info.
'''
do_ioctl_init(panda)
self.cmd = ffi.new("union IoctlCmdUnion*")
self.cmd.asUnsigned32 = cmd
self.original_ret_code = None
self.osi = use_osi_linux
# Optional syscall argument: pointer to buffer
if (self.cmd.bits.arg_size > 0):
try:
self.has_buf = True
self.guest_ptr = guest_ptr
self.guest_buf = panda.virtual_memory_read(cpu, self.guest_ptr, self.cmd.bits.arg_size)
except ValueError:
self.guest_buf = None
#raise RuntimeError("Failed to read guest buffer: ioctl({})".format(str(self.cmd)))
else:
self.has_buf = False
self.guest_ptr = None
self.guest_buf = None
# Optional OSI usage: process and file name
if self.osi:
proc = panda.plugins['osi'].get_current_process(cpu)
proc_name_ptr = proc.name
file_name_ptr = panda.plugins['osi_linux'].osi_linux_fd_to_filename(cpu, proc, panda.ffi.cast("int", fd))
self.proc_name = panda.ffi.string(proc_name_ptr).decode(errors="ignore") if proc_name_ptr != panda.ffi.NULL else "unknown"
self.file_name = panda.ffi.string(file_name_ptr).decode(errors="ignore") if file_name_ptr != panda.ffi.NULL else "unknown"
else:
self.proc_name = None
self.file_name = None
def get_ret_code(self, panda, cpu):
''''
Helper retrive original return code, handles arch-specifc ABI
'''
if panda.arch_name == "mipsel" or panda.arch_name == "mips":
# Note: return values are in $v0, $v1 (regs 2 and 3 respectively), but here we only use first
self.original_ret_code = panda.from_unsigned_guest(cpu.env_ptr.active_tc.gpr[2])
elif panda.arch_name == "aarch64":
self.original_ret_code = panda.from_unsigned_guest(cpu.env_ptr.xregs[0])
elif panda.arch_name == "ppc":
raise RuntimeError("PPC currently unsupported!")
else: # x86/x64/ARM
self.original_ret_code = panda.from_unsigned_guest(cpu.env_ptr.regs[0])
def __str__(self):
if self.osi:
self_str = "\'{}\' using \'{}\' - ".format(self.proc_name, self.file_name)
else:
self_str = ""
bits = self.cmd.bits
direction = ffi.string(ffi.cast("enum ioctl_direction", bits.direction))
ioctl_desc = f"dir={direction},arg_size={bits.arg_size:x},cmd=0x{bits.cmd_num:x},type=0x{bits.type_num:x}"
if (self.guest_ptr == None):
self_str += f"ioctl({ioctl_desc}) -> {self.original_ret_code}"
else:
self_str += f"ioctl({ioctl_desc},ptr={self.guest_ptr:08x},buf={self.guest_buf}) -> {self.original_ret_code}"
return self_str
def __eq__(self, other):
return (
self.__class__ == other.__class__ and
self.cmd.asUnsigned32 == other.cmd.asUnsigned32 and
self.has_buf == other.has_buf and
self.guest_ptr == other.guest_ptr and
self.guest_buf == other.guest_buf and
self.proc_name == self.proc_name and
self.file_name == self.file_name
)
def __hash__(self):
return hash((self.cmd.asUnsigned32, self.has_buf, self.guest_ptr, self.guest_buf, self.proc_name, self.file_name))
class IoctlFaker():
'''
Interpose ioctl() syscall returns, forcing successes for specific error codes to simulate missing drivers/peripherals.
Bin all returns into failures (needed forcing) and successes, store for later retrival/analysis.
'''
def __init__(
self,
panda,
use_osi_linux = False,
log = False,
ignore = [],
intercept_ret_vals = [-25],
intercept_all_non_zero = False
):
'''
Log enables/disables logging.
ignore contains a list of tuples (filename, cmd#) to be ignored.
intercept_ret_vals is a list of ioctl return values that should be intercepted. By default
we just intercept just -25 which indicates that a driver is not present to handle the ioctl.
intercept_all_non_zero is aggressive setting that takes precedence if set - any non-zero return code id changed to zero.
'''
self.osi = use_osi_linux
self._panda = panda
self._panda.load_plugin("syscalls2")
self._log = log
self.ignore = ignore
self.intercept_ret_vals = intercept_ret_vals
self.intercept_all_non_zero = intercept_all_non_zero
if self.osi:
self._panda.load_plugin("osi")
self._panda.load_plugin("osi_linux")
if self._log:
self._logger = logging.getLogger('panda.ioctls')
self._logger.setLevel(logging.DEBUG)
# Track ioctls in two sets: modified (forced_returns) and unmodified
self._forced_returns = set()
self._unmodified_returns = set()
# Force success returns for missing drivers/peripherals
@self._panda.ppp("syscalls2", "on_sys_ioctl_return")
def ioctl_faker_on_sys_ioctl_return(cpu, pc, fd, cmd, arg):
ioctl = Ioctl(self._panda, cpu, fd, cmd, arg, self.osi)
ioctl.get_ret_code(self._panda, cpu)
# Modify
if (self.intercept_all_non_zero and ioctl.original_ret_code != 0) or \
ioctl.original_ret_code in self.intercept_ret_vals and \
(ioctl.file_name, ioctl.cmd.bits.cmd_num) not in self.ignore: # Allow ignoring specific commands on specific files
if panda.arch_name == "mipsel" or panda.arch_name == "mips":
cpu.env_ptr.active_tc.gpr[2] = 0
elif panda.arch_name == "aarch64":
cpu.env_ptr.xregs[0] = 0
elif panda.arch_name == "ppc":
raise RuntimeError("PPC currently unsupported!")
else: # x86/x64/ARM
cpu.env_ptr.regs[0] = 0
self._forced_returns.add(ioctl)
if ioctl.has_buf and self._log:
self._logger.warning("Forcing success return for data-containing {}".format(ioctl))
elif self._log:
self._logger.info("Forcing success return for data-less {}".format(ioctl))
# Don't modify
else:
self._unmodified_returns.add(ioctl)
def _get_returns(self, source, with_buf_only):
if with_buf_only:
return list(filter(lambda i: (i.has_buf == True), source))
else:
return source
def get_forced_returns(self, with_buf_only = False):
'''
Retrieve ioctls whose error codes where overwritten
'''
return self._get_returns(self._forced_returns, with_buf_only)
def get_unmodified_returns(self, with_buf_only = False):
'''
Retrieve ioctl that completed normally
'''
return self._get_returns(self._unmodified_returns, with_buf_only)
if __name__ == "__main__":
'''
Bash will issue ioctls on /dev/ttys0 - this is just a simple test to make sure they're being captured
'''
from pandare import Panda
# No arguments, x86_64. Otherwise argument should be guest arch
generic_type = sys.argv[1] if len(sys.argv) > 1 else "x86_64"
panda = Panda(generic=generic_type)
@panda.queue_blocking
def run_cmd():
# Setup faker
ioctl_faker = IoctlFaker(panda, use_osi_linux=True)
# First revert to root snapshot, then issue an IOCTL directly through perl - which is junk
# so the faker should fake it
panda.revert_sync("root")
panda.run_serial_cmd("""perl -e 'require "sys/ioctl.ph"; ioctl(1, 0, 1);'""")
# Check faker's results
faked_rets = ioctl_faker.get_forced_returns()
normal_rets = ioctl_faker.get_unmodified_returns()
assert(len(faked_rets)), "No returns faked"
assert(len(normal_rets)), "No normal returns"
panda.end_analysis()
panda.run()
print("Success")
Functions
def do_ioctl_init(panda)
-
One-time init for arch-specific bit-packed ioctl cmd struct.
Expand source code
def do_ioctl_init(panda): ''' One-time init for arch-specific bit-packed ioctl cmd struct. ''' # Default config (x86, x86-64, ARM, AArch 64) with options for PPC global ioctl_initialized if ioctl_initialized: return ioctl_initialized = True TYPE_BITS = 8 CMD_BITS = 8 SIZE_BITS = 14 if panda.arch_name != "ppc" else 13 DIR_BITS = 2 if panda.arch_name != "ppc" else 3 ffi.cdef(""" struct IoctlCmdBits { uint8_t type_num:%d; uint8_t cmd_num:%d; uint16_t arg_size:%d; uint8_t direction:%d; }; union IoctlCmdUnion { struct IoctlCmdBits bits; uint32_t asUnsigned32; }; enum ioctl_direction { IO = 0, IOW = 1, IOR = 2, IOWR = 3 }; """ % (TYPE_BITS, CMD_BITS, SIZE_BITS, DIR_BITS), packed=True)
Classes
class Ioctl (panda, cpu, fd, cmd, guest_ptr, use_osi_linux=False)
-
Unpacked ioctl command with optional buffer.
Do unpacking, optionally using OSI for process and file name info.
Expand source code
class Ioctl(): ''' Unpacked ioctl command with optional buffer. ''' def __init__(self, panda, cpu, fd, cmd, guest_ptr, use_osi_linux = False): ''' Do unpacking, optionally using OSI for process and file name info. ''' do_ioctl_init(panda) self.cmd = ffi.new("union IoctlCmdUnion*") self.cmd.asUnsigned32 = cmd self.original_ret_code = None self.osi = use_osi_linux # Optional syscall argument: pointer to buffer if (self.cmd.bits.arg_size > 0): try: self.has_buf = True self.guest_ptr = guest_ptr self.guest_buf = panda.virtual_memory_read(cpu, self.guest_ptr, self.cmd.bits.arg_size) except ValueError: self.guest_buf = None #raise RuntimeError("Failed to read guest buffer: ioctl({})".format(str(self.cmd))) else: self.has_buf = False self.guest_ptr = None self.guest_buf = None # Optional OSI usage: process and file name if self.osi: proc = panda.plugins['osi'].get_current_process(cpu) proc_name_ptr = proc.name file_name_ptr = panda.plugins['osi_linux'].osi_linux_fd_to_filename(cpu, proc, panda.ffi.cast("int", fd)) self.proc_name = panda.ffi.string(proc_name_ptr).decode(errors="ignore") if proc_name_ptr != panda.ffi.NULL else "unknown" self.file_name = panda.ffi.string(file_name_ptr).decode(errors="ignore") if file_name_ptr != panda.ffi.NULL else "unknown" else: self.proc_name = None self.file_name = None def get_ret_code(self, panda, cpu): '''' Helper retrive original return code, handles arch-specifc ABI ''' if panda.arch_name == "mipsel" or panda.arch_name == "mips": # Note: return values are in $v0, $v1 (regs 2 and 3 respectively), but here we only use first self.original_ret_code = panda.from_unsigned_guest(cpu.env_ptr.active_tc.gpr[2]) elif panda.arch_name == "aarch64": self.original_ret_code = panda.from_unsigned_guest(cpu.env_ptr.xregs[0]) elif panda.arch_name == "ppc": raise RuntimeError("PPC currently unsupported!") else: # x86/x64/ARM self.original_ret_code = panda.from_unsigned_guest(cpu.env_ptr.regs[0]) def __str__(self): if self.osi: self_str = "\'{}\' using \'{}\' - ".format(self.proc_name, self.file_name) else: self_str = "" bits = self.cmd.bits direction = ffi.string(ffi.cast("enum ioctl_direction", bits.direction)) ioctl_desc = f"dir={direction},arg_size={bits.arg_size:x},cmd=0x{bits.cmd_num:x},type=0x{bits.type_num:x}" if (self.guest_ptr == None): self_str += f"ioctl({ioctl_desc}) -> {self.original_ret_code}" else: self_str += f"ioctl({ioctl_desc},ptr={self.guest_ptr:08x},buf={self.guest_buf}) -> {self.original_ret_code}" return self_str def __eq__(self, other): return ( self.__class__ == other.__class__ and self.cmd.asUnsigned32 == other.cmd.asUnsigned32 and self.has_buf == other.has_buf and self.guest_ptr == other.guest_ptr and self.guest_buf == other.guest_buf and self.proc_name == self.proc_name and self.file_name == self.file_name ) def __hash__(self): return hash((self.cmd.asUnsigned32, self.has_buf, self.guest_ptr, self.guest_buf, self.proc_name, self.file_name))
Methods
def get_ret_code(self, panda, cpu)
-
' Helper retrive original return code, handles arch-specifc ABI
Expand source code
def get_ret_code(self, panda, cpu): '''' Helper retrive original return code, handles arch-specifc ABI ''' if panda.arch_name == "mipsel" or panda.arch_name == "mips": # Note: return values are in $v0, $v1 (regs 2 and 3 respectively), but here we only use first self.original_ret_code = panda.from_unsigned_guest(cpu.env_ptr.active_tc.gpr[2]) elif panda.arch_name == "aarch64": self.original_ret_code = panda.from_unsigned_guest(cpu.env_ptr.xregs[0]) elif panda.arch_name == "ppc": raise RuntimeError("PPC currently unsupported!") else: # x86/x64/ARM self.original_ret_code = panda.from_unsigned_guest(cpu.env_ptr.regs[0])
class IoctlFaker (panda, use_osi_linux=False, log=False, ignore=[], intercept_ret_vals=[-25], intercept_all_non_zero=False)
-
Interpose ioctl() syscall returns, forcing successes for specific error codes to simulate missing drivers/peripherals. Bin all returns into failures (needed forcing) and successes, store for later retrival/analysis.
Log enables/disables logging. ignore contains a list of tuples (filename, cmd#) to be ignored. intercept_ret_vals is a list of ioctl return values that should be intercepted. By default we just intercept just -25 which indicates that a driver is not present to handle the ioctl. intercept_all_non_zero is aggressive setting that takes precedence if set - any non-zero return code id changed to zero.
Expand source code
class IoctlFaker(): ''' Interpose ioctl() syscall returns, forcing successes for specific error codes to simulate missing drivers/peripherals. Bin all returns into failures (needed forcing) and successes, store for later retrival/analysis. ''' def __init__( self, panda, use_osi_linux = False, log = False, ignore = [], intercept_ret_vals = [-25], intercept_all_non_zero = False ): ''' Log enables/disables logging. ignore contains a list of tuples (filename, cmd#) to be ignored. intercept_ret_vals is a list of ioctl return values that should be intercepted. By default we just intercept just -25 which indicates that a driver is not present to handle the ioctl. intercept_all_non_zero is aggressive setting that takes precedence if set - any non-zero return code id changed to zero. ''' self.osi = use_osi_linux self._panda = panda self._panda.load_plugin("syscalls2") self._log = log self.ignore = ignore self.intercept_ret_vals = intercept_ret_vals self.intercept_all_non_zero = intercept_all_non_zero if self.osi: self._panda.load_plugin("osi") self._panda.load_plugin("osi_linux") if self._log: self._logger = logging.getLogger('panda.ioctls') self._logger.setLevel(logging.DEBUG) # Track ioctls in two sets: modified (forced_returns) and unmodified self._forced_returns = set() self._unmodified_returns = set() # Force success returns for missing drivers/peripherals @self._panda.ppp("syscalls2", "on_sys_ioctl_return") def ioctl_faker_on_sys_ioctl_return(cpu, pc, fd, cmd, arg): ioctl = Ioctl(self._panda, cpu, fd, cmd, arg, self.osi) ioctl.get_ret_code(self._panda, cpu) # Modify if (self.intercept_all_non_zero and ioctl.original_ret_code != 0) or \ ioctl.original_ret_code in self.intercept_ret_vals and \ (ioctl.file_name, ioctl.cmd.bits.cmd_num) not in self.ignore: # Allow ignoring specific commands on specific files if panda.arch_name == "mipsel" or panda.arch_name == "mips": cpu.env_ptr.active_tc.gpr[2] = 0 elif panda.arch_name == "aarch64": cpu.env_ptr.xregs[0] = 0 elif panda.arch_name == "ppc": raise RuntimeError("PPC currently unsupported!") else: # x86/x64/ARM cpu.env_ptr.regs[0] = 0 self._forced_returns.add(ioctl) if ioctl.has_buf and self._log: self._logger.warning("Forcing success return for data-containing {}".format(ioctl)) elif self._log: self._logger.info("Forcing success return for data-less {}".format(ioctl)) # Don't modify else: self._unmodified_returns.add(ioctl) def _get_returns(self, source, with_buf_only): if with_buf_only: return list(filter(lambda i: (i.has_buf == True), source)) else: return source def get_forced_returns(self, with_buf_only = False): ''' Retrieve ioctls whose error codes where overwritten ''' return self._get_returns(self._forced_returns, with_buf_only) def get_unmodified_returns(self, with_buf_only = False): ''' Retrieve ioctl that completed normally ''' return self._get_returns(self._unmodified_returns, with_buf_only)
Methods
def get_forced_returns(self, with_buf_only=False)
-
Retrieve ioctls whose error codes where overwritten
Expand source code
def get_forced_returns(self, with_buf_only = False): ''' Retrieve ioctls whose error codes where overwritten ''' return self._get_returns(self._forced_returns, with_buf_only)
def get_unmodified_returns(self, with_buf_only=False)
-
Retrieve ioctl that completed normally
Expand source code
def get_unmodified_returns(self, with_buf_only = False): ''' Retrieve ioctl that completed normally ''' return self._get_returns(self._unmodified_returns, with_buf_only)