add ChannelState interface, use it in the meta functions.

reword busmodes bitwise logic.

comment out ratelimit, this will probably get permanently removed.
This commit is contained in:
onyx-and-iris 2026-03-01 03:37:57 +00:00
parent 2f3cd0e07f
commit ff5ac193c8
5 changed files with 191 additions and 96 deletions

View File

@ -117,45 +117,49 @@ class BusLevel(IRemote):
def _make_bus_mode_mixin():
"""Creates a mixin of Bus Modes."""
modestates = {
'normal': [0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0],
'amix': [1, 0, 1, 0, 1, 0, 1, 0, 1, 0, 1],
'repeat': [0, 2, 2, 0, 0, 2, 2, 0, 0, 2, 2],
'bmix': [1, 2, 3, 0, 1, 2, 3, 0, 1, 2, 3],
'composite': [0, 0, 0, 4, 4, 4, 4, 0, 0, 0, 0],
'tvmix': [1, 0, 1, 4, 5, 4, 5, 0, 1, 0, 1],
'upmix21': [0, 2, 2, 4, 4, 6, 6, 0, 0, 2, 2],
'upmix41': [1, 2, 3, 4, 5, 6, 7, 0, 1, 2, 3],
'upmix61': [0, 0, 0, 0, 0, 0, 0, 8, 8, 8, 8],
'centeronly': [1, 0, 1, 0, 1, 0, 1, 8, 9, 8, 9],
'lfeonly': [0, 2, 2, 0, 0, 2, 2, 8, 8, 10, 10],
'rearonly': [1, 2, 3, 0, 1, 2, 3, 8, 9, 10, 11],
}
mode_names = [
'normal',
'amix',
'repeat',
'bmix',
'composite',
'tvmix',
'upmix21',
'upmix41',
'upmix61',
'centeronly',
'lfeonly',
'rearonly',
]
def identifier(self) -> str:
return f'bus[{self.index}].mode'
def get(self):
states = [
(
int.from_bytes(
self.public_packets[NBS.zero].busstate[self.index], 'little'
)
& val
)
>> 4
for val in self._modes.modevals
"""Get current bus mode using ChannelState for clean bit extraction."""
mode_cache_items = [
(k, v)
for k, v in self._remote.cache.items()
if k.startswith(f'{self.identifier}.') and v == 1
]
for k, v in modestates.items():
if states == v:
return k
if mode_cache_items:
latest_cached = mode_cache_items[-1][0]
mode_name = latest_cached.split('.')[-1]
return mode_name
bus_state = self.public_packets[NBS.zero].states.bus[self.index]
# Extract bus mode from bits 4-7 (mask 0xF0, shift right by 4)
mode_value = (bus_state._state & 0x000000F0) >> 4
return mode_names[mode_value] if mode_value < len(mode_names) else 'normal'
return type(
'BusModeMixin',
(IRemote,),
{
'identifier': property(identifier),
'modestates': modestates,
**{mode.name: bus_mode_prop(mode.name) for mode in BusModes},
'get': get,
},

View File

@ -1,6 +1,6 @@
from functools import partial
from .enums import NBS
from .enums import NBS, BusModes
from .util import cache_bool, cache_float, cache_int, cache_string
@ -11,17 +11,23 @@ def channel_bool_prop(param):
def fget(self):
cmd = self._cmd(param)
self.logger.debug(f'getter: {cmd}')
return (
not int.from_bytes(
getattr(
self.public_packets[NBS.zero],
f'{"strip" if "strip" in type(self).__name__.lower() else "bus"}state',
)[self.index],
'little',
)
& getattr(self._modes, f'_{param.lower()}')
== 0
states = self.public_packets[NBS.zero].states
channel_states = (
states.strip if 'strip' in type(self).__name__.lower() else states.bus
)
channel_state = channel_states[self.index]
if param.lower() == 'mute':
return channel_state.mute
elif param.lower() == 'solo':
return channel_state.solo
elif param.lower() == 'mono':
return channel_state.mono
elif param.lower() == 'mc':
return channel_state.mc
else:
return channel_state.get_mode(getattr(self._modes, f'_{param.lower()}'))
def fset(self, val):
self.setter(param, 1 if val else 0)
@ -36,13 +42,20 @@ def channel_int_prop(param):
def fget(self):
cmd = self._cmd(param)
self.logger.debug(f'getter: {cmd}')
return int.from_bytes(
getattr(
self.public_packets[NBS.zero],
f'{"strip" if "strip" in type(self).__name__.lower() else "bus"}state',
)[self.index],
'little',
) & getattr(self._modes, f'_{param.lower()}')
states = self.public_packets[NBS.zero].states
channel_states = (
states.strip if 'strip' in type(self).__name__.lower() else states.bus
)
channel_state = channel_states[self.index]
# Special case: bus mono is an integer (0-2) encoded using bits 2 and 9
if param.lower() == 'mono' and 'bus' in type(self).__name__.lower():
bit_2 = (channel_state._state >> 2) & 1
bit_9 = (channel_state._state >> 9) & 1
return (bit_9 << 1) | bit_2
else:
return channel_state.get_mode_int(getattr(self._modes, f'_{param.lower()}'))
def fset(self, val):
self.setter(param, val)
@ -73,13 +86,10 @@ def strip_output_prop(param):
def fget(self):
cmd = self._cmd(param)
self.logger.debug(f'getter: {cmd}')
return (
not int.from_bytes(
self.public_packets[NBS.zero].stripstate[self.index], 'little'
)
& getattr(self._modes, f'_bus{param.lower()}')
== 0
)
strip_state = self.public_packets[NBS.zero].states.strip[self.index]
return strip_state.get_mode(getattr(self._modes, f'_bus{param.lower()}'))
def fset(self, val):
self.setter(param, 1 if val else 0)
@ -94,16 +104,15 @@ def bus_mode_prop(param):
def fget(self):
cmd = self._cmd(param)
self.logger.debug(f'getter: {cmd}')
return [
(
int.from_bytes(
self.public_packets[NBS.zero].busstate[self.index], 'little'
)
& val
)
>> 4
for val in self._modes.modevals
] == self.modestates[param]
bus_state = self.public_packets[NBS.zero].states.bus[self.index]
# Extract current bus mode from bits 4-7
current_mode = (bus_state._state & 0x000000F0) >> 4
expected_mode = getattr(BusModes, param.lower())
return current_mode == expected_mode
def fset(self, val):
self.setter(param, 1 if val else 0)

View File

@ -54,14 +54,85 @@ class Levels(NamedTuple):
bus: tuple[float, ...]
class Labels(NamedTuple):
strip: tuple[str, ...]
bus: tuple[str, ...]
class ChannelState:
"""Represents the processed state of a single strip or bus channel"""
def __init__(self, state_bytes: bytes):
# Convert 4-byte state to integer once for efficient lookups
self._state = int.from_bytes(state_bytes, 'little')
def get_mode(self, mode_value: int) -> bool:
"""Get boolean state for a specific mode"""
return (self._state & mode_value) != 0
def get_mode_int(self, mode_value: int) -> int:
"""Get integer state for a specific mode"""
return self._state & mode_value
# Common boolean modes
@property
def mute(self) -> bool:
return (self._state & 0x00000001) != 0
@property
def solo(self) -> bool:
return (self._state & 0x00000002) != 0
@property
def mono(self) -> bool:
return (self._state & 0x00000004) != 0
@property
def mc(self) -> bool:
return (self._state & 0x00000008) != 0
# EQ modes
@property
def eq_on(self) -> bool:
return (self._state & 0x00000100) != 0
@property
def eq_ab(self) -> bool:
return (self._state & 0x00000800) != 0
# Bus assignments (strip to bus routing)
@property
def busa1(self) -> bool:
return (self._state & 0x00001000) != 0
@property
def busa2(self) -> bool:
return (self._state & 0x00002000) != 0
@property
def busa3(self) -> bool:
return (self._state & 0x00004000) != 0
@property
def busa4(self) -> bool:
return (self._state & 0x00008000) != 0
@property
def busb1(self) -> bool:
return (self._state & 0x00010000) != 0
@property
def busb2(self) -> bool:
return (self._state & 0x00020000) != 0
@property
def busb3(self) -> bool:
return (self._state & 0x00040000) != 0
class States(NamedTuple):
strip: tuple[bytes, ...]
bus: tuple[bytes, ...]
strip: tuple[ChannelState, ...]
bus: tuple[ChannelState, ...]
class Labels(NamedTuple):
strip: tuple[str, ...]
bus: tuple[str, ...]
@dataclass
@ -117,20 +188,34 @@ class VbanPacketNBS0(VbanPacket):
def pdirty(self, other) -> bool:
"""True iff any defined parameter has changed"""
return not (
self._stripState == other._stripState
and self._busState == other._busState
and self._stripGaindB100Layer1 == other._stripGaindB100Layer1
and self._stripGaindB100Layer2 == other._stripGaindB100Layer2
and self._stripGaindB100Layer3 == other._stripGaindB100Layer3
and self._stripGaindB100Layer4 == other._stripGaindB100Layer4
and self._stripGaindB100Layer5 == other._stripGaindB100Layer5
and self._stripGaindB100Layer6 == other._stripGaindB100Layer6
and self._stripGaindB100Layer7 == other._stripGaindB100Layer7
and self._stripGaindB100Layer8 == other._stripGaindB100Layer8
and self._busGaindB100 == other._busGaindB100
and self._stripLabelUTF8c60 == other._stripLabelUTF8c60
and self._busLabelUTF8c60 == other._busLabelUTF8c60
self_gains = (
self._stripGaindB100Layer1
+ self._stripGaindB100Layer2
+ self._stripGaindB100Layer3
+ self._stripGaindB100Layer4
+ self._stripGaindB100Layer5
+ self._stripGaindB100Layer6
+ self._stripGaindB100Layer7
+ self._stripGaindB100Layer8
)
other_gains = (
other._stripGaindB100Layer1
+ other._stripGaindB100Layer2
+ other._stripGaindB100Layer3
+ other._stripGaindB100Layer4
+ other._stripGaindB100Layer5
+ other._stripGaindB100Layer6
+ other._stripGaindB100Layer7
+ other._stripGaindB100Layer8
)
return (
self._stripState != other._stripState
or self._busState != other._busState
or self_gains != other_gains
or self._busGaindB100 != other._busGaindB100
or self._stripLabelUTF8c60 != other._stripLabelUTF8c60
or self._busLabelUTF8c60 != other._busLabelUTF8c60
)
def ldirty(self, strip_cache, bus_cache) -> bool:
@ -171,19 +256,14 @@ class VbanPacketNBS0(VbanPacket):
return Levels(strip=self.strip_levels, bus=self.bus_levels)
@property
def stripstate(self) -> tuple:
"""returns tuple of strip states accessable through bit modes"""
return tuple(self._stripState[i : i + 4] for i in range(0, 32, 4))
@property
def busstate(self) -> tuple:
"""returns tuple of bus states accessable through bit modes"""
return tuple(self._busState[i : i + 4] for i in range(0, 32, 4))
"""
these functions return an array of gainlayers[i] across all strips
ie stripgainlayer1 = [strip[0].gainlayer[0], strip[1].gainlayer[0], strip[2].gainlayer[0]...]
"""
def states(self) -> States:
"""returns States object with processed strip and bus channel states"""
return States(
strip=tuple(
ChannelState(self._stripState[i : i + 4]) for i in range(0, 32, 4)
),
bus=tuple(ChannelState(self._busState[i : i + 4]) for i in range(0, 32, 4)),
)
@property
def gainlayers(self) -> tuple:

View File

@ -95,7 +95,9 @@ def comp(t0: tuple, t1: tuple) -> Iterator[bool]:
for a, b in zip(t0, t1):
# If both values are very quiet (below -72dB), ignore small changes
if a <= -72.0 and b <= -72.0:
yield a == b # At least one has significant level, detect changes
yield a == b # Both quiet, check if they're equal
else:
yield a != b # At least one has significant level, detect changes
def deep_merge(dict1, dict2):

View File

@ -140,7 +140,7 @@ class Producer(threading.Thread):
self.queue.put('pdirty')
if self._remote.event.ldirty:
self.queue.put('ldirty')
time.sleep(self._remote.ratelimit)
# time.sleep(self._remote.ratelimit)
self.logger.debug(f'terminating {self.name} thread')
self.queue.put(None)