# $Id: dpkt.py 43 2007-08-02 22:42:59Z jon.oberheide $
# -*- coding: utf-8 -*-
"""Simple packet creation and parsing."""
from __future__ import absolute_import
import copy
import itertools
import socket
import struct
import array
from .compat import compat_ord, compat_izip, iteritems
[docs]class Error(Exception):
pass
[docs]class UnpackError(Error):
pass
[docs]class NeedData(UnpackError):
pass
[docs]class PackError(Error):
pass
class _MetaPacket(type):
def __new__(cls, clsname, clsbases, clsdict):
t = type.__new__(cls, clsname, clsbases, clsdict)
st = getattr(t, '__hdr__', None)
if st is not None:
# XXX - __slots__ only created in __new__()
clsdict['__slots__'] = [x[0] for x in st] + ['data']
t = type.__new__(cls, clsname, clsbases, clsdict)
t.__hdr_fields__ = [x[0] for x in st]
t.__hdr_fmt__ = getattr(t, '__byte_order__', '>') + ''.join([x[1] for x in st])
t.__hdr_len__ = struct.calcsize(t.__hdr_fmt__)
t.__hdr_defaults__ = dict(compat_izip(
t.__hdr_fields__, [x[2] for x in st]))
return t
[docs]class Packet(_MetaPacket("Temp", (object,), {})):
"""Base packet class, with metaclass magic to generate members from self.__hdr__.
Attributes:
__hdr__: Packet header should be defined as a list of
(name, structfmt, default) tuples.
__byte_order__: Byte order, can be set to override the default ('>')
Example:
>>> class Foo(Packet):
... __hdr__ = (('foo', 'I', 1), ('bar', 'H', 2), ('baz', '4s', 'quux'))
...
>>> foo = Foo(bar=3)
>>> foo
Foo(bar=3)
>>> str(foo)
'\x00\x00\x00\x01\x00\x03quux'
>>> foo.bar
3
>>> foo.baz
'quux'
>>> foo.foo = 7
>>> foo.baz = 'whee'
>>> foo
Foo(baz='whee', foo=7, bar=3)
>>> Foo('hello, world!')
Foo(baz=' wor', foo=1751477356L, bar=28460, data='ld!')
"""
def __init__(self, *args, **kwargs):
"""Packet constructor with ([buf], [field=val,...]) prototype.
Arguments:
buf -- optional packet buffer to unpack
Optional keyword arguments correspond to members to set
(matching fields in self.__hdr__, or 'data').
"""
self.data = b''
if args:
try:
self.unpack(args[0])
except struct.error:
if len(args[0]) < self.__hdr_len__:
raise NeedData
raise UnpackError('invalid %s: %r' %
(self.__class__.__name__, args[0]))
else:
for k in self.__hdr_fields__:
setattr(self, k, copy.copy(self.__hdr_defaults__[k]))
for k, v in iteritems(kwargs):
setattr(self, k, v)
def __len__(self):
return self.__hdr_len__ + len(self.data)
def __getitem__(self, k):
try:
return getattr(self, k)
except AttributeError:
raise KeyError
def __repr__(self):
# Collect and display protocol fields in order:
# 1. public fields defined in __hdr__, unless their value is default
# 2. properties derived from _private fields defined in __hdr__
# 3. dynamically added fields from self.__dict__, unless they are _private
# 4. self.data when it's present
l = []
# maintain order of fields as defined in __hdr__
for field_name, _, _ in getattr(self, '__hdr__', []):
field_value = getattr(self, field_name)
if field_value != self.__hdr_defaults__[field_name]:
if field_name[0] != '_':
l.append('%s=%r' % (field_name, field_value)) # (1)
else:
# interpret _private fields as name of properties joined by underscores
for prop_name in field_name.split('_'): # (2)
if isinstance(getattr(self.__class__, prop_name, None), property):
l.append('%s=%r' % (prop_name, getattr(self, prop_name)))
# (3)
l.extend(
['%s=%r' % (attr_name, attr_value)
for attr_name, attr_value in iteritems(self.__dict__)
if attr_name[0] != '_' # exclude _private attributes
and attr_name != self.data.__class__.__name__.lower()]) # exclude fields like ip.udp
# (4)
if self.data:
l.append('data=%r' % self.data)
return '%s(%s)' % (self.__class__.__name__, ', '.join(l))
def __str__(self):
return str(self.__bytes__())
def __bytes__(self):
return self.pack_hdr() + bytes(self.data)
[docs] def pack_hdr(self):
"""Return packed header string."""
try:
return struct.pack(self.__hdr_fmt__,
*[getattr(self, k) for k in self.__hdr_fields__])
except struct.error:
vals = []
for k in self.__hdr_fields__:
v = getattr(self, k)
if isinstance(v, tuple):
vals.extend(v)
else:
vals.append(v)
try:
return struct.pack(self.__hdr_fmt__, *vals)
except struct.error as e:
raise PackError(str(e))
[docs] def pack(self):
"""Return packed header + self.data string."""
return bytes(self)
[docs] def unpack(self, buf):
"""Unpack packet header fields from buf, and set self.data."""
for k, v in compat_izip(self.__hdr_fields__,
struct.unpack(self.__hdr_fmt__, buf[:self.__hdr_len__])):
setattr(self, k, v)
self.data = buf[self.__hdr_len__:]
# XXX - ''.join([(len(`chr(x)`)==3) and chr(x) or '.' for x in range(256)])
__vis_filter = b'................................ !"#$%&\'()*+,-./0123456789:;<=>?@ABCDEFGHIJKLMNOPQRSTUVWXYZ[.]^_`abcdefghijklmnopqrstuvwxyz{|}~.................................................................................................................................'
[docs]def hexdump(buf, length=16):
"""Return a hexdump output string of the given buffer."""
n = 0
res = []
while buf:
line, buf = buf[:length], buf[length:]
hexa = ' '.join(['%02x' % compat_ord(x) for x in line])
line = line.translate(__vis_filter).decode('utf-8')
res.append(' %04d: %-*s %s' % (n, length * 3, hexa, line))
n += length
return '\n'.join(res)
[docs]def in_cksum_add(s, buf):
n = len(buf)
cnt = (n // 2) * 2
a = array.array('H', buf[:cnt])
if cnt != n:
a.append(compat_ord(buf[-1]))
return s + sum(a)
[docs]def in_cksum_done(s):
s = (s >> 16) + (s & 0xffff)
s += (s >> 16)
return socket.ntohs(~s & 0xffff)
[docs]def in_cksum(buf):
"""Return computed Internet checksum."""
return in_cksum_done(in_cksum_add(0, buf))
[docs]def test_utils():
__buf = b'\x00\x01\x02\x03\x04\x05\x06\x07\x08\t\n\x0b\x0c\r\x0e'
__hd = ' 0000: 00 01 02 03 04 05 06 07 08 09 0a 0b 0c 0d 0e ...............'
h = hexdump(__buf)
assert (h == __hd)
c = in_cksum(__buf)
assert (c == 51150)