Module pandare.extras.procWriteCapture
Expand source code
#!/usr/bin/env python3
import shutil
from pathlib import Path
class ProcWriteCapture():
'''
Set console_capture = True to capture all console output to file, including boot messages.
Set proc_name = "name_of_proc" to, for a named process, capture stdout/stderr and any file writes from the hypervisor, mirror results to log directory.
Can be stacked with console capture.
'''
def __init__(self, panda, console_capture = False, proc_name = None, log_dir = None, rm_existing_logs = False):
self._panda = panda
self._files_written = set()
self._rm = rm_existing_logs
self._console_capture = console_capture
self._proc_name = proc_name
self._proc_printed_err = False
self._console_printed_err = False
if log_dir == None:
self._console_log_dir = Path.cwd()
if proc_name:
self._proc_log_dir = Path.cwd() / self._proc_name
else:
self._console_log_dir = Path(log_dir)
if proc_name:
self._proc_log_dir = Path(log_dir).joinpath(self._proc_name)
# Setup logging dir
self._console_log_dir.mkdir(parents=True, exist_ok=True)
if proc_name:
self._proc_log_dir.mkdir(parents=True, exist_ok=True)
if self._rm:
if proc_name:
shutil.rmtree(self._proc_log_dir)
shutil.rmtree(self._console_log_dir)
# Mirror writes
@self._panda.ppp("syscalls2", "on_sys_write_enter")
def proc_write_capture_on_sys_write_enter(cpu, pc, fd, buf, cnt):
try_read = False
# Capture console output
if self._console_capture:
# Fun trick: lazy eval of OSI
# Based on the idea that a non-POSIX FD will only be used after boot is finished an OSI is functional
# Note: doesn't capture boot logs (would require hooking kernel's printk, not write syscall)
if (fd == 1) or (fd == 2) or (fd == 3):
try_read = True
else:
curr_proc = panda.plugins['osi'].get_current_process(cpu)
file_name_ptr = panda.plugins['osi_linux'].osi_linux_fd_to_filename(cpu, curr_proc, fd)
file_path = panda.ffi.string(file_name_ptr).decode()
if ("tty" in file_path):
try_read = True
if try_read:
try:
data = panda.virtual_memory_read(cpu, buf, cnt)
except ValueError:
raise RuntimeError(f"Failed to read buffer: addr 0x{buf:016x}")
if fd == 2:
self._console_printed_err = True
log_file = self._console_log_dir.joinpath("console.out")
with open(log_file, "ab") as f:
f.write(data)
self._files_written.add(str(log_file))
# Use OSI to capture logs for a named process
if self._proc_name:
curr_proc = panda.plugins['osi'].get_current_process(cpu)
curr_proc_name = panda.ffi.string(curr_proc.name).decode()
if self._proc_name == curr_proc_name:
if not try_read: # If we didn't already read this data in once for console capture
try:
data = panda.virtual_memory_read(cpu, buf, cnt)
except ValueError:
raise RuntimeError(f"Failed to read buffer: proc \'{curr_proc_name}\', addr 0x{buf:016x}")
file_name_ptr = panda.plugins['osi_linux'].osi_linux_fd_to_filename(cpu, curr_proc, fd)
file_path = panda.ffi.string(file_name_ptr).decode()
# For informational purposes only, collection not reliant on this exact mapping
if fd == 1: # POSIX stdout
file_path += ".stdout"
elif fd == 2: # POSIX stderr
file_path += ".stderr"
self._proc_printed_err = True
log_file = self._proc_log_dir.joinpath(file_path.replace("//", "_").replace("/", "_"))
with open(log_file, "ab") as f:
f.write(data)
self._files_written.add(str(log_file))
def proc_printed_err(self):
return self._proc_printed_err
def console_printed_post_boot_err(self):
return self._console_printed_err
def get_files_written(self):
return self._files_written
if __name__ == "__main__":
import os
from pandare import Panda
panda = Panda(generic="x86_64")
test = ProcWriteCapture(panda, console_capture = True)
@panda.queue_blocking
def driver():
panda.revert_sync('root')
data = panda.run_serial_cmd("whoami")
panda.end_analysis()
panda.run()
outfile = "console.out"
assert(os.path.isfile(outfile)), "Missing file"
with open(outfile) as f:
data = f.readlines()
os.remove(outfile)
assert 'whoami\n' in data, "Incorrect output"
print("Success")
Classes
class ProcWriteCapture (panda, console_capture=False, proc_name=None, log_dir=None, rm_existing_logs=False)
-
Set console_capture = True to capture all console output to file, including boot messages. Set proc_name = "name_of_proc" to, for a named process, capture stdout/stderr and any file writes from the hypervisor, mirror results to log directory. Can be stacked with console capture.
Expand source code
class ProcWriteCapture(): ''' Set console_capture = True to capture all console output to file, including boot messages. Set proc_name = "name_of_proc" to, for a named process, capture stdout/stderr and any file writes from the hypervisor, mirror results to log directory. Can be stacked with console capture. ''' def __init__(self, panda, console_capture = False, proc_name = None, log_dir = None, rm_existing_logs = False): self._panda = panda self._files_written = set() self._rm = rm_existing_logs self._console_capture = console_capture self._proc_name = proc_name self._proc_printed_err = False self._console_printed_err = False if log_dir == None: self._console_log_dir = Path.cwd() if proc_name: self._proc_log_dir = Path.cwd() / self._proc_name else: self._console_log_dir = Path(log_dir) if proc_name: self._proc_log_dir = Path(log_dir).joinpath(self._proc_name) # Setup logging dir self._console_log_dir.mkdir(parents=True, exist_ok=True) if proc_name: self._proc_log_dir.mkdir(parents=True, exist_ok=True) if self._rm: if proc_name: shutil.rmtree(self._proc_log_dir) shutil.rmtree(self._console_log_dir) # Mirror writes @self._panda.ppp("syscalls2", "on_sys_write_enter") def proc_write_capture_on_sys_write_enter(cpu, pc, fd, buf, cnt): try_read = False # Capture console output if self._console_capture: # Fun trick: lazy eval of OSI # Based on the idea that a non-POSIX FD will only be used after boot is finished an OSI is functional # Note: doesn't capture boot logs (would require hooking kernel's printk, not write syscall) if (fd == 1) or (fd == 2) or (fd == 3): try_read = True else: curr_proc = panda.plugins['osi'].get_current_process(cpu) file_name_ptr = panda.plugins['osi_linux'].osi_linux_fd_to_filename(cpu, curr_proc, fd) file_path = panda.ffi.string(file_name_ptr).decode() if ("tty" in file_path): try_read = True if try_read: try: data = panda.virtual_memory_read(cpu, buf, cnt) except ValueError: raise RuntimeError(f"Failed to read buffer: addr 0x{buf:016x}") if fd == 2: self._console_printed_err = True log_file = self._console_log_dir.joinpath("console.out") with open(log_file, "ab") as f: f.write(data) self._files_written.add(str(log_file)) # Use OSI to capture logs for a named process if self._proc_name: curr_proc = panda.plugins['osi'].get_current_process(cpu) curr_proc_name = panda.ffi.string(curr_proc.name).decode() if self._proc_name == curr_proc_name: if not try_read: # If we didn't already read this data in once for console capture try: data = panda.virtual_memory_read(cpu, buf, cnt) except ValueError: raise RuntimeError(f"Failed to read buffer: proc \'{curr_proc_name}\', addr 0x{buf:016x}") file_name_ptr = panda.plugins['osi_linux'].osi_linux_fd_to_filename(cpu, curr_proc, fd) file_path = panda.ffi.string(file_name_ptr).decode() # For informational purposes only, collection not reliant on this exact mapping if fd == 1: # POSIX stdout file_path += ".stdout" elif fd == 2: # POSIX stderr file_path += ".stderr" self._proc_printed_err = True log_file = self._proc_log_dir.joinpath(file_path.replace("//", "_").replace("/", "_")) with open(log_file, "ab") as f: f.write(data) self._files_written.add(str(log_file)) def proc_printed_err(self): return self._proc_printed_err def console_printed_post_boot_err(self): return self._console_printed_err def get_files_written(self): return self._files_written
Methods
def console_printed_post_boot_err(self)
-
Expand source code
def console_printed_post_boot_err(self): return self._console_printed_err
def get_files_written(self)
-
Expand source code
def get_files_written(self): return self._files_written
def proc_printed_err(self)
-
Expand source code
def proc_printed_err(self): return self._proc_printed_err