# -*- coding: utf-8 -*-
"""ATA over Ethernet Protocol."""
from __future__ import absolute_import
import struct
from . import dpkt
from .decorators import deprecated
from .compat import iteritems
[docs]class AOE(dpkt.Packet):
"""ATA over Ethernet Protocol.
See more about the AOE on \
https://en.wikipedia.org/wiki/ATA_over_Ethernet
Attributes:
__hdr__: Header fields of AOE.
data: Message data.
"""
__hdr__ = (
('ver_fl', 'B', 0x10),
('err', 'B', 0),
('maj', 'H', 0),
('min', 'B', 0),
('cmd', 'B', 0),
('tag', 'I', 0),
)
_cmdsw = {}
@property
def ver(self): return self.ver_fl >> 4
@ver.setter
def ver(self, ver): self.ver_fl = (ver << 4) | (self.ver_fl & 0xf)
@property
def fl(self): return self.ver_fl & 0xf
@fl.setter
def fl(self, fl): self.ver_fl = (self.ver_fl & 0xf0) | fl
[docs] @classmethod
def set_cmd(cls, cmd, pktclass):
cls._cmdsw[cmd] = pktclass
[docs] @classmethod
def get_cmd(cls, cmd):
return cls._cmdsw[cmd]
[docs] def unpack(self, buf):
dpkt.Packet.unpack(self, buf)
try:
self.data = self._cmdsw[self.cmd](self.data)
setattr(self, self.data.__class__.__name__.lower(), self.data)
except (KeyError, struct.error, dpkt.UnpackError):
pass
[docs] def pack_hdr(self):
try:
return dpkt.Packet.pack_hdr(self)
except struct.error as e:
raise dpkt.PackError(str(e))
AOE_CMD_ATA = 0
AOE_CMD_CFG = 1
AOE_FLAG_RSP = 1 << 3
def __load_cmds():
prefix = 'AOE_CMD_'
g = globals()
for k, v in iteritems(g):
if k.startswith(prefix):
name = 'aoe' + k[len(prefix):].lower()
try:
mod = __import__(name, g, level=1)
AOE.set_cmd(v, getattr(mod, name.upper()))
except (ImportError, AttributeError):
continue
def _mod_init():
"""Post-initialization called when all dpkt modules are fully loaded"""
if not AOE._cmdsw:
__load_cmds()