mirror of
https://github.com/onyx-and-iris/vban-cmd-python.git
synced 2026-03-02 16:29:11 +00:00
move packet classes into internal packet module
This commit is contained in:
parent
ff5ac193c8
commit
10b38b3fcc
@ -1,808 +0,0 @@
|
||||
import struct
|
||||
from dataclasses import dataclass
|
||||
from typing import NamedTuple
|
||||
|
||||
from .enums import NBS
|
||||
from .kinds import KindMapClass
|
||||
from .util import comp
|
||||
|
||||
VBAN_PROTOCOL_TXT = 0x40
|
||||
VBAN_PROTOCOL_SERVICE = 0x60
|
||||
|
||||
VBAN_SERVICE_RTPACKETREGISTER = 32
|
||||
VBAN_SERVICE_RTPACKET = 33
|
||||
VBAN_SERVICE_MASK = 0xE0
|
||||
|
||||
MAX_PACKET_SIZE = 1436
|
||||
HEADER_SIZE = 4 + 1 + 1 + 1 + 1 + 16
|
||||
VMPARAMSTRIP_SIZE = 174
|
||||
|
||||
|
||||
@dataclass
|
||||
class VbanPacket:
|
||||
"""Represents the header of a VBAN data packet"""
|
||||
|
||||
nbs: NBS
|
||||
_kind: KindMapClass
|
||||
_voicemeeterType: bytes
|
||||
_reserved: bytes
|
||||
_buffersize: bytes
|
||||
_voicemeeterVersion: bytes
|
||||
_optionBits: bytes
|
||||
_samplerate: bytes
|
||||
|
||||
@property
|
||||
def voicemeetertype(self) -> str:
|
||||
"""returns voicemeeter type as a string"""
|
||||
return ['', 'basic', 'banana', 'potato'][
|
||||
int.from_bytes(self._voicemeeterType, 'little')
|
||||
]
|
||||
|
||||
@property
|
||||
def voicemeeterversion(self) -> tuple:
|
||||
"""returns voicemeeter version as a tuple"""
|
||||
return tuple(self._voicemeeterVersion[i] for i in range(3, -1, -1))
|
||||
|
||||
@property
|
||||
def samplerate(self) -> int:
|
||||
"""returns samplerate as an int"""
|
||||
return int.from_bytes(self._samplerate, 'little')
|
||||
|
||||
|
||||
class Levels(NamedTuple):
|
||||
strip: tuple[float, ...]
|
||||
bus: tuple[float, ...]
|
||||
|
||||
|
||||
class ChannelState:
|
||||
"""Represents the processed state of a single strip or bus channel"""
|
||||
|
||||
def __init__(self, state_bytes: bytes):
|
||||
# Convert 4-byte state to integer once for efficient lookups
|
||||
self._state = int.from_bytes(state_bytes, 'little')
|
||||
|
||||
def get_mode(self, mode_value: int) -> bool:
|
||||
"""Get boolean state for a specific mode"""
|
||||
return (self._state & mode_value) != 0
|
||||
|
||||
def get_mode_int(self, mode_value: int) -> int:
|
||||
"""Get integer state for a specific mode"""
|
||||
return self._state & mode_value
|
||||
|
||||
# Common boolean modes
|
||||
@property
|
||||
def mute(self) -> bool:
|
||||
return (self._state & 0x00000001) != 0
|
||||
|
||||
@property
|
||||
def solo(self) -> bool:
|
||||
return (self._state & 0x00000002) != 0
|
||||
|
||||
@property
|
||||
def mono(self) -> bool:
|
||||
return (self._state & 0x00000004) != 0
|
||||
|
||||
@property
|
||||
def mc(self) -> bool:
|
||||
return (self._state & 0x00000008) != 0
|
||||
|
||||
# EQ modes
|
||||
@property
|
||||
def eq_on(self) -> bool:
|
||||
return (self._state & 0x00000100) != 0
|
||||
|
||||
@property
|
||||
def eq_ab(self) -> bool:
|
||||
return (self._state & 0x00000800) != 0
|
||||
|
||||
# Bus assignments (strip to bus routing)
|
||||
@property
|
||||
def busa1(self) -> bool:
|
||||
return (self._state & 0x00001000) != 0
|
||||
|
||||
@property
|
||||
def busa2(self) -> bool:
|
||||
return (self._state & 0x00002000) != 0
|
||||
|
||||
@property
|
||||
def busa3(self) -> bool:
|
||||
return (self._state & 0x00004000) != 0
|
||||
|
||||
@property
|
||||
def busa4(self) -> bool:
|
||||
return (self._state & 0x00008000) != 0
|
||||
|
||||
@property
|
||||
def busb1(self) -> bool:
|
||||
return (self._state & 0x00010000) != 0
|
||||
|
||||
@property
|
||||
def busb2(self) -> bool:
|
||||
return (self._state & 0x00020000) != 0
|
||||
|
||||
@property
|
||||
def busb3(self) -> bool:
|
||||
return (self._state & 0x00040000) != 0
|
||||
|
||||
|
||||
class States(NamedTuple):
|
||||
strip: tuple[ChannelState, ...]
|
||||
bus: tuple[ChannelState, ...]
|
||||
|
||||
|
||||
class Labels(NamedTuple):
|
||||
strip: tuple[str, ...]
|
||||
bus: tuple[str, ...]
|
||||
|
||||
|
||||
@dataclass
|
||||
class VbanPacketNBS0(VbanPacket):
|
||||
"""Represents the body of a VBAN data packet with ident:0"""
|
||||
|
||||
_inputLeveldB100: bytes
|
||||
_outputLeveldB100: bytes
|
||||
_TransportBit: bytes
|
||||
_stripState: bytes
|
||||
_busState: bytes
|
||||
_stripGaindB100Layer1: bytes
|
||||
_stripGaindB100Layer2: bytes
|
||||
_stripGaindB100Layer3: bytes
|
||||
_stripGaindB100Layer4: bytes
|
||||
_stripGaindB100Layer5: bytes
|
||||
_stripGaindB100Layer6: bytes
|
||||
_stripGaindB100Layer7: bytes
|
||||
_stripGaindB100Layer8: bytes
|
||||
_busGaindB100: bytes
|
||||
_stripLabelUTF8c60: bytes
|
||||
_busLabelUTF8c60: bytes
|
||||
|
||||
@classmethod
|
||||
def from_bytes(cls, nbs: NBS, kind: KindMapClass, data: bytes):
|
||||
return cls(
|
||||
nbs=nbs,
|
||||
_kind=kind,
|
||||
_voicemeeterType=data[28:29],
|
||||
_reserved=data[29:30],
|
||||
_buffersize=data[30:32],
|
||||
_voicemeeterVersion=data[32:36],
|
||||
_optionBits=data[36:40],
|
||||
_samplerate=data[40:44],
|
||||
_inputLeveldB100=data[44:112],
|
||||
_outputLeveldB100=data[112:240],
|
||||
_TransportBit=data[240:244],
|
||||
_stripState=data[244:276],
|
||||
_busState=data[276:308],
|
||||
_stripGaindB100Layer1=data[308:324],
|
||||
_stripGaindB100Layer2=data[324:340],
|
||||
_stripGaindB100Layer3=data[340:356],
|
||||
_stripGaindB100Layer4=data[356:372],
|
||||
_stripGaindB100Layer5=data[372:388],
|
||||
_stripGaindB100Layer6=data[388:404],
|
||||
_stripGaindB100Layer7=data[404:420],
|
||||
_stripGaindB100Layer8=data[420:436],
|
||||
_busGaindB100=data[436:452],
|
||||
_stripLabelUTF8c60=data[452:932],
|
||||
_busLabelUTF8c60=data[932:1412],
|
||||
)
|
||||
|
||||
def pdirty(self, other) -> bool:
|
||||
"""True iff any defined parameter has changed"""
|
||||
|
||||
self_gains = (
|
||||
self._stripGaindB100Layer1
|
||||
+ self._stripGaindB100Layer2
|
||||
+ self._stripGaindB100Layer3
|
||||
+ self._stripGaindB100Layer4
|
||||
+ self._stripGaindB100Layer5
|
||||
+ self._stripGaindB100Layer6
|
||||
+ self._stripGaindB100Layer7
|
||||
+ self._stripGaindB100Layer8
|
||||
)
|
||||
other_gains = (
|
||||
other._stripGaindB100Layer1
|
||||
+ other._stripGaindB100Layer2
|
||||
+ other._stripGaindB100Layer3
|
||||
+ other._stripGaindB100Layer4
|
||||
+ other._stripGaindB100Layer5
|
||||
+ other._stripGaindB100Layer6
|
||||
+ other._stripGaindB100Layer7
|
||||
+ other._stripGaindB100Layer8
|
||||
)
|
||||
|
||||
return (
|
||||
self._stripState != other._stripState
|
||||
or self._busState != other._busState
|
||||
or self_gains != other_gains
|
||||
or self._busGaindB100 != other._busGaindB100
|
||||
or self._stripLabelUTF8c60 != other._stripLabelUTF8c60
|
||||
or self._busLabelUTF8c60 != other._busLabelUTF8c60
|
||||
)
|
||||
|
||||
def ldirty(self, strip_cache, bus_cache) -> bool:
|
||||
"""True iff any level has changed, ignoring changes when levels are very quiet"""
|
||||
self._strip_comp, self._bus_comp = (
|
||||
tuple(not val for val in comp(strip_cache, self.strip_levels)),
|
||||
tuple(not val for val in comp(bus_cache, self.bus_levels)),
|
||||
)
|
||||
return any(self._strip_comp) or any(self._bus_comp)
|
||||
|
||||
@property
|
||||
def strip_levels(self) -> tuple[float, ...]:
|
||||
"""Returns strip levels in dB"""
|
||||
return tuple(
|
||||
round(
|
||||
int.from_bytes(self._inputLeveldB100[i : i + 2], 'little', signed=True)
|
||||
* 0.01,
|
||||
1,
|
||||
)
|
||||
for i in range(0, len(self._inputLeveldB100), 2)
|
||||
)[: self._kind.num_strip_levels]
|
||||
|
||||
@property
|
||||
def bus_levels(self) -> tuple[float, ...]:
|
||||
"""Returns bus levels in dB"""
|
||||
return tuple(
|
||||
round(
|
||||
int.from_bytes(self._outputLeveldB100[i : i + 2], 'little', signed=True)
|
||||
* 0.01,
|
||||
1,
|
||||
)
|
||||
for i in range(0, len(self._outputLeveldB100), 2)
|
||||
)[: self._kind.num_bus_levels]
|
||||
|
||||
@property
|
||||
def levels(self) -> Levels:
|
||||
"""Returns strip and bus levels as a namedtuple"""
|
||||
return Levels(strip=self.strip_levels, bus=self.bus_levels)
|
||||
|
||||
@property
|
||||
def states(self) -> States:
|
||||
"""returns States object with processed strip and bus channel states"""
|
||||
return States(
|
||||
strip=tuple(
|
||||
ChannelState(self._stripState[i : i + 4]) for i in range(0, 32, 4)
|
||||
),
|
||||
bus=tuple(ChannelState(self._busState[i : i + 4]) for i in range(0, 32, 4)),
|
||||
)
|
||||
|
||||
@property
|
||||
def gainlayers(self) -> tuple:
|
||||
"""returns tuple of all strip gain layers as tuples"""
|
||||
return tuple(
|
||||
tuple(
|
||||
round(
|
||||
int.from_bytes(
|
||||
getattr(self, f'_stripGaindB100Layer{layer}')[i : i + 2],
|
||||
'little',
|
||||
signed=True,
|
||||
)
|
||||
* 0.01,
|
||||
2,
|
||||
)
|
||||
for i in range(0, 16, 2)
|
||||
)
|
||||
for layer in range(1, 9)
|
||||
)
|
||||
|
||||
@property
|
||||
def busgain(self) -> tuple:
|
||||
"""returns tuple of bus gains"""
|
||||
return tuple(
|
||||
round(
|
||||
int.from_bytes(self._busGaindB100[i : i + 2], 'little', signed=True)
|
||||
* 0.01,
|
||||
2,
|
||||
)
|
||||
for i in range(0, 16, 2)
|
||||
)
|
||||
|
||||
@property
|
||||
def labels(self) -> Labels:
|
||||
"""returns Labels namedtuple of strip and bus labels"""
|
||||
|
||||
def _extract_labels_from_bytes(label_bytes: bytes) -> tuple[str, ...]:
|
||||
"""Extract null-terminated UTF-8 labels from 60-byte chunks"""
|
||||
labels = []
|
||||
for i in range(0, len(label_bytes), 60):
|
||||
chunk = label_bytes[i : i + 60]
|
||||
null_pos = chunk.find(b'\x00')
|
||||
if null_pos == -1:
|
||||
try:
|
||||
label = chunk.decode('utf-8', errors='replace').rstrip('\x00')
|
||||
except UnicodeDecodeError:
|
||||
label = ''
|
||||
else:
|
||||
try:
|
||||
label = (
|
||||
chunk[:null_pos].decode('utf-8', errors='replace')
|
||||
if null_pos > 0
|
||||
else ''
|
||||
)
|
||||
except UnicodeDecodeError:
|
||||
label = ''
|
||||
labels.append(label)
|
||||
return tuple(labels)
|
||||
|
||||
return Labels(
|
||||
strip=_extract_labels_from_bytes(self._stripLabelUTF8c60),
|
||||
bus=_extract_labels_from_bytes(self._busLabelUTF8c60),
|
||||
)
|
||||
|
||||
|
||||
class Audibility(NamedTuple):
|
||||
knob: float
|
||||
comp: float
|
||||
gate: float
|
||||
denoiser: float
|
||||
|
||||
|
||||
class Positions(NamedTuple):
|
||||
pan_x: float
|
||||
pan_y: float
|
||||
color_x: float
|
||||
color_y: float
|
||||
fx1: float
|
||||
fx2: float
|
||||
|
||||
|
||||
class EqGains(NamedTuple):
|
||||
bass: float
|
||||
mid: float
|
||||
treble: float
|
||||
|
||||
|
||||
class ParametricEQSettings(NamedTuple):
|
||||
on: bool
|
||||
type: int
|
||||
gain: float
|
||||
freq: float
|
||||
q: float
|
||||
|
||||
|
||||
class Sends(NamedTuple):
|
||||
reverb: float
|
||||
delay: float
|
||||
fx1: float
|
||||
fx2: float
|
||||
|
||||
|
||||
class CompressorSettings(NamedTuple):
|
||||
gain_in: float
|
||||
attack_ms: float
|
||||
release_ms: float
|
||||
n_knee: float
|
||||
ratio: float
|
||||
threshold: float
|
||||
c_enabled: bool
|
||||
makeup: bool
|
||||
gain_out: float
|
||||
|
||||
|
||||
class GateSettings(NamedTuple):
|
||||
threshold_in: float
|
||||
damping_max: float
|
||||
bp_sidechain: bool
|
||||
attack_ms: float
|
||||
hold_ms: float
|
||||
release_ms: float
|
||||
|
||||
|
||||
class DenoiserSettings(NamedTuple):
|
||||
threshold: float
|
||||
|
||||
|
||||
class PitchSettings(NamedTuple):
|
||||
enabled: bool
|
||||
dry_wet: float
|
||||
value: float
|
||||
formant_lo: float
|
||||
formant_med: float
|
||||
formant_high: float
|
||||
|
||||
|
||||
@dataclass
|
||||
class VbanVMParamStrip:
|
||||
"""Represents the VBAN_VMPARAMSTRIP_PACKET structure"""
|
||||
|
||||
_mode: bytes
|
||||
_dblevel: bytes
|
||||
_audibility: bytes
|
||||
_pos3D_x: bytes
|
||||
_pos3D_y: bytes
|
||||
_posColor_x: bytes
|
||||
_posColor_y: bytes
|
||||
_EQgain1: bytes
|
||||
_EQgain2: bytes
|
||||
_EQgain3: bytes
|
||||
|
||||
# First channel parametric EQ
|
||||
_PEQ_eqOn: bytes
|
||||
_PEQ_eqtype: bytes
|
||||
_PEQ_eqgain: bytes
|
||||
_PEQ_eqfreq: bytes
|
||||
_PEQ_eqq: bytes
|
||||
|
||||
_audibility_c: bytes
|
||||
_audibility_g: bytes
|
||||
_audibility_d: bytes
|
||||
_posMod_x: bytes
|
||||
_posMod_y: bytes
|
||||
_send_reverb: bytes
|
||||
_send_delay: bytes
|
||||
_send_fx1: bytes
|
||||
_send_fx2: bytes
|
||||
_dblimit: bytes
|
||||
_nKaraoke: bytes
|
||||
|
||||
_COMP_gain_in: bytes
|
||||
_COMP_attack_ms: bytes
|
||||
_COMP_release_ms: bytes
|
||||
_COMP_n_knee: bytes
|
||||
_COMP_comprate: bytes
|
||||
_COMP_threshold: bytes
|
||||
_COMP_c_enabled: bytes
|
||||
_COMP_c_auto: bytes
|
||||
_COMP_gain_out: bytes
|
||||
|
||||
_GATE_dBThreshold_in: bytes
|
||||
_GATE_dBDamping_max: bytes
|
||||
_GATE_BP_Sidechain: bytes
|
||||
_GATE_attack_ms: bytes
|
||||
_GATE_hold_ms: bytes
|
||||
_GATE_release_ms: bytes
|
||||
|
||||
_DenoiserThreshold: bytes
|
||||
_PitchEnabled: bytes
|
||||
_Pitch_DryWet: bytes
|
||||
_Pitch_Value: bytes
|
||||
_Pitch_formant_lo: bytes
|
||||
_Pitch_formant_med: bytes
|
||||
_Pitch_formant_high: bytes
|
||||
|
||||
@classmethod
|
||||
def from_bytes(cls, data: bytes):
|
||||
return cls(
|
||||
_mode=data[0:4],
|
||||
_dblevel=data[4:8],
|
||||
_audibility=data[8:10],
|
||||
_pos3D_x=data[10:12],
|
||||
_pos3D_y=data[12:14],
|
||||
_posColor_x=data[14:16],
|
||||
_posColor_y=data[16:18],
|
||||
_EQgain1=data[18:20],
|
||||
_EQgain2=data[20:22],
|
||||
_EQgain3=data[22:24],
|
||||
_PEQ_eqOn=data[24:30],
|
||||
_PEQ_eqtype=data[30:36],
|
||||
_PEQ_eqgain=data[36:60],
|
||||
_PEQ_eqfreq=data[60:84],
|
||||
_PEQ_eqq=data[84:108],
|
||||
_audibility_c=data[108:110],
|
||||
_audibility_g=data[110:112],
|
||||
_audibility_d=data[112:114],
|
||||
_posMod_x=data[114:116],
|
||||
_posMod_y=data[116:118],
|
||||
_send_reverb=data[118:120],
|
||||
_send_delay=data[120:122],
|
||||
_send_fx1=data[122:124],
|
||||
_send_fx2=data[124:126],
|
||||
_dblimit=data[126:128],
|
||||
_nKaraoke=data[128:130],
|
||||
_COMP_gain_in=data[130:132],
|
||||
_COMP_attack_ms=data[132:134],
|
||||
_COMP_release_ms=data[134:136],
|
||||
_COMP_n_knee=data[136:138],
|
||||
_COMP_comprate=data[138:140],
|
||||
_COMP_threshold=data[140:142],
|
||||
_COMP_c_enabled=data[142:144],
|
||||
_COMP_c_auto=data[144:146],
|
||||
_COMP_gain_out=data[146:148],
|
||||
_GATE_dBThreshold_in=data[148:150],
|
||||
_GATE_dBDamping_max=data[150:152],
|
||||
_GATE_BP_Sidechain=data[152:154],
|
||||
_GATE_attack_ms=data[154:156],
|
||||
_GATE_hold_ms=data[156:158],
|
||||
_GATE_release_ms=data[158:160],
|
||||
_DenoiserThreshold=data[160:162],
|
||||
_PitchEnabled=data[162:164],
|
||||
_Pitch_DryWet=data[164:166],
|
||||
_Pitch_Value=data[166:168],
|
||||
_Pitch_formant_lo=data[168:170],
|
||||
_Pitch_formant_med=data[170:172],
|
||||
_Pitch_formant_high=data[172:174],
|
||||
)
|
||||
|
||||
@property
|
||||
def mode(self) -> int:
|
||||
return int.from_bytes(self._mode, 'little')
|
||||
|
||||
@property
|
||||
def audibility(self) -> Audibility:
|
||||
return Audibility(
|
||||
round(int.from_bytes(self._audibility, 'little', signed=True) * 0.01, 2),
|
||||
round(int.from_bytes(self._audibility_c, 'little', signed=True) * 0.01, 2),
|
||||
round(int.from_bytes(self._audibility_g, 'little', signed=True) * 0.01, 2),
|
||||
round(int.from_bytes(self._audibility_d, 'little', signed=True) * 0.01, 2),
|
||||
)
|
||||
|
||||
@property
|
||||
def positions(self) -> Positions:
|
||||
return Positions(
|
||||
round(int.from_bytes(self._pos3D_x, 'little', signed=True) * 0.01, 2),
|
||||
round(int.from_bytes(self._pos3D_y, 'little', signed=True) * 0.01, 2),
|
||||
round(int.from_bytes(self._posColor_x, 'little', signed=True) * 0.01, 2),
|
||||
round(int.from_bytes(self._posColor_y, 'little', signed=True) * 0.01, 2),
|
||||
round(int.from_bytes(self._posMod_x, 'little', signed=True) * 0.01, 2),
|
||||
round(int.from_bytes(self._posMod_y, 'little', signed=True) * 0.01, 2),
|
||||
)
|
||||
|
||||
@property
|
||||
def eqgains(self) -> EqGains:
|
||||
return EqGains(
|
||||
*[
|
||||
round(
|
||||
int.from_bytes(getattr(self, f'_EQgain{i}'), 'little', signed=True)
|
||||
* 0.01,
|
||||
2,
|
||||
)
|
||||
for i in range(1, 4)
|
||||
]
|
||||
)
|
||||
|
||||
@property
|
||||
def parametric_eq(self) -> tuple[ParametricEQSettings, ...]:
|
||||
return tuple(
|
||||
ParametricEQSettings(
|
||||
on=bool(int.from_bytes(self._PEQ_eqOn[i : i + 1], 'little')),
|
||||
type=int.from_bytes(self._PEQ_eqtype[i : i + 1], 'little'),
|
||||
freq=struct.unpack('<f', self._PEQ_eqfreq[i * 4 : (i + 1) * 4])[0],
|
||||
gain=struct.unpack('<f', self._PEQ_eqgain[i * 4 : (i + 1) * 4])[0],
|
||||
q=struct.unpack('<f', self._PEQ_eqq[i * 4 : (i + 1) * 4])[0],
|
||||
)
|
||||
for i in range(6)
|
||||
)
|
||||
|
||||
@property
|
||||
def sends(self) -> Sends:
|
||||
return Sends(
|
||||
round(int.from_bytes(self._send_reverb, 'little', signed=True) * 0.01, 2),
|
||||
round(int.from_bytes(self._send_delay, 'little', signed=True) * 0.01, 2),
|
||||
round(int.from_bytes(self._send_fx1, 'little', signed=True) * 0.01, 2),
|
||||
round(int.from_bytes(self._send_fx2, 'little', signed=True) * 0.01, 2),
|
||||
)
|
||||
|
||||
@property
|
||||
def karaoke(self) -> int:
|
||||
return int.from_bytes(self._nKaraoke, 'little')
|
||||
|
||||
@property
|
||||
def compressor(self) -> CompressorSettings:
|
||||
return CompressorSettings(
|
||||
gain_in=round(
|
||||
int.from_bytes(self._COMP_gain_in, 'little', signed=True) * 0.01, 2
|
||||
),
|
||||
attack_ms=round(int.from_bytes(self._COMP_attack_ms, 'little') * 0.1, 2),
|
||||
release_ms=round(int.from_bytes(self._COMP_release_ms, 'little') * 0.1, 2),
|
||||
n_knee=round(int.from_bytes(self._COMP_n_knee, 'little') * 0.01, 2),
|
||||
ratio=round(int.from_bytes(self._COMP_comprate, 'little') * 0.01, 2),
|
||||
threshold=round(
|
||||
int.from_bytes(self._COMP_threshold, 'little', signed=True) * 0.01, 2
|
||||
),
|
||||
c_enabled=bool(int.from_bytes(self._COMP_c_enabled, 'little')),
|
||||
makeup=bool(int.from_bytes(self._COMP_c_auto, 'little')),
|
||||
gain_out=round(
|
||||
int.from_bytes(self._COMP_gain_out, 'little', signed=True) * 0.01, 2
|
||||
),
|
||||
)
|
||||
|
||||
@property
|
||||
def gate(self) -> GateSettings:
|
||||
return GateSettings(
|
||||
threshold_in=round(
|
||||
int.from_bytes(self._GATE_dBThreshold_in, 'little', signed=True) * 0.01,
|
||||
2,
|
||||
),
|
||||
damping_max=round(
|
||||
int.from_bytes(self._GATE_dBDamping_max, 'little', signed=True) * 0.01,
|
||||
2,
|
||||
),
|
||||
bp_sidechain=round(
|
||||
int.from_bytes(self._GATE_BP_Sidechain, 'little') * 0.1, 2
|
||||
),
|
||||
attack_ms=round(int.from_bytes(self._GATE_attack_ms, 'little') * 0.1, 2),
|
||||
hold_ms=round(int.from_bytes(self._GATE_hold_ms, 'little') * 0.1, 2),
|
||||
release_ms=round(int.from_bytes(self._GATE_release_ms, 'little') * 0.1, 2),
|
||||
)
|
||||
|
||||
@property
|
||||
def denoiser(self) -> DenoiserSettings:
|
||||
return DenoiserSettings(
|
||||
threshold=round(
|
||||
int.from_bytes(self._DenoiserThreshold, 'little', signed=True) * 0.01, 2
|
||||
)
|
||||
)
|
||||
|
||||
@property
|
||||
def pitch(self) -> PitchSettings:
|
||||
return PitchSettings(
|
||||
enabled=bool(int.from_bytes(self._PitchEnabled, 'little')),
|
||||
dry_wet=round(
|
||||
int.from_bytes(self._Pitch_DryWet, 'little', signed=True) * 0.01, 2
|
||||
),
|
||||
value=round(
|
||||
int.from_bytes(self._Pitch_Value, 'little', signed=True) * 0.01, 2
|
||||
),
|
||||
formant_lo=round(
|
||||
int.from_bytes(self._Pitch_formant_lo, 'little', signed=True) * 0.01, 2
|
||||
),
|
||||
formant_med=round(
|
||||
int.from_bytes(self._Pitch_formant_med, 'little', signed=True) * 0.01, 2
|
||||
),
|
||||
formant_high=round(
|
||||
int.from_bytes(self._Pitch_formant_high, 'little', signed=True) * 0.01,
|
||||
2,
|
||||
),
|
||||
)
|
||||
|
||||
|
||||
@dataclass
|
||||
class VbanPacketNBS1(VbanPacket):
|
||||
"""Represents the body of a VBAN data packet with ident:1"""
|
||||
|
||||
strips: tuple[VbanVMParamStrip, ...]
|
||||
|
||||
@classmethod
|
||||
def from_bytes(
|
||||
cls,
|
||||
nbs: NBS,
|
||||
kind: KindMapClass,
|
||||
data: bytes,
|
||||
):
|
||||
return cls(
|
||||
nbs=nbs,
|
||||
_kind=kind,
|
||||
_voicemeeterType=data[28:29],
|
||||
_reserved=data[29:30],
|
||||
_buffersize=data[30:32],
|
||||
_voicemeeterVersion=data[32:36],
|
||||
_optionBits=data[36:40],
|
||||
_samplerate=data[40:44],
|
||||
strips=tuple(
|
||||
VbanVMParamStrip.from_bytes(
|
||||
data[44 + i * VMPARAMSTRIP_SIZE : 44 + (i + 1) * VMPARAMSTRIP_SIZE]
|
||||
)
|
||||
for i in range(16)
|
||||
),
|
||||
)
|
||||
|
||||
|
||||
@dataclass
|
||||
class SubscribeHeader:
|
||||
"""Represents the header of an RT subscription packet"""
|
||||
|
||||
nbs: NBS = NBS.zero
|
||||
name: str = 'Register-RTP'
|
||||
timeout: int = 15
|
||||
|
||||
@property
|
||||
def vban(self) -> bytes:
|
||||
return b'VBAN'
|
||||
|
||||
@property
|
||||
def format_sr(self) -> bytes:
|
||||
return VBAN_PROTOCOL_SERVICE.to_bytes(1, 'little')
|
||||
|
||||
@property
|
||||
def format_nbs(self) -> bytes:
|
||||
return (self.nbs.value & 0xFF).to_bytes(1, 'little')
|
||||
|
||||
@property
|
||||
def format_nbc(self) -> bytes:
|
||||
return VBAN_SERVICE_RTPACKETREGISTER.to_bytes(1, 'little')
|
||||
|
||||
@property
|
||||
def format_bit(self) -> bytes:
|
||||
return (self.timeout & 0xFF).to_bytes(1, 'little')
|
||||
|
||||
@property
|
||||
def streamname(self) -> bytes:
|
||||
return self.name.encode('ascii') + bytes(16 - len(self.name))
|
||||
|
||||
@classmethod
|
||||
def to_bytes(cls, nbs: NBS, framecounter: int) -> bytes:
|
||||
header = cls(nbs=nbs)
|
||||
|
||||
data = bytearray()
|
||||
data.extend(header.vban)
|
||||
data.extend(header.format_sr)
|
||||
data.extend(header.format_nbs)
|
||||
data.extend(header.format_nbc)
|
||||
data.extend(header.format_bit)
|
||||
data.extend(header.streamname)
|
||||
data.extend(framecounter.to_bytes(4, 'little'))
|
||||
return bytes(data)
|
||||
|
||||
|
||||
@dataclass
|
||||
class VbanRtPacketHeader:
|
||||
"""Represents the header of an RT response packet"""
|
||||
|
||||
name: str = 'Voicemeeter-RTP'
|
||||
format_sr: int = VBAN_PROTOCOL_SERVICE
|
||||
format_nbs: int = 0
|
||||
format_nbc: int = VBAN_SERVICE_RTPACKET
|
||||
format_bit: int = 0
|
||||
|
||||
@property
|
||||
def vban(self) -> bytes:
|
||||
return b'VBAN'
|
||||
|
||||
@property
|
||||
def streamname(self) -> bytes:
|
||||
return self.name.encode('ascii') + bytes(16 - len(self.name))
|
||||
|
||||
@classmethod
|
||||
def from_bytes(cls, data: bytes):
|
||||
if len(data) < HEADER_SIZE:
|
||||
raise ValueError('Data is too short to be a valid VbanRTPPacketHeader')
|
||||
|
||||
name = data[8:24].rstrip(b'\x00').decode('utf-8')
|
||||
return cls(
|
||||
name=name,
|
||||
format_sr=data[4] & VBAN_SERVICE_MASK,
|
||||
format_nbs=data[5],
|
||||
format_nbc=data[6],
|
||||
format_bit=data[7],
|
||||
)
|
||||
|
||||
|
||||
@dataclass
|
||||
class RequestHeader:
|
||||
"""Represents the header of an RT request packet"""
|
||||
|
||||
name: str
|
||||
bps_index: int
|
||||
channel: int
|
||||
framecounter: int = 0
|
||||
|
||||
@property
|
||||
def vban(self) -> bytes:
|
||||
return b'VBAN'
|
||||
|
||||
@property
|
||||
def sr(self) -> bytes:
|
||||
return (VBAN_PROTOCOL_TXT + self.bps_index).to_bytes(1, 'little')
|
||||
|
||||
@property
|
||||
def nbs(self) -> bytes:
|
||||
return (0).to_bytes(1, 'little')
|
||||
|
||||
@property
|
||||
def nbc(self) -> bytes:
|
||||
return (self.channel).to_bytes(1, 'little')
|
||||
|
||||
@property
|
||||
def bit(self) -> bytes:
|
||||
return (0x10).to_bytes(1, 'little')
|
||||
|
||||
@property
|
||||
def streamname(self) -> bytes:
|
||||
return self.name.encode() + bytes(16 - len(self.name))
|
||||
|
||||
@classmethod
|
||||
def to_bytes(
|
||||
cls, name: str, bps_index: int, channel: int, framecounter: int
|
||||
) -> bytes:
|
||||
header = cls(
|
||||
name=name, bps_index=bps_index, channel=channel, framecounter=framecounter
|
||||
)
|
||||
|
||||
data = bytearray()
|
||||
data.extend(header.vban)
|
||||
data.extend(header.sr)
|
||||
data.extend(header.nbs)
|
||||
data.extend(header.nbc)
|
||||
data.extend(header.bit)
|
||||
data.extend(header.streamname)
|
||||
data.extend(header.framecounter.to_bytes(4, 'little'))
|
||||
return bytes(data)
|
||||
184
vban_cmd/packet/headers.py
Normal file
184
vban_cmd/packet/headers.py
Normal file
@ -0,0 +1,184 @@
|
||||
from dataclasses import dataclass
|
||||
|
||||
from vban_cmd.enums import NBS
|
||||
from vban_cmd.kinds import KindMapClass
|
||||
|
||||
VBAN_PROTOCOL_TXT = 0x40
|
||||
VBAN_PROTOCOL_SERVICE = 0x60
|
||||
|
||||
VBAN_SERVICE_RTPACKETREGISTER = 32
|
||||
VBAN_SERVICE_RTPACKET = 33
|
||||
VBAN_SERVICE_MASK = 0xE0
|
||||
|
||||
MAX_PACKET_SIZE = 1436
|
||||
HEADER_SIZE = 4 + 1 + 1 + 1 + 1 + 16
|
||||
|
||||
|
||||
@dataclass
|
||||
class VbanPacket:
|
||||
"""Represents the header of an incoming VBAN data packet"""
|
||||
|
||||
nbs: NBS
|
||||
_kind: KindMapClass
|
||||
_voicemeeterType: bytes
|
||||
_reserved: bytes
|
||||
_buffersize: bytes
|
||||
_voicemeeterVersion: bytes
|
||||
_optionBits: bytes
|
||||
_samplerate: bytes
|
||||
|
||||
@property
|
||||
def voicemeetertype(self) -> str:
|
||||
"""returns voicemeeter type as a string"""
|
||||
return ['', 'basic', 'banana', 'potato'][
|
||||
int.from_bytes(self._voicemeeterType, 'little')
|
||||
]
|
||||
|
||||
@property
|
||||
def voicemeeterversion(self) -> tuple:
|
||||
"""returns voicemeeter version as a tuple"""
|
||||
return tuple(self._voicemeeterVersion[i] for i in range(3, -1, -1))
|
||||
|
||||
@property
|
||||
def samplerate(self) -> int:
|
||||
"""returns samplerate as an int"""
|
||||
return int.from_bytes(self._samplerate, 'little')
|
||||
|
||||
|
||||
@dataclass
|
||||
class VbanSubscribeHeader:
|
||||
"""Represents the header of a subscription packet"""
|
||||
|
||||
nbs: NBS = NBS.zero
|
||||
name: str = 'Register-RTP'
|
||||
timeout: int = 15
|
||||
|
||||
@property
|
||||
def vban(self) -> bytes:
|
||||
return b'VBAN'
|
||||
|
||||
@property
|
||||
def format_sr(self) -> bytes:
|
||||
return VBAN_PROTOCOL_SERVICE.to_bytes(1, 'little')
|
||||
|
||||
@property
|
||||
def format_nbs(self) -> bytes:
|
||||
return (self.nbs.value & 0xFF).to_bytes(1, 'little')
|
||||
|
||||
@property
|
||||
def format_nbc(self) -> bytes:
|
||||
return VBAN_SERVICE_RTPACKETREGISTER.to_bytes(1, 'little')
|
||||
|
||||
@property
|
||||
def format_bit(self) -> bytes:
|
||||
return (self.timeout & 0xFF).to_bytes(1, 'little')
|
||||
|
||||
@property
|
||||
def streamname(self) -> bytes:
|
||||
return self.name.encode('ascii') + bytes(16 - len(self.name))
|
||||
|
||||
@classmethod
|
||||
def to_bytes(cls, nbs: NBS, framecounter: int) -> bytes:
|
||||
header = cls(nbs=nbs)
|
||||
|
||||
data = bytearray()
|
||||
data.extend(header.vban)
|
||||
data.extend(header.format_sr)
|
||||
data.extend(header.format_nbs)
|
||||
data.extend(header.format_nbc)
|
||||
data.extend(header.format_bit)
|
||||
data.extend(header.streamname)
|
||||
data.extend(framecounter.to_bytes(4, 'little'))
|
||||
return bytes(data)
|
||||
|
||||
|
||||
@dataclass
|
||||
class VbanResponseHeader:
|
||||
"""Represents the header of a response packet"""
|
||||
|
||||
name: str = 'Voicemeeter-RTP'
|
||||
format_sr: int = VBAN_PROTOCOL_SERVICE
|
||||
format_nbs: int = 0
|
||||
format_nbc: int = VBAN_SERVICE_RTPACKET
|
||||
format_bit: int = 0
|
||||
|
||||
@property
|
||||
def vban(self) -> bytes:
|
||||
return b'VBAN'
|
||||
|
||||
@property
|
||||
def streamname(self) -> bytes:
|
||||
return self.name.encode('ascii') + bytes(16 - len(self.name))
|
||||
|
||||
@classmethod
|
||||
def from_bytes(cls, data: bytes):
|
||||
if len(data) < HEADER_SIZE:
|
||||
raise ValueError('Data is too short to be a valid VbanResponseHeader')
|
||||
|
||||
name = data[8:24].rstrip(b'\x00').decode('utf-8')
|
||||
return cls(
|
||||
name=name,
|
||||
format_sr=data[4] & VBAN_SERVICE_MASK,
|
||||
format_nbs=data[5],
|
||||
format_nbc=data[6],
|
||||
format_bit=data[7],
|
||||
)
|
||||
|
||||
|
||||
@dataclass
|
||||
class VbanRequestHeader:
|
||||
"""Represents the header of a request packet"""
|
||||
|
||||
name: str
|
||||
bps_index: int
|
||||
channel: int
|
||||
framecounter: int = 0
|
||||
|
||||
@property
|
||||
def vban(self) -> bytes:
|
||||
return b'VBAN'
|
||||
|
||||
@property
|
||||
def sr(self) -> bytes:
|
||||
return (VBAN_PROTOCOL_TXT + self.bps_index).to_bytes(1, 'little')
|
||||
|
||||
@property
|
||||
def nbs(self) -> bytes:
|
||||
return (0).to_bytes(1, 'little')
|
||||
|
||||
@property
|
||||
def nbc(self) -> bytes:
|
||||
return (self.channel).to_bytes(1, 'little')
|
||||
|
||||
@property
|
||||
def bit(self) -> bytes:
|
||||
return (0x10).to_bytes(1, 'little')
|
||||
|
||||
@property
|
||||
def streamname(self) -> bytes:
|
||||
return self.name.encode() + bytes(16 - len(self.name))
|
||||
|
||||
@classmethod
|
||||
def to_bytes(
|
||||
cls, name: str, bps_index: int, channel: int, framecounter: int
|
||||
) -> bytes:
|
||||
header = cls(
|
||||
name=name, bps_index=bps_index, channel=channel, framecounter=framecounter
|
||||
)
|
||||
|
||||
data = bytearray()
|
||||
data.extend(header.vban)
|
||||
data.extend(header.sr)
|
||||
data.extend(header.nbs)
|
||||
data.extend(header.nbc)
|
||||
data.extend(header.bit)
|
||||
data.extend(header.streamname)
|
||||
data.extend(header.framecounter.to_bytes(4, 'little'))
|
||||
return bytes(data)
|
||||
|
||||
@classmethod
|
||||
def encode_with_payload(
|
||||
cls, name: str, bps_index: int, channel: int, framecounter: int, payload: str
|
||||
) -> bytes:
|
||||
"""Creates the complete packet with header and payload."""
|
||||
return cls.to_bytes(name, bps_index, channel, framecounter) + payload.encode()
|
||||
288
vban_cmd/packet/nbs0.py
Normal file
288
vban_cmd/packet/nbs0.py
Normal file
@ -0,0 +1,288 @@
|
||||
from dataclasses import dataclass
|
||||
from typing import NamedTuple
|
||||
|
||||
from vban_cmd.enums import NBS
|
||||
from vban_cmd.kinds import KindMapClass
|
||||
from vban_cmd.util import comp
|
||||
|
||||
from .headers import VbanPacket
|
||||
|
||||
|
||||
class Levels(NamedTuple):
|
||||
strip: tuple[float, ...]
|
||||
bus: tuple[float, ...]
|
||||
|
||||
|
||||
class ChannelState:
|
||||
"""Represents the processed state of a single strip or bus channel"""
|
||||
|
||||
def __init__(self, state_bytes: bytes):
|
||||
# Convert 4-byte state to integer once for efficient lookups
|
||||
self._state = int.from_bytes(state_bytes, 'little')
|
||||
|
||||
def get_mode(self, mode_value: int) -> bool:
|
||||
"""Get boolean state for a specific mode"""
|
||||
return (self._state & mode_value) != 0
|
||||
|
||||
def get_mode_int(self, mode_value: int) -> int:
|
||||
"""Get integer state for a specific mode"""
|
||||
return self._state & mode_value
|
||||
|
||||
# Common boolean modes
|
||||
@property
|
||||
def mute(self) -> bool:
|
||||
return (self._state & 0x00000001) != 0
|
||||
|
||||
@property
|
||||
def solo(self) -> bool:
|
||||
return (self._state & 0x00000002) != 0
|
||||
|
||||
@property
|
||||
def mono(self) -> bool:
|
||||
return (self._state & 0x00000004) != 0
|
||||
|
||||
@property
|
||||
def mc(self) -> bool:
|
||||
return (self._state & 0x00000008) != 0
|
||||
|
||||
# EQ modes
|
||||
@property
|
||||
def eq_on(self) -> bool:
|
||||
return (self._state & 0x00000100) != 0
|
||||
|
||||
@property
|
||||
def eq_ab(self) -> bool:
|
||||
return (self._state & 0x00000800) != 0
|
||||
|
||||
# Bus assignments (strip to bus routing)
|
||||
@property
|
||||
def busa1(self) -> bool:
|
||||
return (self._state & 0x00001000) != 0
|
||||
|
||||
@property
|
||||
def busa2(self) -> bool:
|
||||
return (self._state & 0x00002000) != 0
|
||||
|
||||
@property
|
||||
def busa3(self) -> bool:
|
||||
return (self._state & 0x00004000) != 0
|
||||
|
||||
@property
|
||||
def busa4(self) -> bool:
|
||||
return (self._state & 0x00008000) != 0
|
||||
|
||||
@property
|
||||
def busb1(self) -> bool:
|
||||
return (self._state & 0x00010000) != 0
|
||||
|
||||
@property
|
||||
def busb2(self) -> bool:
|
||||
return (self._state & 0x00020000) != 0
|
||||
|
||||
@property
|
||||
def busb3(self) -> bool:
|
||||
return (self._state & 0x00040000) != 0
|
||||
|
||||
|
||||
class States(NamedTuple):
|
||||
strip: tuple[ChannelState, ...]
|
||||
bus: tuple[ChannelState, ...]
|
||||
|
||||
|
||||
class Labels(NamedTuple):
|
||||
strip: tuple[str, ...]
|
||||
bus: tuple[str, ...]
|
||||
|
||||
|
||||
@dataclass
|
||||
class VbanPacketNBS0(VbanPacket):
|
||||
"""Represents the body of a VBAN data packet with ident:0"""
|
||||
|
||||
_inputLeveldB100: bytes
|
||||
_outputLeveldB100: bytes
|
||||
_TransportBit: bytes
|
||||
_stripState: bytes
|
||||
_busState: bytes
|
||||
_stripGaindB100Layer1: bytes
|
||||
_stripGaindB100Layer2: bytes
|
||||
_stripGaindB100Layer3: bytes
|
||||
_stripGaindB100Layer4: bytes
|
||||
_stripGaindB100Layer5: bytes
|
||||
_stripGaindB100Layer6: bytes
|
||||
_stripGaindB100Layer7: bytes
|
||||
_stripGaindB100Layer8: bytes
|
||||
_busGaindB100: bytes
|
||||
_stripLabelUTF8c60: bytes
|
||||
_busLabelUTF8c60: bytes
|
||||
|
||||
@classmethod
|
||||
def from_bytes(cls, nbs: NBS, kind: KindMapClass, data: bytes):
|
||||
return cls(
|
||||
nbs=nbs,
|
||||
_kind=kind,
|
||||
_voicemeeterType=data[28:29],
|
||||
_reserved=data[29:30],
|
||||
_buffersize=data[30:32],
|
||||
_voicemeeterVersion=data[32:36],
|
||||
_optionBits=data[36:40],
|
||||
_samplerate=data[40:44],
|
||||
_inputLeveldB100=data[44:112],
|
||||
_outputLeveldB100=data[112:240],
|
||||
_TransportBit=data[240:244],
|
||||
_stripState=data[244:276],
|
||||
_busState=data[276:308],
|
||||
_stripGaindB100Layer1=data[308:324],
|
||||
_stripGaindB100Layer2=data[324:340],
|
||||
_stripGaindB100Layer3=data[340:356],
|
||||
_stripGaindB100Layer4=data[356:372],
|
||||
_stripGaindB100Layer5=data[372:388],
|
||||
_stripGaindB100Layer6=data[388:404],
|
||||
_stripGaindB100Layer7=data[404:420],
|
||||
_stripGaindB100Layer8=data[420:436],
|
||||
_busGaindB100=data[436:452],
|
||||
_stripLabelUTF8c60=data[452:932],
|
||||
_busLabelUTF8c60=data[932:1412],
|
||||
)
|
||||
|
||||
def pdirty(self, other) -> bool:
|
||||
"""True iff any defined parameter has changed"""
|
||||
|
||||
self_gains = (
|
||||
self._stripGaindB100Layer1
|
||||
+ self._stripGaindB100Layer2
|
||||
+ self._stripGaindB100Layer3
|
||||
+ self._stripGaindB100Layer4
|
||||
+ self._stripGaindB100Layer5
|
||||
+ self._stripGaindB100Layer6
|
||||
+ self._stripGaindB100Layer7
|
||||
+ self._stripGaindB100Layer8
|
||||
)
|
||||
other_gains = (
|
||||
other._stripGaindB100Layer1
|
||||
+ other._stripGaindB100Layer2
|
||||
+ other._stripGaindB100Layer3
|
||||
+ other._stripGaindB100Layer4
|
||||
+ other._stripGaindB100Layer5
|
||||
+ other._stripGaindB100Layer6
|
||||
+ other._stripGaindB100Layer7
|
||||
+ other._stripGaindB100Layer8
|
||||
)
|
||||
|
||||
return (
|
||||
self._stripState != other._stripState
|
||||
or self._busState != other._busState
|
||||
or self_gains != other_gains
|
||||
or self._busGaindB100 != other._busGaindB100
|
||||
or self._stripLabelUTF8c60 != other._stripLabelUTF8c60
|
||||
or self._busLabelUTF8c60 != other._busLabelUTF8c60
|
||||
)
|
||||
|
||||
def ldirty(self, strip_cache, bus_cache) -> bool:
|
||||
"""True iff any level has changed, ignoring changes when levels are very quiet"""
|
||||
self._strip_comp, self._bus_comp = (
|
||||
tuple(not val for val in comp(strip_cache, self.strip_levels)),
|
||||
tuple(not val for val in comp(bus_cache, self.bus_levels)),
|
||||
)
|
||||
return any(self._strip_comp) or any(self._bus_comp)
|
||||
|
||||
@property
|
||||
def strip_levels(self) -> tuple[float, ...]:
|
||||
"""Returns strip levels in dB"""
|
||||
return tuple(
|
||||
round(
|
||||
int.from_bytes(self._inputLeveldB100[i : i + 2], 'little', signed=True)
|
||||
* 0.01,
|
||||
1,
|
||||
)
|
||||
for i in range(0, len(self._inputLeveldB100), 2)
|
||||
)[: self._kind.num_strip_levels]
|
||||
|
||||
@property
|
||||
def bus_levels(self) -> tuple[float, ...]:
|
||||
"""Returns bus levels in dB"""
|
||||
return tuple(
|
||||
round(
|
||||
int.from_bytes(self._outputLeveldB100[i : i + 2], 'little', signed=True)
|
||||
* 0.01,
|
||||
1,
|
||||
)
|
||||
for i in range(0, len(self._outputLeveldB100), 2)
|
||||
)[: self._kind.num_bus_levels]
|
||||
|
||||
@property
|
||||
def levels(self) -> Levels:
|
||||
"""Returns strip and bus levels as a namedtuple"""
|
||||
return Levels(strip=self.strip_levels, bus=self.bus_levels)
|
||||
|
||||
@property
|
||||
def states(self) -> States:
|
||||
"""returns States object with processed strip and bus channel states"""
|
||||
return States(
|
||||
strip=tuple(
|
||||
ChannelState(self._stripState[i : i + 4]) for i in range(0, 32, 4)
|
||||
),
|
||||
bus=tuple(ChannelState(self._busState[i : i + 4]) for i in range(0, 32, 4)),
|
||||
)
|
||||
|
||||
@property
|
||||
def gainlayers(self) -> tuple:
|
||||
"""returns tuple of all strip gain layers as tuples"""
|
||||
return tuple(
|
||||
tuple(
|
||||
round(
|
||||
int.from_bytes(
|
||||
getattr(self, f'_stripGaindB100Layer{layer}')[i : i + 2],
|
||||
'little',
|
||||
signed=True,
|
||||
)
|
||||
* 0.01,
|
||||
2,
|
||||
)
|
||||
for i in range(0, 16, 2)
|
||||
)
|
||||
for layer in range(1, 9)
|
||||
)
|
||||
|
||||
@property
|
||||
def busgain(self) -> tuple:
|
||||
"""returns tuple of bus gains"""
|
||||
return tuple(
|
||||
round(
|
||||
int.from_bytes(self._busGaindB100[i : i + 2], 'little', signed=True)
|
||||
* 0.01,
|
||||
2,
|
||||
)
|
||||
for i in range(0, 16, 2)
|
||||
)
|
||||
|
||||
@property
|
||||
def labels(self) -> Labels:
|
||||
"""returns Labels namedtuple of strip and bus labels"""
|
||||
|
||||
def _extract_labels_from_bytes(label_bytes: bytes) -> tuple[str, ...]:
|
||||
"""Extract null-terminated UTF-8 labels from 60-byte chunks"""
|
||||
labels = []
|
||||
for i in range(0, len(label_bytes), 60):
|
||||
chunk = label_bytes[i : i + 60]
|
||||
null_pos = chunk.find(b'\x00')
|
||||
if null_pos == -1:
|
||||
try:
|
||||
label = chunk.decode('utf-8', errors='replace').rstrip('\x00')
|
||||
except UnicodeDecodeError:
|
||||
label = ''
|
||||
else:
|
||||
try:
|
||||
label = (
|
||||
chunk[:null_pos].decode('utf-8', errors='replace')
|
||||
if null_pos > 0
|
||||
else ''
|
||||
)
|
||||
except UnicodeDecodeError:
|
||||
label = ''
|
||||
labels.append(label)
|
||||
return tuple(labels)
|
||||
|
||||
return Labels(
|
||||
strip=_extract_labels_from_bytes(self._stripLabelUTF8c60),
|
||||
bus=_extract_labels_from_bytes(self._busLabelUTF8c60),
|
||||
)
|
||||
357
vban_cmd/packet/nbs1.py
Normal file
357
vban_cmd/packet/nbs1.py
Normal file
@ -0,0 +1,357 @@
|
||||
import struct
|
||||
from dataclasses import dataclass
|
||||
from typing import NamedTuple
|
||||
|
||||
from vban_cmd.enums import NBS
|
||||
from vban_cmd.kinds import KindMapClass
|
||||
|
||||
from .headers import VbanPacket
|
||||
|
||||
VMPARAMSTRIP_SIZE = 174
|
||||
|
||||
|
||||
class Audibility(NamedTuple):
|
||||
knob: float
|
||||
comp: float
|
||||
gate: float
|
||||
denoiser: float
|
||||
|
||||
|
||||
class Positions(NamedTuple):
|
||||
pan_x: float
|
||||
pan_y: float
|
||||
color_x: float
|
||||
color_y: float
|
||||
fx1: float
|
||||
fx2: float
|
||||
|
||||
|
||||
class EqGains(NamedTuple):
|
||||
bass: float
|
||||
mid: float
|
||||
treble: float
|
||||
|
||||
|
||||
class ParametricEQSettings(NamedTuple):
|
||||
on: bool
|
||||
type: int
|
||||
gain: float
|
||||
freq: float
|
||||
q: float
|
||||
|
||||
|
||||
class Sends(NamedTuple):
|
||||
reverb: float
|
||||
delay: float
|
||||
fx1: float
|
||||
fx2: float
|
||||
|
||||
|
||||
class CompressorSettings(NamedTuple):
|
||||
gain_in: float
|
||||
attack_ms: float
|
||||
release_ms: float
|
||||
n_knee: float
|
||||
ratio: float
|
||||
threshold: float
|
||||
c_enabled: bool
|
||||
makeup: bool
|
||||
gain_out: float
|
||||
|
||||
|
||||
class GateSettings(NamedTuple):
|
||||
threshold_in: float
|
||||
damping_max: float
|
||||
bp_sidechain: bool
|
||||
attack_ms: float
|
||||
hold_ms: float
|
||||
release_ms: float
|
||||
|
||||
|
||||
class DenoiserSettings(NamedTuple):
|
||||
threshold: float
|
||||
|
||||
|
||||
class PitchSettings(NamedTuple):
|
||||
enabled: bool
|
||||
dry_wet: float
|
||||
value: float
|
||||
formant_lo: float
|
||||
formant_med: float
|
||||
formant_high: float
|
||||
|
||||
|
||||
@dataclass
|
||||
class VbanVMParamStrip:
|
||||
"""Represents the VBAN_VMPARAMSTRIP_PACKET structure"""
|
||||
|
||||
_mode: bytes
|
||||
_dblevel: bytes
|
||||
_audibility: bytes
|
||||
_pos3D_x: bytes
|
||||
_pos3D_y: bytes
|
||||
_posColor_x: bytes
|
||||
_posColor_y: bytes
|
||||
_EQgain1: bytes
|
||||
_EQgain2: bytes
|
||||
_EQgain3: bytes
|
||||
|
||||
# First channel parametric EQ
|
||||
_PEQ_eqOn: bytes
|
||||
_PEQ_eqtype: bytes
|
||||
_PEQ_eqgain: bytes
|
||||
_PEQ_eqfreq: bytes
|
||||
_PEQ_eqq: bytes
|
||||
|
||||
_audibility_c: bytes
|
||||
_audibility_g: bytes
|
||||
_audibility_d: bytes
|
||||
_posMod_x: bytes
|
||||
_posMod_y: bytes
|
||||
_send_reverb: bytes
|
||||
_send_delay: bytes
|
||||
_send_fx1: bytes
|
||||
_send_fx2: bytes
|
||||
_dblimit: bytes
|
||||
_nKaraoke: bytes
|
||||
|
||||
_COMP_gain_in: bytes
|
||||
_COMP_attack_ms: bytes
|
||||
_COMP_release_ms: bytes
|
||||
_COMP_n_knee: bytes
|
||||
_COMP_comprate: bytes
|
||||
_COMP_threshold: bytes
|
||||
_COMP_c_enabled: bytes
|
||||
_COMP_c_auto: bytes
|
||||
_COMP_gain_out: bytes
|
||||
|
||||
_GATE_dBThreshold_in: bytes
|
||||
_GATE_dBDamping_max: bytes
|
||||
_GATE_BP_Sidechain: bytes
|
||||
_GATE_attack_ms: bytes
|
||||
_GATE_hold_ms: bytes
|
||||
_GATE_release_ms: bytes
|
||||
|
||||
_DenoiserThreshold: bytes
|
||||
_PitchEnabled: bytes
|
||||
_Pitch_DryWet: bytes
|
||||
_Pitch_Value: bytes
|
||||
_Pitch_formant_lo: bytes
|
||||
_Pitch_formant_med: bytes
|
||||
_Pitch_formant_high: bytes
|
||||
|
||||
@classmethod
|
||||
def from_bytes(cls, data: bytes):
|
||||
return cls(
|
||||
_mode=data[0:4],
|
||||
_dblevel=data[4:8],
|
||||
_audibility=data[8:10],
|
||||
_pos3D_x=data[10:12],
|
||||
_pos3D_y=data[12:14],
|
||||
_posColor_x=data[14:16],
|
||||
_posColor_y=data[16:18],
|
||||
_EQgain1=data[18:20],
|
||||
_EQgain2=data[20:22],
|
||||
_EQgain3=data[22:24],
|
||||
_PEQ_eqOn=data[24:30],
|
||||
_PEQ_eqtype=data[30:36],
|
||||
_PEQ_eqgain=data[36:60],
|
||||
_PEQ_eqfreq=data[60:84],
|
||||
_PEQ_eqq=data[84:108],
|
||||
_audibility_c=data[108:110],
|
||||
_audibility_g=data[110:112],
|
||||
_audibility_d=data[112:114],
|
||||
_posMod_x=data[114:116],
|
||||
_posMod_y=data[116:118],
|
||||
_send_reverb=data[118:120],
|
||||
_send_delay=data[120:122],
|
||||
_send_fx1=data[122:124],
|
||||
_send_fx2=data[124:126],
|
||||
_dblimit=data[126:128],
|
||||
_nKaraoke=data[128:130],
|
||||
_COMP_gain_in=data[130:132],
|
||||
_COMP_attack_ms=data[132:134],
|
||||
_COMP_release_ms=data[134:136],
|
||||
_COMP_n_knee=data[136:138],
|
||||
_COMP_comprate=data[138:140],
|
||||
_COMP_threshold=data[140:142],
|
||||
_COMP_c_enabled=data[142:144],
|
||||
_COMP_c_auto=data[144:146],
|
||||
_COMP_gain_out=data[146:148],
|
||||
_GATE_dBThreshold_in=data[148:150],
|
||||
_GATE_dBDamping_max=data[150:152],
|
||||
_GATE_BP_Sidechain=data[152:154],
|
||||
_GATE_attack_ms=data[154:156],
|
||||
_GATE_hold_ms=data[156:158],
|
||||
_GATE_release_ms=data[158:160],
|
||||
_DenoiserThreshold=data[160:162],
|
||||
_PitchEnabled=data[162:164],
|
||||
_Pitch_DryWet=data[164:166],
|
||||
_Pitch_Value=data[166:168],
|
||||
_Pitch_formant_lo=data[168:170],
|
||||
_Pitch_formant_med=data[170:172],
|
||||
_Pitch_formant_high=data[172:174],
|
||||
)
|
||||
|
||||
@property
|
||||
def mode(self) -> int:
|
||||
return int.from_bytes(self._mode, 'little')
|
||||
|
||||
@property
|
||||
def audibility(self) -> Audibility:
|
||||
return Audibility(
|
||||
round(int.from_bytes(self._audibility, 'little', signed=True) * 0.01, 2),
|
||||
round(int.from_bytes(self._audibility_c, 'little', signed=True) * 0.01, 2),
|
||||
round(int.from_bytes(self._audibility_g, 'little', signed=True) * 0.01, 2),
|
||||
round(int.from_bytes(self._audibility_d, 'little', signed=True) * 0.01, 2),
|
||||
)
|
||||
|
||||
@property
|
||||
def positions(self) -> Positions:
|
||||
return Positions(
|
||||
round(int.from_bytes(self._pos3D_x, 'little', signed=True) * 0.01, 2),
|
||||
round(int.from_bytes(self._pos3D_y, 'little', signed=True) * 0.01, 2),
|
||||
round(int.from_bytes(self._posColor_x, 'little', signed=True) * 0.01, 2),
|
||||
round(int.from_bytes(self._posColor_y, 'little', signed=True) * 0.01, 2),
|
||||
round(int.from_bytes(self._posMod_x, 'little', signed=True) * 0.01, 2),
|
||||
round(int.from_bytes(self._posMod_y, 'little', signed=True) * 0.01, 2),
|
||||
)
|
||||
|
||||
@property
|
||||
def eqgains(self) -> EqGains:
|
||||
return EqGains(
|
||||
*[
|
||||
round(
|
||||
int.from_bytes(getattr(self, f'_EQgain{i}'), 'little', signed=True)
|
||||
* 0.01,
|
||||
2,
|
||||
)
|
||||
for i in range(1, 4)
|
||||
]
|
||||
)
|
||||
|
||||
@property
|
||||
def parametric_eq(self) -> tuple[ParametricEQSettings, ...]:
|
||||
return tuple(
|
||||
ParametricEQSettings(
|
||||
on=bool(int.from_bytes(self._PEQ_eqOn[i : i + 1], 'little')),
|
||||
type=int.from_bytes(self._PEQ_eqtype[i : i + 1], 'little'),
|
||||
freq=struct.unpack('<f', self._PEQ_eqfreq[i * 4 : (i + 1) * 4])[0],
|
||||
gain=struct.unpack('<f', self._PEQ_eqgain[i * 4 : (i + 1) * 4])[0],
|
||||
q=struct.unpack('<f', self._PEQ_eqq[i * 4 : (i + 1) * 4])[0],
|
||||
)
|
||||
for i in range(6)
|
||||
)
|
||||
|
||||
@property
|
||||
def sends(self) -> Sends:
|
||||
return Sends(
|
||||
round(int.from_bytes(self._send_reverb, 'little', signed=True) * 0.01, 2),
|
||||
round(int.from_bytes(self._send_delay, 'little', signed=True) * 0.01, 2),
|
||||
round(int.from_bytes(self._send_fx1, 'little', signed=True) * 0.01, 2),
|
||||
round(int.from_bytes(self._send_fx2, 'little', signed=True) * 0.01, 2),
|
||||
)
|
||||
|
||||
@property
|
||||
def karaoke(self) -> int:
|
||||
return int.from_bytes(self._nKaraoke, 'little')
|
||||
|
||||
@property
|
||||
def compressor(self) -> CompressorSettings:
|
||||
return CompressorSettings(
|
||||
gain_in=round(
|
||||
int.from_bytes(self._COMP_gain_in, 'little', signed=True) * 0.01, 2
|
||||
),
|
||||
attack_ms=round(int.from_bytes(self._COMP_attack_ms, 'little') * 0.1, 2),
|
||||
release_ms=round(int.from_bytes(self._COMP_release_ms, 'little') * 0.1, 2),
|
||||
n_knee=round(int.from_bytes(self._COMP_n_knee, 'little') * 0.01, 2),
|
||||
ratio=round(int.from_bytes(self._COMP_comprate, 'little') * 0.01, 2),
|
||||
threshold=round(
|
||||
int.from_bytes(self._COMP_threshold, 'little', signed=True) * 0.01, 2
|
||||
),
|
||||
c_enabled=bool(int.from_bytes(self._COMP_c_enabled, 'little')),
|
||||
makeup=bool(int.from_bytes(self._COMP_c_auto, 'little')),
|
||||
gain_out=round(
|
||||
int.from_bytes(self._COMP_gain_out, 'little', signed=True) * 0.01, 2
|
||||
),
|
||||
)
|
||||
|
||||
@property
|
||||
def gate(self) -> GateSettings:
|
||||
return GateSettings(
|
||||
threshold_in=round(
|
||||
int.from_bytes(self._GATE_dBThreshold_in, 'little', signed=True) * 0.01,
|
||||
2,
|
||||
),
|
||||
damping_max=round(
|
||||
int.from_bytes(self._GATE_dBDamping_max, 'little', signed=True) * 0.01,
|
||||
2,
|
||||
),
|
||||
bp_sidechain=round(
|
||||
int.from_bytes(self._GATE_BP_Sidechain, 'little') * 0.1, 2
|
||||
),
|
||||
attack_ms=round(int.from_bytes(self._GATE_attack_ms, 'little') * 0.1, 2),
|
||||
hold_ms=round(int.from_bytes(self._GATE_hold_ms, 'little') * 0.1, 2),
|
||||
release_ms=round(int.from_bytes(self._GATE_release_ms, 'little') * 0.1, 2),
|
||||
)
|
||||
|
||||
@property
|
||||
def denoiser(self) -> DenoiserSettings:
|
||||
return DenoiserSettings(
|
||||
threshold=round(
|
||||
int.from_bytes(self._DenoiserThreshold, 'little', signed=True) * 0.01, 2
|
||||
)
|
||||
)
|
||||
|
||||
@property
|
||||
def pitch(self) -> PitchSettings:
|
||||
return PitchSettings(
|
||||
enabled=bool(int.from_bytes(self._PitchEnabled, 'little')),
|
||||
dry_wet=round(
|
||||
int.from_bytes(self._Pitch_DryWet, 'little', signed=True) * 0.01, 2
|
||||
),
|
||||
value=round(
|
||||
int.from_bytes(self._Pitch_Value, 'little', signed=True) * 0.01, 2
|
||||
),
|
||||
formant_lo=round(
|
||||
int.from_bytes(self._Pitch_formant_lo, 'little', signed=True) * 0.01, 2
|
||||
),
|
||||
formant_med=round(
|
||||
int.from_bytes(self._Pitch_formant_med, 'little', signed=True) * 0.01, 2
|
||||
),
|
||||
formant_high=round(
|
||||
int.from_bytes(self._Pitch_formant_high, 'little', signed=True) * 0.01,
|
||||
2,
|
||||
),
|
||||
)
|
||||
|
||||
|
||||
@dataclass
|
||||
class VbanPacketNBS1(VbanPacket):
|
||||
"""Represents the body of a VBAN data packet with ident:1"""
|
||||
|
||||
strips: tuple[VbanVMParamStrip, ...]
|
||||
|
||||
@classmethod
|
||||
def from_bytes(
|
||||
cls,
|
||||
nbs: NBS,
|
||||
kind: KindMapClass,
|
||||
data: bytes,
|
||||
):
|
||||
return cls(
|
||||
nbs=nbs,
|
||||
_kind=kind,
|
||||
_voicemeeterType=data[28:29],
|
||||
_reserved=data[29:30],
|
||||
_buffersize=data[30:32],
|
||||
_voicemeeterVersion=data[32:36],
|
||||
_optionBits=data[36:40],
|
||||
_samplerate=data[40:44],
|
||||
strips=tuple(
|
||||
VbanVMParamStrip.from_bytes(
|
||||
data[44 + i * VMPARAMSTRIP_SIZE : 44 + (i + 1) * VMPARAMSTRIP_SIZE]
|
||||
)
|
||||
for i in range(16)
|
||||
),
|
||||
)
|
||||
Loading…
x
Reference in New Issue
Block a user