move header validation into class methods

add _parse_vban_service_header() helper function
This commit is contained in:
onyx-and-iris 2026-03-01 16:17:03 +00:00
parent 3c3e415d7e
commit 1123fe6432
2 changed files with 112 additions and 32 deletions

View File

@ -9,6 +9,9 @@ VBAN_PROTOCOL_SERVICE = 0x60
VBAN_SERVICE_RTPACKETREGISTER = 32
VBAN_SERVICE_RTPACKET = 33
VBAN_SERVICE_MASK = 0xE0
VBAN_PROTOCOL_MASK = 0xE0
VBAN_SERVICE_REQUESTREPLY = 0x02
VBAN_SERVICE_FNCT_REPLY = 0x02
MAX_PACKET_SIZE = 1436
HEADER_SIZE = 4 + 1 + 1 + 1 + 1 + 16
@ -92,6 +95,38 @@ class VbanSubscribeHeader:
return bytes(data)
def _parse_vban_service_header(data: bytes) -> dict:
"""Common parsing and validation for VBAN service protocol headers."""
if len(data) < HEADER_SIZE:
raise ValueError('Data is too short to be a valid VBAN header')
if data[:4] != b'VBAN':
raise ValueError('Invalid VBAN magic bytes')
format_sr = data[4]
format_nbs = data[5]
format_nbc = data[6]
format_bit = data[7]
# Verify this is a service protocol packet
protocol = format_sr & VBAN_PROTOCOL_MASK
if protocol != VBAN_PROTOCOL_SERVICE:
raise ValueError(f'Not a service protocol packet: {protocol:02x}')
# Extract stream name and frame counter
name = data[8:24].rstrip(b'\x00').decode('utf-8', errors='ignore')
framecounter = int.from_bytes(data[24:28], 'little')
return {
'format_sr': format_sr,
'format_nbs': format_nbs,
'format_nbc': format_nbc,
'format_bit': format_bit,
'name': name,
'framecounter': framecounter,
}
@dataclass
class VbanResponseHeader:
"""Represents the header of a response packet"""
@ -101,6 +136,7 @@ class VbanResponseHeader:
format_nbs: int = 0
format_nbc: int = VBAN_SERVICE_RTPACKET
format_bit: int = 0
framecounter: int = 0
@property
def vban(self) -> bytes:
@ -112,17 +148,63 @@ class VbanResponseHeader:
@classmethod
def from_bytes(cls, data: bytes):
if len(data) < HEADER_SIZE:
raise ValueError('Data is too short to be a valid VbanResponseHeader')
"""Parse a VbanResponseHeader from bytes."""
parsed = _parse_vban_service_header(data)
name = data[8:24].rstrip(b'\x00').decode('utf-8')
return cls(
name=name,
format_sr=data[4] & VBAN_SERVICE_MASK,
format_nbs=data[5],
format_nbc=data[6],
format_bit=data[7],
)
# Validate this is an RTPacket response
if parsed['format_nbc'] != VBAN_SERVICE_RTPACKET:
raise ValueError(
f'Not a RTPacket response packet: {parsed["format_nbc"]:02x}'
)
return cls(**parsed)
@dataclass
class VbanMatrixResponseHeader:
"""Represents the header of a matrix response packet"""
name: str = 'Request Reply'
format_sr: int = VBAN_PROTOCOL_SERVICE
format_nbs: int = VBAN_SERVICE_FNCT_REPLY
format_nbc: int = VBAN_SERVICE_REQUESTREPLY
format_bit: int = 0
framecounter: int = 0
@property
def vban(self) -> bytes:
return b'VBAN'
@property
def streamname(self) -> bytes:
return self.name.encode('ascii')[:16].ljust(16, b'\x00')
@classmethod
def from_bytes(cls, data: bytes):
"""Parse a matrix response packet from bytes."""
parsed = _parse_vban_service_header(data)
# Validate this is a service reply packet
if parsed['format_nbs'] != VBAN_SERVICE_FNCT_REPLY:
raise ValueError(f'Not a service reply packet: {parsed["format_nbs"]:02x}')
return cls(**parsed)
@classmethod
def extract_payload(cls, data: bytes) -> str:
"""Extract the text payload from a matrix response packet."""
if len(data) <= HEADER_SIZE:
return ''
payload_bytes = data[HEADER_SIZE:]
return payload_bytes.rstrip(b'\x00').decode('utf-8', errors='ignore')
@classmethod
def parse_response(cls, data: bytes) -> tuple['VbanMatrixResponseHeader', str]:
"""Parse a complete matrix response packet returning header and payload."""
header = cls.from_bytes(data)
payload = cls.extract_payload(data)
return header, payload
@dataclass

View File

@ -7,8 +7,6 @@ from .enums import NBS
from .error import VBANCMDConnectionError
from .packet.headers import (
HEADER_SIZE,
VBAN_PROTOCOL_SERVICE,
VBAN_SERVICE_RTPACKET,
VbanPacket,
VbanResponseHeader,
VbanSubscribeHeader,
@ -89,31 +87,31 @@ class Producer(threading.Thread):
data, _ = self._remote.sock.recvfrom(2048)
if len(data) < HEADER_SIZE:
return
response_header = VbanResponseHeader.from_bytes(data[:HEADER_SIZE])
if (
response_header.format_sr != VBAN_PROTOCOL_SERVICE
or response_header.format_nbc != VBAN_SERVICE_RTPACKET
):
return
match response_header.format_nbs:
case NBS.zero:
return VbanPacketNBS0.from_bytes(
nbs=NBS.zero, kind=self._remote.kind, data=data
)
case NBS.one:
return VbanPacketNBS1.from_bytes(
nbs=NBS.one, kind=self._remote.kind, data=data
)
return None
except TimeoutError as e:
self.logger.exception(f'{type(e).__name__}: {e}')
raise VBANCMDConnectionError(
f'timeout waiting for RtPacket from {self._remote.ip}'
f'timeout waiting for response from {self._remote.ip}:{self._remote.port}'
) from e
try:
header = VbanResponseHeader.from_bytes(data[:HEADER_SIZE])
except ValueError as e:
self.logger.warning(f'Error parsing response packet: {e}')
return None
match header.format_nbs:
case NBS.zero:
return VbanPacketNBS0.from_bytes(
nbs=NBS.zero, kind=self._remote.kind, data=data
)
case NBS.one:
return VbanPacketNBS1.from_bytes(
nbs=NBS.one, kind=self._remote.kind, data=data
)
return None
def stopped(self):
return self.stop_event.is_set()