Compare commits

..

3 Commits

6 changed files with 850 additions and 837 deletions

View File

@ -1,808 +0,0 @@
import struct
from dataclasses import dataclass
from typing import NamedTuple
from .enums import NBS
from .kinds import KindMapClass
from .util import comp
VBAN_PROTOCOL_TXT = 0x40
VBAN_PROTOCOL_SERVICE = 0x60
VBAN_SERVICE_RTPACKETREGISTER = 32
VBAN_SERVICE_RTPACKET = 33
VBAN_SERVICE_MASK = 0xE0
MAX_PACKET_SIZE = 1436
HEADER_SIZE = 4 + 1 + 1 + 1 + 1 + 16
VMPARAMSTRIP_SIZE = 174
@dataclass
class VbanPacket:
"""Represents the header of a VBAN data packet"""
nbs: NBS
_kind: KindMapClass
_voicemeeterType: bytes
_reserved: bytes
_buffersize: bytes
_voicemeeterVersion: bytes
_optionBits: bytes
_samplerate: bytes
@property
def voicemeetertype(self) -> str:
"""returns voicemeeter type as a string"""
return ['', 'basic', 'banana', 'potato'][
int.from_bytes(self._voicemeeterType, 'little')
]
@property
def voicemeeterversion(self) -> tuple:
"""returns voicemeeter version as a tuple"""
return tuple(self._voicemeeterVersion[i] for i in range(3, -1, -1))
@property
def samplerate(self) -> int:
"""returns samplerate as an int"""
return int.from_bytes(self._samplerate, 'little')
class Levels(NamedTuple):
strip: tuple[float, ...]
bus: tuple[float, ...]
class ChannelState:
"""Represents the processed state of a single strip or bus channel"""
def __init__(self, state_bytes: bytes):
# Convert 4-byte state to integer once for efficient lookups
self._state = int.from_bytes(state_bytes, 'little')
def get_mode(self, mode_value: int) -> bool:
"""Get boolean state for a specific mode"""
return (self._state & mode_value) != 0
def get_mode_int(self, mode_value: int) -> int:
"""Get integer state for a specific mode"""
return self._state & mode_value
# Common boolean modes
@property
def mute(self) -> bool:
return (self._state & 0x00000001) != 0
@property
def solo(self) -> bool:
return (self._state & 0x00000002) != 0
@property
def mono(self) -> bool:
return (self._state & 0x00000004) != 0
@property
def mc(self) -> bool:
return (self._state & 0x00000008) != 0
# EQ modes
@property
def eq_on(self) -> bool:
return (self._state & 0x00000100) != 0
@property
def eq_ab(self) -> bool:
return (self._state & 0x00000800) != 0
# Bus assignments (strip to bus routing)
@property
def busa1(self) -> bool:
return (self._state & 0x00001000) != 0
@property
def busa2(self) -> bool:
return (self._state & 0x00002000) != 0
@property
def busa3(self) -> bool:
return (self._state & 0x00004000) != 0
@property
def busa4(self) -> bool:
return (self._state & 0x00008000) != 0
@property
def busb1(self) -> bool:
return (self._state & 0x00010000) != 0
@property
def busb2(self) -> bool:
return (self._state & 0x00020000) != 0
@property
def busb3(self) -> bool:
return (self._state & 0x00040000) != 0
class States(NamedTuple):
strip: tuple[ChannelState, ...]
bus: tuple[ChannelState, ...]
class Labels(NamedTuple):
strip: tuple[str, ...]
bus: tuple[str, ...]
@dataclass
class VbanPacketNBS0(VbanPacket):
"""Represents the body of a VBAN data packet with ident:0"""
_inputLeveldB100: bytes
_outputLeveldB100: bytes
_TransportBit: bytes
_stripState: bytes
_busState: bytes
_stripGaindB100Layer1: bytes
_stripGaindB100Layer2: bytes
_stripGaindB100Layer3: bytes
_stripGaindB100Layer4: bytes
_stripGaindB100Layer5: bytes
_stripGaindB100Layer6: bytes
_stripGaindB100Layer7: bytes
_stripGaindB100Layer8: bytes
_busGaindB100: bytes
_stripLabelUTF8c60: bytes
_busLabelUTF8c60: bytes
@classmethod
def from_bytes(cls, nbs: NBS, kind: KindMapClass, data: bytes):
return cls(
nbs=nbs,
_kind=kind,
_voicemeeterType=data[28:29],
_reserved=data[29:30],
_buffersize=data[30:32],
_voicemeeterVersion=data[32:36],
_optionBits=data[36:40],
_samplerate=data[40:44],
_inputLeveldB100=data[44:112],
_outputLeveldB100=data[112:240],
_TransportBit=data[240:244],
_stripState=data[244:276],
_busState=data[276:308],
_stripGaindB100Layer1=data[308:324],
_stripGaindB100Layer2=data[324:340],
_stripGaindB100Layer3=data[340:356],
_stripGaindB100Layer4=data[356:372],
_stripGaindB100Layer5=data[372:388],
_stripGaindB100Layer6=data[388:404],
_stripGaindB100Layer7=data[404:420],
_stripGaindB100Layer8=data[420:436],
_busGaindB100=data[436:452],
_stripLabelUTF8c60=data[452:932],
_busLabelUTF8c60=data[932:1412],
)
def pdirty(self, other) -> bool:
"""True iff any defined parameter has changed"""
self_gains = (
self._stripGaindB100Layer1
+ self._stripGaindB100Layer2
+ self._stripGaindB100Layer3
+ self._stripGaindB100Layer4
+ self._stripGaindB100Layer5
+ self._stripGaindB100Layer6
+ self._stripGaindB100Layer7
+ self._stripGaindB100Layer8
)
other_gains = (
other._stripGaindB100Layer1
+ other._stripGaindB100Layer2
+ other._stripGaindB100Layer3
+ other._stripGaindB100Layer4
+ other._stripGaindB100Layer5
+ other._stripGaindB100Layer6
+ other._stripGaindB100Layer7
+ other._stripGaindB100Layer8
)
return (
self._stripState != other._stripState
or self._busState != other._busState
or self_gains != other_gains
or self._busGaindB100 != other._busGaindB100
or self._stripLabelUTF8c60 != other._stripLabelUTF8c60
or self._busLabelUTF8c60 != other._busLabelUTF8c60
)
def ldirty(self, strip_cache, bus_cache) -> bool:
"""True iff any level has changed, ignoring changes when levels are very quiet"""
self._strip_comp, self._bus_comp = (
tuple(not val for val in comp(strip_cache, self.strip_levels)),
tuple(not val for val in comp(bus_cache, self.bus_levels)),
)
return any(self._strip_comp) or any(self._bus_comp)
@property
def strip_levels(self) -> tuple[float, ...]:
"""Returns strip levels in dB"""
return tuple(
round(
int.from_bytes(self._inputLeveldB100[i : i + 2], 'little', signed=True)
* 0.01,
1,
)
for i in range(0, len(self._inputLeveldB100), 2)
)[: self._kind.num_strip_levels]
@property
def bus_levels(self) -> tuple[float, ...]:
"""Returns bus levels in dB"""
return tuple(
round(
int.from_bytes(self._outputLeveldB100[i : i + 2], 'little', signed=True)
* 0.01,
1,
)
for i in range(0, len(self._outputLeveldB100), 2)
)[: self._kind.num_bus_levels]
@property
def levels(self) -> Levels:
"""Returns strip and bus levels as a namedtuple"""
return Levels(strip=self.strip_levels, bus=self.bus_levels)
@property
def states(self) -> States:
"""returns States object with processed strip and bus channel states"""
return States(
strip=tuple(
ChannelState(self._stripState[i : i + 4]) for i in range(0, 32, 4)
),
bus=tuple(ChannelState(self._busState[i : i + 4]) for i in range(0, 32, 4)),
)
@property
def gainlayers(self) -> tuple:
"""returns tuple of all strip gain layers as tuples"""
return tuple(
tuple(
round(
int.from_bytes(
getattr(self, f'_stripGaindB100Layer{layer}')[i : i + 2],
'little',
signed=True,
)
* 0.01,
2,
)
for i in range(0, 16, 2)
)
for layer in range(1, 9)
)
@property
def busgain(self) -> tuple:
"""returns tuple of bus gains"""
return tuple(
round(
int.from_bytes(self._busGaindB100[i : i + 2], 'little', signed=True)
* 0.01,
2,
)
for i in range(0, 16, 2)
)
@property
def labels(self) -> Labels:
"""returns Labels namedtuple of strip and bus labels"""
def _extract_labels_from_bytes(label_bytes: bytes) -> tuple[str, ...]:
"""Extract null-terminated UTF-8 labels from 60-byte chunks"""
labels = []
for i in range(0, len(label_bytes), 60):
chunk = label_bytes[i : i + 60]
null_pos = chunk.find(b'\x00')
if null_pos == -1:
try:
label = chunk.decode('utf-8', errors='replace').rstrip('\x00')
except UnicodeDecodeError:
label = ''
else:
try:
label = (
chunk[:null_pos].decode('utf-8', errors='replace')
if null_pos > 0
else ''
)
except UnicodeDecodeError:
label = ''
labels.append(label)
return tuple(labels)
return Labels(
strip=_extract_labels_from_bytes(self._stripLabelUTF8c60),
bus=_extract_labels_from_bytes(self._busLabelUTF8c60),
)
class Audibility(NamedTuple):
knob: float
comp: float
gate: float
denoiser: float
class Positions(NamedTuple):
pan_x: float
pan_y: float
color_x: float
color_y: float
fx1: float
fx2: float
class EqGains(NamedTuple):
bass: float
mid: float
treble: float
class ParametricEQSettings(NamedTuple):
on: bool
type: int
gain: float
freq: float
q: float
class Sends(NamedTuple):
reverb: float
delay: float
fx1: float
fx2: float
class CompressorSettings(NamedTuple):
gain_in: float
attack_ms: float
release_ms: float
n_knee: float
ratio: float
threshold: float
c_enabled: bool
makeup: bool
gain_out: float
class GateSettings(NamedTuple):
threshold_in: float
damping_max: float
bp_sidechain: bool
attack_ms: float
hold_ms: float
release_ms: float
class DenoiserSettings(NamedTuple):
threshold: float
class PitchSettings(NamedTuple):
enabled: bool
dry_wet: float
value: float
formant_lo: float
formant_med: float
formant_high: float
@dataclass
class VbanVMParamStrip:
"""Represents the VBAN_VMPARAMSTRIP_PACKET structure"""
_mode: bytes
_dblevel: bytes
_audibility: bytes
_pos3D_x: bytes
_pos3D_y: bytes
_posColor_x: bytes
_posColor_y: bytes
_EQgain1: bytes
_EQgain2: bytes
_EQgain3: bytes
# First channel parametric EQ
_PEQ_eqOn: bytes
_PEQ_eqtype: bytes
_PEQ_eqgain: bytes
_PEQ_eqfreq: bytes
_PEQ_eqq: bytes
_audibility_c: bytes
_audibility_g: bytes
_audibility_d: bytes
_posMod_x: bytes
_posMod_y: bytes
_send_reverb: bytes
_send_delay: bytes
_send_fx1: bytes
_send_fx2: bytes
_dblimit: bytes
_nKaraoke: bytes
_COMP_gain_in: bytes
_COMP_attack_ms: bytes
_COMP_release_ms: bytes
_COMP_n_knee: bytes
_COMP_comprate: bytes
_COMP_threshold: bytes
_COMP_c_enabled: bytes
_COMP_c_auto: bytes
_COMP_gain_out: bytes
_GATE_dBThreshold_in: bytes
_GATE_dBDamping_max: bytes
_GATE_BP_Sidechain: bytes
_GATE_attack_ms: bytes
_GATE_hold_ms: bytes
_GATE_release_ms: bytes
_DenoiserThreshold: bytes
_PitchEnabled: bytes
_Pitch_DryWet: bytes
_Pitch_Value: bytes
_Pitch_formant_lo: bytes
_Pitch_formant_med: bytes
_Pitch_formant_high: bytes
@classmethod
def from_bytes(cls, data: bytes):
return cls(
_mode=data[0:4],
_dblevel=data[4:8],
_audibility=data[8:10],
_pos3D_x=data[10:12],
_pos3D_y=data[12:14],
_posColor_x=data[14:16],
_posColor_y=data[16:18],
_EQgain1=data[18:20],
_EQgain2=data[20:22],
_EQgain3=data[22:24],
_PEQ_eqOn=data[24:30],
_PEQ_eqtype=data[30:36],
_PEQ_eqgain=data[36:60],
_PEQ_eqfreq=data[60:84],
_PEQ_eqq=data[84:108],
_audibility_c=data[108:110],
_audibility_g=data[110:112],
_audibility_d=data[112:114],
_posMod_x=data[114:116],
_posMod_y=data[116:118],
_send_reverb=data[118:120],
_send_delay=data[120:122],
_send_fx1=data[122:124],
_send_fx2=data[124:126],
_dblimit=data[126:128],
_nKaraoke=data[128:130],
_COMP_gain_in=data[130:132],
_COMP_attack_ms=data[132:134],
_COMP_release_ms=data[134:136],
_COMP_n_knee=data[136:138],
_COMP_comprate=data[138:140],
_COMP_threshold=data[140:142],
_COMP_c_enabled=data[142:144],
_COMP_c_auto=data[144:146],
_COMP_gain_out=data[146:148],
_GATE_dBThreshold_in=data[148:150],
_GATE_dBDamping_max=data[150:152],
_GATE_BP_Sidechain=data[152:154],
_GATE_attack_ms=data[154:156],
_GATE_hold_ms=data[156:158],
_GATE_release_ms=data[158:160],
_DenoiserThreshold=data[160:162],
_PitchEnabled=data[162:164],
_Pitch_DryWet=data[164:166],
_Pitch_Value=data[166:168],
_Pitch_formant_lo=data[168:170],
_Pitch_formant_med=data[170:172],
_Pitch_formant_high=data[172:174],
)
@property
def mode(self) -> int:
return int.from_bytes(self._mode, 'little')
@property
def audibility(self) -> Audibility:
return Audibility(
round(int.from_bytes(self._audibility, 'little', signed=True) * 0.01, 2),
round(int.from_bytes(self._audibility_c, 'little', signed=True) * 0.01, 2),
round(int.from_bytes(self._audibility_g, 'little', signed=True) * 0.01, 2),
round(int.from_bytes(self._audibility_d, 'little', signed=True) * 0.01, 2),
)
@property
def positions(self) -> Positions:
return Positions(
round(int.from_bytes(self._pos3D_x, 'little', signed=True) * 0.01, 2),
round(int.from_bytes(self._pos3D_y, 'little', signed=True) * 0.01, 2),
round(int.from_bytes(self._posColor_x, 'little', signed=True) * 0.01, 2),
round(int.from_bytes(self._posColor_y, 'little', signed=True) * 0.01, 2),
round(int.from_bytes(self._posMod_x, 'little', signed=True) * 0.01, 2),
round(int.from_bytes(self._posMod_y, 'little', signed=True) * 0.01, 2),
)
@property
def eqgains(self) -> EqGains:
return EqGains(
*[
round(
int.from_bytes(getattr(self, f'_EQgain{i}'), 'little', signed=True)
* 0.01,
2,
)
for i in range(1, 4)
]
)
@property
def parametric_eq(self) -> tuple[ParametricEQSettings, ...]:
return tuple(
ParametricEQSettings(
on=bool(int.from_bytes(self._PEQ_eqOn[i : i + 1], 'little')),
type=int.from_bytes(self._PEQ_eqtype[i : i + 1], 'little'),
freq=struct.unpack('<f', self._PEQ_eqfreq[i * 4 : (i + 1) * 4])[0],
gain=struct.unpack('<f', self._PEQ_eqgain[i * 4 : (i + 1) * 4])[0],
q=struct.unpack('<f', self._PEQ_eqq[i * 4 : (i + 1) * 4])[0],
)
for i in range(6)
)
@property
def sends(self) -> Sends:
return Sends(
round(int.from_bytes(self._send_reverb, 'little', signed=True) * 0.01, 2),
round(int.from_bytes(self._send_delay, 'little', signed=True) * 0.01, 2),
round(int.from_bytes(self._send_fx1, 'little', signed=True) * 0.01, 2),
round(int.from_bytes(self._send_fx2, 'little', signed=True) * 0.01, 2),
)
@property
def karaoke(self) -> int:
return int.from_bytes(self._nKaraoke, 'little')
@property
def compressor(self) -> CompressorSettings:
return CompressorSettings(
gain_in=round(
int.from_bytes(self._COMP_gain_in, 'little', signed=True) * 0.01, 2
),
attack_ms=round(int.from_bytes(self._COMP_attack_ms, 'little') * 0.1, 2),
release_ms=round(int.from_bytes(self._COMP_release_ms, 'little') * 0.1, 2),
n_knee=round(int.from_bytes(self._COMP_n_knee, 'little') * 0.01, 2),
ratio=round(int.from_bytes(self._COMP_comprate, 'little') * 0.01, 2),
threshold=round(
int.from_bytes(self._COMP_threshold, 'little', signed=True) * 0.01, 2
),
c_enabled=bool(int.from_bytes(self._COMP_c_enabled, 'little')),
makeup=bool(int.from_bytes(self._COMP_c_auto, 'little')),
gain_out=round(
int.from_bytes(self._COMP_gain_out, 'little', signed=True) * 0.01, 2
),
)
@property
def gate(self) -> GateSettings:
return GateSettings(
threshold_in=round(
int.from_bytes(self._GATE_dBThreshold_in, 'little', signed=True) * 0.01,
2,
),
damping_max=round(
int.from_bytes(self._GATE_dBDamping_max, 'little', signed=True) * 0.01,
2,
),
bp_sidechain=round(
int.from_bytes(self._GATE_BP_Sidechain, 'little') * 0.1, 2
),
attack_ms=round(int.from_bytes(self._GATE_attack_ms, 'little') * 0.1, 2),
hold_ms=round(int.from_bytes(self._GATE_hold_ms, 'little') * 0.1, 2),
release_ms=round(int.from_bytes(self._GATE_release_ms, 'little') * 0.1, 2),
)
@property
def denoiser(self) -> DenoiserSettings:
return DenoiserSettings(
threshold=round(
int.from_bytes(self._DenoiserThreshold, 'little', signed=True) * 0.01, 2
)
)
@property
def pitch(self) -> PitchSettings:
return PitchSettings(
enabled=bool(int.from_bytes(self._PitchEnabled, 'little')),
dry_wet=round(
int.from_bytes(self._Pitch_DryWet, 'little', signed=True) * 0.01, 2
),
value=round(
int.from_bytes(self._Pitch_Value, 'little', signed=True) * 0.01, 2
),
formant_lo=round(
int.from_bytes(self._Pitch_formant_lo, 'little', signed=True) * 0.01, 2
),
formant_med=round(
int.from_bytes(self._Pitch_formant_med, 'little', signed=True) * 0.01, 2
),
formant_high=round(
int.from_bytes(self._Pitch_formant_high, 'little', signed=True) * 0.01,
2,
),
)
@dataclass
class VbanPacketNBS1(VbanPacket):
"""Represents the body of a VBAN data packet with ident:1"""
strips: tuple[VbanVMParamStrip, ...]
@classmethod
def from_bytes(
cls,
nbs: NBS,
kind: KindMapClass,
data: bytes,
):
return cls(
nbs=nbs,
_kind=kind,
_voicemeeterType=data[28:29],
_reserved=data[29:30],
_buffersize=data[30:32],
_voicemeeterVersion=data[32:36],
_optionBits=data[36:40],
_samplerate=data[40:44],
strips=tuple(
VbanVMParamStrip.from_bytes(
data[44 + i * VMPARAMSTRIP_SIZE : 44 + (i + 1) * VMPARAMSTRIP_SIZE]
)
for i in range(16)
),
)
@dataclass
class SubscribeHeader:
"""Represents the header of an RT subscription packet"""
nbs: NBS = NBS.zero
name: str = 'Register-RTP'
timeout: int = 15
@property
def vban(self) -> bytes:
return b'VBAN'
@property
def format_sr(self) -> bytes:
return VBAN_PROTOCOL_SERVICE.to_bytes(1, 'little')
@property
def format_nbs(self) -> bytes:
return (self.nbs.value & 0xFF).to_bytes(1, 'little')
@property
def format_nbc(self) -> bytes:
return VBAN_SERVICE_RTPACKETREGISTER.to_bytes(1, 'little')
@property
def format_bit(self) -> bytes:
return (self.timeout & 0xFF).to_bytes(1, 'little')
@property
def streamname(self) -> bytes:
return self.name.encode('ascii') + bytes(16 - len(self.name))
@classmethod
def to_bytes(cls, nbs: NBS, framecounter: int) -> bytes:
header = cls(nbs=nbs)
data = bytearray()
data.extend(header.vban)
data.extend(header.format_sr)
data.extend(header.format_nbs)
data.extend(header.format_nbc)
data.extend(header.format_bit)
data.extend(header.streamname)
data.extend(framecounter.to_bytes(4, 'little'))
return bytes(data)
@dataclass
class VbanRtPacketHeader:
"""Represents the header of an RT response packet"""
name: str = 'Voicemeeter-RTP'
format_sr: int = VBAN_PROTOCOL_SERVICE
format_nbs: int = 0
format_nbc: int = VBAN_SERVICE_RTPACKET
format_bit: int = 0
@property
def vban(self) -> bytes:
return b'VBAN'
@property
def streamname(self) -> bytes:
return self.name.encode('ascii') + bytes(16 - len(self.name))
@classmethod
def from_bytes(cls, data: bytes):
if len(data) < HEADER_SIZE:
raise ValueError('Data is too short to be a valid VbanRTPPacketHeader')
name = data[8:24].rstrip(b'\x00').decode('utf-8')
return cls(
name=name,
format_sr=data[4] & VBAN_SERVICE_MASK,
format_nbs=data[5],
format_nbc=data[6],
format_bit=data[7],
)
@dataclass
class RequestHeader:
"""Represents the header of an RT request packet"""
name: str
bps_index: int
channel: int
framecounter: int = 0
@property
def vban(self) -> bytes:
return b'VBAN'
@property
def sr(self) -> bytes:
return (VBAN_PROTOCOL_TXT + self.bps_index).to_bytes(1, 'little')
@property
def nbs(self) -> bytes:
return (0).to_bytes(1, 'little')
@property
def nbc(self) -> bytes:
return (self.channel).to_bytes(1, 'little')
@property
def bit(self) -> bytes:
return (0x10).to_bytes(1, 'little')
@property
def streamname(self) -> bytes:
return self.name.encode() + bytes(16 - len(self.name))
@classmethod
def to_bytes(
cls, name: str, bps_index: int, channel: int, framecounter: int
) -> bytes:
header = cls(
name=name, bps_index=bps_index, channel=channel, framecounter=framecounter
)
data = bytearray()
data.extend(header.vban)
data.extend(header.sr)
data.extend(header.nbs)
data.extend(header.nbc)
data.extend(header.bit)
data.extend(header.streamname)
data.extend(header.framecounter.to_bytes(4, 'little'))
return bytes(data)

184
vban_cmd/packet/headers.py Normal file
View File

@ -0,0 +1,184 @@
from dataclasses import dataclass
from vban_cmd.enums import NBS
from vban_cmd.kinds import KindMapClass
VBAN_PROTOCOL_TXT = 0x40
VBAN_PROTOCOL_SERVICE = 0x60
VBAN_SERVICE_RTPACKETREGISTER = 32
VBAN_SERVICE_RTPACKET = 33
VBAN_SERVICE_MASK = 0xE0
MAX_PACKET_SIZE = 1436
HEADER_SIZE = 4 + 1 + 1 + 1 + 1 + 16
@dataclass
class VbanPacket:
"""Represents the header of an incoming VBAN data packet"""
nbs: NBS
_kind: KindMapClass
_voicemeeterType: bytes
_reserved: bytes
_buffersize: bytes
_voicemeeterVersion: bytes
_optionBits: bytes
_samplerate: bytes
@property
def voicemeetertype(self) -> str:
"""returns voicemeeter type as a string"""
return ['', 'basic', 'banana', 'potato'][
int.from_bytes(self._voicemeeterType, 'little')
]
@property
def voicemeeterversion(self) -> tuple:
"""returns voicemeeter version as a tuple"""
return tuple(self._voicemeeterVersion[i] for i in range(3, -1, -1))
@property
def samplerate(self) -> int:
"""returns samplerate as an int"""
return int.from_bytes(self._samplerate, 'little')
@dataclass
class VbanSubscribeHeader:
"""Represents the header of a subscription packet"""
nbs: NBS = NBS.zero
name: str = 'Register-RTP'
timeout: int = 15
@property
def vban(self) -> bytes:
return b'VBAN'
@property
def format_sr(self) -> bytes:
return VBAN_PROTOCOL_SERVICE.to_bytes(1, 'little')
@property
def format_nbs(self) -> bytes:
return (self.nbs.value & 0xFF).to_bytes(1, 'little')
@property
def format_nbc(self) -> bytes:
return VBAN_SERVICE_RTPACKETREGISTER.to_bytes(1, 'little')
@property
def format_bit(self) -> bytes:
return (self.timeout & 0xFF).to_bytes(1, 'little')
@property
def streamname(self) -> bytes:
return self.name.encode('ascii') + bytes(16 - len(self.name))
@classmethod
def to_bytes(cls, nbs: NBS, framecounter: int) -> bytes:
header = cls(nbs=nbs)
data = bytearray()
data.extend(header.vban)
data.extend(header.format_sr)
data.extend(header.format_nbs)
data.extend(header.format_nbc)
data.extend(header.format_bit)
data.extend(header.streamname)
data.extend(framecounter.to_bytes(4, 'little'))
return bytes(data)
@dataclass
class VbanResponseHeader:
"""Represents the header of a response packet"""
name: str = 'Voicemeeter-RTP'
format_sr: int = VBAN_PROTOCOL_SERVICE
format_nbs: int = 0
format_nbc: int = VBAN_SERVICE_RTPACKET
format_bit: int = 0
@property
def vban(self) -> bytes:
return b'VBAN'
@property
def streamname(self) -> bytes:
return self.name.encode('ascii') + bytes(16 - len(self.name))
@classmethod
def from_bytes(cls, data: bytes):
if len(data) < HEADER_SIZE:
raise ValueError('Data is too short to be a valid VbanResponseHeader')
name = data[8:24].rstrip(b'\x00').decode('utf-8')
return cls(
name=name,
format_sr=data[4] & VBAN_SERVICE_MASK,
format_nbs=data[5],
format_nbc=data[6],
format_bit=data[7],
)
@dataclass
class VbanRequestHeader:
"""Represents the header of a request packet"""
name: str
bps_index: int
channel: int
framecounter: int = 0
@property
def vban(self) -> bytes:
return b'VBAN'
@property
def sr(self) -> bytes:
return (VBAN_PROTOCOL_TXT + self.bps_index).to_bytes(1, 'little')
@property
def nbs(self) -> bytes:
return (0).to_bytes(1, 'little')
@property
def nbc(self) -> bytes:
return (self.channel).to_bytes(1, 'little')
@property
def bit(self) -> bytes:
return (0x10).to_bytes(1, 'little')
@property
def streamname(self) -> bytes:
return self.name.encode() + bytes(16 - len(self.name))
@classmethod
def to_bytes(
cls, name: str, bps_index: int, channel: int, framecounter: int
) -> bytes:
header = cls(
name=name, bps_index=bps_index, channel=channel, framecounter=framecounter
)
data = bytearray()
data.extend(header.vban)
data.extend(header.sr)
data.extend(header.nbs)
data.extend(header.nbc)
data.extend(header.bit)
data.extend(header.streamname)
data.extend(header.framecounter.to_bytes(4, 'little'))
return bytes(data)
@classmethod
def encode_with_payload(
cls, name: str, bps_index: int, channel: int, framecounter: int, payload: str
) -> bytes:
"""Creates the complete packet with header and payload."""
return cls.to_bytes(name, bps_index, channel, framecounter) + payload.encode()

288
vban_cmd/packet/nbs0.py Normal file
View File

@ -0,0 +1,288 @@
from dataclasses import dataclass
from typing import NamedTuple
from vban_cmd.enums import NBS
from vban_cmd.kinds import KindMapClass
from vban_cmd.util import comp
from .headers import VbanPacket
class Levels(NamedTuple):
strip: tuple[float, ...]
bus: tuple[float, ...]
class ChannelState:
"""Represents the processed state of a single strip or bus channel"""
def __init__(self, state_bytes: bytes):
# Convert 4-byte state to integer once for efficient lookups
self._state = int.from_bytes(state_bytes, 'little')
def get_mode(self, mode_value: int) -> bool:
"""Get boolean state for a specific mode"""
return (self._state & mode_value) != 0
def get_mode_int(self, mode_value: int) -> int:
"""Get integer state for a specific mode"""
return self._state & mode_value
# Common boolean modes
@property
def mute(self) -> bool:
return (self._state & 0x00000001) != 0
@property
def solo(self) -> bool:
return (self._state & 0x00000002) != 0
@property
def mono(self) -> bool:
return (self._state & 0x00000004) != 0
@property
def mc(self) -> bool:
return (self._state & 0x00000008) != 0
# EQ modes
@property
def eq_on(self) -> bool:
return (self._state & 0x00000100) != 0
@property
def eq_ab(self) -> bool:
return (self._state & 0x00000800) != 0
# Bus assignments (strip to bus routing)
@property
def busa1(self) -> bool:
return (self._state & 0x00001000) != 0
@property
def busa2(self) -> bool:
return (self._state & 0x00002000) != 0
@property
def busa3(self) -> bool:
return (self._state & 0x00004000) != 0
@property
def busa4(self) -> bool:
return (self._state & 0x00008000) != 0
@property
def busb1(self) -> bool:
return (self._state & 0x00010000) != 0
@property
def busb2(self) -> bool:
return (self._state & 0x00020000) != 0
@property
def busb3(self) -> bool:
return (self._state & 0x00040000) != 0
class States(NamedTuple):
strip: tuple[ChannelState, ...]
bus: tuple[ChannelState, ...]
class Labels(NamedTuple):
strip: tuple[str, ...]
bus: tuple[str, ...]
@dataclass
class VbanPacketNBS0(VbanPacket):
"""Represents the body of a VBAN data packet with ident:0"""
_inputLeveldB100: bytes
_outputLeveldB100: bytes
_TransportBit: bytes
_stripState: bytes
_busState: bytes
_stripGaindB100Layer1: bytes
_stripGaindB100Layer2: bytes
_stripGaindB100Layer3: bytes
_stripGaindB100Layer4: bytes
_stripGaindB100Layer5: bytes
_stripGaindB100Layer6: bytes
_stripGaindB100Layer7: bytes
_stripGaindB100Layer8: bytes
_busGaindB100: bytes
_stripLabelUTF8c60: bytes
_busLabelUTF8c60: bytes
@classmethod
def from_bytes(cls, nbs: NBS, kind: KindMapClass, data: bytes):
return cls(
nbs=nbs,
_kind=kind,
_voicemeeterType=data[28:29],
_reserved=data[29:30],
_buffersize=data[30:32],
_voicemeeterVersion=data[32:36],
_optionBits=data[36:40],
_samplerate=data[40:44],
_inputLeveldB100=data[44:112],
_outputLeveldB100=data[112:240],
_TransportBit=data[240:244],
_stripState=data[244:276],
_busState=data[276:308],
_stripGaindB100Layer1=data[308:324],
_stripGaindB100Layer2=data[324:340],
_stripGaindB100Layer3=data[340:356],
_stripGaindB100Layer4=data[356:372],
_stripGaindB100Layer5=data[372:388],
_stripGaindB100Layer6=data[388:404],
_stripGaindB100Layer7=data[404:420],
_stripGaindB100Layer8=data[420:436],
_busGaindB100=data[436:452],
_stripLabelUTF8c60=data[452:932],
_busLabelUTF8c60=data[932:1412],
)
def pdirty(self, other) -> bool:
"""True iff any defined parameter has changed"""
self_gains = (
self._stripGaindB100Layer1
+ self._stripGaindB100Layer2
+ self._stripGaindB100Layer3
+ self._stripGaindB100Layer4
+ self._stripGaindB100Layer5
+ self._stripGaindB100Layer6
+ self._stripGaindB100Layer7
+ self._stripGaindB100Layer8
)
other_gains = (
other._stripGaindB100Layer1
+ other._stripGaindB100Layer2
+ other._stripGaindB100Layer3
+ other._stripGaindB100Layer4
+ other._stripGaindB100Layer5
+ other._stripGaindB100Layer6
+ other._stripGaindB100Layer7
+ other._stripGaindB100Layer8
)
return (
self._stripState != other._stripState
or self._busState != other._busState
or self_gains != other_gains
or self._busGaindB100 != other._busGaindB100
or self._stripLabelUTF8c60 != other._stripLabelUTF8c60
or self._busLabelUTF8c60 != other._busLabelUTF8c60
)
def ldirty(self, strip_cache, bus_cache) -> bool:
"""True iff any level has changed, ignoring changes when levels are very quiet"""
self._strip_comp, self._bus_comp = (
tuple(not val for val in comp(strip_cache, self.strip_levels)),
tuple(not val for val in comp(bus_cache, self.bus_levels)),
)
return any(self._strip_comp) or any(self._bus_comp)
@property
def strip_levels(self) -> tuple[float, ...]:
"""Returns strip levels in dB"""
return tuple(
round(
int.from_bytes(self._inputLeveldB100[i : i + 2], 'little', signed=True)
* 0.01,
1,
)
for i in range(0, len(self._inputLeveldB100), 2)
)[: self._kind.num_strip_levels]
@property
def bus_levels(self) -> tuple[float, ...]:
"""Returns bus levels in dB"""
return tuple(
round(
int.from_bytes(self._outputLeveldB100[i : i + 2], 'little', signed=True)
* 0.01,
1,
)
for i in range(0, len(self._outputLeveldB100), 2)
)[: self._kind.num_bus_levels]
@property
def levels(self) -> Levels:
"""Returns strip and bus levels as a namedtuple"""
return Levels(strip=self.strip_levels, bus=self.bus_levels)
@property
def states(self) -> States:
"""returns States object with processed strip and bus channel states"""
return States(
strip=tuple(
ChannelState(self._stripState[i : i + 4]) for i in range(0, 32, 4)
),
bus=tuple(ChannelState(self._busState[i : i + 4]) for i in range(0, 32, 4)),
)
@property
def gainlayers(self) -> tuple:
"""returns tuple of all strip gain layers as tuples"""
return tuple(
tuple(
round(
int.from_bytes(
getattr(self, f'_stripGaindB100Layer{layer}')[i : i + 2],
'little',
signed=True,
)
* 0.01,
2,
)
for i in range(0, 16, 2)
)
for layer in range(1, 9)
)
@property
def busgain(self) -> tuple:
"""returns tuple of bus gains"""
return tuple(
round(
int.from_bytes(self._busGaindB100[i : i + 2], 'little', signed=True)
* 0.01,
2,
)
for i in range(0, 16, 2)
)
@property
def labels(self) -> Labels:
"""returns Labels namedtuple of strip and bus labels"""
def _extract_labels_from_bytes(label_bytes: bytes) -> tuple[str, ...]:
"""Extract null-terminated UTF-8 labels from 60-byte chunks"""
labels = []
for i in range(0, len(label_bytes), 60):
chunk = label_bytes[i : i + 60]
null_pos = chunk.find(b'\x00')
if null_pos == -1:
try:
label = chunk.decode('utf-8', errors='replace').rstrip('\x00')
except UnicodeDecodeError:
label = ''
else:
try:
label = (
chunk[:null_pos].decode('utf-8', errors='replace')
if null_pos > 0
else ''
)
except UnicodeDecodeError:
label = ''
labels.append(label)
return tuple(labels)
return Labels(
strip=_extract_labels_from_bytes(self._stripLabelUTF8c60),
bus=_extract_labels_from_bytes(self._busLabelUTF8c60),
)

357
vban_cmd/packet/nbs1.py Normal file
View File

@ -0,0 +1,357 @@
import struct
from dataclasses import dataclass
from typing import NamedTuple
from vban_cmd.enums import NBS
from vban_cmd.kinds import KindMapClass
from .headers import VbanPacket
VMPARAMSTRIP_SIZE = 174
class Audibility(NamedTuple):
knob: float
comp: float
gate: float
denoiser: float
class Positions(NamedTuple):
pan_x: float
pan_y: float
color_x: float
color_y: float
fx1: float
fx2: float
class EqGains(NamedTuple):
bass: float
mid: float
treble: float
class ParametricEQSettings(NamedTuple):
on: bool
type: int
gain: float
freq: float
q: float
class Sends(NamedTuple):
reverb: float
delay: float
fx1: float
fx2: float
class CompressorSettings(NamedTuple):
gain_in: float
attack_ms: float
release_ms: float
n_knee: float
ratio: float
threshold: float
c_enabled: bool
makeup: bool
gain_out: float
class GateSettings(NamedTuple):
threshold_in: float
damping_max: float
bp_sidechain: bool
attack_ms: float
hold_ms: float
release_ms: float
class DenoiserSettings(NamedTuple):
threshold: float
class PitchSettings(NamedTuple):
enabled: bool
dry_wet: float
value: float
formant_lo: float
formant_med: float
formant_high: float
@dataclass
class VbanVMParamStrip:
"""Represents the VBAN_VMPARAMSTRIP_PACKET structure"""
_mode: bytes
_dblevel: bytes
_audibility: bytes
_pos3D_x: bytes
_pos3D_y: bytes
_posColor_x: bytes
_posColor_y: bytes
_EQgain1: bytes
_EQgain2: bytes
_EQgain3: bytes
# First channel parametric EQ
_PEQ_eqOn: bytes
_PEQ_eqtype: bytes
_PEQ_eqgain: bytes
_PEQ_eqfreq: bytes
_PEQ_eqq: bytes
_audibility_c: bytes
_audibility_g: bytes
_audibility_d: bytes
_posMod_x: bytes
_posMod_y: bytes
_send_reverb: bytes
_send_delay: bytes
_send_fx1: bytes
_send_fx2: bytes
_dblimit: bytes
_nKaraoke: bytes
_COMP_gain_in: bytes
_COMP_attack_ms: bytes
_COMP_release_ms: bytes
_COMP_n_knee: bytes
_COMP_comprate: bytes
_COMP_threshold: bytes
_COMP_c_enabled: bytes
_COMP_c_auto: bytes
_COMP_gain_out: bytes
_GATE_dBThreshold_in: bytes
_GATE_dBDamping_max: bytes
_GATE_BP_Sidechain: bytes
_GATE_attack_ms: bytes
_GATE_hold_ms: bytes
_GATE_release_ms: bytes
_DenoiserThreshold: bytes
_PitchEnabled: bytes
_Pitch_DryWet: bytes
_Pitch_Value: bytes
_Pitch_formant_lo: bytes
_Pitch_formant_med: bytes
_Pitch_formant_high: bytes
@classmethod
def from_bytes(cls, data: bytes):
return cls(
_mode=data[0:4],
_dblevel=data[4:8],
_audibility=data[8:10],
_pos3D_x=data[10:12],
_pos3D_y=data[12:14],
_posColor_x=data[14:16],
_posColor_y=data[16:18],
_EQgain1=data[18:20],
_EQgain2=data[20:22],
_EQgain3=data[22:24],
_PEQ_eqOn=data[24:30],
_PEQ_eqtype=data[30:36],
_PEQ_eqgain=data[36:60],
_PEQ_eqfreq=data[60:84],
_PEQ_eqq=data[84:108],
_audibility_c=data[108:110],
_audibility_g=data[110:112],
_audibility_d=data[112:114],
_posMod_x=data[114:116],
_posMod_y=data[116:118],
_send_reverb=data[118:120],
_send_delay=data[120:122],
_send_fx1=data[122:124],
_send_fx2=data[124:126],
_dblimit=data[126:128],
_nKaraoke=data[128:130],
_COMP_gain_in=data[130:132],
_COMP_attack_ms=data[132:134],
_COMP_release_ms=data[134:136],
_COMP_n_knee=data[136:138],
_COMP_comprate=data[138:140],
_COMP_threshold=data[140:142],
_COMP_c_enabled=data[142:144],
_COMP_c_auto=data[144:146],
_COMP_gain_out=data[146:148],
_GATE_dBThreshold_in=data[148:150],
_GATE_dBDamping_max=data[150:152],
_GATE_BP_Sidechain=data[152:154],
_GATE_attack_ms=data[154:156],
_GATE_hold_ms=data[156:158],
_GATE_release_ms=data[158:160],
_DenoiserThreshold=data[160:162],
_PitchEnabled=data[162:164],
_Pitch_DryWet=data[164:166],
_Pitch_Value=data[166:168],
_Pitch_formant_lo=data[168:170],
_Pitch_formant_med=data[170:172],
_Pitch_formant_high=data[172:174],
)
@property
def mode(self) -> int:
return int.from_bytes(self._mode, 'little')
@property
def audibility(self) -> Audibility:
return Audibility(
round(int.from_bytes(self._audibility, 'little', signed=True) * 0.01, 2),
round(int.from_bytes(self._audibility_c, 'little', signed=True) * 0.01, 2),
round(int.from_bytes(self._audibility_g, 'little', signed=True) * 0.01, 2),
round(int.from_bytes(self._audibility_d, 'little', signed=True) * 0.01, 2),
)
@property
def positions(self) -> Positions:
return Positions(
round(int.from_bytes(self._pos3D_x, 'little', signed=True) * 0.01, 2),
round(int.from_bytes(self._pos3D_y, 'little', signed=True) * 0.01, 2),
round(int.from_bytes(self._posColor_x, 'little', signed=True) * 0.01, 2),
round(int.from_bytes(self._posColor_y, 'little', signed=True) * 0.01, 2),
round(int.from_bytes(self._posMod_x, 'little', signed=True) * 0.01, 2),
round(int.from_bytes(self._posMod_y, 'little', signed=True) * 0.01, 2),
)
@property
def eqgains(self) -> EqGains:
return EqGains(
*[
round(
int.from_bytes(getattr(self, f'_EQgain{i}'), 'little', signed=True)
* 0.01,
2,
)
for i in range(1, 4)
]
)
@property
def parametric_eq(self) -> tuple[ParametricEQSettings, ...]:
return tuple(
ParametricEQSettings(
on=bool(int.from_bytes(self._PEQ_eqOn[i : i + 1], 'little')),
type=int.from_bytes(self._PEQ_eqtype[i : i + 1], 'little'),
freq=struct.unpack('<f', self._PEQ_eqfreq[i * 4 : (i + 1) * 4])[0],
gain=struct.unpack('<f', self._PEQ_eqgain[i * 4 : (i + 1) * 4])[0],
q=struct.unpack('<f', self._PEQ_eqq[i * 4 : (i + 1) * 4])[0],
)
for i in range(6)
)
@property
def sends(self) -> Sends:
return Sends(
round(int.from_bytes(self._send_reverb, 'little', signed=True) * 0.01, 2),
round(int.from_bytes(self._send_delay, 'little', signed=True) * 0.01, 2),
round(int.from_bytes(self._send_fx1, 'little', signed=True) * 0.01, 2),
round(int.from_bytes(self._send_fx2, 'little', signed=True) * 0.01, 2),
)
@property
def karaoke(self) -> int:
return int.from_bytes(self._nKaraoke, 'little')
@property
def compressor(self) -> CompressorSettings:
return CompressorSettings(
gain_in=round(
int.from_bytes(self._COMP_gain_in, 'little', signed=True) * 0.01, 2
),
attack_ms=round(int.from_bytes(self._COMP_attack_ms, 'little') * 0.1, 2),
release_ms=round(int.from_bytes(self._COMP_release_ms, 'little') * 0.1, 2),
n_knee=round(int.from_bytes(self._COMP_n_knee, 'little') * 0.01, 2),
ratio=round(int.from_bytes(self._COMP_comprate, 'little') * 0.01, 2),
threshold=round(
int.from_bytes(self._COMP_threshold, 'little', signed=True) * 0.01, 2
),
c_enabled=bool(int.from_bytes(self._COMP_c_enabled, 'little')),
makeup=bool(int.from_bytes(self._COMP_c_auto, 'little')),
gain_out=round(
int.from_bytes(self._COMP_gain_out, 'little', signed=True) * 0.01, 2
),
)
@property
def gate(self) -> GateSettings:
return GateSettings(
threshold_in=round(
int.from_bytes(self._GATE_dBThreshold_in, 'little', signed=True) * 0.01,
2,
),
damping_max=round(
int.from_bytes(self._GATE_dBDamping_max, 'little', signed=True) * 0.01,
2,
),
bp_sidechain=round(
int.from_bytes(self._GATE_BP_Sidechain, 'little') * 0.1, 2
),
attack_ms=round(int.from_bytes(self._GATE_attack_ms, 'little') * 0.1, 2),
hold_ms=round(int.from_bytes(self._GATE_hold_ms, 'little') * 0.1, 2),
release_ms=round(int.from_bytes(self._GATE_release_ms, 'little') * 0.1, 2),
)
@property
def denoiser(self) -> DenoiserSettings:
return DenoiserSettings(
threshold=round(
int.from_bytes(self._DenoiserThreshold, 'little', signed=True) * 0.01, 2
)
)
@property
def pitch(self) -> PitchSettings:
return PitchSettings(
enabled=bool(int.from_bytes(self._PitchEnabled, 'little')),
dry_wet=round(
int.from_bytes(self._Pitch_DryWet, 'little', signed=True) * 0.01, 2
),
value=round(
int.from_bytes(self._Pitch_Value, 'little', signed=True) * 0.01, 2
),
formant_lo=round(
int.from_bytes(self._Pitch_formant_lo, 'little', signed=True) * 0.01, 2
),
formant_med=round(
int.from_bytes(self._Pitch_formant_med, 'little', signed=True) * 0.01, 2
),
formant_high=round(
int.from_bytes(self._Pitch_formant_high, 'little', signed=True) * 0.01,
2,
),
)
@dataclass
class VbanPacketNBS1(VbanPacket):
"""Represents the body of a VBAN data packet with ident:1"""
strips: tuple[VbanVMParamStrip, ...]
@classmethod
def from_bytes(
cls,
nbs: NBS,
kind: KindMapClass,
data: bytes,
):
return cls(
nbs=nbs,
_kind=kind,
_voicemeeterType=data[28:29],
_reserved=data[29:30],
_buffersize=data[30:32],
_voicemeeterVersion=data[32:36],
_optionBits=data[36:40],
_samplerate=data[40:44],
strips=tuple(
VbanVMParamStrip.from_bytes(
data[44 + i * VMPARAMSTRIP_SIZE : 44 + (i + 1) * VMPARAMSTRIP_SIZE]
)
for i in range(16)
),
)

View File

@ -10,7 +10,7 @@ from typing import Union
from .enums import NBS from .enums import NBS
from .error import VBANCMDError from .error import VBANCMDError
from .event import Event from .event import Event
from .packet import RequestHeader from .packet.headers import VbanRequestHeader
from .subject import Subject from .subject import Subject
from .util import bump_framecounter, deep_merge, script from .util import bump_framecounter, deep_merge, script
from .worker import Producer, Subscriber, Updater from .worker import Producer, Subscriber, Updater
@ -120,37 +120,29 @@ class VbanCmd(abc.ABC):
def stopped(self): def stopped(self):
return self.stop_event is None or self.stop_event.is_set() return self.stop_event is None or self.stop_event.is_set()
def _set_rt(self, cmd: str, val: Union[str, float]): def _send_request(self, payload: str) -> None:
"""Sends a string request command over a network.""" """Sends a request packet over the network and bumps the framecounter."""
req_packet = RequestHeader.to_bytes(
name=self.streamname,
bps_index=self.BPS_OPTS.index(self.bps),
channel=self.channel,
framecounter=self._framecounter,
)
self.sock.sendto( self.sock.sendto(
req_packet + f'{cmd}={val};'.encode(), VbanRequestHeader.encode_with_payload(
name=self.streamname,
bps_index=self.BPS_OPTS.index(self.bps),
channel=self.channel,
framecounter=self._framecounter,
payload=payload,
),
(socket.gethostbyname(self.ip), self.port), (socket.gethostbyname(self.ip), self.port),
) )
self._framecounter = bump_framecounter(self._framecounter) self._framecounter = bump_framecounter(self._framecounter)
def _set_rt(self, cmd: str, val: Union[str, float]):
"""Sends a string request command over a network."""
self._send_request(f'{cmd}={val};')
self.cache[cmd] = val self.cache[cmd] = val
@script @script
def sendtext(self, script): def sendtext(self, script):
"""Sends a multiple parameter string over a network.""" """Sends a multiple parameter string over a network."""
req_packet = RequestHeader.to_bytes( self._send_request(script)
name=self.streamname,
bps_index=self.BPS_OPTS.index(self.bps),
channel=self.channel,
framecounter=self._framecounter,
)
self.sock.sendto(
req_packet + script.encode(),
(socket.gethostbyname(self.ip), self.port),
)
self._framecounter = bump_framecounter(self._framecounter)
self.logger.debug(f'sendtext: {script}') self.logger.debug(f'sendtext: {script}')
time.sleep(self.DELAY) time.sleep(self.DELAY)

View File

@ -5,16 +5,16 @@ import time
from .enums import NBS from .enums import NBS
from .error import VBANCMDConnectionError from .error import VBANCMDConnectionError
from .packet import ( from .packet.headers import (
HEADER_SIZE, HEADER_SIZE,
VBAN_PROTOCOL_SERVICE, VBAN_PROTOCOL_SERVICE,
VBAN_SERVICE_RTPACKET, VBAN_SERVICE_RTPACKET,
SubscribeHeader,
VbanPacket, VbanPacket,
VbanPacketNBS0, VbanResponseHeader,
VbanPacketNBS1, VbanSubscribeHeader,
VbanRtPacketHeader,
) )
from .packet.nbs0 import VbanPacketNBS0
from .packet.nbs1 import VbanPacketNBS1
from .util import bump_framecounter from .util import bump_framecounter
logger = logging.getLogger(__name__) logger = logging.getLogger(__name__)
@ -34,7 +34,7 @@ class Subscriber(threading.Thread):
while not self.stopped(): while not self.stopped():
try: try:
for nbs in NBS: for nbs in NBS:
sub_packet = SubscribeHeader().to_bytes(nbs, self._framecounter) sub_packet = VbanSubscribeHeader().to_bytes(nbs, self._framecounter)
self._remote.sock.sendto( self._remote.sock.sendto(
sub_packet, (self._remote.ip, self._remote.port) sub_packet, (self._remote.ip, self._remote.port)
) )
@ -90,7 +90,7 @@ class Producer(threading.Thread):
if len(data) < HEADER_SIZE: if len(data) < HEADER_SIZE:
return return
response_header = VbanRtPacketHeader.from_bytes(data[:HEADER_SIZE]) response_header = VbanResponseHeader.from_bytes(data[:HEADER_SIZE])
if ( if (
response_header.format_sr != VBAN_PROTOCOL_SERVICE response_header.format_sr != VBAN_PROTOCOL_SERVICE
or response_header.format_nbc != VBAN_SERVICE_RTPACKET or response_header.format_nbc != VBAN_SERVICE_RTPACKET