diff --git a/vban_cmd/packet.py b/vban_cmd/packet.py deleted file mode 100644 index da58ab1..0000000 --- a/vban_cmd/packet.py +++ /dev/null @@ -1,808 +0,0 @@ -import struct -from dataclasses import dataclass -from typing import NamedTuple - -from .enums import NBS -from .kinds import KindMapClass -from .util import comp - -VBAN_PROTOCOL_TXT = 0x40 -VBAN_PROTOCOL_SERVICE = 0x60 - -VBAN_SERVICE_RTPACKETREGISTER = 32 -VBAN_SERVICE_RTPACKET = 33 -VBAN_SERVICE_MASK = 0xE0 - -MAX_PACKET_SIZE = 1436 -HEADER_SIZE = 4 + 1 + 1 + 1 + 1 + 16 -VMPARAMSTRIP_SIZE = 174 - - -@dataclass -class VbanPacket: - """Represents the header of a VBAN data packet""" - - nbs: NBS - _kind: KindMapClass - _voicemeeterType: bytes - _reserved: bytes - _buffersize: bytes - _voicemeeterVersion: bytes - _optionBits: bytes - _samplerate: bytes - - @property - def voicemeetertype(self) -> str: - """returns voicemeeter type as a string""" - return ['', 'basic', 'banana', 'potato'][ - int.from_bytes(self._voicemeeterType, 'little') - ] - - @property - def voicemeeterversion(self) -> tuple: - """returns voicemeeter version as a tuple""" - return tuple(self._voicemeeterVersion[i] for i in range(3, -1, -1)) - - @property - def samplerate(self) -> int: - """returns samplerate as an int""" - return int.from_bytes(self._samplerate, 'little') - - -class Levels(NamedTuple): - strip: tuple[float, ...] - bus: tuple[float, ...] - - -class ChannelState: - """Represents the processed state of a single strip or bus channel""" - - def __init__(self, state_bytes: bytes): - # Convert 4-byte state to integer once for efficient lookups - self._state = int.from_bytes(state_bytes, 'little') - - def get_mode(self, mode_value: int) -> bool: - """Get boolean state for a specific mode""" - return (self._state & mode_value) != 0 - - def get_mode_int(self, mode_value: int) -> int: - """Get integer state for a specific mode""" - return self._state & mode_value - - # Common boolean modes - @property - def mute(self) -> bool: - return (self._state & 0x00000001) != 0 - - @property - def solo(self) -> bool: - return (self._state & 0x00000002) != 0 - - @property - def mono(self) -> bool: - return (self._state & 0x00000004) != 0 - - @property - def mc(self) -> bool: - return (self._state & 0x00000008) != 0 - - # EQ modes - @property - def eq_on(self) -> bool: - return (self._state & 0x00000100) != 0 - - @property - def eq_ab(self) -> bool: - return (self._state & 0x00000800) != 0 - - # Bus assignments (strip to bus routing) - @property - def busa1(self) -> bool: - return (self._state & 0x00001000) != 0 - - @property - def busa2(self) -> bool: - return (self._state & 0x00002000) != 0 - - @property - def busa3(self) -> bool: - return (self._state & 0x00004000) != 0 - - @property - def busa4(self) -> bool: - return (self._state & 0x00008000) != 0 - - @property - def busb1(self) -> bool: - return (self._state & 0x00010000) != 0 - - @property - def busb2(self) -> bool: - return (self._state & 0x00020000) != 0 - - @property - def busb3(self) -> bool: - return (self._state & 0x00040000) != 0 - - -class States(NamedTuple): - strip: tuple[ChannelState, ...] - bus: tuple[ChannelState, ...] - - -class Labels(NamedTuple): - strip: tuple[str, ...] - bus: tuple[str, ...] - - -@dataclass -class VbanPacketNBS0(VbanPacket): - """Represents the body of a VBAN data packet with ident:0""" - - _inputLeveldB100: bytes - _outputLeveldB100: bytes - _TransportBit: bytes - _stripState: bytes - _busState: bytes - _stripGaindB100Layer1: bytes - _stripGaindB100Layer2: bytes - _stripGaindB100Layer3: bytes - _stripGaindB100Layer4: bytes - _stripGaindB100Layer5: bytes - _stripGaindB100Layer6: bytes - _stripGaindB100Layer7: bytes - _stripGaindB100Layer8: bytes - _busGaindB100: bytes - _stripLabelUTF8c60: bytes - _busLabelUTF8c60: bytes - - @classmethod - def from_bytes(cls, nbs: NBS, kind: KindMapClass, data: bytes): - return cls( - nbs=nbs, - _kind=kind, - _voicemeeterType=data[28:29], - _reserved=data[29:30], - _buffersize=data[30:32], - _voicemeeterVersion=data[32:36], - _optionBits=data[36:40], - _samplerate=data[40:44], - _inputLeveldB100=data[44:112], - _outputLeveldB100=data[112:240], - _TransportBit=data[240:244], - _stripState=data[244:276], - _busState=data[276:308], - _stripGaindB100Layer1=data[308:324], - _stripGaindB100Layer2=data[324:340], - _stripGaindB100Layer3=data[340:356], - _stripGaindB100Layer4=data[356:372], - _stripGaindB100Layer5=data[372:388], - _stripGaindB100Layer6=data[388:404], - _stripGaindB100Layer7=data[404:420], - _stripGaindB100Layer8=data[420:436], - _busGaindB100=data[436:452], - _stripLabelUTF8c60=data[452:932], - _busLabelUTF8c60=data[932:1412], - ) - - def pdirty(self, other) -> bool: - """True iff any defined parameter has changed""" - - self_gains = ( - self._stripGaindB100Layer1 - + self._stripGaindB100Layer2 - + self._stripGaindB100Layer3 - + self._stripGaindB100Layer4 - + self._stripGaindB100Layer5 - + self._stripGaindB100Layer6 - + self._stripGaindB100Layer7 - + self._stripGaindB100Layer8 - ) - other_gains = ( - other._stripGaindB100Layer1 - + other._stripGaindB100Layer2 - + other._stripGaindB100Layer3 - + other._stripGaindB100Layer4 - + other._stripGaindB100Layer5 - + other._stripGaindB100Layer6 - + other._stripGaindB100Layer7 - + other._stripGaindB100Layer8 - ) - - return ( - self._stripState != other._stripState - or self._busState != other._busState - or self_gains != other_gains - or self._busGaindB100 != other._busGaindB100 - or self._stripLabelUTF8c60 != other._stripLabelUTF8c60 - or self._busLabelUTF8c60 != other._busLabelUTF8c60 - ) - - def ldirty(self, strip_cache, bus_cache) -> bool: - """True iff any level has changed, ignoring changes when levels are very quiet""" - self._strip_comp, self._bus_comp = ( - tuple(not val for val in comp(strip_cache, self.strip_levels)), - tuple(not val for val in comp(bus_cache, self.bus_levels)), - ) - return any(self._strip_comp) or any(self._bus_comp) - - @property - def strip_levels(self) -> tuple[float, ...]: - """Returns strip levels in dB""" - return tuple( - round( - int.from_bytes(self._inputLeveldB100[i : i + 2], 'little', signed=True) - * 0.01, - 1, - ) - for i in range(0, len(self._inputLeveldB100), 2) - )[: self._kind.num_strip_levels] - - @property - def bus_levels(self) -> tuple[float, ...]: - """Returns bus levels in dB""" - return tuple( - round( - int.from_bytes(self._outputLeveldB100[i : i + 2], 'little', signed=True) - * 0.01, - 1, - ) - for i in range(0, len(self._outputLeveldB100), 2) - )[: self._kind.num_bus_levels] - - @property - def levels(self) -> Levels: - """Returns strip and bus levels as a namedtuple""" - return Levels(strip=self.strip_levels, bus=self.bus_levels) - - @property - def states(self) -> States: - """returns States object with processed strip and bus channel states""" - return States( - strip=tuple( - ChannelState(self._stripState[i : i + 4]) for i in range(0, 32, 4) - ), - bus=tuple(ChannelState(self._busState[i : i + 4]) for i in range(0, 32, 4)), - ) - - @property - def gainlayers(self) -> tuple: - """returns tuple of all strip gain layers as tuples""" - return tuple( - tuple( - round( - int.from_bytes( - getattr(self, f'_stripGaindB100Layer{layer}')[i : i + 2], - 'little', - signed=True, - ) - * 0.01, - 2, - ) - for i in range(0, 16, 2) - ) - for layer in range(1, 9) - ) - - @property - def busgain(self) -> tuple: - """returns tuple of bus gains""" - return tuple( - round( - int.from_bytes(self._busGaindB100[i : i + 2], 'little', signed=True) - * 0.01, - 2, - ) - for i in range(0, 16, 2) - ) - - @property - def labels(self) -> Labels: - """returns Labels namedtuple of strip and bus labels""" - - def _extract_labels_from_bytes(label_bytes: bytes) -> tuple[str, ...]: - """Extract null-terminated UTF-8 labels from 60-byte chunks""" - labels = [] - for i in range(0, len(label_bytes), 60): - chunk = label_bytes[i : i + 60] - null_pos = chunk.find(b'\x00') - if null_pos == -1: - try: - label = chunk.decode('utf-8', errors='replace').rstrip('\x00') - except UnicodeDecodeError: - label = '' - else: - try: - label = ( - chunk[:null_pos].decode('utf-8', errors='replace') - if null_pos > 0 - else '' - ) - except UnicodeDecodeError: - label = '' - labels.append(label) - return tuple(labels) - - return Labels( - strip=_extract_labels_from_bytes(self._stripLabelUTF8c60), - bus=_extract_labels_from_bytes(self._busLabelUTF8c60), - ) - - -class Audibility(NamedTuple): - knob: float - comp: float - gate: float - denoiser: float - - -class Positions(NamedTuple): - pan_x: float - pan_y: float - color_x: float - color_y: float - fx1: float - fx2: float - - -class EqGains(NamedTuple): - bass: float - mid: float - treble: float - - -class ParametricEQSettings(NamedTuple): - on: bool - type: int - gain: float - freq: float - q: float - - -class Sends(NamedTuple): - reverb: float - delay: float - fx1: float - fx2: float - - -class CompressorSettings(NamedTuple): - gain_in: float - attack_ms: float - release_ms: float - n_knee: float - ratio: float - threshold: float - c_enabled: bool - makeup: bool - gain_out: float - - -class GateSettings(NamedTuple): - threshold_in: float - damping_max: float - bp_sidechain: bool - attack_ms: float - hold_ms: float - release_ms: float - - -class DenoiserSettings(NamedTuple): - threshold: float - - -class PitchSettings(NamedTuple): - enabled: bool - dry_wet: float - value: float - formant_lo: float - formant_med: float - formant_high: float - - -@dataclass -class VbanVMParamStrip: - """Represents the VBAN_VMPARAMSTRIP_PACKET structure""" - - _mode: bytes - _dblevel: bytes - _audibility: bytes - _pos3D_x: bytes - _pos3D_y: bytes - _posColor_x: bytes - _posColor_y: bytes - _EQgain1: bytes - _EQgain2: bytes - _EQgain3: bytes - - # First channel parametric EQ - _PEQ_eqOn: bytes - _PEQ_eqtype: bytes - _PEQ_eqgain: bytes - _PEQ_eqfreq: bytes - _PEQ_eqq: bytes - - _audibility_c: bytes - _audibility_g: bytes - _audibility_d: bytes - _posMod_x: bytes - _posMod_y: bytes - _send_reverb: bytes - _send_delay: bytes - _send_fx1: bytes - _send_fx2: bytes - _dblimit: bytes - _nKaraoke: bytes - - _COMP_gain_in: bytes - _COMP_attack_ms: bytes - _COMP_release_ms: bytes - _COMP_n_knee: bytes - _COMP_comprate: bytes - _COMP_threshold: bytes - _COMP_c_enabled: bytes - _COMP_c_auto: bytes - _COMP_gain_out: bytes - - _GATE_dBThreshold_in: bytes - _GATE_dBDamping_max: bytes - _GATE_BP_Sidechain: bytes - _GATE_attack_ms: bytes - _GATE_hold_ms: bytes - _GATE_release_ms: bytes - - _DenoiserThreshold: bytes - _PitchEnabled: bytes - _Pitch_DryWet: bytes - _Pitch_Value: bytes - _Pitch_formant_lo: bytes - _Pitch_formant_med: bytes - _Pitch_formant_high: bytes - - @classmethod - def from_bytes(cls, data: bytes): - return cls( - _mode=data[0:4], - _dblevel=data[4:8], - _audibility=data[8:10], - _pos3D_x=data[10:12], - _pos3D_y=data[12:14], - _posColor_x=data[14:16], - _posColor_y=data[16:18], - _EQgain1=data[18:20], - _EQgain2=data[20:22], - _EQgain3=data[22:24], - _PEQ_eqOn=data[24:30], - _PEQ_eqtype=data[30:36], - _PEQ_eqgain=data[36:60], - _PEQ_eqfreq=data[60:84], - _PEQ_eqq=data[84:108], - _audibility_c=data[108:110], - _audibility_g=data[110:112], - _audibility_d=data[112:114], - _posMod_x=data[114:116], - _posMod_y=data[116:118], - _send_reverb=data[118:120], - _send_delay=data[120:122], - _send_fx1=data[122:124], - _send_fx2=data[124:126], - _dblimit=data[126:128], - _nKaraoke=data[128:130], - _COMP_gain_in=data[130:132], - _COMP_attack_ms=data[132:134], - _COMP_release_ms=data[134:136], - _COMP_n_knee=data[136:138], - _COMP_comprate=data[138:140], - _COMP_threshold=data[140:142], - _COMP_c_enabled=data[142:144], - _COMP_c_auto=data[144:146], - _COMP_gain_out=data[146:148], - _GATE_dBThreshold_in=data[148:150], - _GATE_dBDamping_max=data[150:152], - _GATE_BP_Sidechain=data[152:154], - _GATE_attack_ms=data[154:156], - _GATE_hold_ms=data[156:158], - _GATE_release_ms=data[158:160], - _DenoiserThreshold=data[160:162], - _PitchEnabled=data[162:164], - _Pitch_DryWet=data[164:166], - _Pitch_Value=data[166:168], - _Pitch_formant_lo=data[168:170], - _Pitch_formant_med=data[170:172], - _Pitch_formant_high=data[172:174], - ) - - @property - def mode(self) -> int: - return int.from_bytes(self._mode, 'little') - - @property - def audibility(self) -> Audibility: - return Audibility( - round(int.from_bytes(self._audibility, 'little', signed=True) * 0.01, 2), - round(int.from_bytes(self._audibility_c, 'little', signed=True) * 0.01, 2), - round(int.from_bytes(self._audibility_g, 'little', signed=True) * 0.01, 2), - round(int.from_bytes(self._audibility_d, 'little', signed=True) * 0.01, 2), - ) - - @property - def positions(self) -> Positions: - return Positions( - round(int.from_bytes(self._pos3D_x, 'little', signed=True) * 0.01, 2), - round(int.from_bytes(self._pos3D_y, 'little', signed=True) * 0.01, 2), - round(int.from_bytes(self._posColor_x, 'little', signed=True) * 0.01, 2), - round(int.from_bytes(self._posColor_y, 'little', signed=True) * 0.01, 2), - round(int.from_bytes(self._posMod_x, 'little', signed=True) * 0.01, 2), - round(int.from_bytes(self._posMod_y, 'little', signed=True) * 0.01, 2), - ) - - @property - def eqgains(self) -> EqGains: - return EqGains( - *[ - round( - int.from_bytes(getattr(self, f'_EQgain{i}'), 'little', signed=True) - * 0.01, - 2, - ) - for i in range(1, 4) - ] - ) - - @property - def parametric_eq(self) -> tuple[ParametricEQSettings, ...]: - return tuple( - ParametricEQSettings( - on=bool(int.from_bytes(self._PEQ_eqOn[i : i + 1], 'little')), - type=int.from_bytes(self._PEQ_eqtype[i : i + 1], 'little'), - freq=struct.unpack(' Sends: - return Sends( - round(int.from_bytes(self._send_reverb, 'little', signed=True) * 0.01, 2), - round(int.from_bytes(self._send_delay, 'little', signed=True) * 0.01, 2), - round(int.from_bytes(self._send_fx1, 'little', signed=True) * 0.01, 2), - round(int.from_bytes(self._send_fx2, 'little', signed=True) * 0.01, 2), - ) - - @property - def karaoke(self) -> int: - return int.from_bytes(self._nKaraoke, 'little') - - @property - def compressor(self) -> CompressorSettings: - return CompressorSettings( - gain_in=round( - int.from_bytes(self._COMP_gain_in, 'little', signed=True) * 0.01, 2 - ), - attack_ms=round(int.from_bytes(self._COMP_attack_ms, 'little') * 0.1, 2), - release_ms=round(int.from_bytes(self._COMP_release_ms, 'little') * 0.1, 2), - n_knee=round(int.from_bytes(self._COMP_n_knee, 'little') * 0.01, 2), - ratio=round(int.from_bytes(self._COMP_comprate, 'little') * 0.01, 2), - threshold=round( - int.from_bytes(self._COMP_threshold, 'little', signed=True) * 0.01, 2 - ), - c_enabled=bool(int.from_bytes(self._COMP_c_enabled, 'little')), - makeup=bool(int.from_bytes(self._COMP_c_auto, 'little')), - gain_out=round( - int.from_bytes(self._COMP_gain_out, 'little', signed=True) * 0.01, 2 - ), - ) - - @property - def gate(self) -> GateSettings: - return GateSettings( - threshold_in=round( - int.from_bytes(self._GATE_dBThreshold_in, 'little', signed=True) * 0.01, - 2, - ), - damping_max=round( - int.from_bytes(self._GATE_dBDamping_max, 'little', signed=True) * 0.01, - 2, - ), - bp_sidechain=round( - int.from_bytes(self._GATE_BP_Sidechain, 'little') * 0.1, 2 - ), - attack_ms=round(int.from_bytes(self._GATE_attack_ms, 'little') * 0.1, 2), - hold_ms=round(int.from_bytes(self._GATE_hold_ms, 'little') * 0.1, 2), - release_ms=round(int.from_bytes(self._GATE_release_ms, 'little') * 0.1, 2), - ) - - @property - def denoiser(self) -> DenoiserSettings: - return DenoiserSettings( - threshold=round( - int.from_bytes(self._DenoiserThreshold, 'little', signed=True) * 0.01, 2 - ) - ) - - @property - def pitch(self) -> PitchSettings: - return PitchSettings( - enabled=bool(int.from_bytes(self._PitchEnabled, 'little')), - dry_wet=round( - int.from_bytes(self._Pitch_DryWet, 'little', signed=True) * 0.01, 2 - ), - value=round( - int.from_bytes(self._Pitch_Value, 'little', signed=True) * 0.01, 2 - ), - formant_lo=round( - int.from_bytes(self._Pitch_formant_lo, 'little', signed=True) * 0.01, 2 - ), - formant_med=round( - int.from_bytes(self._Pitch_formant_med, 'little', signed=True) * 0.01, 2 - ), - formant_high=round( - int.from_bytes(self._Pitch_formant_high, 'little', signed=True) * 0.01, - 2, - ), - ) - - -@dataclass -class VbanPacketNBS1(VbanPacket): - """Represents the body of a VBAN data packet with ident:1""" - - strips: tuple[VbanVMParamStrip, ...] - - @classmethod - def from_bytes( - cls, - nbs: NBS, - kind: KindMapClass, - data: bytes, - ): - return cls( - nbs=nbs, - _kind=kind, - _voicemeeterType=data[28:29], - _reserved=data[29:30], - _buffersize=data[30:32], - _voicemeeterVersion=data[32:36], - _optionBits=data[36:40], - _samplerate=data[40:44], - strips=tuple( - VbanVMParamStrip.from_bytes( - data[44 + i * VMPARAMSTRIP_SIZE : 44 + (i + 1) * VMPARAMSTRIP_SIZE] - ) - for i in range(16) - ), - ) - - -@dataclass -class SubscribeHeader: - """Represents the header of an RT subscription packet""" - - nbs: NBS = NBS.zero - name: str = 'Register-RTP' - timeout: int = 15 - - @property - def vban(self) -> bytes: - return b'VBAN' - - @property - def format_sr(self) -> bytes: - return VBAN_PROTOCOL_SERVICE.to_bytes(1, 'little') - - @property - def format_nbs(self) -> bytes: - return (self.nbs.value & 0xFF).to_bytes(1, 'little') - - @property - def format_nbc(self) -> bytes: - return VBAN_SERVICE_RTPACKETREGISTER.to_bytes(1, 'little') - - @property - def format_bit(self) -> bytes: - return (self.timeout & 0xFF).to_bytes(1, 'little') - - @property - def streamname(self) -> bytes: - return self.name.encode('ascii') + bytes(16 - len(self.name)) - - @classmethod - def to_bytes(cls, nbs: NBS, framecounter: int) -> bytes: - header = cls(nbs=nbs) - - data = bytearray() - data.extend(header.vban) - data.extend(header.format_sr) - data.extend(header.format_nbs) - data.extend(header.format_nbc) - data.extend(header.format_bit) - data.extend(header.streamname) - data.extend(framecounter.to_bytes(4, 'little')) - return bytes(data) - - -@dataclass -class VbanRtPacketHeader: - """Represents the header of an RT response packet""" - - name: str = 'Voicemeeter-RTP' - format_sr: int = VBAN_PROTOCOL_SERVICE - format_nbs: int = 0 - format_nbc: int = VBAN_SERVICE_RTPACKET - format_bit: int = 0 - - @property - def vban(self) -> bytes: - return b'VBAN' - - @property - def streamname(self) -> bytes: - return self.name.encode('ascii') + bytes(16 - len(self.name)) - - @classmethod - def from_bytes(cls, data: bytes): - if len(data) < HEADER_SIZE: - raise ValueError('Data is too short to be a valid VbanRTPPacketHeader') - - name = data[8:24].rstrip(b'\x00').decode('utf-8') - return cls( - name=name, - format_sr=data[4] & VBAN_SERVICE_MASK, - format_nbs=data[5], - format_nbc=data[6], - format_bit=data[7], - ) - - -@dataclass -class RequestHeader: - """Represents the header of an RT request packet""" - - name: str - bps_index: int - channel: int - framecounter: int = 0 - - @property - def vban(self) -> bytes: - return b'VBAN' - - @property - def sr(self) -> bytes: - return (VBAN_PROTOCOL_TXT + self.bps_index).to_bytes(1, 'little') - - @property - def nbs(self) -> bytes: - return (0).to_bytes(1, 'little') - - @property - def nbc(self) -> bytes: - return (self.channel).to_bytes(1, 'little') - - @property - def bit(self) -> bytes: - return (0x10).to_bytes(1, 'little') - - @property - def streamname(self) -> bytes: - return self.name.encode() + bytes(16 - len(self.name)) - - @classmethod - def to_bytes( - cls, name: str, bps_index: int, channel: int, framecounter: int - ) -> bytes: - header = cls( - name=name, bps_index=bps_index, channel=channel, framecounter=framecounter - ) - - data = bytearray() - data.extend(header.vban) - data.extend(header.sr) - data.extend(header.nbs) - data.extend(header.nbc) - data.extend(header.bit) - data.extend(header.streamname) - data.extend(header.framecounter.to_bytes(4, 'little')) - return bytes(data) diff --git a/vban_cmd/packet/headers.py b/vban_cmd/packet/headers.py new file mode 100644 index 0000000..fc985b8 --- /dev/null +++ b/vban_cmd/packet/headers.py @@ -0,0 +1,184 @@ +from dataclasses import dataclass + +from vban_cmd.enums import NBS +from vban_cmd.kinds import KindMapClass + +VBAN_PROTOCOL_TXT = 0x40 +VBAN_PROTOCOL_SERVICE = 0x60 + +VBAN_SERVICE_RTPACKETREGISTER = 32 +VBAN_SERVICE_RTPACKET = 33 +VBAN_SERVICE_MASK = 0xE0 + +MAX_PACKET_SIZE = 1436 +HEADER_SIZE = 4 + 1 + 1 + 1 + 1 + 16 + + +@dataclass +class VbanPacket: + """Represents the header of an incoming VBAN data packet""" + + nbs: NBS + _kind: KindMapClass + _voicemeeterType: bytes + _reserved: bytes + _buffersize: bytes + _voicemeeterVersion: bytes + _optionBits: bytes + _samplerate: bytes + + @property + def voicemeetertype(self) -> str: + """returns voicemeeter type as a string""" + return ['', 'basic', 'banana', 'potato'][ + int.from_bytes(self._voicemeeterType, 'little') + ] + + @property + def voicemeeterversion(self) -> tuple: + """returns voicemeeter version as a tuple""" + return tuple(self._voicemeeterVersion[i] for i in range(3, -1, -1)) + + @property + def samplerate(self) -> int: + """returns samplerate as an int""" + return int.from_bytes(self._samplerate, 'little') + + +@dataclass +class VbanSubscribeHeader: + """Represents the header of a subscription packet""" + + nbs: NBS = NBS.zero + name: str = 'Register-RTP' + timeout: int = 15 + + @property + def vban(self) -> bytes: + return b'VBAN' + + @property + def format_sr(self) -> bytes: + return VBAN_PROTOCOL_SERVICE.to_bytes(1, 'little') + + @property + def format_nbs(self) -> bytes: + return (self.nbs.value & 0xFF).to_bytes(1, 'little') + + @property + def format_nbc(self) -> bytes: + return VBAN_SERVICE_RTPACKETREGISTER.to_bytes(1, 'little') + + @property + def format_bit(self) -> bytes: + return (self.timeout & 0xFF).to_bytes(1, 'little') + + @property + def streamname(self) -> bytes: + return self.name.encode('ascii') + bytes(16 - len(self.name)) + + @classmethod + def to_bytes(cls, nbs: NBS, framecounter: int) -> bytes: + header = cls(nbs=nbs) + + data = bytearray() + data.extend(header.vban) + data.extend(header.format_sr) + data.extend(header.format_nbs) + data.extend(header.format_nbc) + data.extend(header.format_bit) + data.extend(header.streamname) + data.extend(framecounter.to_bytes(4, 'little')) + return bytes(data) + + +@dataclass +class VbanResponseHeader: + """Represents the header of a response packet""" + + name: str = 'Voicemeeter-RTP' + format_sr: int = VBAN_PROTOCOL_SERVICE + format_nbs: int = 0 + format_nbc: int = VBAN_SERVICE_RTPACKET + format_bit: int = 0 + + @property + def vban(self) -> bytes: + return b'VBAN' + + @property + def streamname(self) -> bytes: + return self.name.encode('ascii') + bytes(16 - len(self.name)) + + @classmethod + def from_bytes(cls, data: bytes): + if len(data) < HEADER_SIZE: + raise ValueError('Data is too short to be a valid VbanResponseHeader') + + name = data[8:24].rstrip(b'\x00').decode('utf-8') + return cls( + name=name, + format_sr=data[4] & VBAN_SERVICE_MASK, + format_nbs=data[5], + format_nbc=data[6], + format_bit=data[7], + ) + + +@dataclass +class VbanRequestHeader: + """Represents the header of a request packet""" + + name: str + bps_index: int + channel: int + framecounter: int = 0 + + @property + def vban(self) -> bytes: + return b'VBAN' + + @property + def sr(self) -> bytes: + return (VBAN_PROTOCOL_TXT + self.bps_index).to_bytes(1, 'little') + + @property + def nbs(self) -> bytes: + return (0).to_bytes(1, 'little') + + @property + def nbc(self) -> bytes: + return (self.channel).to_bytes(1, 'little') + + @property + def bit(self) -> bytes: + return (0x10).to_bytes(1, 'little') + + @property + def streamname(self) -> bytes: + return self.name.encode() + bytes(16 - len(self.name)) + + @classmethod + def to_bytes( + cls, name: str, bps_index: int, channel: int, framecounter: int + ) -> bytes: + header = cls( + name=name, bps_index=bps_index, channel=channel, framecounter=framecounter + ) + + data = bytearray() + data.extend(header.vban) + data.extend(header.sr) + data.extend(header.nbs) + data.extend(header.nbc) + data.extend(header.bit) + data.extend(header.streamname) + data.extend(header.framecounter.to_bytes(4, 'little')) + return bytes(data) + + @classmethod + def encode_with_payload( + cls, name: str, bps_index: int, channel: int, framecounter: int, payload: str + ) -> bytes: + """Creates the complete packet with header and payload.""" + return cls.to_bytes(name, bps_index, channel, framecounter) + payload.encode() diff --git a/vban_cmd/packet/nbs0.py b/vban_cmd/packet/nbs0.py new file mode 100644 index 0000000..191fd40 --- /dev/null +++ b/vban_cmd/packet/nbs0.py @@ -0,0 +1,288 @@ +from dataclasses import dataclass +from typing import NamedTuple + +from vban_cmd.enums import NBS +from vban_cmd.kinds import KindMapClass +from vban_cmd.util import comp + +from .headers import VbanPacket + + +class Levels(NamedTuple): + strip: tuple[float, ...] + bus: tuple[float, ...] + + +class ChannelState: + """Represents the processed state of a single strip or bus channel""" + + def __init__(self, state_bytes: bytes): + # Convert 4-byte state to integer once for efficient lookups + self._state = int.from_bytes(state_bytes, 'little') + + def get_mode(self, mode_value: int) -> bool: + """Get boolean state for a specific mode""" + return (self._state & mode_value) != 0 + + def get_mode_int(self, mode_value: int) -> int: + """Get integer state for a specific mode""" + return self._state & mode_value + + # Common boolean modes + @property + def mute(self) -> bool: + return (self._state & 0x00000001) != 0 + + @property + def solo(self) -> bool: + return (self._state & 0x00000002) != 0 + + @property + def mono(self) -> bool: + return (self._state & 0x00000004) != 0 + + @property + def mc(self) -> bool: + return (self._state & 0x00000008) != 0 + + # EQ modes + @property + def eq_on(self) -> bool: + return (self._state & 0x00000100) != 0 + + @property + def eq_ab(self) -> bool: + return (self._state & 0x00000800) != 0 + + # Bus assignments (strip to bus routing) + @property + def busa1(self) -> bool: + return (self._state & 0x00001000) != 0 + + @property + def busa2(self) -> bool: + return (self._state & 0x00002000) != 0 + + @property + def busa3(self) -> bool: + return (self._state & 0x00004000) != 0 + + @property + def busa4(self) -> bool: + return (self._state & 0x00008000) != 0 + + @property + def busb1(self) -> bool: + return (self._state & 0x00010000) != 0 + + @property + def busb2(self) -> bool: + return (self._state & 0x00020000) != 0 + + @property + def busb3(self) -> bool: + return (self._state & 0x00040000) != 0 + + +class States(NamedTuple): + strip: tuple[ChannelState, ...] + bus: tuple[ChannelState, ...] + + +class Labels(NamedTuple): + strip: tuple[str, ...] + bus: tuple[str, ...] + + +@dataclass +class VbanPacketNBS0(VbanPacket): + """Represents the body of a VBAN data packet with ident:0""" + + _inputLeveldB100: bytes + _outputLeveldB100: bytes + _TransportBit: bytes + _stripState: bytes + _busState: bytes + _stripGaindB100Layer1: bytes + _stripGaindB100Layer2: bytes + _stripGaindB100Layer3: bytes + _stripGaindB100Layer4: bytes + _stripGaindB100Layer5: bytes + _stripGaindB100Layer6: bytes + _stripGaindB100Layer7: bytes + _stripGaindB100Layer8: bytes + _busGaindB100: bytes + _stripLabelUTF8c60: bytes + _busLabelUTF8c60: bytes + + @classmethod + def from_bytes(cls, nbs: NBS, kind: KindMapClass, data: bytes): + return cls( + nbs=nbs, + _kind=kind, + _voicemeeterType=data[28:29], + _reserved=data[29:30], + _buffersize=data[30:32], + _voicemeeterVersion=data[32:36], + _optionBits=data[36:40], + _samplerate=data[40:44], + _inputLeveldB100=data[44:112], + _outputLeveldB100=data[112:240], + _TransportBit=data[240:244], + _stripState=data[244:276], + _busState=data[276:308], + _stripGaindB100Layer1=data[308:324], + _stripGaindB100Layer2=data[324:340], + _stripGaindB100Layer3=data[340:356], + _stripGaindB100Layer4=data[356:372], + _stripGaindB100Layer5=data[372:388], + _stripGaindB100Layer6=data[388:404], + _stripGaindB100Layer7=data[404:420], + _stripGaindB100Layer8=data[420:436], + _busGaindB100=data[436:452], + _stripLabelUTF8c60=data[452:932], + _busLabelUTF8c60=data[932:1412], + ) + + def pdirty(self, other) -> bool: + """True iff any defined parameter has changed""" + + self_gains = ( + self._stripGaindB100Layer1 + + self._stripGaindB100Layer2 + + self._stripGaindB100Layer3 + + self._stripGaindB100Layer4 + + self._stripGaindB100Layer5 + + self._stripGaindB100Layer6 + + self._stripGaindB100Layer7 + + self._stripGaindB100Layer8 + ) + other_gains = ( + other._stripGaindB100Layer1 + + other._stripGaindB100Layer2 + + other._stripGaindB100Layer3 + + other._stripGaindB100Layer4 + + other._stripGaindB100Layer5 + + other._stripGaindB100Layer6 + + other._stripGaindB100Layer7 + + other._stripGaindB100Layer8 + ) + + return ( + self._stripState != other._stripState + or self._busState != other._busState + or self_gains != other_gains + or self._busGaindB100 != other._busGaindB100 + or self._stripLabelUTF8c60 != other._stripLabelUTF8c60 + or self._busLabelUTF8c60 != other._busLabelUTF8c60 + ) + + def ldirty(self, strip_cache, bus_cache) -> bool: + """True iff any level has changed, ignoring changes when levels are very quiet""" + self._strip_comp, self._bus_comp = ( + tuple(not val for val in comp(strip_cache, self.strip_levels)), + tuple(not val for val in comp(bus_cache, self.bus_levels)), + ) + return any(self._strip_comp) or any(self._bus_comp) + + @property + def strip_levels(self) -> tuple[float, ...]: + """Returns strip levels in dB""" + return tuple( + round( + int.from_bytes(self._inputLeveldB100[i : i + 2], 'little', signed=True) + * 0.01, + 1, + ) + for i in range(0, len(self._inputLeveldB100), 2) + )[: self._kind.num_strip_levels] + + @property + def bus_levels(self) -> tuple[float, ...]: + """Returns bus levels in dB""" + return tuple( + round( + int.from_bytes(self._outputLeveldB100[i : i + 2], 'little', signed=True) + * 0.01, + 1, + ) + for i in range(0, len(self._outputLeveldB100), 2) + )[: self._kind.num_bus_levels] + + @property + def levels(self) -> Levels: + """Returns strip and bus levels as a namedtuple""" + return Levels(strip=self.strip_levels, bus=self.bus_levels) + + @property + def states(self) -> States: + """returns States object with processed strip and bus channel states""" + return States( + strip=tuple( + ChannelState(self._stripState[i : i + 4]) for i in range(0, 32, 4) + ), + bus=tuple(ChannelState(self._busState[i : i + 4]) for i in range(0, 32, 4)), + ) + + @property + def gainlayers(self) -> tuple: + """returns tuple of all strip gain layers as tuples""" + return tuple( + tuple( + round( + int.from_bytes( + getattr(self, f'_stripGaindB100Layer{layer}')[i : i + 2], + 'little', + signed=True, + ) + * 0.01, + 2, + ) + for i in range(0, 16, 2) + ) + for layer in range(1, 9) + ) + + @property + def busgain(self) -> tuple: + """returns tuple of bus gains""" + return tuple( + round( + int.from_bytes(self._busGaindB100[i : i + 2], 'little', signed=True) + * 0.01, + 2, + ) + for i in range(0, 16, 2) + ) + + @property + def labels(self) -> Labels: + """returns Labels namedtuple of strip and bus labels""" + + def _extract_labels_from_bytes(label_bytes: bytes) -> tuple[str, ...]: + """Extract null-terminated UTF-8 labels from 60-byte chunks""" + labels = [] + for i in range(0, len(label_bytes), 60): + chunk = label_bytes[i : i + 60] + null_pos = chunk.find(b'\x00') + if null_pos == -1: + try: + label = chunk.decode('utf-8', errors='replace').rstrip('\x00') + except UnicodeDecodeError: + label = '' + else: + try: + label = ( + chunk[:null_pos].decode('utf-8', errors='replace') + if null_pos > 0 + else '' + ) + except UnicodeDecodeError: + label = '' + labels.append(label) + return tuple(labels) + + return Labels( + strip=_extract_labels_from_bytes(self._stripLabelUTF8c60), + bus=_extract_labels_from_bytes(self._busLabelUTF8c60), + ) diff --git a/vban_cmd/packet/nbs1.py b/vban_cmd/packet/nbs1.py new file mode 100644 index 0000000..901dcda --- /dev/null +++ b/vban_cmd/packet/nbs1.py @@ -0,0 +1,357 @@ +import struct +from dataclasses import dataclass +from typing import NamedTuple + +from vban_cmd.enums import NBS +from vban_cmd.kinds import KindMapClass + +from .headers import VbanPacket + +VMPARAMSTRIP_SIZE = 174 + + +class Audibility(NamedTuple): + knob: float + comp: float + gate: float + denoiser: float + + +class Positions(NamedTuple): + pan_x: float + pan_y: float + color_x: float + color_y: float + fx1: float + fx2: float + + +class EqGains(NamedTuple): + bass: float + mid: float + treble: float + + +class ParametricEQSettings(NamedTuple): + on: bool + type: int + gain: float + freq: float + q: float + + +class Sends(NamedTuple): + reverb: float + delay: float + fx1: float + fx2: float + + +class CompressorSettings(NamedTuple): + gain_in: float + attack_ms: float + release_ms: float + n_knee: float + ratio: float + threshold: float + c_enabled: bool + makeup: bool + gain_out: float + + +class GateSettings(NamedTuple): + threshold_in: float + damping_max: float + bp_sidechain: bool + attack_ms: float + hold_ms: float + release_ms: float + + +class DenoiserSettings(NamedTuple): + threshold: float + + +class PitchSettings(NamedTuple): + enabled: bool + dry_wet: float + value: float + formant_lo: float + formant_med: float + formant_high: float + + +@dataclass +class VbanVMParamStrip: + """Represents the VBAN_VMPARAMSTRIP_PACKET structure""" + + _mode: bytes + _dblevel: bytes + _audibility: bytes + _pos3D_x: bytes + _pos3D_y: bytes + _posColor_x: bytes + _posColor_y: bytes + _EQgain1: bytes + _EQgain2: bytes + _EQgain3: bytes + + # First channel parametric EQ + _PEQ_eqOn: bytes + _PEQ_eqtype: bytes + _PEQ_eqgain: bytes + _PEQ_eqfreq: bytes + _PEQ_eqq: bytes + + _audibility_c: bytes + _audibility_g: bytes + _audibility_d: bytes + _posMod_x: bytes + _posMod_y: bytes + _send_reverb: bytes + _send_delay: bytes + _send_fx1: bytes + _send_fx2: bytes + _dblimit: bytes + _nKaraoke: bytes + + _COMP_gain_in: bytes + _COMP_attack_ms: bytes + _COMP_release_ms: bytes + _COMP_n_knee: bytes + _COMP_comprate: bytes + _COMP_threshold: bytes + _COMP_c_enabled: bytes + _COMP_c_auto: bytes + _COMP_gain_out: bytes + + _GATE_dBThreshold_in: bytes + _GATE_dBDamping_max: bytes + _GATE_BP_Sidechain: bytes + _GATE_attack_ms: bytes + _GATE_hold_ms: bytes + _GATE_release_ms: bytes + + _DenoiserThreshold: bytes + _PitchEnabled: bytes + _Pitch_DryWet: bytes + _Pitch_Value: bytes + _Pitch_formant_lo: bytes + _Pitch_formant_med: bytes + _Pitch_formant_high: bytes + + @classmethod + def from_bytes(cls, data: bytes): + return cls( + _mode=data[0:4], + _dblevel=data[4:8], + _audibility=data[8:10], + _pos3D_x=data[10:12], + _pos3D_y=data[12:14], + _posColor_x=data[14:16], + _posColor_y=data[16:18], + _EQgain1=data[18:20], + _EQgain2=data[20:22], + _EQgain3=data[22:24], + _PEQ_eqOn=data[24:30], + _PEQ_eqtype=data[30:36], + _PEQ_eqgain=data[36:60], + _PEQ_eqfreq=data[60:84], + _PEQ_eqq=data[84:108], + _audibility_c=data[108:110], + _audibility_g=data[110:112], + _audibility_d=data[112:114], + _posMod_x=data[114:116], + _posMod_y=data[116:118], + _send_reverb=data[118:120], + _send_delay=data[120:122], + _send_fx1=data[122:124], + _send_fx2=data[124:126], + _dblimit=data[126:128], + _nKaraoke=data[128:130], + _COMP_gain_in=data[130:132], + _COMP_attack_ms=data[132:134], + _COMP_release_ms=data[134:136], + _COMP_n_knee=data[136:138], + _COMP_comprate=data[138:140], + _COMP_threshold=data[140:142], + _COMP_c_enabled=data[142:144], + _COMP_c_auto=data[144:146], + _COMP_gain_out=data[146:148], + _GATE_dBThreshold_in=data[148:150], + _GATE_dBDamping_max=data[150:152], + _GATE_BP_Sidechain=data[152:154], + _GATE_attack_ms=data[154:156], + _GATE_hold_ms=data[156:158], + _GATE_release_ms=data[158:160], + _DenoiserThreshold=data[160:162], + _PitchEnabled=data[162:164], + _Pitch_DryWet=data[164:166], + _Pitch_Value=data[166:168], + _Pitch_formant_lo=data[168:170], + _Pitch_formant_med=data[170:172], + _Pitch_formant_high=data[172:174], + ) + + @property + def mode(self) -> int: + return int.from_bytes(self._mode, 'little') + + @property + def audibility(self) -> Audibility: + return Audibility( + round(int.from_bytes(self._audibility, 'little', signed=True) * 0.01, 2), + round(int.from_bytes(self._audibility_c, 'little', signed=True) * 0.01, 2), + round(int.from_bytes(self._audibility_g, 'little', signed=True) * 0.01, 2), + round(int.from_bytes(self._audibility_d, 'little', signed=True) * 0.01, 2), + ) + + @property + def positions(self) -> Positions: + return Positions( + round(int.from_bytes(self._pos3D_x, 'little', signed=True) * 0.01, 2), + round(int.from_bytes(self._pos3D_y, 'little', signed=True) * 0.01, 2), + round(int.from_bytes(self._posColor_x, 'little', signed=True) * 0.01, 2), + round(int.from_bytes(self._posColor_y, 'little', signed=True) * 0.01, 2), + round(int.from_bytes(self._posMod_x, 'little', signed=True) * 0.01, 2), + round(int.from_bytes(self._posMod_y, 'little', signed=True) * 0.01, 2), + ) + + @property + def eqgains(self) -> EqGains: + return EqGains( + *[ + round( + int.from_bytes(getattr(self, f'_EQgain{i}'), 'little', signed=True) + * 0.01, + 2, + ) + for i in range(1, 4) + ] + ) + + @property + def parametric_eq(self) -> tuple[ParametricEQSettings, ...]: + return tuple( + ParametricEQSettings( + on=bool(int.from_bytes(self._PEQ_eqOn[i : i + 1], 'little')), + type=int.from_bytes(self._PEQ_eqtype[i : i + 1], 'little'), + freq=struct.unpack(' Sends: + return Sends( + round(int.from_bytes(self._send_reverb, 'little', signed=True) * 0.01, 2), + round(int.from_bytes(self._send_delay, 'little', signed=True) * 0.01, 2), + round(int.from_bytes(self._send_fx1, 'little', signed=True) * 0.01, 2), + round(int.from_bytes(self._send_fx2, 'little', signed=True) * 0.01, 2), + ) + + @property + def karaoke(self) -> int: + return int.from_bytes(self._nKaraoke, 'little') + + @property + def compressor(self) -> CompressorSettings: + return CompressorSettings( + gain_in=round( + int.from_bytes(self._COMP_gain_in, 'little', signed=True) * 0.01, 2 + ), + attack_ms=round(int.from_bytes(self._COMP_attack_ms, 'little') * 0.1, 2), + release_ms=round(int.from_bytes(self._COMP_release_ms, 'little') * 0.1, 2), + n_knee=round(int.from_bytes(self._COMP_n_knee, 'little') * 0.01, 2), + ratio=round(int.from_bytes(self._COMP_comprate, 'little') * 0.01, 2), + threshold=round( + int.from_bytes(self._COMP_threshold, 'little', signed=True) * 0.01, 2 + ), + c_enabled=bool(int.from_bytes(self._COMP_c_enabled, 'little')), + makeup=bool(int.from_bytes(self._COMP_c_auto, 'little')), + gain_out=round( + int.from_bytes(self._COMP_gain_out, 'little', signed=True) * 0.01, 2 + ), + ) + + @property + def gate(self) -> GateSettings: + return GateSettings( + threshold_in=round( + int.from_bytes(self._GATE_dBThreshold_in, 'little', signed=True) * 0.01, + 2, + ), + damping_max=round( + int.from_bytes(self._GATE_dBDamping_max, 'little', signed=True) * 0.01, + 2, + ), + bp_sidechain=round( + int.from_bytes(self._GATE_BP_Sidechain, 'little') * 0.1, 2 + ), + attack_ms=round(int.from_bytes(self._GATE_attack_ms, 'little') * 0.1, 2), + hold_ms=round(int.from_bytes(self._GATE_hold_ms, 'little') * 0.1, 2), + release_ms=round(int.from_bytes(self._GATE_release_ms, 'little') * 0.1, 2), + ) + + @property + def denoiser(self) -> DenoiserSettings: + return DenoiserSettings( + threshold=round( + int.from_bytes(self._DenoiserThreshold, 'little', signed=True) * 0.01, 2 + ) + ) + + @property + def pitch(self) -> PitchSettings: + return PitchSettings( + enabled=bool(int.from_bytes(self._PitchEnabled, 'little')), + dry_wet=round( + int.from_bytes(self._Pitch_DryWet, 'little', signed=True) * 0.01, 2 + ), + value=round( + int.from_bytes(self._Pitch_Value, 'little', signed=True) * 0.01, 2 + ), + formant_lo=round( + int.from_bytes(self._Pitch_formant_lo, 'little', signed=True) * 0.01, 2 + ), + formant_med=round( + int.from_bytes(self._Pitch_formant_med, 'little', signed=True) * 0.01, 2 + ), + formant_high=round( + int.from_bytes(self._Pitch_formant_high, 'little', signed=True) * 0.01, + 2, + ), + ) + + +@dataclass +class VbanPacketNBS1(VbanPacket): + """Represents the body of a VBAN data packet with ident:1""" + + strips: tuple[VbanVMParamStrip, ...] + + @classmethod + def from_bytes( + cls, + nbs: NBS, + kind: KindMapClass, + data: bytes, + ): + return cls( + nbs=nbs, + _kind=kind, + _voicemeeterType=data[28:29], + _reserved=data[29:30], + _buffersize=data[30:32], + _voicemeeterVersion=data[32:36], + _optionBits=data[36:40], + _samplerate=data[40:44], + strips=tuple( + VbanVMParamStrip.from_bytes( + data[44 + i * VMPARAMSTRIP_SIZE : 44 + (i + 1) * VMPARAMSTRIP_SIZE] + ) + for i in range(16) + ), + )