Module pandare.plog_reader

Module for reading and writing PANDAlog (plog) files from Python.

Expand source code
#!/usr/bin/env python3
'''
Module for reading and writing PANDAlog (plog) files from Python.
'''

import zlib
import struct
try:
    import pandare.plog_pb2
except (TypeError,FileNotFoundError):
    raise ImportError("Unable to import autogenerated pandalog class (pandare/plog_pb2.py")

#from google.protobuf.message import Message

class PLogReader:
    '''
    A class for reading PANDAlog (plog) files. Run directly with `python -m  pandare.plog_reader [input.plog]` to translate input.plog file to json.

    Or the class can be imported and used in a Python script, where it can be iterated over to get 
    [google.protobuf.message.Message](https://googleapis.dev/python/protobuf/latest/google/protobuf/message.html#google.protobuf.message.Message) objects.

        with PLogReader('input.plog') as plr:
          for msg in plr:
                if msg.HasField("SomeField"):
                  print(msg.SomeField)
                if msg.HasField("OtherField"):
                  print(msg.otherField)

    '''
    def __init__(self, fn):
        self.f = open(fn, 'rb')
        self.version, _, self.dir_pos, _, self.chunk_gsize = struct.unpack('<IIQII', self.f.read(24))

        self.f.seek(self.dir_pos)
        self.nchunks, = struct.unpack('<I', self.f.read(4)) # number of chunks
        self.chunks = self.f.read(24*self.nchunks)          # chunks buffer
        self.chunk_idx = 0                                  # index of current chunk
        self.chunk_size = 0                                 # size of current chunk
        self.chunk_data = None                              # data of current chunk
        self.chunk_data_idx = 0

    def __iter__(self):
        return self

    def __enter__(self):
        return self

    def __exit__(self, exc_type, exc_val, exc_tb):
        self.f.close()
        self.f = self.chunk_data = None

    def __next__(self):
        # ran out of chunks
        if not self.chunk_idx < self.nchunks:
            raise StopIteration

        if self.chunk_data is None:
            # unpack ins, pos, nentries for this and the next chunk
            cur = struct.unpack_from('<QQQ', self.chunks, 24*self.chunk_idx)
            if self.chunk_idx + 1 < self.nchunks:
                nxt = struct.unpack_from('<QQQ', self.chunks, 24*(self.chunk_idx+1))
                zchunk_size = nxt[1] - cur[1]
            else:
                # setting the compressed chunk size to -1 will
                # result in reading the remaining of the file
                zchunk_size = -1

            # read and decompress chunk data
            self.f.seek(cur[1])
            self.chunk_data = zlib.decompress(self.f.read(zchunk_size), 15, self.chunk_gsize)
            self.chunk_size = len(self.chunk_data)
            self.chunk_data_idx = 0

        # parse message - we're using a fresh message
        # using MergeFromString() is slightly faster than using ParseFromString()
        msg_size, = struct.unpack_from('<I', self.chunk_data, self.chunk_data_idx)
        msg = pandare.plog_pb2.LogEntry()
        msg_start = self.chunk_data_idx + 4
        msg_end = msg_start + msg_size
        msg.MergeFromString(self.chunk_data[msg_start:msg_end])

        # update state
        self.chunk_data_idx = msg_end

        if not self.chunk_data_idx < self.chunk_size:
            self.chunk_idx += 1
            self.chunk_size = 0
            self.chunk_data = None
            self.chunk_data_idx = 0

        return msg

if __name__ == "__main__":
    import sys
    from google.protobuf.json_format import MessageToJson
    print('[')
    with PLogReader(sys.argv[1]) as plr:
        for i, m in enumerate(plr):
            if i > 0: print(',')
            print(MessageToJson(m), end='')
    print('\n]')

Classes

class PLogReader (fn)

A class for reading PANDAlog (plog) files. Run directly with python -m pandare.plog_reader [input.plog] to translate input.plog file to json.

Or the class can be imported and used in a Python script, where it can be iterated over to get google.protobuf.message.Message objects.

with PLogReader('input.plog') as plr:
  for msg in plr:
        if msg.HasField("SomeField"):
          print(msg.SomeField)
        if msg.HasField("OtherField"):
          print(msg.otherField)
Expand source code
class PLogReader:
    '''
    A class for reading PANDAlog (plog) files. Run directly with `python -m  pandare.plog_reader [input.plog]` to translate input.plog file to json.

    Or the class can be imported and used in a Python script, where it can be iterated over to get 
    [google.protobuf.message.Message](https://googleapis.dev/python/protobuf/latest/google/protobuf/message.html#google.protobuf.message.Message) objects.

        with PLogReader('input.plog') as plr:
          for msg in plr:
                if msg.HasField("SomeField"):
                  print(msg.SomeField)
                if msg.HasField("OtherField"):
                  print(msg.otherField)

    '''
    def __init__(self, fn):
        self.f = open(fn, 'rb')
        self.version, _, self.dir_pos, _, self.chunk_gsize = struct.unpack('<IIQII', self.f.read(24))

        self.f.seek(self.dir_pos)
        self.nchunks, = struct.unpack('<I', self.f.read(4)) # number of chunks
        self.chunks = self.f.read(24*self.nchunks)          # chunks buffer
        self.chunk_idx = 0                                  # index of current chunk
        self.chunk_size = 0                                 # size of current chunk
        self.chunk_data = None                              # data of current chunk
        self.chunk_data_idx = 0

    def __iter__(self):
        return self

    def __enter__(self):
        return self

    def __exit__(self, exc_type, exc_val, exc_tb):
        self.f.close()
        self.f = self.chunk_data = None

    def __next__(self):
        # ran out of chunks
        if not self.chunk_idx < self.nchunks:
            raise StopIteration

        if self.chunk_data is None:
            # unpack ins, pos, nentries for this and the next chunk
            cur = struct.unpack_from('<QQQ', self.chunks, 24*self.chunk_idx)
            if self.chunk_idx + 1 < self.nchunks:
                nxt = struct.unpack_from('<QQQ', self.chunks, 24*(self.chunk_idx+1))
                zchunk_size = nxt[1] - cur[1]
            else:
                # setting the compressed chunk size to -1 will
                # result in reading the remaining of the file
                zchunk_size = -1

            # read and decompress chunk data
            self.f.seek(cur[1])
            self.chunk_data = zlib.decompress(self.f.read(zchunk_size), 15, self.chunk_gsize)
            self.chunk_size = len(self.chunk_data)
            self.chunk_data_idx = 0

        # parse message - we're using a fresh message
        # using MergeFromString() is slightly faster than using ParseFromString()
        msg_size, = struct.unpack_from('<I', self.chunk_data, self.chunk_data_idx)
        msg = pandare.plog_pb2.LogEntry()
        msg_start = self.chunk_data_idx + 4
        msg_end = msg_start + msg_size
        msg.MergeFromString(self.chunk_data[msg_start:msg_end])

        # update state
        self.chunk_data_idx = msg_end

        if not self.chunk_data_idx < self.chunk_size:
            self.chunk_idx += 1
            self.chunk_size = 0
            self.chunk_data = None
            self.chunk_data_idx = 0

        return msg