From 1123fe6432ae52cf3ebd02b3876f4ad6b2872e76 Mon Sep 17 00:00:00 2001 From: onyx-and-iris Date: Sun, 1 Mar 2026 16:17:03 +0000 Subject: [PATCH] move header validation into class methods add _parse_vban_service_header() helper function --- vban_cmd/packet/headers.py | 102 +++++++++++++++++++++++++++++++++---- vban_cmd/worker.py | 42 ++++++++------- 2 files changed, 112 insertions(+), 32 deletions(-) diff --git a/vban_cmd/packet/headers.py b/vban_cmd/packet/headers.py index fc985b8..bcf4685 100644 --- a/vban_cmd/packet/headers.py +++ b/vban_cmd/packet/headers.py @@ -9,6 +9,9 @@ VBAN_PROTOCOL_SERVICE = 0x60 VBAN_SERVICE_RTPACKETREGISTER = 32 VBAN_SERVICE_RTPACKET = 33 VBAN_SERVICE_MASK = 0xE0 +VBAN_PROTOCOL_MASK = 0xE0 +VBAN_SERVICE_REQUESTREPLY = 0x02 +VBAN_SERVICE_FNCT_REPLY = 0x02 MAX_PACKET_SIZE = 1436 HEADER_SIZE = 4 + 1 + 1 + 1 + 1 + 16 @@ -92,6 +95,38 @@ class VbanSubscribeHeader: return bytes(data) +def _parse_vban_service_header(data: bytes) -> dict: + """Common parsing and validation for VBAN service protocol headers.""" + if len(data) < HEADER_SIZE: + raise ValueError('Data is too short to be a valid VBAN header') + + if data[:4] != b'VBAN': + raise ValueError('Invalid VBAN magic bytes') + + format_sr = data[4] + format_nbs = data[5] + format_nbc = data[6] + format_bit = data[7] + + # Verify this is a service protocol packet + protocol = format_sr & VBAN_PROTOCOL_MASK + if protocol != VBAN_PROTOCOL_SERVICE: + raise ValueError(f'Not a service protocol packet: {protocol:02x}') + + # Extract stream name and frame counter + name = data[8:24].rstrip(b'\x00').decode('utf-8', errors='ignore') + framecounter = int.from_bytes(data[24:28], 'little') + + return { + 'format_sr': format_sr, + 'format_nbs': format_nbs, + 'format_nbc': format_nbc, + 'format_bit': format_bit, + 'name': name, + 'framecounter': framecounter, + } + + @dataclass class VbanResponseHeader: """Represents the header of a response packet""" @@ -101,6 +136,7 @@ class VbanResponseHeader: format_nbs: int = 0 format_nbc: int = VBAN_SERVICE_RTPACKET format_bit: int = 0 + framecounter: int = 0 @property def vban(self) -> bytes: @@ -112,17 +148,63 @@ class VbanResponseHeader: @classmethod def from_bytes(cls, data: bytes): - if len(data) < HEADER_SIZE: - raise ValueError('Data is too short to be a valid VbanResponseHeader') + """Parse a VbanResponseHeader from bytes.""" + parsed = _parse_vban_service_header(data) - name = data[8:24].rstrip(b'\x00').decode('utf-8') - return cls( - name=name, - format_sr=data[4] & VBAN_SERVICE_MASK, - format_nbs=data[5], - format_nbc=data[6], - format_bit=data[7], - ) + # Validate this is an RTPacket response + if parsed['format_nbc'] != VBAN_SERVICE_RTPACKET: + raise ValueError( + f'Not a RTPacket response packet: {parsed["format_nbc"]:02x}' + ) + + return cls(**parsed) + + +@dataclass +class VbanMatrixResponseHeader: + """Represents the header of a matrix response packet""" + + name: str = 'Request Reply' + format_sr: int = VBAN_PROTOCOL_SERVICE + format_nbs: int = VBAN_SERVICE_FNCT_REPLY + format_nbc: int = VBAN_SERVICE_REQUESTREPLY + format_bit: int = 0 + framecounter: int = 0 + + @property + def vban(self) -> bytes: + return b'VBAN' + + @property + def streamname(self) -> bytes: + return self.name.encode('ascii')[:16].ljust(16, b'\x00') + + @classmethod + def from_bytes(cls, data: bytes): + """Parse a matrix response packet from bytes.""" + parsed = _parse_vban_service_header(data) + + # Validate this is a service reply packet + if parsed['format_nbs'] != VBAN_SERVICE_FNCT_REPLY: + raise ValueError(f'Not a service reply packet: {parsed["format_nbs"]:02x}') + + return cls(**parsed) + + @classmethod + def extract_payload(cls, data: bytes) -> str: + """Extract the text payload from a matrix response packet.""" + if len(data) <= HEADER_SIZE: + return '' + + payload_bytes = data[HEADER_SIZE:] + return payload_bytes.rstrip(b'\x00').decode('utf-8', errors='ignore') + + @classmethod + def parse_response(cls, data: bytes) -> tuple['VbanMatrixResponseHeader', str]: + """Parse a complete matrix response packet returning header and payload.""" + header = cls.from_bytes(data) + payload = cls.extract_payload(data) + return header, payload @dataclass diff --git a/vban_cmd/worker.py b/vban_cmd/worker.py index 9bb6713..1f8a6a9 100644 --- a/vban_cmd/worker.py +++ b/vban_cmd/worker.py @@ -7,8 +7,6 @@ from .enums import NBS from .error import VBANCMDConnectionError from .packet.headers import ( HEADER_SIZE, - VBAN_PROTOCOL_SERVICE, - VBAN_SERVICE_RTPACKET, VbanPacket, VbanResponseHeader, VbanSubscribeHeader, @@ -89,31 +87,31 @@ class Producer(threading.Thread): data, _ = self._remote.sock.recvfrom(2048) if len(data) < HEADER_SIZE: return - - response_header = VbanResponseHeader.from_bytes(data[:HEADER_SIZE]) - if ( - response_header.format_sr != VBAN_PROTOCOL_SERVICE - or response_header.format_nbc != VBAN_SERVICE_RTPACKET - ): - return - - match response_header.format_nbs: - case NBS.zero: - return VbanPacketNBS0.from_bytes( - nbs=NBS.zero, kind=self._remote.kind, data=data - ) - - case NBS.one: - return VbanPacketNBS1.from_bytes( - nbs=NBS.one, kind=self._remote.kind, data=data - ) - return None except TimeoutError as e: self.logger.exception(f'{type(e).__name__}: {e}') raise VBANCMDConnectionError( - f'timeout waiting for RtPacket from {self._remote.ip}' + f'timeout waiting for response from {self._remote.ip}:{self._remote.port}' ) from e + try: + header = VbanResponseHeader.from_bytes(data[:HEADER_SIZE]) + except ValueError as e: + self.logger.warning(f'Error parsing response packet: {e}') + return None + + match header.format_nbs: + case NBS.zero: + return VbanPacketNBS0.from_bytes( + nbs=NBS.zero, kind=self._remote.kind, data=data + ) + + case NBS.one: + return VbanPacketNBS1.from_bytes( + nbs=NBS.one, kind=self._remote.kind, data=data + ) + + return None + def stopped(self): return self.stop_event.is_set()