diff --git a/vban_cmd/bus.py b/vban_cmd/bus.py index e9b030c..15464ad 100644 --- a/vban_cmd/bus.py +++ b/vban_cmd/bus.py @@ -4,7 +4,7 @@ from typing import Union from .enums import NBS, BusModes from .iremote import IRemote -from .meta import bus_mode_prop, channel_bool_prop, channel_label_prop +from .meta import bus_mode_prop, channel_bool_prop, channel_int_prop, channel_label_prop class Bus(IRemote): @@ -90,20 +90,9 @@ class BusLevel(IRemote): def getter(self): """Returns a tuple of level values for the channel.""" - def fget(i): - return round((((1 << 16) - 1) - i) * -0.01, 1) - if not self._remote.stopped() and self._remote.event.ldirty: - return tuple( - fget(i) - for i in self._remote.cache['bus_level'][self.range[0] : self.range[-1]] - ) - return tuple( - fget(i) - for i in self._remote._get_levels(self.public_packets[NBS.zero])[1][ - self.range[0] : self.range[-1] - ] - ) + return self._remote.cache['bus_level'][self.range[0] : self.range[-1]] + return self.public_packets[NBS.zero].levels.bus[self.range[0] : self.range[-1]] @property def identifier(self) -> str: @@ -188,7 +177,8 @@ def bus_factory(phys_bus, remote, i) -> Union[PhysicalBus, VirtualBus]: 'eq': BusEQ.make(remote, i), 'levels': BusLevel(remote, i), 'mode': BUSMODEMIXIN_cls(remote, i), - **{param: channel_bool_prop(param) for param in ['mute', 'mono']}, + **{param: channel_bool_prop(param) for param in ('mute',)}, + **{param: channel_int_prop(param) for param in ('mono',)}, 'label': channel_label_prop(), }, )(remote, i) diff --git a/vban_cmd/meta.py b/vban_cmd/meta.py index 4d4aa64..730684a 100644 --- a/vban_cmd/meta.py +++ b/vban_cmd/meta.py @@ -1,7 +1,7 @@ from functools import partial from .enums import NBS -from .util import cache_bool, cache_float, cache_string +from .util import cache_bool, cache_float, cache_int, cache_string def channel_bool_prop(param): @@ -29,18 +29,39 @@ def channel_bool_prop(param): return property(fget, fset) +def channel_int_prop(param): + """meta function for channel integer parameters""" + + @partial(cache_int, param=param) + def fget(self): + cmd = self._cmd(param) + self.logger.debug(f'getter: {cmd}') + return int.from_bytes( + getattr( + self.public_packets[NBS.zero], + f'{"strip" if "strip" in type(self).__name__.lower() else "bus"}state', + )[self.index], + 'little', + ) & getattr(self._modes, f'_{param.lower()}') + + def fset(self, val): + self.setter(param, val) + + return property(fget, fset) + + def channel_label_prop(): """meta function for channel label parameters""" @partial(cache_string, param='label') def fget(self) -> str: - return getattr( - self.public_packets[NBS.zero], - f'{"strip" if "strip" in type(self).__name__.lower() else "bus"}labels', - )[self.index] + if 'strip' in type(self).__name__.lower(): + return self.public_packets[NBS.zero].labels.strip[self.index] + else: + return self.public_packets[NBS.zero].labels.bus[self.index] def fset(self, val: str): - self.setter('label', str(val)) + self.setter('label', f'"{val}"') return property(fget, fset) diff --git a/vban_cmd/packet.py b/vban_cmd/packet.py index 304f2e7..70baeeb 100644 --- a/vban_cmd/packet.py +++ b/vban_cmd/packet.py @@ -19,8 +19,8 @@ VMPARAMSTRIP_SIZE = 174 @dataclass -class VbanRtPacket: - """Represents the body of a VBAN RT data packet""" +class VbanPacket: + """Represents the header of a VBAN data packet""" nbs: NBS _kind: KindMapClass @@ -31,10 +31,42 @@ class VbanRtPacket: _optionBits: bytes _samplerate: bytes + @property + def voicemeetertype(self) -> str: + """returns voicemeeter type as a string""" + return ['', 'basic', 'banana', 'potato'][ + int.from_bytes(self._voicemeeterType, 'little') + ] + + @property + def voicemeeterversion(self) -> tuple: + """returns voicemeeter version as a tuple""" + return tuple(self._voicemeeterVersion[i] for i in range(3, -1, -1)) + + @property + def samplerate(self) -> int: + """returns samplerate as an int""" + return int.from_bytes(self._samplerate, 'little') + + +class Levels(NamedTuple): + strip: tuple[float, ...] + bus: tuple[float, ...] + + +class Labels(NamedTuple): + strip: tuple[str, ...] + bus: tuple[str, ...] + + +class States(NamedTuple): + strip: tuple[bytes, ...] + bus: tuple[bytes, ...] + @dataclass -class VbanRtPacketNBS0(VbanRtPacket): - """Represents the body of a VBAN RT data packet with NBS 0""" +class VbanPacketNBS0(VbanPacket): + """Represents the body of a VBAN data packet with ident:0""" _inputLeveldB100: bytes _outputLeveldB100: bytes @@ -82,20 +114,6 @@ class VbanRtPacketNBS0(VbanRtPacket): _busLabelUTF8c60=data[932:1412], ) - def _generate_levels(self, levelarray) -> tuple: - return tuple( - int.from_bytes(levelarray[i : i + 2], 'little') - for i in range(0, len(levelarray), 2) - ) - - @property - def strip_levels(self): - return self._generate_levels(self._inputLeveldB100) - - @property - def bus_levels(self): - return self._generate_levels(self._outputLeveldB100) - def pdirty(self, other) -> bool: """True iff any defined parameter has changed""" @@ -123,37 +141,34 @@ class VbanRtPacketNBS0(VbanRtPacket): return any(any(li) for li in (self._strip_comp, self._bus_comp)) @property - def voicemeetertype(self) -> str: - """returns voicemeeter type as a string""" - type_ = ('basic', 'banana', 'potato') - return type_[int.from_bytes(self._voicemeeterType, 'little') - 1] - - @property - def voicemeeterversion(self) -> tuple: - """returns voicemeeter version as a tuple""" + def strip_levels(self) -> tuple[int, ...]: + """Returns raw integer strip levels""" return tuple( - reversed( - tuple( - int.from_bytes(self._voicemeeterVersion[i : i + 1], 'little') - for i in range(4) - ) - ) + int.from_bytes(self._inputLeveldB100[i : i + 2], 'little') + for i in range(0, len(self._inputLeveldB100), 2) ) @property - def samplerate(self) -> int: - """returns samplerate as an int""" - return int.from_bytes(self._samplerate, 'little') + def bus_levels(self) -> tuple[int, ...]: + """Returns raw integer bus levels""" + return tuple( + int.from_bytes(self._outputLeveldB100[i : i + 2], 'little') + for i in range(0, len(self._outputLeveldB100), 2) + ) @property - def inputlevels(self) -> tuple: - """returns the entire level array across all inputs for a kind""" - return self.strip_levels[0 : self._kind.num_strip_levels] + def levels(self) -> Levels: + """Returns strip and bus levels converted to dB""" - @property - def outputlevels(self) -> tuple: - """returns the entire level array across all outputs for a kind""" - return self.bus_levels[0 : self._kind.num_bus_levels] + def to_db(raw_levels: tuple[int, ...]) -> tuple[float, ...]: + return tuple( + round((((1 << 16) - 1) - level) * -0.01, 1) for level in raw_levels + ) + + return Levels( + strip=to_db(self.strip_levels)[: self._kind.num_strip_levels], + bus=to_db(self.bus_levels)[: self._kind.num_bus_levels], + ) @property def stripstate(self) -> tuple: @@ -202,19 +217,35 @@ class VbanRtPacketNBS0(VbanRtPacket): ) @property - def striplabels(self) -> tuple: - """returns tuple of strip labels""" - return tuple( - self._stripLabelUTF8c60[i : i + 60].decode().split('\x00')[0] - for i in range(0, 480, 60) - ) + def labels(self) -> Labels: + """returns Labels namedtuple of strip and bus labels""" - @property - def buslabels(self) -> tuple: - """returns tuple of bus labels""" - return tuple( - self._busLabelUTF8c60[i : i + 60].decode().split('\x00')[0] - for i in range(0, 480, 60) + def _extract_labels_from_bytes(label_bytes: bytes) -> tuple[str, ...]: + """Extract null-terminated UTF-8 labels from 60-byte chunks""" + labels = [] + for i in range(0, len(label_bytes), 60): + chunk = label_bytes[i : i + 60] + null_pos = chunk.find(b'\x00') + if null_pos == -1: + try: + label = chunk.decode('utf-8', errors='replace').rstrip('\x00') + except UnicodeDecodeError: + label = '' + else: + try: + label = ( + chunk[:null_pos].decode('utf-8', errors='replace') + if null_pos > 0 + else '' + ) + except UnicodeDecodeError: + label = '' + labels.append(label) + return tuple(labels) + + return Labels( + strip=_extract_labels_from_bytes(self._stripLabelUTF8c60), + bus=_extract_labels_from_bytes(self._busLabelUTF8c60), ) @@ -535,8 +566,8 @@ class VbanVMParamStrip: @dataclass -class VbanRtPacketNBS1(VbanRtPacket): - """Represents the body of a VBAN RT data packet with NBS 1""" +class VbanPacketNBS1(VbanPacket): + """Represents the body of a VBAN data packet with ident:1""" strips: tuple[VbanVMParamStrip, ...] diff --git a/vban_cmd/strip.py b/vban_cmd/strip.py index 57cfd92..9cb7e82 100644 --- a/vban_cmd/strip.py +++ b/vban_cmd/strip.py @@ -540,22 +540,11 @@ class StripLevel(IRemote): def getter(self): """Returns a tuple of level values for the channel.""" - def fget(i): - return round((((1 << 16) - 1) - i) * -0.01, 1) - if not self._remote.stopped() and self._remote.event.ldirty: - return tuple( - fget(i) - for i in self._remote.cache['strip_level'][ - self.range[0] : self.range[-1] - ] - ) - return tuple( - fget(i) - for i in self._remote._get_levels(self.public_packets[NBS.zero])[0][ - self.range[0] : self.range[-1] - ] - ) + return self._remote.cache['strip_level'][self.range[0] : self.range[-1]] + return self.public_packets[NBS.zero].levels.strip[ + self.range[0] : self.range[-1] + ] @property def identifier(self) -> str: diff --git a/vban_cmd/util.py b/vban_cmd/util.py index 9af9f4e..576eb58 100644 --- a/vban_cmd/util.py +++ b/vban_cmd/util.py @@ -15,13 +15,27 @@ def cache_bool(func, param): return wrapper +def cache_int(func, param): + """Check cache for an int prop""" + + def wrapper(*args, **kwargs): + self, *rem = args + if self._cmd(param) in self._remote.cache: + return self._remote.cache.pop(self._cmd(param)) + if self._remote.sync: + self._remote.clear_dirty() + return func(*args, **kwargs) + + return wrapper + + def cache_string(func, param): """Check cache for a string prop""" def wrapper(*args, **kwargs): self, *rem = args if self._cmd(param) in self._remote.cache: - return self._remote.cache.pop(self._cmd(param)) + return self._remote.cache.pop(self._cmd(param)).strip('"') if self._remote.sync: self._remote.clear_dirty() return func(*args, **kwargs) @@ -75,13 +89,19 @@ def comp(t0: tuple, t1: tuple) -> Iterator[bool]: Generator function, accepts two tuples. Evaluates equality of each member in both tuples. + Only ignores changes when levels are very quiet (below noise floor). """ for a, b in zip(t0, t1): - if ((1 << 16) - 1) - b <= 7200: - yield a == b + # Convert to dB-equivalent: higher raw values = quieter audio + a_db_equiv = ((1 << 16) - 1) - a + b_db_equiv = ((1 << 16) - 1) - b + + # If both values are very quiet (> -72dB equivalent), ignore small changes + if a_db_equiv > 7200 and b_db_equiv > 7200: + yield True # Both very quiet, ignore changes else: - yield True + yield a == b # At least one has significant level, detect changes def deep_merge(dict1, dict2): diff --git a/vban_cmd/vbancmd.py b/vban_cmd/vbancmd.py index e963459..9f59dac 100644 --- a/vban_cmd/vbancmd.py +++ b/vban_cmd/vbancmd.py @@ -5,7 +5,7 @@ import threading import time from pathlib import Path from queue import Queue -from typing import Iterable, Union +from typing import Union from .enums import NBS from .error import VBANCMDError @@ -184,17 +184,6 @@ class VbanCmd(abc.ABC): while self.pdirty: time.sleep(self.DELAY) - def _get_levels(self, packet) -> Iterable: - """ - returns both level arrays (strip_levels, bus_levels) BEFORE math conversion - - strip levels in PREFADER mode. - """ - return ( - packet.inputlevels, - packet.outputlevels, - ) - def apply(self, data: dict): """ Sets all parameters of a dict diff --git a/vban_cmd/worker.py b/vban_cmd/worker.py index 606bde3..0adf872 100644 --- a/vban_cmd/worker.py +++ b/vban_cmd/worker.py @@ -10,10 +10,10 @@ from .packet import ( VBAN_PROTOCOL_SERVICE, VBAN_SERVICE_RTPACKET, SubscribeHeader, - VbanRtPacket, + VbanPacket, + VbanPacketNBS0, + VbanPacketNBS1, VbanRtPacketHeader, - VbanRtPacketNBS0, - VbanRtPacketNBS1, ) from .util import bump_framecounter @@ -75,16 +75,16 @@ class Producer(threading.Thread): ( self._remote.cache['strip_level'], self._remote.cache['bus_level'], - ) = self._remote._get_levels(self._remote.public_packets[NBS.zero]) + ) = self._remote.public_packets[NBS.zero].levels - def _get_rt(self) -> VbanRtPacket: + def _get_rt(self) -> VbanPacket: """Attempt to fetch data packet until a valid one found""" while True: if resp := self._fetch_rt_packet(): return resp - def _fetch_rt_packet(self) -> VbanRtPacket | None: + def _fetch_rt_packet(self) -> VbanPacket | None: try: data, _ = self._remote.sock.recvfrom(2048) if len(data) < HEADER_SIZE: @@ -99,12 +99,12 @@ class Producer(threading.Thread): match response_header.format_nbs: case NBS.zero: - return VbanRtPacketNBS0.from_bytes( + return VbanPacketNBS0.from_bytes( nbs=NBS.zero, kind=self._remote.kind, data=data ) case NBS.one: - return VbanRtPacketNBS1.from_bytes( + return VbanPacketNBS1.from_bytes( nbs=NBS.one, kind=self._remote.kind, data=data ) return None @@ -177,9 +177,6 @@ class Updater(threading.Thread): ( self._remote.cache['strip_level'], self._remote.cache['bus_level'], - ) = ( - self._remote._public_packets[NBS.zero].inputlevels, - self._remote._public_packets[NBS.zero].outputlevels, - ) + ) = self._remote.public_packets[NBS.zero].levels self._remote.subject.notify(event) self.logger.debug(f'terminating {self.name} thread')