Source code for dpkt.snoop

# $Id$
# -*- coding: utf-8 -*-
"""Snoop file format."""
from __future__ import absolute_import

import time

from . import dpkt

# RFC 1761

SNOOP_MAGIC = 0x736E6F6F70000000

SNOOP_VERSION = 2

SDL_8023 = 0
SDL_8024 = 1
SDL_8025 = 2
SDL_8026 = 3
SDL_ETHER = 4
SDL_HDLC = 5
SDL_CHSYNC = 6
SDL_IBMCC = 7
SDL_FDDI = 8
SDL_OTHER = 9

dltoff = {SDL_ETHER: 14}


[docs]class PktHdr(dpkt.Packet): """snoop packet header. TODO: Longer class information.... Attributes: __hdr__: Header fields of snoop packet header. TODO. """ __byte_order__ = '!' __hdr__ = ( ('orig_len', 'I', 0), ('incl_len', 'I', 0), ('rec_len', 'I', 0), ('cum_drops', 'I', 0), ('ts_sec', 'I', 0), ('ts_usec', 'I', 0), )
[docs]class FileHdr(dpkt.Packet): """snoop file header. TODO: Longer class information.... Attributes: __hdr__: Header fields of snoop file header. TODO. """ __byte_order__ = '!' __hdr__ = ( ('magic', 'Q', SNOOP_MAGIC), ('v', 'I', SNOOP_VERSION), ('linktype', 'I', SDL_ETHER), )
[docs]class Writer(object): """Simple snoop dumpfile writer. TODO: Longer class information.... Attributes: TODO. """ def __init__(self, fileobj, linktype=SDL_ETHER): self.__f = fileobj fh = FileHdr(linktype=linktype) self.__f.write(str(fh))
[docs] def writepkt(self, pkt, ts=None): if ts is None: ts = time.time() s = str(pkt) n = len(s) pad_len = 4 - n % 4 if n % 4 else 0 ph = PktHdr(orig_len=n, incl_len=n, rec_len=PktHdr.__hdr_len__ + n + pad_len, ts_sec=int(ts), ts_usec=int((int(ts) - float(ts)) * 1000000.0)) self.__f.write(str(ph)) self.__f.write(s + '\0' * pad_len)
[docs] def close(self): self.__f.close()
[docs]class Reader(object): """Simple pypcap-compatible snoop file reader. TODO: Longer class information.... Attributes: TODO. """ def __init__(self, fileobj): self.name = fileobj.name self.fd = fileobj.fileno() self.__f = fileobj buf = self.__f.read(FileHdr.__hdr_len__) self.__fh = FileHdr(buf) self.__ph = PktHdr if self.__fh.magic != SNOOP_MAGIC: raise ValueError('invalid snoop header') self.dloff = dltoff[self.__fh.linktype] self.filter = ''
[docs] def fileno(self): return self.fd
[docs] def setfilter(self, value, optimize=1): return NotImplementedError
[docs] def readpkts(self): return list(self)
[docs] def dispatch(self, cnt, callback, *args): if cnt > 0: for i in range(cnt): ts, pkt = next(self) callback(ts, pkt, *args) else: for ts, pkt in self: callback(ts, pkt, *args)
[docs] def loop(self, callback, *args): self.dispatch(0, callback, *args)
def __iter__(self): self.__f.seek(FileHdr.__hdr_len__) while 1: buf = self.__f.read(PktHdr.__hdr_len__) if not buf: break hdr = self.__ph(buf) buf = self.__f.read(hdr.rec_len - PktHdr.__hdr_len__) yield (hdr.ts_sec + (hdr.ts_usec / 1000000.0), buf[:hdr.incl_len])