6 Commits

Author SHA1 Message Date
3c3e415d7e add _send_request() helper method. 2026-03-01 11:09:45 +00:00
8cfeb45fcb update imports 2026-03-01 11:09:28 +00:00
10b38b3fcc move packet classes into internal packet module 2026-03-01 11:09:22 +00:00
ff5ac193c8 add ChannelState interface, use it in the meta functions.
reword busmodes bitwise logic.

comment out ratelimit, this will probably get permanently removed.
2026-03-01 03:37:57 +00:00
2f3cd0e07f use db levels throughout the package. This is cleaner than converting to db but comparing raw integer values. 2026-03-01 01:08:02 +00:00
d689b3a301 move voicemeetertype(), voicemeeterversion() and samplerate() properties into VbanPacket
add NamedTuples for Levels, Labels and States.

refactor the levels properties

update the math in util.comp()

StripLevel/BusLevel getters updated according to changes in VbanPacketNBS0

remove {VbanCmd}._get_levels(), it's no longer necessary.
2026-03-01 00:25:22 +00:00
9 changed files with 632 additions and 493 deletions

View File

@@ -4,7 +4,7 @@ from typing import Union
from .enums import NBS, BusModes
from .iremote import IRemote
from .meta import bus_mode_prop, channel_bool_prop, channel_label_prop
from .meta import bus_mode_prop, channel_bool_prop, channel_int_prop, channel_label_prop
class Bus(IRemote):
@@ -90,20 +90,9 @@ class BusLevel(IRemote):
def getter(self):
"""Returns a tuple of level values for the channel."""
def fget(i):
return round((((1 << 16) - 1) - i) * -0.01, 1)
if not self._remote.stopped() and self._remote.event.ldirty:
return tuple(
fget(i)
for i in self._remote.cache['bus_level'][self.range[0] : self.range[-1]]
)
return tuple(
fget(i)
for i in self._remote._get_levels(self.public_packets[NBS.zero])[1][
self.range[0] : self.range[-1]
]
)
return self._remote.cache['bus_level'][self.range[0] : self.range[-1]]
return self.public_packets[NBS.zero].levels.bus[self.range[0] : self.range[-1]]
@property
def identifier(self) -> str:
@@ -128,45 +117,49 @@ class BusLevel(IRemote):
def _make_bus_mode_mixin():
"""Creates a mixin of Bus Modes."""
modestates = {
'normal': [0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0],
'amix': [1, 0, 1, 0, 1, 0, 1, 0, 1, 0, 1],
'repeat': [0, 2, 2, 0, 0, 2, 2, 0, 0, 2, 2],
'bmix': [1, 2, 3, 0, 1, 2, 3, 0, 1, 2, 3],
'composite': [0, 0, 0, 4, 4, 4, 4, 0, 0, 0, 0],
'tvmix': [1, 0, 1, 4, 5, 4, 5, 0, 1, 0, 1],
'upmix21': [0, 2, 2, 4, 4, 6, 6, 0, 0, 2, 2],
'upmix41': [1, 2, 3, 4, 5, 6, 7, 0, 1, 2, 3],
'upmix61': [0, 0, 0, 0, 0, 0, 0, 8, 8, 8, 8],
'centeronly': [1, 0, 1, 0, 1, 0, 1, 8, 9, 8, 9],
'lfeonly': [0, 2, 2, 0, 0, 2, 2, 8, 8, 10, 10],
'rearonly': [1, 2, 3, 0, 1, 2, 3, 8, 9, 10, 11],
}
mode_names = [
'normal',
'amix',
'repeat',
'bmix',
'composite',
'tvmix',
'upmix21',
'upmix41',
'upmix61',
'centeronly',
'lfeonly',
'rearonly',
]
def identifier(self) -> str:
return f'bus[{self.index}].mode'
def get(self):
states = [
(
int.from_bytes(
self.public_packets[NBS.zero].busstate[self.index], 'little'
)
& val
)
>> 4
for val in self._modes.modevals
"""Get current bus mode using ChannelState for clean bit extraction."""
mode_cache_items = [
(k, v)
for k, v in self._remote.cache.items()
if k.startswith(f'{self.identifier}.') and v == 1
]
for k, v in modestates.items():
if states == v:
return k
if mode_cache_items:
latest_cached = mode_cache_items[-1][0]
mode_name = latest_cached.split('.')[-1]
return mode_name
bus_state = self.public_packets[NBS.zero].states.bus[self.index]
# Extract bus mode from bits 4-7 (mask 0xF0, shift right by 4)
mode_value = (bus_state._state & 0x000000F0) >> 4
return mode_names[mode_value] if mode_value < len(mode_names) else 'normal'
return type(
'BusModeMixin',
(IRemote,),
{
'identifier': property(identifier),
'modestates': modestates,
**{mode.name: bus_mode_prop(mode.name) for mode in BusModes},
'get': get,
},
@@ -188,7 +181,8 @@ def bus_factory(phys_bus, remote, i) -> Union[PhysicalBus, VirtualBus]:
'eq': BusEQ.make(remote, i),
'levels': BusLevel(remote, i),
'mode': BUSMODEMIXIN_cls(remote, i),
**{param: channel_bool_prop(param) for param in ['mute', 'mono']},
**{param: channel_bool_prop(param) for param in ('mute',)},
**{param: channel_int_prop(param) for param in ('mono',)},
'label': channel_label_prop(),
},
)(remote, i)

View File

@@ -1,7 +1,7 @@
from functools import partial
from .enums import NBS
from .util import cache_bool, cache_float, cache_string
from .enums import NBS, BusModes
from .util import cache_bool, cache_float, cache_int, cache_string
def channel_bool_prop(param):
@@ -11,17 +11,23 @@ def channel_bool_prop(param):
def fget(self):
cmd = self._cmd(param)
self.logger.debug(f'getter: {cmd}')
return (
not int.from_bytes(
getattr(
self.public_packets[NBS.zero],
f'{"strip" if "strip" in type(self).__name__.lower() else "bus"}state',
)[self.index],
'little',
)
& getattr(self._modes, f'_{param.lower()}')
== 0
states = self.public_packets[NBS.zero].states
channel_states = (
states.strip if 'strip' in type(self).__name__.lower() else states.bus
)
channel_state = channel_states[self.index]
if param.lower() == 'mute':
return channel_state.mute
elif param.lower() == 'solo':
return channel_state.solo
elif param.lower() == 'mono':
return channel_state.mono
elif param.lower() == 'mc':
return channel_state.mc
else:
return channel_state.get_mode(getattr(self._modes, f'_{param.lower()}'))
def fset(self, val):
self.setter(param, 1 if val else 0)
@@ -29,18 +35,46 @@ def channel_bool_prop(param):
return property(fget, fset)
def channel_int_prop(param):
"""meta function for channel integer parameters"""
@partial(cache_int, param=param)
def fget(self):
cmd = self._cmd(param)
self.logger.debug(f'getter: {cmd}')
states = self.public_packets[NBS.zero].states
channel_states = (
states.strip if 'strip' in type(self).__name__.lower() else states.bus
)
channel_state = channel_states[self.index]
# Special case: bus mono is an integer (0-2) encoded using bits 2 and 9
if param.lower() == 'mono' and 'bus' in type(self).__name__.lower():
bit_2 = (channel_state._state >> 2) & 1
bit_9 = (channel_state._state >> 9) & 1
return (bit_9 << 1) | bit_2
else:
return channel_state.get_mode_int(getattr(self._modes, f'_{param.lower()}'))
def fset(self, val):
self.setter(param, val)
return property(fget, fset)
def channel_label_prop():
"""meta function for channel label parameters"""
@partial(cache_string, param='label')
def fget(self) -> str:
return getattr(
self.public_packets[NBS.zero],
f'{"strip" if "strip" in type(self).__name__.lower() else "bus"}labels',
)[self.index]
if 'strip' in type(self).__name__.lower():
return self.public_packets[NBS.zero].labels.strip[self.index]
else:
return self.public_packets[NBS.zero].labels.bus[self.index]
def fset(self, val: str):
self.setter('label', str(val))
self.setter('label', f'"{val}"')
return property(fget, fset)
@@ -52,13 +86,10 @@ def strip_output_prop(param):
def fget(self):
cmd = self._cmd(param)
self.logger.debug(f'getter: {cmd}')
return (
not int.from_bytes(
self.public_packets[NBS.zero].stripstate[self.index], 'little'
)
& getattr(self._modes, f'_bus{param.lower()}')
== 0
)
strip_state = self.public_packets[NBS.zero].states.strip[self.index]
return strip_state.get_mode(getattr(self._modes, f'_bus{param.lower()}'))
def fset(self, val):
self.setter(param, 1 if val else 0)
@@ -73,16 +104,15 @@ def bus_mode_prop(param):
def fget(self):
cmd = self._cmd(param)
self.logger.debug(f'getter: {cmd}')
return [
(
int.from_bytes(
self.public_packets[NBS.zero].busstate[self.index], 'little'
)
& val
)
>> 4
for val in self._modes.modevals
] == self.modestates[param]
bus_state = self.public_packets[NBS.zero].states.bus[self.index]
# Extract current bus mode from bits 4-7
current_mode = (bus_state._state & 0x000000F0) >> 4
expected_mode = getattr(BusModes, param.lower())
return current_mode == expected_mode
def fset(self, val):
self.setter(param, 1 if val else 0)

184
vban_cmd/packet/headers.py Normal file
View File

@@ -0,0 +1,184 @@
from dataclasses import dataclass
from vban_cmd.enums import NBS
from vban_cmd.kinds import KindMapClass
VBAN_PROTOCOL_TXT = 0x40
VBAN_PROTOCOL_SERVICE = 0x60
VBAN_SERVICE_RTPACKETREGISTER = 32
VBAN_SERVICE_RTPACKET = 33
VBAN_SERVICE_MASK = 0xE0
MAX_PACKET_SIZE = 1436
HEADER_SIZE = 4 + 1 + 1 + 1 + 1 + 16
@dataclass
class VbanPacket:
"""Represents the header of an incoming VBAN data packet"""
nbs: NBS
_kind: KindMapClass
_voicemeeterType: bytes
_reserved: bytes
_buffersize: bytes
_voicemeeterVersion: bytes
_optionBits: bytes
_samplerate: bytes
@property
def voicemeetertype(self) -> str:
"""returns voicemeeter type as a string"""
return ['', 'basic', 'banana', 'potato'][
int.from_bytes(self._voicemeeterType, 'little')
]
@property
def voicemeeterversion(self) -> tuple:
"""returns voicemeeter version as a tuple"""
return tuple(self._voicemeeterVersion[i] for i in range(3, -1, -1))
@property
def samplerate(self) -> int:
"""returns samplerate as an int"""
return int.from_bytes(self._samplerate, 'little')
@dataclass
class VbanSubscribeHeader:
"""Represents the header of a subscription packet"""
nbs: NBS = NBS.zero
name: str = 'Register-RTP'
timeout: int = 15
@property
def vban(self) -> bytes:
return b'VBAN'
@property
def format_sr(self) -> bytes:
return VBAN_PROTOCOL_SERVICE.to_bytes(1, 'little')
@property
def format_nbs(self) -> bytes:
return (self.nbs.value & 0xFF).to_bytes(1, 'little')
@property
def format_nbc(self) -> bytes:
return VBAN_SERVICE_RTPACKETREGISTER.to_bytes(1, 'little')
@property
def format_bit(self) -> bytes:
return (self.timeout & 0xFF).to_bytes(1, 'little')
@property
def streamname(self) -> bytes:
return self.name.encode('ascii') + bytes(16 - len(self.name))
@classmethod
def to_bytes(cls, nbs: NBS, framecounter: int) -> bytes:
header = cls(nbs=nbs)
data = bytearray()
data.extend(header.vban)
data.extend(header.format_sr)
data.extend(header.format_nbs)
data.extend(header.format_nbc)
data.extend(header.format_bit)
data.extend(header.streamname)
data.extend(framecounter.to_bytes(4, 'little'))
return bytes(data)
@dataclass
class VbanResponseHeader:
"""Represents the header of a response packet"""
name: str = 'Voicemeeter-RTP'
format_sr: int = VBAN_PROTOCOL_SERVICE
format_nbs: int = 0
format_nbc: int = VBAN_SERVICE_RTPACKET
format_bit: int = 0
@property
def vban(self) -> bytes:
return b'VBAN'
@property
def streamname(self) -> bytes:
return self.name.encode('ascii') + bytes(16 - len(self.name))
@classmethod
def from_bytes(cls, data: bytes):
if len(data) < HEADER_SIZE:
raise ValueError('Data is too short to be a valid VbanResponseHeader')
name = data[8:24].rstrip(b'\x00').decode('utf-8')
return cls(
name=name,
format_sr=data[4] & VBAN_SERVICE_MASK,
format_nbs=data[5],
format_nbc=data[6],
format_bit=data[7],
)
@dataclass
class VbanRequestHeader:
"""Represents the header of a request packet"""
name: str
bps_index: int
channel: int
framecounter: int = 0
@property
def vban(self) -> bytes:
return b'VBAN'
@property
def sr(self) -> bytes:
return (VBAN_PROTOCOL_TXT + self.bps_index).to_bytes(1, 'little')
@property
def nbs(self) -> bytes:
return (0).to_bytes(1, 'little')
@property
def nbc(self) -> bytes:
return (self.channel).to_bytes(1, 'little')
@property
def bit(self) -> bytes:
return (0x10).to_bytes(1, 'little')
@property
def streamname(self) -> bytes:
return self.name.encode() + bytes(16 - len(self.name))
@classmethod
def to_bytes(
cls, name: str, bps_index: int, channel: int, framecounter: int
) -> bytes:
header = cls(
name=name, bps_index=bps_index, channel=channel, framecounter=framecounter
)
data = bytearray()
data.extend(header.vban)
data.extend(header.sr)
data.extend(header.nbs)
data.extend(header.nbc)
data.extend(header.bit)
data.extend(header.streamname)
data.extend(header.framecounter.to_bytes(4, 'little'))
return bytes(data)
@classmethod
def encode_with_payload(
cls, name: str, bps_index: int, channel: int, framecounter: int, payload: str
) -> bytes:
"""Creates the complete packet with header and payload."""
return cls.to_bytes(name, bps_index, channel, framecounter) + payload.encode()

288
vban_cmd/packet/nbs0.py Normal file
View File

@@ -0,0 +1,288 @@
from dataclasses import dataclass
from typing import NamedTuple
from vban_cmd.enums import NBS
from vban_cmd.kinds import KindMapClass
from vban_cmd.util import comp
from .headers import VbanPacket
class Levels(NamedTuple):
strip: tuple[float, ...]
bus: tuple[float, ...]
class ChannelState:
"""Represents the processed state of a single strip or bus channel"""
def __init__(self, state_bytes: bytes):
# Convert 4-byte state to integer once for efficient lookups
self._state = int.from_bytes(state_bytes, 'little')
def get_mode(self, mode_value: int) -> bool:
"""Get boolean state for a specific mode"""
return (self._state & mode_value) != 0
def get_mode_int(self, mode_value: int) -> int:
"""Get integer state for a specific mode"""
return self._state & mode_value
# Common boolean modes
@property
def mute(self) -> bool:
return (self._state & 0x00000001) != 0
@property
def solo(self) -> bool:
return (self._state & 0x00000002) != 0
@property
def mono(self) -> bool:
return (self._state & 0x00000004) != 0
@property
def mc(self) -> bool:
return (self._state & 0x00000008) != 0
# EQ modes
@property
def eq_on(self) -> bool:
return (self._state & 0x00000100) != 0
@property
def eq_ab(self) -> bool:
return (self._state & 0x00000800) != 0
# Bus assignments (strip to bus routing)
@property
def busa1(self) -> bool:
return (self._state & 0x00001000) != 0
@property
def busa2(self) -> bool:
return (self._state & 0x00002000) != 0
@property
def busa3(self) -> bool:
return (self._state & 0x00004000) != 0
@property
def busa4(self) -> bool:
return (self._state & 0x00008000) != 0
@property
def busb1(self) -> bool:
return (self._state & 0x00010000) != 0
@property
def busb2(self) -> bool:
return (self._state & 0x00020000) != 0
@property
def busb3(self) -> bool:
return (self._state & 0x00040000) != 0
class States(NamedTuple):
strip: tuple[ChannelState, ...]
bus: tuple[ChannelState, ...]
class Labels(NamedTuple):
strip: tuple[str, ...]
bus: tuple[str, ...]
@dataclass
class VbanPacketNBS0(VbanPacket):
"""Represents the body of a VBAN data packet with ident:0"""
_inputLeveldB100: bytes
_outputLeveldB100: bytes
_TransportBit: bytes
_stripState: bytes
_busState: bytes
_stripGaindB100Layer1: bytes
_stripGaindB100Layer2: bytes
_stripGaindB100Layer3: bytes
_stripGaindB100Layer4: bytes
_stripGaindB100Layer5: bytes
_stripGaindB100Layer6: bytes
_stripGaindB100Layer7: bytes
_stripGaindB100Layer8: bytes
_busGaindB100: bytes
_stripLabelUTF8c60: bytes
_busLabelUTF8c60: bytes
@classmethod
def from_bytes(cls, nbs: NBS, kind: KindMapClass, data: bytes):
return cls(
nbs=nbs,
_kind=kind,
_voicemeeterType=data[28:29],
_reserved=data[29:30],
_buffersize=data[30:32],
_voicemeeterVersion=data[32:36],
_optionBits=data[36:40],
_samplerate=data[40:44],
_inputLeveldB100=data[44:112],
_outputLeveldB100=data[112:240],
_TransportBit=data[240:244],
_stripState=data[244:276],
_busState=data[276:308],
_stripGaindB100Layer1=data[308:324],
_stripGaindB100Layer2=data[324:340],
_stripGaindB100Layer3=data[340:356],
_stripGaindB100Layer4=data[356:372],
_stripGaindB100Layer5=data[372:388],
_stripGaindB100Layer6=data[388:404],
_stripGaindB100Layer7=data[404:420],
_stripGaindB100Layer8=data[420:436],
_busGaindB100=data[436:452],
_stripLabelUTF8c60=data[452:932],
_busLabelUTF8c60=data[932:1412],
)
def pdirty(self, other) -> bool:
"""True iff any defined parameter has changed"""
self_gains = (
self._stripGaindB100Layer1
+ self._stripGaindB100Layer2
+ self._stripGaindB100Layer3
+ self._stripGaindB100Layer4
+ self._stripGaindB100Layer5
+ self._stripGaindB100Layer6
+ self._stripGaindB100Layer7
+ self._stripGaindB100Layer8
)
other_gains = (
other._stripGaindB100Layer1
+ other._stripGaindB100Layer2
+ other._stripGaindB100Layer3
+ other._stripGaindB100Layer4
+ other._stripGaindB100Layer5
+ other._stripGaindB100Layer6
+ other._stripGaindB100Layer7
+ other._stripGaindB100Layer8
)
return (
self._stripState != other._stripState
or self._busState != other._busState
or self_gains != other_gains
or self._busGaindB100 != other._busGaindB100
or self._stripLabelUTF8c60 != other._stripLabelUTF8c60
or self._busLabelUTF8c60 != other._busLabelUTF8c60
)
def ldirty(self, strip_cache, bus_cache) -> bool:
"""True iff any level has changed, ignoring changes when levels are very quiet"""
self._strip_comp, self._bus_comp = (
tuple(not val for val in comp(strip_cache, self.strip_levels)),
tuple(not val for val in comp(bus_cache, self.bus_levels)),
)
return any(self._strip_comp) or any(self._bus_comp)
@property
def strip_levels(self) -> tuple[float, ...]:
"""Returns strip levels in dB"""
return tuple(
round(
int.from_bytes(self._inputLeveldB100[i : i + 2], 'little', signed=True)
* 0.01,
1,
)
for i in range(0, len(self._inputLeveldB100), 2)
)[: self._kind.num_strip_levels]
@property
def bus_levels(self) -> tuple[float, ...]:
"""Returns bus levels in dB"""
return tuple(
round(
int.from_bytes(self._outputLeveldB100[i : i + 2], 'little', signed=True)
* 0.01,
1,
)
for i in range(0, len(self._outputLeveldB100), 2)
)[: self._kind.num_bus_levels]
@property
def levels(self) -> Levels:
"""Returns strip and bus levels as a namedtuple"""
return Levels(strip=self.strip_levels, bus=self.bus_levels)
@property
def states(self) -> States:
"""returns States object with processed strip and bus channel states"""
return States(
strip=tuple(
ChannelState(self._stripState[i : i + 4]) for i in range(0, 32, 4)
),
bus=tuple(ChannelState(self._busState[i : i + 4]) for i in range(0, 32, 4)),
)
@property
def gainlayers(self) -> tuple:
"""returns tuple of all strip gain layers as tuples"""
return tuple(
tuple(
round(
int.from_bytes(
getattr(self, f'_stripGaindB100Layer{layer}')[i : i + 2],
'little',
signed=True,
)
* 0.01,
2,
)
for i in range(0, 16, 2)
)
for layer in range(1, 9)
)
@property
def busgain(self) -> tuple:
"""returns tuple of bus gains"""
return tuple(
round(
int.from_bytes(self._busGaindB100[i : i + 2], 'little', signed=True)
* 0.01,
2,
)
for i in range(0, 16, 2)
)
@property
def labels(self) -> Labels:
"""returns Labels namedtuple of strip and bus labels"""
def _extract_labels_from_bytes(label_bytes: bytes) -> tuple[str, ...]:
"""Extract null-terminated UTF-8 labels from 60-byte chunks"""
labels = []
for i in range(0, len(label_bytes), 60):
chunk = label_bytes[i : i + 60]
null_pos = chunk.find(b'\x00')
if null_pos == -1:
try:
label = chunk.decode('utf-8', errors='replace').rstrip('\x00')
except UnicodeDecodeError:
label = ''
else:
try:
label = (
chunk[:null_pos].decode('utf-8', errors='replace')
if null_pos > 0
else ''
)
except UnicodeDecodeError:
label = ''
labels.append(label)
return tuple(labels)
return Labels(
strip=_extract_labels_from_bytes(self._stripLabelUTF8c60),
bus=_extract_labels_from_bytes(self._busLabelUTF8c60),
)

View File

@@ -2,222 +2,14 @@ import struct
from dataclasses import dataclass
from typing import NamedTuple
from .enums import NBS
from .kinds import KindMapClass
from .util import comp
from vban_cmd.enums import NBS
from vban_cmd.kinds import KindMapClass
VBAN_PROTOCOL_TXT = 0x40
VBAN_PROTOCOL_SERVICE = 0x60
from .headers import VbanPacket
VBAN_SERVICE_RTPACKETREGISTER = 32
VBAN_SERVICE_RTPACKET = 33
VBAN_SERVICE_MASK = 0xE0
MAX_PACKET_SIZE = 1436
HEADER_SIZE = 4 + 1 + 1 + 1 + 1 + 16
VMPARAMSTRIP_SIZE = 174
@dataclass
class VbanRtPacket:
"""Represents the body of a VBAN RT data packet"""
nbs: NBS
_kind: KindMapClass
_voicemeeterType: bytes
_reserved: bytes
_buffersize: bytes
_voicemeeterVersion: bytes
_optionBits: bytes
_samplerate: bytes
@dataclass
class VbanRtPacketNBS0(VbanRtPacket):
"""Represents the body of a VBAN RT data packet with NBS 0"""
_inputLeveldB100: bytes
_outputLeveldB100: bytes
_TransportBit: bytes
_stripState: bytes
_busState: bytes
_stripGaindB100Layer1: bytes
_stripGaindB100Layer2: bytes
_stripGaindB100Layer3: bytes
_stripGaindB100Layer4: bytes
_stripGaindB100Layer5: bytes
_stripGaindB100Layer6: bytes
_stripGaindB100Layer7: bytes
_stripGaindB100Layer8: bytes
_busGaindB100: bytes
_stripLabelUTF8c60: bytes
_busLabelUTF8c60: bytes
@classmethod
def from_bytes(cls, nbs: NBS, kind: KindMapClass, data: bytes):
return cls(
nbs=nbs,
_kind=kind,
_voicemeeterType=data[28:29],
_reserved=data[29:30],
_buffersize=data[30:32],
_voicemeeterVersion=data[32:36],
_optionBits=data[36:40],
_samplerate=data[40:44],
_inputLeveldB100=data[44:112],
_outputLeveldB100=data[112:240],
_TransportBit=data[240:244],
_stripState=data[244:276],
_busState=data[276:308],
_stripGaindB100Layer1=data[308:324],
_stripGaindB100Layer2=data[324:340],
_stripGaindB100Layer3=data[340:356],
_stripGaindB100Layer4=data[356:372],
_stripGaindB100Layer5=data[372:388],
_stripGaindB100Layer6=data[388:404],
_stripGaindB100Layer7=data[404:420],
_stripGaindB100Layer8=data[420:436],
_busGaindB100=data[436:452],
_stripLabelUTF8c60=data[452:932],
_busLabelUTF8c60=data[932:1412],
)
def _generate_levels(self, levelarray) -> tuple:
return tuple(
int.from_bytes(levelarray[i : i + 2], 'little')
for i in range(0, len(levelarray), 2)
)
@property
def strip_levels(self):
return self._generate_levels(self._inputLeveldB100)
@property
def bus_levels(self):
return self._generate_levels(self._outputLeveldB100)
def pdirty(self, other) -> bool:
"""True iff any defined parameter has changed"""
return not (
self._stripState == other._stripState
and self._busState == other._busState
and self._stripGaindB100Layer1 == other._stripGaindB100Layer1
and self._stripGaindB100Layer2 == other._stripGaindB100Layer2
and self._stripGaindB100Layer3 == other._stripGaindB100Layer3
and self._stripGaindB100Layer4 == other._stripGaindB100Layer4
and self._stripGaindB100Layer5 == other._stripGaindB100Layer5
and self._stripGaindB100Layer6 == other._stripGaindB100Layer6
and self._stripGaindB100Layer7 == other._stripGaindB100Layer7
and self._stripGaindB100Layer8 == other._stripGaindB100Layer8
and self._busGaindB100 == other._busGaindB100
and self._stripLabelUTF8c60 == other._stripLabelUTF8c60
and self._busLabelUTF8c60 == other._busLabelUTF8c60
)
def ldirty(self, strip_cache, bus_cache) -> bool:
self._strip_comp, self._bus_comp = (
tuple(not val for val in comp(strip_cache, self.strip_levels)),
tuple(not val for val in comp(bus_cache, self.bus_levels)),
)
return any(any(li) for li in (self._strip_comp, self._bus_comp))
@property
def voicemeetertype(self) -> str:
"""returns voicemeeter type as a string"""
type_ = ('basic', 'banana', 'potato')
return type_[int.from_bytes(self._voicemeeterType, 'little') - 1]
@property
def voicemeeterversion(self) -> tuple:
"""returns voicemeeter version as a tuple"""
return tuple(
reversed(
tuple(
int.from_bytes(self._voicemeeterVersion[i : i + 1], 'little')
for i in range(4)
)
)
)
@property
def samplerate(self) -> int:
"""returns samplerate as an int"""
return int.from_bytes(self._samplerate, 'little')
@property
def inputlevels(self) -> tuple:
"""returns the entire level array across all inputs for a kind"""
return self.strip_levels[0 : self._kind.num_strip_levels]
@property
def outputlevels(self) -> tuple:
"""returns the entire level array across all outputs for a kind"""
return self.bus_levels[0 : self._kind.num_bus_levels]
@property
def stripstate(self) -> tuple:
"""returns tuple of strip states accessable through bit modes"""
return tuple(self._stripState[i : i + 4] for i in range(0, 32, 4))
@property
def busstate(self) -> tuple:
"""returns tuple of bus states accessable through bit modes"""
return tuple(self._busState[i : i + 4] for i in range(0, 32, 4))
"""
these functions return an array of gainlayers[i] across all strips
ie stripgainlayer1 = [strip[0].gainlayer[0], strip[1].gainlayer[0], strip[2].gainlayer[0]...]
"""
@property
def gainlayers(self) -> tuple:
"""returns tuple of all strip gain layers as tuples"""
return tuple(
tuple(
round(
int.from_bytes(
getattr(self, f'_stripGaindB100Layer{layer}')[i : i + 2],
'little',
signed=True,
)
* 0.01,
2,
)
for i in range(0, 16, 2)
)
for layer in range(1, 9)
)
@property
def busgain(self) -> tuple:
"""returns tuple of bus gains"""
return tuple(
round(
int.from_bytes(self._busGaindB100[i : i + 2], 'little', signed=True)
* 0.01,
2,
)
for i in range(0, 16, 2)
)
@property
def striplabels(self) -> tuple:
"""returns tuple of strip labels"""
return tuple(
self._stripLabelUTF8c60[i : i + 60].decode().split('\x00')[0]
for i in range(0, 480, 60)
)
@property
def buslabels(self) -> tuple:
"""returns tuple of bus labels"""
return tuple(
self._busLabelUTF8c60[i : i + 60].decode().split('\x00')[0]
for i in range(0, 480, 60)
)
class Audibility(NamedTuple):
knob: float
comp: float
@@ -535,8 +327,8 @@ class VbanVMParamStrip:
@dataclass
class VbanRtPacketNBS1(VbanRtPacket):
"""Represents the body of a VBAN RT data packet with NBS 1"""
class VbanPacketNBS1(VbanPacket):
"""Represents the body of a VBAN data packet with ident:1"""
strips: tuple[VbanVMParamStrip, ...]
@@ -563,135 +355,3 @@ class VbanRtPacketNBS1(VbanRtPacket):
for i in range(16)
),
)
@dataclass
class SubscribeHeader:
"""Represents the header of an RT subscription packet"""
nbs: NBS = NBS.zero
name: str = 'Register-RTP'
timeout: int = 15
@property
def vban(self) -> bytes:
return b'VBAN'
@property
def format_sr(self) -> bytes:
return VBAN_PROTOCOL_SERVICE.to_bytes(1, 'little')
@property
def format_nbs(self) -> bytes:
return (self.nbs.value & 0xFF).to_bytes(1, 'little')
@property
def format_nbc(self) -> bytes:
return VBAN_SERVICE_RTPACKETREGISTER.to_bytes(1, 'little')
@property
def format_bit(self) -> bytes:
return (self.timeout & 0xFF).to_bytes(1, 'little')
@property
def streamname(self) -> bytes:
return self.name.encode('ascii') + bytes(16 - len(self.name))
@classmethod
def to_bytes(cls, nbs: NBS, framecounter: int) -> bytes:
header = cls(nbs=nbs)
data = bytearray()
data.extend(header.vban)
data.extend(header.format_sr)
data.extend(header.format_nbs)
data.extend(header.format_nbc)
data.extend(header.format_bit)
data.extend(header.streamname)
data.extend(framecounter.to_bytes(4, 'little'))
return bytes(data)
@dataclass
class VbanRtPacketHeader:
"""Represents the header of an RT response packet"""
name: str = 'Voicemeeter-RTP'
format_sr: int = VBAN_PROTOCOL_SERVICE
format_nbs: int = 0
format_nbc: int = VBAN_SERVICE_RTPACKET
format_bit: int = 0
@property
def vban(self) -> bytes:
return b'VBAN'
@property
def streamname(self) -> bytes:
return self.name.encode('ascii') + bytes(16 - len(self.name))
@classmethod
def from_bytes(cls, data: bytes):
if len(data) < HEADER_SIZE:
raise ValueError('Data is too short to be a valid VbanRTPPacketHeader')
name = data[8:24].rstrip(b'\x00').decode('utf-8')
return cls(
name=name,
format_sr=data[4] & VBAN_SERVICE_MASK,
format_nbs=data[5],
format_nbc=data[6],
format_bit=data[7],
)
@dataclass
class RequestHeader:
"""Represents the header of an RT request packet"""
name: str
bps_index: int
channel: int
framecounter: int = 0
@property
def vban(self) -> bytes:
return b'VBAN'
@property
def sr(self) -> bytes:
return (VBAN_PROTOCOL_TXT + self.bps_index).to_bytes(1, 'little')
@property
def nbs(self) -> bytes:
return (0).to_bytes(1, 'little')
@property
def nbc(self) -> bytes:
return (self.channel).to_bytes(1, 'little')
@property
def bit(self) -> bytes:
return (0x10).to_bytes(1, 'little')
@property
def streamname(self) -> bytes:
return self.name.encode() + bytes(16 - len(self.name))
@classmethod
def to_bytes(
cls, name: str, bps_index: int, channel: int, framecounter: int
) -> bytes:
header = cls(
name=name, bps_index=bps_index, channel=channel, framecounter=framecounter
)
data = bytearray()
data.extend(header.vban)
data.extend(header.sr)
data.extend(header.nbs)
data.extend(header.nbc)
data.extend(header.bit)
data.extend(header.streamname)
data.extend(header.framecounter.to_bytes(4, 'little'))
return bytes(data)

View File

@@ -540,22 +540,11 @@ class StripLevel(IRemote):
def getter(self):
"""Returns a tuple of level values for the channel."""
def fget(i):
return round((((1 << 16) - 1) - i) * -0.01, 1)
if not self._remote.stopped() and self._remote.event.ldirty:
return tuple(
fget(i)
for i in self._remote.cache['strip_level'][
return self._remote.cache['strip_level'][self.range[0] : self.range[-1]]
return self.public_packets[NBS.zero].levels.strip[
self.range[0] : self.range[-1]
]
)
return tuple(
fget(i)
for i in self._remote._get_levels(self.public_packets[NBS.zero])[0][
self.range[0] : self.range[-1]
]
)
@property
def identifier(self) -> str:

View File

@@ -15,13 +15,27 @@ def cache_bool(func, param):
return wrapper
def cache_int(func, param):
"""Check cache for an int prop"""
def wrapper(*args, **kwargs):
self, *rem = args
if self._cmd(param) in self._remote.cache:
return self._remote.cache.pop(self._cmd(param))
if self._remote.sync:
self._remote.clear_dirty()
return func(*args, **kwargs)
return wrapper
def cache_string(func, param):
"""Check cache for a string prop"""
def wrapper(*args, **kwargs):
self, *rem = args
if self._cmd(param) in self._remote.cache:
return self._remote.cache.pop(self._cmd(param))
return self._remote.cache.pop(self._cmd(param)).strip('"')
if self._remote.sync:
self._remote.clear_dirty()
return func(*args, **kwargs)
@@ -72,16 +86,18 @@ def script(func):
def comp(t0: tuple, t1: tuple) -> Iterator[bool]:
"""
Generator function, accepts two tuples.
Generator function, accepts two tuples of dB values.
Evaluates equality of each member in both tuples.
Only ignores changes when levels are very quiet (below -72 dB).
"""
for a, b in zip(t0, t1):
if ((1 << 16) - 1) - b <= 7200:
yield a == b
# If both values are very quiet (below -72dB), ignore small changes
if a <= -72.0 and b <= -72.0:
yield a == b # Both quiet, check if they're equal
else:
yield True
yield a != b # At least one has significant level, detect changes
def deep_merge(dict1, dict2):

View File

@@ -5,12 +5,12 @@ import threading
import time
from pathlib import Path
from queue import Queue
from typing import Iterable, Union
from typing import Union
from .enums import NBS
from .error import VBANCMDError
from .event import Event
from .packet import RequestHeader
from .packet.headers import VbanRequestHeader
from .subject import Subject
from .util import bump_framecounter, deep_merge, script
from .worker import Producer, Subscriber, Updater
@@ -120,37 +120,29 @@ class VbanCmd(abc.ABC):
def stopped(self):
return self.stop_event is None or self.stop_event.is_set()
def _set_rt(self, cmd: str, val: Union[str, float]):
"""Sends a string request command over a network."""
req_packet = RequestHeader.to_bytes(
def _send_request(self, payload: str) -> None:
"""Sends a request packet over the network and bumps the framecounter."""
self.sock.sendto(
VbanRequestHeader.encode_with_payload(
name=self.streamname,
bps_index=self.BPS_OPTS.index(self.bps),
channel=self.channel,
framecounter=self._framecounter,
)
self.sock.sendto(
req_packet + f'{cmd}={val};'.encode(),
payload=payload,
),
(socket.gethostbyname(self.ip), self.port),
)
self._framecounter = bump_framecounter(self._framecounter)
def _set_rt(self, cmd: str, val: Union[str, float]):
"""Sends a string request command over a network."""
self._send_request(f'{cmd}={val};')
self.cache[cmd] = val
@script
def sendtext(self, script):
"""Sends a multiple parameter string over a network."""
req_packet = RequestHeader.to_bytes(
name=self.streamname,
bps_index=self.BPS_OPTS.index(self.bps),
channel=self.channel,
framecounter=self._framecounter,
)
self.sock.sendto(
req_packet + script.encode(),
(socket.gethostbyname(self.ip), self.port),
)
self._framecounter = bump_framecounter(self._framecounter)
self._send_request(script)
self.logger.debug(f'sendtext: {script}')
time.sleep(self.DELAY)
@@ -184,17 +176,6 @@ class VbanCmd(abc.ABC):
while self.pdirty:
time.sleep(self.DELAY)
def _get_levels(self, packet) -> Iterable:
"""
returns both level arrays (strip_levels, bus_levels) BEFORE math conversion
strip levels in PREFADER mode.
"""
return (
packet.inputlevels,
packet.outputlevels,
)
def apply(self, data: dict):
"""
Sets all parameters of a dict

View File

@@ -5,16 +5,16 @@ import time
from .enums import NBS
from .error import VBANCMDConnectionError
from .packet import (
from .packet.headers import (
HEADER_SIZE,
VBAN_PROTOCOL_SERVICE,
VBAN_SERVICE_RTPACKET,
SubscribeHeader,
VbanRtPacket,
VbanRtPacketHeader,
VbanRtPacketNBS0,
VbanRtPacketNBS1,
VbanPacket,
VbanResponseHeader,
VbanSubscribeHeader,
)
from .packet.nbs0 import VbanPacketNBS0
from .packet.nbs1 import VbanPacketNBS1
from .util import bump_framecounter
logger = logging.getLogger(__name__)
@@ -34,7 +34,7 @@ class Subscriber(threading.Thread):
while not self.stopped():
try:
for nbs in NBS:
sub_packet = SubscribeHeader().to_bytes(nbs, self._framecounter)
sub_packet = VbanSubscribeHeader().to_bytes(nbs, self._framecounter)
self._remote.sock.sendto(
sub_packet, (self._remote.ip, self._remote.port)
)
@@ -75,22 +75,22 @@ class Producer(threading.Thread):
(
self._remote.cache['strip_level'],
self._remote.cache['bus_level'],
) = self._remote._get_levels(self._remote.public_packets[NBS.zero])
) = self._remote.public_packets[NBS.zero].levels
def _get_rt(self) -> VbanRtPacket:
def _get_rt(self) -> VbanPacket:
"""Attempt to fetch data packet until a valid one found"""
while True:
if resp := self._fetch_rt_packet():
return resp
def _fetch_rt_packet(self) -> VbanRtPacket | None:
def _fetch_rt_packet(self) -> VbanPacket | None:
try:
data, _ = self._remote.sock.recvfrom(2048)
if len(data) < HEADER_SIZE:
return
response_header = VbanRtPacketHeader.from_bytes(data[:HEADER_SIZE])
response_header = VbanResponseHeader.from_bytes(data[:HEADER_SIZE])
if (
response_header.format_sr != VBAN_PROTOCOL_SERVICE
or response_header.format_nbc != VBAN_SERVICE_RTPACKET
@@ -99,12 +99,12 @@ class Producer(threading.Thread):
match response_header.format_nbs:
case NBS.zero:
return VbanRtPacketNBS0.from_bytes(
return VbanPacketNBS0.from_bytes(
nbs=NBS.zero, kind=self._remote.kind, data=data
)
case NBS.one:
return VbanRtPacketNBS1.from_bytes(
return VbanPacketNBS1.from_bytes(
nbs=NBS.one, kind=self._remote.kind, data=data
)
return None
@@ -140,7 +140,7 @@ class Producer(threading.Thread):
self.queue.put('pdirty')
if self._remote.event.ldirty:
self.queue.put('ldirty')
time.sleep(self._remote.ratelimit)
# time.sleep(self._remote.ratelimit)
self.logger.debug(f'terminating {self.name} thread')
self.queue.put(None)
@@ -177,9 +177,6 @@ class Updater(threading.Thread):
(
self._remote.cache['strip_level'],
self._remote.cache['bus_level'],
) = (
self._remote._public_packets[NBS.zero].inputlevels,
self._remote._public_packets[NBS.zero].outputlevels,
)
) = self._remote.public_packets[NBS.zero].levels
self._remote.subject.notify(event)
self.logger.debug(f'terminating {self.name} thread')