mirror of
https://github.com/onyx-and-iris/vban-cmd-python.git
synced 2026-04-18 04:53:31 +00:00
implement ping/pong dataclasses
This commit is contained in:
@@ -8,11 +8,15 @@ VBAN_PROTOCOL_SERVICE = 0x60
|
||||
|
||||
VBAN_SERVICE_RTPACKETREGISTER = 32
|
||||
VBAN_SERVICE_RTPACKET = 33
|
||||
VBAN_SERVICE_PING = 0
|
||||
VBAN_SERVICE_PONG = 0 # PONG uses same service type as PING
|
||||
VBAN_SERVICE_MASK = 0xE0
|
||||
VBAN_PROTOCOL_MASK = 0xE0
|
||||
VBAN_SERVICE_REQUESTREPLY = 0x02
|
||||
VBAN_SERVICE_FNCT_REPLY = 0x02
|
||||
|
||||
PINGPONG_PACKET_SIZE = 704 # Size of the PING/PONG header + payload in bytes
|
||||
|
||||
MAX_PACKET_SIZE = 1436
|
||||
HEADER_SIZE = 4 + 1 + 1 + 1 + 1 + 16
|
||||
|
||||
@@ -207,6 +211,92 @@ class VbanMatrixResponseHeader:
|
||||
return header, payload
|
||||
|
||||
|
||||
@dataclass
|
||||
class VbanPingHeader:
|
||||
"""Represents the header of a PING packet"""
|
||||
|
||||
name: str = 'PING0'
|
||||
format_sr: int = VBAN_PROTOCOL_SERVICE
|
||||
format_nbs: int = 0
|
||||
format_nbc: int = VBAN_SERVICE_PING
|
||||
format_bit: int = 0
|
||||
framecounter: int = 0
|
||||
|
||||
@property
|
||||
def vban(self) -> bytes:
|
||||
return b'VBAN'
|
||||
|
||||
@property
|
||||
def streamname(self) -> bytes:
|
||||
return self.name.encode('ascii')[:16].ljust(16, b'\x00')
|
||||
|
||||
@classmethod
|
||||
def to_bytes(cls, framecounter: int = 0) -> bytes:
|
||||
"""Creates the PING header bytes only."""
|
||||
header = cls(framecounter=framecounter)
|
||||
|
||||
data = bytearray()
|
||||
data.extend(header.vban)
|
||||
data.extend(header.format_sr.to_bytes(1, 'little'))
|
||||
data.extend(header.format_nbs.to_bytes(1, 'little'))
|
||||
data.extend(header.format_nbc.to_bytes(1, 'little'))
|
||||
data.extend(header.format_bit.to_bytes(1, 'little'))
|
||||
data.extend(header.streamname)
|
||||
data.extend(header.framecounter.to_bytes(4, 'little'))
|
||||
return bytes(data)
|
||||
|
||||
|
||||
@dataclass
|
||||
class VbanPongHeader:
|
||||
"""Represents the header of a PONG response packet"""
|
||||
|
||||
name: str = 'PING0'
|
||||
format_sr: int = VBAN_PROTOCOL_SERVICE
|
||||
format_nbs: int = 0
|
||||
format_nbc: int = VBAN_SERVICE_PONG
|
||||
format_bit: int = 0
|
||||
framecounter: int = 0
|
||||
|
||||
@property
|
||||
def vban(self) -> bytes:
|
||||
return b'VBAN'
|
||||
|
||||
@property
|
||||
def streamname(self) -> bytes:
|
||||
return self.name.encode('ascii')[:16].ljust(16, b'\x00')
|
||||
|
||||
@classmethod
|
||||
def from_bytes(cls, data: bytes):
|
||||
"""Parse a PONG response packet from bytes."""
|
||||
parsed = _parse_vban_service_header(data)
|
||||
|
||||
# PONG responses use the same service type as PING (0x00)
|
||||
# and are identified by having payload data
|
||||
if parsed['format_nbc'] != VBAN_SERVICE_PONG:
|
||||
raise ValueError(f'Not a PONG response packet: {parsed["format_nbc"]:02x}')
|
||||
|
||||
return cls(**parsed)
|
||||
|
||||
@classmethod
|
||||
def is_pong_response(cls, data: bytes) -> bool:
|
||||
"""Check if packet is a PONG response by analyzing the actual response format."""
|
||||
try:
|
||||
parsed = _parse_vban_service_header(data)
|
||||
|
||||
# Validate this is a service protocol packet with PING/PONG service type
|
||||
if parsed['format_nbc'] != VBAN_SERVICE_PONG:
|
||||
return False
|
||||
|
||||
if parsed['name'] not in ['PING0', 'VBAN Service']:
|
||||
return False
|
||||
|
||||
# PONG should have payload data (same size as PING)
|
||||
return len(data) >= PINGPONG_PACKET_SIZE
|
||||
|
||||
except (ValueError, Exception):
|
||||
return False
|
||||
|
||||
|
||||
@dataclass
|
||||
class VbanRequestHeader:
|
||||
"""Represents the header of a request packet"""
|
||||
@@ -238,7 +328,7 @@ class VbanRequestHeader:
|
||||
|
||||
@property
|
||||
def streamname(self) -> bytes:
|
||||
return self.name.encode() + bytes(16 - len(self.name))
|
||||
return self.name.encode()[:16].ljust(16, b'\x00')
|
||||
|
||||
@classmethod
|
||||
def to_bytes(
|
||||
|
||||
Reference in New Issue
Block a user