mirror of
https://github.com/onyx-and-iris/vban-cmd-python.git
synced 2026-03-02 16:29:11 +00:00
Compare commits
14 Commits
| Author | SHA1 | Date | |
|---|---|---|---|
| 23b99cb66b | |||
| 2fd7b8ad8b | |||
| c851cb5abe | |||
| dc681f50d0 | |||
| a0ec00652b | |||
| 69263c22f2 | |||
| ad2cfeaae6 | |||
| 1123fe6432 | |||
| 3c3e415d7e | |||
| 8cfeb45fcb | |||
| 10b38b3fcc | |||
| ff5ac193c8 | |||
| 2f3cd0e07f | |||
| d689b3a301 |
18
CHANGELOG.md
18
CHANGELOG.md
@ -11,6 +11,24 @@ Before any major/minor/patch bump all unit tests will be run to verify they pass
|
||||
|
||||
- [x]
|
||||
|
||||
## [2.7.0] - 2026-03-01
|
||||
|
||||
### Added
|
||||
|
||||
- new kind `matrix` has been added, it does two things:
|
||||
- scales the interface according to `potato` kind, in practice this has no affect but it's required by the builder classes.
|
||||
- disables the rt listener threads since we aren't expecting to receive any from a Matrix VBAN server.
|
||||
- however, matrix responses may still be received with the {VbanCmd}.sendtext() method.
|
||||
|
||||
### Changed
|
||||
|
||||
- `outbound` kwarg has been renamed to `disable_rt_listeners`. Since it's job is to disable the listener threads for incoming RT packets this new name is more descriptive.
|
||||
- dataclasses representing packet headers and packets with ident:0 and ident:1 have been moved into an internal packet module.
|
||||
|
||||
### Removed
|
||||
|
||||
- {VbanCmd}.sendtext() @script decorator removed. It's purpose was to attempt to convert a dictionary to a script but it was poorly implemented and there exists the {VbanCmd}.apply() method already.
|
||||
|
||||
## [2.6.0] - 2026-02-26
|
||||
|
||||
### Added
|
||||
|
||||
63
README.md
63
README.md
@ -8,19 +8,19 @@
|
||||
|
||||
# VBAN CMD
|
||||
|
||||
This python interface allows you to transmit Voicemeeter parameters over a network.
|
||||
This python interface allows you to send Voicemeeter/Matrix commands over a network.
|
||||
|
||||
It may be used standalone or to extend the [Voicemeeter Remote Python API](https://github.com/onyx-and-iris/voicemeeter-api-python)
|
||||
It offers the same public API as [Voicemeeter Remote Python API](https://github.com/onyx-and-iris/voicemeeter-api-python).
|
||||
|
||||
There is no support for audio transfer in this package, only parameters.
|
||||
Only the VBAN SERVICE/TEXT subprotocols are supported, there is no support for AUDIO or MIDI in this package.
|
||||
|
||||
For an outline of past/future changes refer to: [CHANGELOG](CHANGELOG.md)
|
||||
|
||||
## Tested against
|
||||
|
||||
- Basic 1.0.8.8
|
||||
- Banana 2.0.6.8
|
||||
- Potato 3.0.2.8
|
||||
- Basic 1.1.2.2
|
||||
- Banana 2.1.2.2
|
||||
- Potato 3.1.2.2
|
||||
|
||||
## Requirements
|
||||
|
||||
@ -29,7 +29,9 @@ For an outline of past/future changes refer to: [CHANGELOG](CHANGELOG.md)
|
||||
|
||||
## Installation
|
||||
|
||||
`pip install vban-cmd`
|
||||
```console
|
||||
pip install vban-cmd
|
||||
```
|
||||
|
||||
## `Use`
|
||||
|
||||
@ -113,6 +115,8 @@ Pass the kind of Voicemeeter as an argument. KIND_ID may be:
|
||||
- `banana`
|
||||
- `potato`
|
||||
|
||||
A fourth kind `matrix` has been added, if you pass it as a KIND_ID you are expected to use the [{VbanCmd}.sendtext()](https://github.com/onyx-and-iris/vban-cmd-python?tab=readme-ov-file#vbansendtextscript) method for sending text requests.
|
||||
|
||||
## `Available commands`
|
||||
|
||||
### Strip
|
||||
@ -345,6 +349,40 @@ vban.strip[0].fadeto(-10.3, 1000)
|
||||
vban.bus[3].fadeby(-5.6, 500)
|
||||
```
|
||||
|
||||
### Recorder
|
||||
|
||||
The following methods are available
|
||||
|
||||
- `play()`
|
||||
- `stop()`
|
||||
- `pause()`
|
||||
- `record()`
|
||||
- `ff()`
|
||||
- `rew()`
|
||||
- `load(filepath)`: raw string
|
||||
- `goto(time_string)`: time string in format `hh:mm:ss`
|
||||
|
||||
The following properties are available
|
||||
|
||||
- `samplerate`: int, (22050, 24000, 32000, 44100, 48000, 88200, 96000, 176400, 192000)
|
||||
- `bitresolution`: int, (8, 16, 24, 32)
|
||||
- `channel`: int, from 1 to 8
|
||||
- `kbps`: int, (32, 40, 48, 56, 64, 80, 96, 112, 128, 160, 192, 224, 256, 320)
|
||||
- `gain`: float, from -60.0 to 12.0
|
||||
|
||||
example:
|
||||
|
||||
```python
|
||||
vban.recorder.play()
|
||||
vban.recorder.stop()
|
||||
|
||||
# filepath as raw string
|
||||
vban.recorder.load(r'C:\music\mytune.mp3')
|
||||
|
||||
# set the goto time to 1m 30s
|
||||
vban.recorder.goto('00:01:30')
|
||||
```
|
||||
|
||||
### Command
|
||||
|
||||
Certain 'special' commands are defined by the API as performing actions rather than setting values. The following methods are available:
|
||||
@ -511,7 +549,8 @@ You may pass the following optional keyword arguments:
|
||||
- `pdirty`: boolean=False, parameter updates
|
||||
- `ldirty`: boolean=False, level updates
|
||||
- `timeout`: int=5, amount of time (seconds) to wait for an incoming RT data packet (parameter states).
|
||||
- `outbound`: boolean=False, set `True` if you are only interested in sending commands. (no rt packets will be received)
|
||||
- `disable_rt_listeners`: boolean=False, set `True` if you don't wish to receive RT packets.
|
||||
- You can still send Matrix string requests ending with `?` and receive a response.
|
||||
|
||||
#### `vban.pdirty`
|
||||
|
||||
@ -529,6 +568,14 @@ Sends a script block as a string request, for example:
|
||||
vban.sendtext('Strip[0].Mute=1;Bus[0].Mono=1')
|
||||
```
|
||||
|
||||
You can even use it to send matrix commands:
|
||||
|
||||
```python
|
||||
vban.sendtext('Point(ASIO128.IN[1..4],ASIO128.OUT[1]).dBGain = -3.0')
|
||||
|
||||
vban.sendtext('Command.Version = ?')
|
||||
```
|
||||
|
||||
## Errors
|
||||
|
||||
- `errors.VBANCMDError`: Base VBANCMD Exception class.
|
||||
|
||||
@ -1,6 +1,6 @@
|
||||
[project]
|
||||
name = "vban-cmd"
|
||||
version = "2.6.0"
|
||||
version = "2.8.1"
|
||||
description = "Python interface for the VBAN RT Packet Service (Sendtext)"
|
||||
authors = [{ name = "Onyx and Iris", email = "code@onyxandiris.online" }]
|
||||
license = { text = "MIT" }
|
||||
@ -9,7 +9,7 @@ requires-python = ">=3.10"
|
||||
dependencies = ["tomli (>=2.0.1,<3.0) ; python_version < '3.11'"]
|
||||
|
||||
[tool.poetry.requires-plugins]
|
||||
poethepoet = "^0.35.0"
|
||||
poethepoet = ">=0.42.0"
|
||||
|
||||
[tool.poetry.group.dev.dependencies]
|
||||
pytest = "^8.3.4"
|
||||
|
||||
@ -4,7 +4,7 @@ from typing import Union
|
||||
|
||||
from .enums import NBS, BusModes
|
||||
from .iremote import IRemote
|
||||
from .meta import bus_mode_prop, channel_bool_prop, channel_label_prop
|
||||
from .meta import bus_mode_prop, channel_bool_prop, channel_int_prop, channel_label_prop
|
||||
|
||||
|
||||
class Bus(IRemote):
|
||||
@ -90,20 +90,9 @@ class BusLevel(IRemote):
|
||||
def getter(self):
|
||||
"""Returns a tuple of level values for the channel."""
|
||||
|
||||
def fget(i):
|
||||
return round((((1 << 16) - 1) - i) * -0.01, 1)
|
||||
|
||||
if not self._remote.stopped() and self._remote.event.ldirty:
|
||||
return tuple(
|
||||
fget(i)
|
||||
for i in self._remote.cache['bus_level'][self.range[0] : self.range[-1]]
|
||||
)
|
||||
return tuple(
|
||||
fget(i)
|
||||
for i in self._remote._get_levels(self.public_packets[NBS.zero])[1][
|
||||
self.range[0] : self.range[-1]
|
||||
]
|
||||
)
|
||||
return self._remote.cache['bus_level'][self.range[0] : self.range[-1]]
|
||||
return self.public_packets[NBS.zero].levels.bus[self.range[0] : self.range[-1]]
|
||||
|
||||
@property
|
||||
def identifier(self) -> str:
|
||||
@ -128,45 +117,49 @@ class BusLevel(IRemote):
|
||||
def _make_bus_mode_mixin():
|
||||
"""Creates a mixin of Bus Modes."""
|
||||
|
||||
modestates = {
|
||||
'normal': [0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0],
|
||||
'amix': [1, 0, 1, 0, 1, 0, 1, 0, 1, 0, 1],
|
||||
'repeat': [0, 2, 2, 0, 0, 2, 2, 0, 0, 2, 2],
|
||||
'bmix': [1, 2, 3, 0, 1, 2, 3, 0, 1, 2, 3],
|
||||
'composite': [0, 0, 0, 4, 4, 4, 4, 0, 0, 0, 0],
|
||||
'tvmix': [1, 0, 1, 4, 5, 4, 5, 0, 1, 0, 1],
|
||||
'upmix21': [0, 2, 2, 4, 4, 6, 6, 0, 0, 2, 2],
|
||||
'upmix41': [1, 2, 3, 4, 5, 6, 7, 0, 1, 2, 3],
|
||||
'upmix61': [0, 0, 0, 0, 0, 0, 0, 8, 8, 8, 8],
|
||||
'centeronly': [1, 0, 1, 0, 1, 0, 1, 8, 9, 8, 9],
|
||||
'lfeonly': [0, 2, 2, 0, 0, 2, 2, 8, 8, 10, 10],
|
||||
'rearonly': [1, 2, 3, 0, 1, 2, 3, 8, 9, 10, 11],
|
||||
}
|
||||
mode_names = [
|
||||
'normal',
|
||||
'amix',
|
||||
'repeat',
|
||||
'bmix',
|
||||
'composite',
|
||||
'tvmix',
|
||||
'upmix21',
|
||||
'upmix41',
|
||||
'upmix61',
|
||||
'centeronly',
|
||||
'lfeonly',
|
||||
'rearonly',
|
||||
]
|
||||
|
||||
def identifier(self) -> str:
|
||||
return f'bus[{self.index}].mode'
|
||||
|
||||
def get(self):
|
||||
states = [
|
||||
(
|
||||
int.from_bytes(
|
||||
self.public_packets[NBS.zero].busstate[self.index], 'little'
|
||||
)
|
||||
& val
|
||||
)
|
||||
>> 4
|
||||
for val in self._modes.modevals
|
||||
"""Get current bus mode using ChannelState for clean bit extraction."""
|
||||
mode_cache_items = [
|
||||
(k, v)
|
||||
for k, v in self._remote.cache.items()
|
||||
if k.startswith(f'{self.identifier}.') and v == 1
|
||||
]
|
||||
for k, v in modestates.items():
|
||||
if states == v:
|
||||
return k
|
||||
|
||||
if mode_cache_items:
|
||||
latest_cached = mode_cache_items[-1][0]
|
||||
mode_name = latest_cached.split('.')[-1]
|
||||
return mode_name
|
||||
|
||||
bus_state = self.public_packets[NBS.zero].states.bus[self.index]
|
||||
|
||||
# Extract bus mode from bits 4-7 (mask 0xF0, shift right by 4)
|
||||
mode_value = (bus_state._state & 0x000000F0) >> 4
|
||||
|
||||
return mode_names[mode_value] if mode_value < len(mode_names) else 'normal'
|
||||
|
||||
return type(
|
||||
'BusModeMixin',
|
||||
(IRemote,),
|
||||
{
|
||||
'identifier': property(identifier),
|
||||
'modestates': modestates,
|
||||
**{mode.name: bus_mode_prop(mode.name) for mode in BusModes},
|
||||
'get': get,
|
||||
},
|
||||
@ -188,7 +181,8 @@ def bus_factory(phys_bus, remote, i) -> Union[PhysicalBus, VirtualBus]:
|
||||
'eq': BusEQ.make(remote, i),
|
||||
'levels': BusLevel(remote, i),
|
||||
'mode': BUSMODEMIXIN_cls(remote, i),
|
||||
**{param: channel_bool_prop(param) for param in ['mute', 'mono']},
|
||||
**{param: channel_bool_prop(param) for param in ('mute',)},
|
||||
**{param: channel_int_prop(param) for param in ('mono',)},
|
||||
'label': channel_label_prop(),
|
||||
},
|
||||
)(remote, i)
|
||||
|
||||
@ -11,6 +11,7 @@ from .error import VBANCMDError
|
||||
from .kinds import KindMapClass
|
||||
from .kinds import request_kind_map as kindmap
|
||||
from .macrobutton import MacroButton
|
||||
from .recorder import Recorder
|
||||
from .strip import request_strip_obj as strip
|
||||
from .vban import request_vban_obj as vban
|
||||
from .vbancmd import VbanCmd
|
||||
@ -26,7 +27,7 @@ class FactoryBuilder:
|
||||
"""
|
||||
|
||||
BuilderProgress = IntEnum(
|
||||
'BuilderProgress', 'strip bus command macrobutton vban', start=0
|
||||
'BuilderProgress', 'strip bus command macrobutton vban recorder', start=0
|
||||
)
|
||||
|
||||
def __init__(self, factory, kind: KindMapClass):
|
||||
@ -38,6 +39,7 @@ class FactoryBuilder:
|
||||
f'Finished building commands for {self._factory}',
|
||||
f'Finished building macrobuttons for {self._factory}',
|
||||
f'Finished building vban in/out streams for {self._factory}',
|
||||
f'Finished building recorder for {self._factory}',
|
||||
)
|
||||
self.logger = logger.getChild(self.__class__.__name__)
|
||||
|
||||
@ -72,6 +74,10 @@ class FactoryBuilder:
|
||||
self._factory.vban = vban(self._factory)
|
||||
return self
|
||||
|
||||
def make_recorder(self):
|
||||
self._factory.recorder = Recorder.make(self._factory)
|
||||
return self
|
||||
|
||||
|
||||
class FactoryBase(VbanCmd):
|
||||
"""Base class for factories, subclasses VbanCmd."""
|
||||
@ -85,7 +91,7 @@ class FactoryBase(VbanCmd):
|
||||
'channel': 0,
|
||||
'ratelimit': 0.01,
|
||||
'timeout': 5,
|
||||
'outbound': False,
|
||||
'disable_rt_listeners': False,
|
||||
'sync': False,
|
||||
'pdirty': False,
|
||||
'ldirty': False,
|
||||
@ -166,7 +172,7 @@ class BananaFactory(FactoryBase):
|
||||
@property
|
||||
def steps(self) -> Iterable:
|
||||
"""steps required to build the interface for a kind"""
|
||||
return self._steps
|
||||
return self._steps + (self.builder.make_recorder,)
|
||||
|
||||
|
||||
class PotatoFactory(FactoryBase):
|
||||
@ -188,7 +194,7 @@ class PotatoFactory(FactoryBase):
|
||||
@property
|
||||
def steps(self) -> Iterable:
|
||||
"""steps required to build the interface for a kind"""
|
||||
return self._steps
|
||||
return self._steps + (self.builder.make_recorder,)
|
||||
|
||||
|
||||
def vbancmd_factory(kind_id: str, **kwargs) -> VbanCmd:
|
||||
@ -202,7 +208,13 @@ def vbancmd_factory(kind_id: str, **kwargs) -> VbanCmd:
|
||||
_factory = BasicFactory
|
||||
case 'banana':
|
||||
_factory = BananaFactory
|
||||
case 'potato':
|
||||
case 'potato' | 'matrix':
|
||||
# matrix is a special kind where:
|
||||
# - we don't need to scale the interface with the builder (in other words kind is arbitrary).
|
||||
# - we don't ever need to use real-time listeners, so we disable them to avoid confusion
|
||||
if kind_id == 'matrix':
|
||||
kwargs['disable_rt_listeners'] = True
|
||||
kind_id = 'potato'
|
||||
_factory = PotatoFactory
|
||||
case _:
|
||||
raise ValueError(f"Unknown Voicemeeter kind '{kind_id}'")
|
||||
|
||||
@ -1,7 +1,7 @@
|
||||
from functools import partial
|
||||
|
||||
from .enums import NBS
|
||||
from .util import cache_bool, cache_float, cache_string
|
||||
from .enums import NBS, BusModes
|
||||
from .util import cache_bool, cache_float, cache_int, cache_string
|
||||
|
||||
|
||||
def channel_bool_prop(param):
|
||||
@ -11,17 +11,23 @@ def channel_bool_prop(param):
|
||||
def fget(self):
|
||||
cmd = self._cmd(param)
|
||||
self.logger.debug(f'getter: {cmd}')
|
||||
return (
|
||||
not int.from_bytes(
|
||||
getattr(
|
||||
self.public_packets[NBS.zero],
|
||||
f'{"strip" if "strip" in type(self).__name__.lower() else "bus"}state',
|
||||
)[self.index],
|
||||
'little',
|
||||
)
|
||||
& getattr(self._modes, f'_{param.lower()}')
|
||||
== 0
|
||||
|
||||
states = self.public_packets[NBS.zero].states
|
||||
channel_states = (
|
||||
states.strip if 'strip' in type(self).__name__.lower() else states.bus
|
||||
)
|
||||
channel_state = channel_states[self.index]
|
||||
|
||||
if param.lower() == 'mute':
|
||||
return channel_state.mute
|
||||
elif param.lower() == 'solo':
|
||||
return channel_state.solo
|
||||
elif param.lower() == 'mono':
|
||||
return channel_state.mono
|
||||
elif param.lower() == 'mc':
|
||||
return channel_state.mc
|
||||
else:
|
||||
return channel_state.get_mode(getattr(self._modes, f'_{param.lower()}'))
|
||||
|
||||
def fset(self, val):
|
||||
self.setter(param, 1 if val else 0)
|
||||
@ -29,18 +35,46 @@ def channel_bool_prop(param):
|
||||
return property(fget, fset)
|
||||
|
||||
|
||||
def channel_int_prop(param):
|
||||
"""meta function for channel integer parameters"""
|
||||
|
||||
@partial(cache_int, param=param)
|
||||
def fget(self):
|
||||
cmd = self._cmd(param)
|
||||
self.logger.debug(f'getter: {cmd}')
|
||||
|
||||
states = self.public_packets[NBS.zero].states
|
||||
channel_states = (
|
||||
states.strip if 'strip' in type(self).__name__.lower() else states.bus
|
||||
)
|
||||
channel_state = channel_states[self.index]
|
||||
|
||||
# Special case: bus mono is an integer (0-2) encoded using bits 2 and 9
|
||||
if param.lower() == 'mono' and 'bus' in type(self).__name__.lower():
|
||||
bit_2 = (channel_state._state >> 2) & 1
|
||||
bit_9 = (channel_state._state >> 9) & 1
|
||||
return (bit_9 << 1) | bit_2
|
||||
else:
|
||||
return channel_state.get_mode_int(getattr(self._modes, f'_{param.lower()}'))
|
||||
|
||||
def fset(self, val):
|
||||
self.setter(param, val)
|
||||
|
||||
return property(fget, fset)
|
||||
|
||||
|
||||
def channel_label_prop():
|
||||
"""meta function for channel label parameters"""
|
||||
|
||||
@partial(cache_string, param='label')
|
||||
def fget(self) -> str:
|
||||
return getattr(
|
||||
self.public_packets[NBS.zero],
|
||||
f'{"strip" if "strip" in type(self).__name__.lower() else "bus"}labels',
|
||||
)[self.index]
|
||||
if 'strip' in type(self).__name__.lower():
|
||||
return self.public_packets[NBS.zero].labels.strip[self.index]
|
||||
else:
|
||||
return self.public_packets[NBS.zero].labels.bus[self.index]
|
||||
|
||||
def fset(self, val: str):
|
||||
self.setter('label', str(val))
|
||||
self.setter('label', f'"{val}"')
|
||||
|
||||
return property(fget, fset)
|
||||
|
||||
@ -52,13 +86,10 @@ def strip_output_prop(param):
|
||||
def fget(self):
|
||||
cmd = self._cmd(param)
|
||||
self.logger.debug(f'getter: {cmd}')
|
||||
return (
|
||||
not int.from_bytes(
|
||||
self.public_packets[NBS.zero].stripstate[self.index], 'little'
|
||||
)
|
||||
& getattr(self._modes, f'_bus{param.lower()}')
|
||||
== 0
|
||||
)
|
||||
|
||||
strip_state = self.public_packets[NBS.zero].states.strip[self.index]
|
||||
|
||||
return strip_state.get_mode(getattr(self._modes, f'_bus{param.lower()}'))
|
||||
|
||||
def fset(self, val):
|
||||
self.setter(param, 1 if val else 0)
|
||||
@ -73,16 +104,15 @@ def bus_mode_prop(param):
|
||||
def fget(self):
|
||||
cmd = self._cmd(param)
|
||||
self.logger.debug(f'getter: {cmd}')
|
||||
return [
|
||||
(
|
||||
int.from_bytes(
|
||||
self.public_packets[NBS.zero].busstate[self.index], 'little'
|
||||
)
|
||||
& val
|
||||
)
|
||||
>> 4
|
||||
for val in self._modes.modevals
|
||||
] == self.modestates[param]
|
||||
|
||||
bus_state = self.public_packets[NBS.zero].states.bus[self.index]
|
||||
|
||||
# Extract current bus mode from bits 4-7
|
||||
current_mode = (bus_state._state & 0x000000F0) >> 4
|
||||
|
||||
expected_mode = getattr(BusModes, param.lower())
|
||||
|
||||
return current_mode == expected_mode
|
||||
|
||||
def fset(self, val):
|
||||
self.setter(param, 1 if val else 0)
|
||||
|
||||
266
vban_cmd/packet/headers.py
Normal file
266
vban_cmd/packet/headers.py
Normal file
@ -0,0 +1,266 @@
|
||||
from dataclasses import dataclass
|
||||
|
||||
from vban_cmd.enums import NBS
|
||||
from vban_cmd.kinds import KindMapClass
|
||||
|
||||
VBAN_PROTOCOL_TXT = 0x40
|
||||
VBAN_PROTOCOL_SERVICE = 0x60
|
||||
|
||||
VBAN_SERVICE_RTPACKETREGISTER = 32
|
||||
VBAN_SERVICE_RTPACKET = 33
|
||||
VBAN_SERVICE_MASK = 0xE0
|
||||
VBAN_PROTOCOL_MASK = 0xE0
|
||||
VBAN_SERVICE_REQUESTREPLY = 0x02
|
||||
VBAN_SERVICE_FNCT_REPLY = 0x02
|
||||
|
||||
MAX_PACKET_SIZE = 1436
|
||||
HEADER_SIZE = 4 + 1 + 1 + 1 + 1 + 16
|
||||
|
||||
|
||||
@dataclass
|
||||
class VbanPacket:
|
||||
"""Represents the header of an incoming VBAN data packet"""
|
||||
|
||||
nbs: NBS
|
||||
_kind: KindMapClass
|
||||
_voicemeeterType: bytes
|
||||
_reserved: bytes
|
||||
_buffersize: bytes
|
||||
_voicemeeterVersion: bytes
|
||||
_optionBits: bytes
|
||||
_samplerate: bytes
|
||||
|
||||
@property
|
||||
def voicemeetertype(self) -> str:
|
||||
"""returns voicemeeter type as a string"""
|
||||
return ['', 'basic', 'banana', 'potato'][
|
||||
int.from_bytes(self._voicemeeterType, 'little')
|
||||
]
|
||||
|
||||
@property
|
||||
def voicemeeterversion(self) -> tuple:
|
||||
"""returns voicemeeter version as a tuple"""
|
||||
return tuple(self._voicemeeterVersion[i] for i in range(3, -1, -1))
|
||||
|
||||
@property
|
||||
def samplerate(self) -> int:
|
||||
"""returns samplerate as an int"""
|
||||
return int.from_bytes(self._samplerate, 'little')
|
||||
|
||||
|
||||
@dataclass
|
||||
class VbanSubscribeHeader:
|
||||
"""Represents the header of a subscription packet"""
|
||||
|
||||
nbs: NBS = NBS.zero
|
||||
name: str = 'Register-RTP'
|
||||
timeout: int = 15
|
||||
|
||||
@property
|
||||
def vban(self) -> bytes:
|
||||
return b'VBAN'
|
||||
|
||||
@property
|
||||
def format_sr(self) -> bytes:
|
||||
return VBAN_PROTOCOL_SERVICE.to_bytes(1, 'little')
|
||||
|
||||
@property
|
||||
def format_nbs(self) -> bytes:
|
||||
return (self.nbs.value & 0xFF).to_bytes(1, 'little')
|
||||
|
||||
@property
|
||||
def format_nbc(self) -> bytes:
|
||||
return VBAN_SERVICE_RTPACKETREGISTER.to_bytes(1, 'little')
|
||||
|
||||
@property
|
||||
def format_bit(self) -> bytes:
|
||||
return (self.timeout & 0xFF).to_bytes(1, 'little')
|
||||
|
||||
@property
|
||||
def streamname(self) -> bytes:
|
||||
return self.name.encode('ascii') + bytes(16 - len(self.name))
|
||||
|
||||
@classmethod
|
||||
def to_bytes(cls, nbs: NBS, framecounter: int) -> bytes:
|
||||
header = cls(nbs=nbs)
|
||||
|
||||
data = bytearray()
|
||||
data.extend(header.vban)
|
||||
data.extend(header.format_sr)
|
||||
data.extend(header.format_nbs)
|
||||
data.extend(header.format_nbc)
|
||||
data.extend(header.format_bit)
|
||||
data.extend(header.streamname)
|
||||
data.extend(framecounter.to_bytes(4, 'little'))
|
||||
return bytes(data)
|
||||
|
||||
|
||||
def _parse_vban_service_header(data: bytes) -> dict:
|
||||
"""Common parsing and validation for VBAN service protocol headers."""
|
||||
if len(data) < HEADER_SIZE:
|
||||
raise ValueError('Data is too short to be a valid VBAN header')
|
||||
|
||||
if data[:4] != b'VBAN':
|
||||
raise ValueError('Invalid VBAN magic bytes')
|
||||
|
||||
format_sr = data[4]
|
||||
format_nbs = data[5]
|
||||
format_nbc = data[6]
|
||||
format_bit = data[7]
|
||||
|
||||
# Verify this is a service protocol packet
|
||||
protocol = format_sr & VBAN_PROTOCOL_MASK
|
||||
if protocol != VBAN_PROTOCOL_SERVICE:
|
||||
raise ValueError(f'Not a service protocol packet: {protocol:02x}')
|
||||
|
||||
# Extract stream name and frame counter
|
||||
name = data[8:24].rstrip(b'\x00').decode('utf-8', errors='ignore')
|
||||
framecounter = int.from_bytes(data[24:28], 'little')
|
||||
|
||||
return {
|
||||
'format_sr': format_sr,
|
||||
'format_nbs': format_nbs,
|
||||
'format_nbc': format_nbc,
|
||||
'format_bit': format_bit,
|
||||
'name': name,
|
||||
'framecounter': framecounter,
|
||||
}
|
||||
|
||||
|
||||
@dataclass
|
||||
class VbanResponseHeader:
|
||||
"""Represents the header of a response packet"""
|
||||
|
||||
name: str = 'Voicemeeter-RTP'
|
||||
format_sr: int = VBAN_PROTOCOL_SERVICE
|
||||
format_nbs: int = 0
|
||||
format_nbc: int = VBAN_SERVICE_RTPACKET
|
||||
format_bit: int = 0
|
||||
framecounter: int = 0
|
||||
|
||||
@property
|
||||
def vban(self) -> bytes:
|
||||
return b'VBAN'
|
||||
|
||||
@property
|
||||
def streamname(self) -> bytes:
|
||||
return self.name.encode('ascii') + bytes(16 - len(self.name))
|
||||
|
||||
@classmethod
|
||||
def from_bytes(cls, data: bytes):
|
||||
"""Parse a VbanResponseHeader from bytes."""
|
||||
parsed = _parse_vban_service_header(data)
|
||||
|
||||
# Validate this is an RTPacket response
|
||||
if parsed['format_nbc'] != VBAN_SERVICE_RTPACKET:
|
||||
raise ValueError(
|
||||
f'Not an RTPacket response packet: {parsed["format_nbc"]:02x}'
|
||||
)
|
||||
|
||||
return cls(**parsed)
|
||||
|
||||
|
||||
@dataclass
|
||||
class VbanMatrixResponseHeader:
|
||||
"""Represents the header of a matrix response packet"""
|
||||
|
||||
name: str = 'Request Reply'
|
||||
format_sr: int = VBAN_PROTOCOL_SERVICE
|
||||
format_nbs: int = VBAN_SERVICE_FNCT_REPLY
|
||||
format_nbc: int = VBAN_SERVICE_REQUESTREPLY
|
||||
format_bit: int = 0
|
||||
framecounter: int = 0
|
||||
|
||||
@property
|
||||
def vban(self) -> bytes:
|
||||
return b'VBAN'
|
||||
|
||||
@property
|
||||
def streamname(self) -> bytes:
|
||||
return self.name.encode('ascii')[:16].ljust(16, b'\x00')
|
||||
|
||||
@classmethod
|
||||
def from_bytes(cls, data: bytes):
|
||||
"""Parse a matrix response packet from bytes."""
|
||||
parsed = _parse_vban_service_header(data)
|
||||
|
||||
# Validate this is a service reply packet
|
||||
if parsed['format_nbs'] != VBAN_SERVICE_FNCT_REPLY:
|
||||
raise ValueError(f'Not a service reply packet: {parsed["format_nbs"]:02x}')
|
||||
|
||||
return cls(**parsed)
|
||||
|
||||
@classmethod
|
||||
def extract_payload(cls, data: bytes) -> str:
|
||||
"""Extract the text payload from a matrix response packet."""
|
||||
if len(data) <= HEADER_SIZE:
|
||||
return ''
|
||||
|
||||
payload_bytes = data[HEADER_SIZE:]
|
||||
return payload_bytes.rstrip(b'\x00').decode('utf-8', errors='ignore')
|
||||
|
||||
@classmethod
|
||||
def parse_response(cls, data: bytes) -> tuple['VbanMatrixResponseHeader', str]:
|
||||
"""Parse a complete matrix response packet returning header and payload."""
|
||||
header = cls.from_bytes(data)
|
||||
payload = cls.extract_payload(data)
|
||||
return header, payload
|
||||
|
||||
|
||||
@dataclass
|
||||
class VbanRequestHeader:
|
||||
"""Represents the header of a request packet"""
|
||||
|
||||
name: str
|
||||
bps_index: int
|
||||
channel: int
|
||||
framecounter: int = 0
|
||||
|
||||
@property
|
||||
def vban(self) -> bytes:
|
||||
return b'VBAN'
|
||||
|
||||
@property
|
||||
def sr(self) -> bytes:
|
||||
return (VBAN_PROTOCOL_TXT + self.bps_index).to_bytes(1, 'little')
|
||||
|
||||
@property
|
||||
def nbs(self) -> bytes:
|
||||
return (0).to_bytes(1, 'little')
|
||||
|
||||
@property
|
||||
def nbc(self) -> bytes:
|
||||
return (self.channel).to_bytes(1, 'little')
|
||||
|
||||
@property
|
||||
def bit(self) -> bytes:
|
||||
return (0x10).to_bytes(1, 'little')
|
||||
|
||||
@property
|
||||
def streamname(self) -> bytes:
|
||||
return self.name.encode() + bytes(16 - len(self.name))
|
||||
|
||||
@classmethod
|
||||
def to_bytes(
|
||||
cls, name: str, bps_index: int, channel: int, framecounter: int
|
||||
) -> bytes:
|
||||
header = cls(
|
||||
name=name, bps_index=bps_index, channel=channel, framecounter=framecounter
|
||||
)
|
||||
|
||||
data = bytearray()
|
||||
data.extend(header.vban)
|
||||
data.extend(header.sr)
|
||||
data.extend(header.nbs)
|
||||
data.extend(header.nbc)
|
||||
data.extend(header.bit)
|
||||
data.extend(header.streamname)
|
||||
data.extend(header.framecounter.to_bytes(4, 'little'))
|
||||
return bytes(data)
|
||||
|
||||
@classmethod
|
||||
def encode_with_payload(
|
||||
cls, name: str, bps_index: int, channel: int, framecounter: int, payload: str
|
||||
) -> bytes:
|
||||
"""Creates the complete packet with header and payload."""
|
||||
return cls.to_bytes(name, bps_index, channel, framecounter) + payload.encode()
|
||||
288
vban_cmd/packet/nbs0.py
Normal file
288
vban_cmd/packet/nbs0.py
Normal file
@ -0,0 +1,288 @@
|
||||
from dataclasses import dataclass
|
||||
from typing import NamedTuple
|
||||
|
||||
from vban_cmd.enums import NBS
|
||||
from vban_cmd.kinds import KindMapClass
|
||||
from vban_cmd.util import comp
|
||||
|
||||
from .headers import VbanPacket
|
||||
|
||||
|
||||
class Levels(NamedTuple):
|
||||
strip: tuple[float, ...]
|
||||
bus: tuple[float, ...]
|
||||
|
||||
|
||||
class ChannelState:
|
||||
"""Represents the processed state of a single strip or bus channel"""
|
||||
|
||||
def __init__(self, state_bytes: bytes):
|
||||
# Convert 4-byte state to integer once for efficient lookups
|
||||
self._state = int.from_bytes(state_bytes, 'little')
|
||||
|
||||
def get_mode(self, mode_value: int) -> bool:
|
||||
"""Get boolean state for a specific mode"""
|
||||
return (self._state & mode_value) != 0
|
||||
|
||||
def get_mode_int(self, mode_value: int) -> int:
|
||||
"""Get integer state for a specific mode"""
|
||||
return self._state & mode_value
|
||||
|
||||
# Common boolean modes
|
||||
@property
|
||||
def mute(self) -> bool:
|
||||
return (self._state & 0x00000001) != 0
|
||||
|
||||
@property
|
||||
def solo(self) -> bool:
|
||||
return (self._state & 0x00000002) != 0
|
||||
|
||||
@property
|
||||
def mono(self) -> bool:
|
||||
return (self._state & 0x00000004) != 0
|
||||
|
||||
@property
|
||||
def mc(self) -> bool:
|
||||
return (self._state & 0x00000008) != 0
|
||||
|
||||
# EQ modes
|
||||
@property
|
||||
def eq_on(self) -> bool:
|
||||
return (self._state & 0x00000100) != 0
|
||||
|
||||
@property
|
||||
def eq_ab(self) -> bool:
|
||||
return (self._state & 0x00000800) != 0
|
||||
|
||||
# Bus assignments (strip to bus routing)
|
||||
@property
|
||||
def busa1(self) -> bool:
|
||||
return (self._state & 0x00001000) != 0
|
||||
|
||||
@property
|
||||
def busa2(self) -> bool:
|
||||
return (self._state & 0x00002000) != 0
|
||||
|
||||
@property
|
||||
def busa3(self) -> bool:
|
||||
return (self._state & 0x00004000) != 0
|
||||
|
||||
@property
|
||||
def busa4(self) -> bool:
|
||||
return (self._state & 0x00008000) != 0
|
||||
|
||||
@property
|
||||
def busb1(self) -> bool:
|
||||
return (self._state & 0x00010000) != 0
|
||||
|
||||
@property
|
||||
def busb2(self) -> bool:
|
||||
return (self._state & 0x00020000) != 0
|
||||
|
||||
@property
|
||||
def busb3(self) -> bool:
|
||||
return (self._state & 0x00040000) != 0
|
||||
|
||||
|
||||
class States(NamedTuple):
|
||||
strip: tuple[ChannelState, ...]
|
||||
bus: tuple[ChannelState, ...]
|
||||
|
||||
|
||||
class Labels(NamedTuple):
|
||||
strip: tuple[str, ...]
|
||||
bus: tuple[str, ...]
|
||||
|
||||
|
||||
@dataclass
|
||||
class VbanPacketNBS0(VbanPacket):
|
||||
"""Represents the body of a VBAN data packet with ident:0"""
|
||||
|
||||
_inputLeveldB100: bytes
|
||||
_outputLeveldB100: bytes
|
||||
_TransportBit: bytes
|
||||
_stripState: bytes
|
||||
_busState: bytes
|
||||
_stripGaindB100Layer1: bytes
|
||||
_stripGaindB100Layer2: bytes
|
||||
_stripGaindB100Layer3: bytes
|
||||
_stripGaindB100Layer4: bytes
|
||||
_stripGaindB100Layer5: bytes
|
||||
_stripGaindB100Layer6: bytes
|
||||
_stripGaindB100Layer7: bytes
|
||||
_stripGaindB100Layer8: bytes
|
||||
_busGaindB100: bytes
|
||||
_stripLabelUTF8c60: bytes
|
||||
_busLabelUTF8c60: bytes
|
||||
|
||||
@classmethod
|
||||
def from_bytes(cls, nbs: NBS, kind: KindMapClass, data: bytes):
|
||||
return cls(
|
||||
nbs=nbs,
|
||||
_kind=kind,
|
||||
_voicemeeterType=data[28:29],
|
||||
_reserved=data[29:30],
|
||||
_buffersize=data[30:32],
|
||||
_voicemeeterVersion=data[32:36],
|
||||
_optionBits=data[36:40],
|
||||
_samplerate=data[40:44],
|
||||
_inputLeveldB100=data[44:112],
|
||||
_outputLeveldB100=data[112:240],
|
||||
_TransportBit=data[240:244],
|
||||
_stripState=data[244:276],
|
||||
_busState=data[276:308],
|
||||
_stripGaindB100Layer1=data[308:324],
|
||||
_stripGaindB100Layer2=data[324:340],
|
||||
_stripGaindB100Layer3=data[340:356],
|
||||
_stripGaindB100Layer4=data[356:372],
|
||||
_stripGaindB100Layer5=data[372:388],
|
||||
_stripGaindB100Layer6=data[388:404],
|
||||
_stripGaindB100Layer7=data[404:420],
|
||||
_stripGaindB100Layer8=data[420:436],
|
||||
_busGaindB100=data[436:452],
|
||||
_stripLabelUTF8c60=data[452:932],
|
||||
_busLabelUTF8c60=data[932:1412],
|
||||
)
|
||||
|
||||
def pdirty(self, other) -> bool:
|
||||
"""True iff any defined parameter has changed"""
|
||||
|
||||
self_gains = (
|
||||
self._stripGaindB100Layer1
|
||||
+ self._stripGaindB100Layer2
|
||||
+ self._stripGaindB100Layer3
|
||||
+ self._stripGaindB100Layer4
|
||||
+ self._stripGaindB100Layer5
|
||||
+ self._stripGaindB100Layer6
|
||||
+ self._stripGaindB100Layer7
|
||||
+ self._stripGaindB100Layer8
|
||||
)
|
||||
other_gains = (
|
||||
other._stripGaindB100Layer1
|
||||
+ other._stripGaindB100Layer2
|
||||
+ other._stripGaindB100Layer3
|
||||
+ other._stripGaindB100Layer4
|
||||
+ other._stripGaindB100Layer5
|
||||
+ other._stripGaindB100Layer6
|
||||
+ other._stripGaindB100Layer7
|
||||
+ other._stripGaindB100Layer8
|
||||
)
|
||||
|
||||
return (
|
||||
self._stripState != other._stripState
|
||||
or self._busState != other._busState
|
||||
or self_gains != other_gains
|
||||
or self._busGaindB100 != other._busGaindB100
|
||||
or self._stripLabelUTF8c60 != other._stripLabelUTF8c60
|
||||
or self._busLabelUTF8c60 != other._busLabelUTF8c60
|
||||
)
|
||||
|
||||
def ldirty(self, strip_cache, bus_cache) -> bool:
|
||||
"""True iff any level has changed, ignoring changes when levels are very quiet"""
|
||||
self._strip_comp, self._bus_comp = (
|
||||
tuple(not val for val in comp(strip_cache, self.strip_levels)),
|
||||
tuple(not val for val in comp(bus_cache, self.bus_levels)),
|
||||
)
|
||||
return any(self._strip_comp) or any(self._bus_comp)
|
||||
|
||||
@property
|
||||
def strip_levels(self) -> tuple[float, ...]:
|
||||
"""Returns strip levels in dB"""
|
||||
return tuple(
|
||||
round(
|
||||
int.from_bytes(self._inputLeveldB100[i : i + 2], 'little', signed=True)
|
||||
* 0.01,
|
||||
1,
|
||||
)
|
||||
for i in range(0, len(self._inputLeveldB100), 2)
|
||||
)[: self._kind.num_strip_levels]
|
||||
|
||||
@property
|
||||
def bus_levels(self) -> tuple[float, ...]:
|
||||
"""Returns bus levels in dB"""
|
||||
return tuple(
|
||||
round(
|
||||
int.from_bytes(self._outputLeveldB100[i : i + 2], 'little', signed=True)
|
||||
* 0.01,
|
||||
1,
|
||||
)
|
||||
for i in range(0, len(self._outputLeveldB100), 2)
|
||||
)[: self._kind.num_bus_levels]
|
||||
|
||||
@property
|
||||
def levels(self) -> Levels:
|
||||
"""Returns strip and bus levels as a namedtuple"""
|
||||
return Levels(strip=self.strip_levels, bus=self.bus_levels)
|
||||
|
||||
@property
|
||||
def states(self) -> States:
|
||||
"""returns States object with processed strip and bus channel states"""
|
||||
return States(
|
||||
strip=tuple(
|
||||
ChannelState(self._stripState[i : i + 4]) for i in range(0, 32, 4)
|
||||
),
|
||||
bus=tuple(ChannelState(self._busState[i : i + 4]) for i in range(0, 32, 4)),
|
||||
)
|
||||
|
||||
@property
|
||||
def gainlayers(self) -> tuple:
|
||||
"""returns tuple of all strip gain layers as tuples"""
|
||||
return tuple(
|
||||
tuple(
|
||||
round(
|
||||
int.from_bytes(
|
||||
getattr(self, f'_stripGaindB100Layer{layer}')[i : i + 2],
|
||||
'little',
|
||||
signed=True,
|
||||
)
|
||||
* 0.01,
|
||||
2,
|
||||
)
|
||||
for i in range(0, 16, 2)
|
||||
)
|
||||
for layer in range(1, 9)
|
||||
)
|
||||
|
||||
@property
|
||||
def busgain(self) -> tuple:
|
||||
"""returns tuple of bus gains"""
|
||||
return tuple(
|
||||
round(
|
||||
int.from_bytes(self._busGaindB100[i : i + 2], 'little', signed=True)
|
||||
* 0.01,
|
||||
2,
|
||||
)
|
||||
for i in range(0, 16, 2)
|
||||
)
|
||||
|
||||
@property
|
||||
def labels(self) -> Labels:
|
||||
"""returns Labels namedtuple of strip and bus labels"""
|
||||
|
||||
def _extract_labels_from_bytes(label_bytes: bytes) -> tuple[str, ...]:
|
||||
"""Extract null-terminated UTF-8 labels from 60-byte chunks"""
|
||||
labels = []
|
||||
for i in range(0, len(label_bytes), 60):
|
||||
chunk = label_bytes[i : i + 60]
|
||||
null_pos = chunk.find(b'\x00')
|
||||
if null_pos == -1:
|
||||
try:
|
||||
label = chunk.decode('utf-8', errors='replace').rstrip('\x00')
|
||||
except UnicodeDecodeError:
|
||||
label = ''
|
||||
else:
|
||||
try:
|
||||
label = (
|
||||
chunk[:null_pos].decode('utf-8', errors='replace')
|
||||
if null_pos > 0
|
||||
else ''
|
||||
)
|
||||
except UnicodeDecodeError:
|
||||
label = ''
|
||||
labels.append(label)
|
||||
return tuple(labels)
|
||||
|
||||
return Labels(
|
||||
strip=_extract_labels_from_bytes(self._stripLabelUTF8c60),
|
||||
bus=_extract_labels_from_bytes(self._busLabelUTF8c60),
|
||||
)
|
||||
@ -2,222 +2,14 @@ import struct
|
||||
from dataclasses import dataclass
|
||||
from typing import NamedTuple
|
||||
|
||||
from .enums import NBS
|
||||
from .kinds import KindMapClass
|
||||
from .util import comp
|
||||
from vban_cmd.enums import NBS
|
||||
from vban_cmd.kinds import KindMapClass
|
||||
|
||||
VBAN_PROTOCOL_TXT = 0x40
|
||||
VBAN_PROTOCOL_SERVICE = 0x60
|
||||
from .headers import VbanPacket
|
||||
|
||||
VBAN_SERVICE_RTPACKETREGISTER = 32
|
||||
VBAN_SERVICE_RTPACKET = 33
|
||||
VBAN_SERVICE_MASK = 0xE0
|
||||
|
||||
MAX_PACKET_SIZE = 1436
|
||||
HEADER_SIZE = 4 + 1 + 1 + 1 + 1 + 16
|
||||
VMPARAMSTRIP_SIZE = 174
|
||||
|
||||
|
||||
@dataclass
|
||||
class VbanRtPacket:
|
||||
"""Represents the body of a VBAN RT data packet"""
|
||||
|
||||
nbs: NBS
|
||||
_kind: KindMapClass
|
||||
_voicemeeterType: bytes
|
||||
_reserved: bytes
|
||||
_buffersize: bytes
|
||||
_voicemeeterVersion: bytes
|
||||
_optionBits: bytes
|
||||
_samplerate: bytes
|
||||
|
||||
|
||||
@dataclass
|
||||
class VbanRtPacketNBS0(VbanRtPacket):
|
||||
"""Represents the body of a VBAN RT data packet with NBS 0"""
|
||||
|
||||
_inputLeveldB100: bytes
|
||||
_outputLeveldB100: bytes
|
||||
_TransportBit: bytes
|
||||
_stripState: bytes
|
||||
_busState: bytes
|
||||
_stripGaindB100Layer1: bytes
|
||||
_stripGaindB100Layer2: bytes
|
||||
_stripGaindB100Layer3: bytes
|
||||
_stripGaindB100Layer4: bytes
|
||||
_stripGaindB100Layer5: bytes
|
||||
_stripGaindB100Layer6: bytes
|
||||
_stripGaindB100Layer7: bytes
|
||||
_stripGaindB100Layer8: bytes
|
||||
_busGaindB100: bytes
|
||||
_stripLabelUTF8c60: bytes
|
||||
_busLabelUTF8c60: bytes
|
||||
|
||||
@classmethod
|
||||
def from_bytes(cls, nbs: NBS, kind: KindMapClass, data: bytes):
|
||||
return cls(
|
||||
nbs=nbs,
|
||||
_kind=kind,
|
||||
_voicemeeterType=data[28:29],
|
||||
_reserved=data[29:30],
|
||||
_buffersize=data[30:32],
|
||||
_voicemeeterVersion=data[32:36],
|
||||
_optionBits=data[36:40],
|
||||
_samplerate=data[40:44],
|
||||
_inputLeveldB100=data[44:112],
|
||||
_outputLeveldB100=data[112:240],
|
||||
_TransportBit=data[240:244],
|
||||
_stripState=data[244:276],
|
||||
_busState=data[276:308],
|
||||
_stripGaindB100Layer1=data[308:324],
|
||||
_stripGaindB100Layer2=data[324:340],
|
||||
_stripGaindB100Layer3=data[340:356],
|
||||
_stripGaindB100Layer4=data[356:372],
|
||||
_stripGaindB100Layer5=data[372:388],
|
||||
_stripGaindB100Layer6=data[388:404],
|
||||
_stripGaindB100Layer7=data[404:420],
|
||||
_stripGaindB100Layer8=data[420:436],
|
||||
_busGaindB100=data[436:452],
|
||||
_stripLabelUTF8c60=data[452:932],
|
||||
_busLabelUTF8c60=data[932:1412],
|
||||
)
|
||||
|
||||
def _generate_levels(self, levelarray) -> tuple:
|
||||
return tuple(
|
||||
int.from_bytes(levelarray[i : i + 2], 'little')
|
||||
for i in range(0, len(levelarray), 2)
|
||||
)
|
||||
|
||||
@property
|
||||
def strip_levels(self):
|
||||
return self._generate_levels(self._inputLeveldB100)
|
||||
|
||||
@property
|
||||
def bus_levels(self):
|
||||
return self._generate_levels(self._outputLeveldB100)
|
||||
|
||||
def pdirty(self, other) -> bool:
|
||||
"""True iff any defined parameter has changed"""
|
||||
|
||||
return not (
|
||||
self._stripState == other._stripState
|
||||
and self._busState == other._busState
|
||||
and self._stripGaindB100Layer1 == other._stripGaindB100Layer1
|
||||
and self._stripGaindB100Layer2 == other._stripGaindB100Layer2
|
||||
and self._stripGaindB100Layer3 == other._stripGaindB100Layer3
|
||||
and self._stripGaindB100Layer4 == other._stripGaindB100Layer4
|
||||
and self._stripGaindB100Layer5 == other._stripGaindB100Layer5
|
||||
and self._stripGaindB100Layer6 == other._stripGaindB100Layer6
|
||||
and self._stripGaindB100Layer7 == other._stripGaindB100Layer7
|
||||
and self._stripGaindB100Layer8 == other._stripGaindB100Layer8
|
||||
and self._busGaindB100 == other._busGaindB100
|
||||
and self._stripLabelUTF8c60 == other._stripLabelUTF8c60
|
||||
and self._busLabelUTF8c60 == other._busLabelUTF8c60
|
||||
)
|
||||
|
||||
def ldirty(self, strip_cache, bus_cache) -> bool:
|
||||
self._strip_comp, self._bus_comp = (
|
||||
tuple(not val for val in comp(strip_cache, self.strip_levels)),
|
||||
tuple(not val for val in comp(bus_cache, self.bus_levels)),
|
||||
)
|
||||
return any(any(li) for li in (self._strip_comp, self._bus_comp))
|
||||
|
||||
@property
|
||||
def voicemeetertype(self) -> str:
|
||||
"""returns voicemeeter type as a string"""
|
||||
type_ = ('basic', 'banana', 'potato')
|
||||
return type_[int.from_bytes(self._voicemeeterType, 'little') - 1]
|
||||
|
||||
@property
|
||||
def voicemeeterversion(self) -> tuple:
|
||||
"""returns voicemeeter version as a tuple"""
|
||||
return tuple(
|
||||
reversed(
|
||||
tuple(
|
||||
int.from_bytes(self._voicemeeterVersion[i : i + 1], 'little')
|
||||
for i in range(4)
|
||||
)
|
||||
)
|
||||
)
|
||||
|
||||
@property
|
||||
def samplerate(self) -> int:
|
||||
"""returns samplerate as an int"""
|
||||
return int.from_bytes(self._samplerate, 'little')
|
||||
|
||||
@property
|
||||
def inputlevels(self) -> tuple:
|
||||
"""returns the entire level array across all inputs for a kind"""
|
||||
return self.strip_levels[0 : self._kind.num_strip_levels]
|
||||
|
||||
@property
|
||||
def outputlevels(self) -> tuple:
|
||||
"""returns the entire level array across all outputs for a kind"""
|
||||
return self.bus_levels[0 : self._kind.num_bus_levels]
|
||||
|
||||
@property
|
||||
def stripstate(self) -> tuple:
|
||||
"""returns tuple of strip states accessable through bit modes"""
|
||||
return tuple(self._stripState[i : i + 4] for i in range(0, 32, 4))
|
||||
|
||||
@property
|
||||
def busstate(self) -> tuple:
|
||||
"""returns tuple of bus states accessable through bit modes"""
|
||||
return tuple(self._busState[i : i + 4] for i in range(0, 32, 4))
|
||||
|
||||
"""
|
||||
these functions return an array of gainlayers[i] across all strips
|
||||
ie stripgainlayer1 = [strip[0].gainlayer[0], strip[1].gainlayer[0], strip[2].gainlayer[0]...]
|
||||
"""
|
||||
|
||||
@property
|
||||
def gainlayers(self) -> tuple:
|
||||
"""returns tuple of all strip gain layers as tuples"""
|
||||
return tuple(
|
||||
tuple(
|
||||
round(
|
||||
int.from_bytes(
|
||||
getattr(self, f'_stripGaindB100Layer{layer}')[i : i + 2],
|
||||
'little',
|
||||
signed=True,
|
||||
)
|
||||
* 0.01,
|
||||
2,
|
||||
)
|
||||
for i in range(0, 16, 2)
|
||||
)
|
||||
for layer in range(1, 9)
|
||||
)
|
||||
|
||||
@property
|
||||
def busgain(self) -> tuple:
|
||||
"""returns tuple of bus gains"""
|
||||
return tuple(
|
||||
round(
|
||||
int.from_bytes(self._busGaindB100[i : i + 2], 'little', signed=True)
|
||||
* 0.01,
|
||||
2,
|
||||
)
|
||||
for i in range(0, 16, 2)
|
||||
)
|
||||
|
||||
@property
|
||||
def striplabels(self) -> tuple:
|
||||
"""returns tuple of strip labels"""
|
||||
return tuple(
|
||||
self._stripLabelUTF8c60[i : i + 60].decode().split('\x00')[0]
|
||||
for i in range(0, 480, 60)
|
||||
)
|
||||
|
||||
@property
|
||||
def buslabels(self) -> tuple:
|
||||
"""returns tuple of bus labels"""
|
||||
return tuple(
|
||||
self._busLabelUTF8c60[i : i + 60].decode().split('\x00')[0]
|
||||
for i in range(0, 480, 60)
|
||||
)
|
||||
|
||||
|
||||
class Audibility(NamedTuple):
|
||||
knob: float
|
||||
comp: float
|
||||
@ -535,8 +327,8 @@ class VbanVMParamStrip:
|
||||
|
||||
|
||||
@dataclass
|
||||
class VbanRtPacketNBS1(VbanRtPacket):
|
||||
"""Represents the body of a VBAN RT data packet with NBS 1"""
|
||||
class VbanPacketNBS1(VbanPacket):
|
||||
"""Represents the body of a VBAN data packet with ident:1"""
|
||||
|
||||
strips: tuple[VbanVMParamStrip, ...]
|
||||
|
||||
@ -563,135 +355,3 @@ class VbanRtPacketNBS1(VbanRtPacket):
|
||||
for i in range(16)
|
||||
),
|
||||
)
|
||||
|
||||
|
||||
@dataclass
|
||||
class SubscribeHeader:
|
||||
"""Represents the header of an RT subscription packet"""
|
||||
|
||||
nbs: NBS = NBS.zero
|
||||
name: str = 'Register-RTP'
|
||||
timeout: int = 15
|
||||
|
||||
@property
|
||||
def vban(self) -> bytes:
|
||||
return b'VBAN'
|
||||
|
||||
@property
|
||||
def format_sr(self) -> bytes:
|
||||
return VBAN_PROTOCOL_SERVICE.to_bytes(1, 'little')
|
||||
|
||||
@property
|
||||
def format_nbs(self) -> bytes:
|
||||
return (self.nbs.value & 0xFF).to_bytes(1, 'little')
|
||||
|
||||
@property
|
||||
def format_nbc(self) -> bytes:
|
||||
return VBAN_SERVICE_RTPACKETREGISTER.to_bytes(1, 'little')
|
||||
|
||||
@property
|
||||
def format_bit(self) -> bytes:
|
||||
return (self.timeout & 0xFF).to_bytes(1, 'little')
|
||||
|
||||
@property
|
||||
def streamname(self) -> bytes:
|
||||
return self.name.encode('ascii') + bytes(16 - len(self.name))
|
||||
|
||||
@classmethod
|
||||
def to_bytes(cls, nbs: NBS, framecounter: int) -> bytes:
|
||||
header = cls(nbs=nbs)
|
||||
|
||||
data = bytearray()
|
||||
data.extend(header.vban)
|
||||
data.extend(header.format_sr)
|
||||
data.extend(header.format_nbs)
|
||||
data.extend(header.format_nbc)
|
||||
data.extend(header.format_bit)
|
||||
data.extend(header.streamname)
|
||||
data.extend(framecounter.to_bytes(4, 'little'))
|
||||
return bytes(data)
|
||||
|
||||
|
||||
@dataclass
|
||||
class VbanRtPacketHeader:
|
||||
"""Represents the header of an RT response packet"""
|
||||
|
||||
name: str = 'Voicemeeter-RTP'
|
||||
format_sr: int = VBAN_PROTOCOL_SERVICE
|
||||
format_nbs: int = 0
|
||||
format_nbc: int = VBAN_SERVICE_RTPACKET
|
||||
format_bit: int = 0
|
||||
|
||||
@property
|
||||
def vban(self) -> bytes:
|
||||
return b'VBAN'
|
||||
|
||||
@property
|
||||
def streamname(self) -> bytes:
|
||||
return self.name.encode('ascii') + bytes(16 - len(self.name))
|
||||
|
||||
@classmethod
|
||||
def from_bytes(cls, data: bytes):
|
||||
if len(data) < HEADER_SIZE:
|
||||
raise ValueError('Data is too short to be a valid VbanRTPPacketHeader')
|
||||
|
||||
name = data[8:24].rstrip(b'\x00').decode('utf-8')
|
||||
return cls(
|
||||
name=name,
|
||||
format_sr=data[4] & VBAN_SERVICE_MASK,
|
||||
format_nbs=data[5],
|
||||
format_nbc=data[6],
|
||||
format_bit=data[7],
|
||||
)
|
||||
|
||||
|
||||
@dataclass
|
||||
class RequestHeader:
|
||||
"""Represents the header of an RT request packet"""
|
||||
|
||||
name: str
|
||||
bps_index: int
|
||||
channel: int
|
||||
framecounter: int = 0
|
||||
|
||||
@property
|
||||
def vban(self) -> bytes:
|
||||
return b'VBAN'
|
||||
|
||||
@property
|
||||
def sr(self) -> bytes:
|
||||
return (VBAN_PROTOCOL_TXT + self.bps_index).to_bytes(1, 'little')
|
||||
|
||||
@property
|
||||
def nbs(self) -> bytes:
|
||||
return (0).to_bytes(1, 'little')
|
||||
|
||||
@property
|
||||
def nbc(self) -> bytes:
|
||||
return (self.channel).to_bytes(1, 'little')
|
||||
|
||||
@property
|
||||
def bit(self) -> bytes:
|
||||
return (0x10).to_bytes(1, 'little')
|
||||
|
||||
@property
|
||||
def streamname(self) -> bytes:
|
||||
return self.name.encode() + bytes(16 - len(self.name))
|
||||
|
||||
@classmethod
|
||||
def to_bytes(
|
||||
cls, name: str, bps_index: int, channel: int, framecounter: int
|
||||
) -> bytes:
|
||||
header = cls(
|
||||
name=name, bps_index=bps_index, channel=channel, framecounter=framecounter
|
||||
)
|
||||
|
||||
data = bytearray()
|
||||
data.extend(header.vban)
|
||||
data.extend(header.sr)
|
||||
data.extend(header.nbs)
|
||||
data.extend(header.nbc)
|
||||
data.extend(header.bit)
|
||||
data.extend(header.streamname)
|
||||
data.extend(header.framecounter.to_bytes(4, 'little'))
|
||||
return bytes(data)
|
||||
138
vban_cmd/recorder.py
Normal file
138
vban_cmd/recorder.py
Normal file
@ -0,0 +1,138 @@
|
||||
import os
|
||||
import re
|
||||
|
||||
from .error import VBANCMDError
|
||||
from .iremote import IRemote
|
||||
from .meta import action_fn
|
||||
|
||||
|
||||
class Recorder(IRemote):
|
||||
"""
|
||||
Implements the common interface
|
||||
|
||||
Defines concrete implementation for recorder
|
||||
"""
|
||||
|
||||
@classmethod
|
||||
def make(cls, remote):
|
||||
"""
|
||||
Factory function for recorder class.
|
||||
|
||||
Returns a Recorder class of a kind.
|
||||
"""
|
||||
Recorder_cls = type(
|
||||
f'Recorder{remote.kind}',
|
||||
(cls,),
|
||||
{
|
||||
**{
|
||||
param: action_fn(param)
|
||||
for param in [
|
||||
'play',
|
||||
'stop',
|
||||
'pause',
|
||||
'replay',
|
||||
'record',
|
||||
'ff',
|
||||
'rew',
|
||||
]
|
||||
},
|
||||
},
|
||||
)
|
||||
return Recorder_cls(remote)
|
||||
|
||||
def __str__(self):
|
||||
return f'{type(self).__name__}'
|
||||
|
||||
@property
|
||||
def identifier(self) -> str:
|
||||
return 'recorder'
|
||||
|
||||
@property
|
||||
def samplerate(self) -> int:
|
||||
return
|
||||
|
||||
@samplerate.setter
|
||||
def samplerate(self, val: int):
|
||||
opts = (22050, 24000, 32000, 44100, 48000, 88200, 96000, 176400, 192000)
|
||||
if val not in opts:
|
||||
self.logger.warning(f'samplerate got: {val} but expected a value in {opts}')
|
||||
self.setter('samplerate', val)
|
||||
|
||||
@property
|
||||
def bitresolution(self) -> int:
|
||||
return
|
||||
|
||||
@bitresolution.setter
|
||||
def bitresolution(self, val: int):
|
||||
opts = (8, 16, 24, 32)
|
||||
if val not in opts:
|
||||
self.logger.warning(
|
||||
f'bitresolution got: {val} but expected a value in {opts}'
|
||||
)
|
||||
self.setter('bitresolution', val)
|
||||
|
||||
@property
|
||||
def channel(self) -> int:
|
||||
return
|
||||
|
||||
@channel.setter
|
||||
def channel(self, val: int):
|
||||
if not 1 <= val <= 8:
|
||||
self.logger.warning(f'channel got: {val} but expected a value from 1 to 8')
|
||||
self.setter('channel', val)
|
||||
|
||||
@property
|
||||
def kbps(self):
|
||||
return
|
||||
|
||||
@kbps.setter
|
||||
def kbps(self, val: int):
|
||||
opts = (32, 40, 48, 56, 64, 80, 96, 112, 128, 160, 192, 224, 256, 320)
|
||||
if val not in opts:
|
||||
self.logger.warning(f'kbps got: {val} but expected a value in {opts}')
|
||||
self.setter('kbps', val)
|
||||
|
||||
@property
|
||||
def gain(self) -> float:
|
||||
return
|
||||
|
||||
@gain.setter
|
||||
def gain(self, val: float):
|
||||
self.setter('gain', val)
|
||||
|
||||
def load(self, file: os.PathLike):
|
||||
try:
|
||||
# Convert to string, use forward slashes, and wrap in quotes for spaces
|
||||
file_path = f'"{os.fspath(file).replace(chr(92), "/")}"'
|
||||
self.setter('load', file_path)
|
||||
except UnicodeError:
|
||||
raise VBANCMDError('File full directory must be a raw string')
|
||||
|
||||
def goto(self, time_str):
|
||||
def get_sec():
|
||||
"""Get seconds from time string"""
|
||||
h, m, s = time_str.split(':')
|
||||
return int(h) * 3600 + int(m) * 60 + int(s)
|
||||
|
||||
time_str = str(time_str) # coerce the type
|
||||
if (
|
||||
re.match(
|
||||
r'^(?:[01]\d|2[0123]):(?:[012345]\d):(?:[012345]\d)$',
|
||||
time_str,
|
||||
)
|
||||
is not None
|
||||
):
|
||||
self.setter('goto', get_sec())
|
||||
else:
|
||||
self.logger.warning(
|
||||
"goto expects a string that matches the format 'hh:mm:ss'"
|
||||
)
|
||||
|
||||
def filetype(self, val: str):
|
||||
opts = {'wav': 1, 'aiff': 2, 'bwf': 3, 'mp3': 100}
|
||||
try:
|
||||
self.setter('filetype', opts[val.lower()])
|
||||
except KeyError:
|
||||
self.logger.warning(
|
||||
f'filetype got: {val} but expected a value in {list(opts.keys())}'
|
||||
)
|
||||
@ -540,22 +540,11 @@ class StripLevel(IRemote):
|
||||
def getter(self):
|
||||
"""Returns a tuple of level values for the channel."""
|
||||
|
||||
def fget(i):
|
||||
return round((((1 << 16) - 1) - i) * -0.01, 1)
|
||||
|
||||
if not self._remote.stopped() and self._remote.event.ldirty:
|
||||
return tuple(
|
||||
fget(i)
|
||||
for i in self._remote.cache['strip_level'][
|
||||
self.range[0] : self.range[-1]
|
||||
]
|
||||
)
|
||||
return tuple(
|
||||
fget(i)
|
||||
for i in self._remote._get_levels(self.public_packets[NBS.zero])[0][
|
||||
self.range[0] : self.range[-1]
|
||||
]
|
||||
)
|
||||
return self._remote.cache['strip_level'][self.range[0] : self.range[-1]]
|
||||
return self.public_packets[NBS.zero].levels.strip[
|
||||
self.range[0] : self.range[-1]
|
||||
]
|
||||
|
||||
@property
|
||||
def identifier(self) -> str:
|
||||
|
||||
@ -15,13 +15,27 @@ def cache_bool(func, param):
|
||||
return wrapper
|
||||
|
||||
|
||||
def cache_int(func, param):
|
||||
"""Check cache for an int prop"""
|
||||
|
||||
def wrapper(*args, **kwargs):
|
||||
self, *rem = args
|
||||
if self._cmd(param) in self._remote.cache:
|
||||
return self._remote.cache.pop(self._cmd(param))
|
||||
if self._remote.sync:
|
||||
self._remote.clear_dirty()
|
||||
return func(*args, **kwargs)
|
||||
|
||||
return wrapper
|
||||
|
||||
|
||||
def cache_string(func, param):
|
||||
"""Check cache for a string prop"""
|
||||
|
||||
def wrapper(*args, **kwargs):
|
||||
self, *rem = args
|
||||
if self._cmd(param) in self._remote.cache:
|
||||
return self._remote.cache.pop(self._cmd(param))
|
||||
return self._remote.cache.pop(self._cmd(param)).strip('"')
|
||||
if self._remote.sync:
|
||||
self._remote.clear_dirty()
|
||||
return func(*args, **kwargs)
|
||||
@ -49,39 +63,20 @@ def depth(d):
|
||||
return 0
|
||||
|
||||
|
||||
def script(func):
|
||||
"""Convert dictionary to script"""
|
||||
|
||||
def wrapper(*args):
|
||||
remote, script = args
|
||||
if isinstance(script, dict):
|
||||
params = ''
|
||||
for key, val in script.items():
|
||||
obj, m2, *rem = key.split('-')
|
||||
index = int(m2) if m2.isnumeric() else int(*rem)
|
||||
params += ';'.join(
|
||||
f'{obj}{f".{m2}stream" if not m2.isnumeric() else ""}[{index}].{k}={int(v) if isinstance(v, bool) else v}'
|
||||
for k, v in val.items()
|
||||
)
|
||||
params += ';'
|
||||
script = params
|
||||
return func(remote, script)
|
||||
|
||||
return wrapper
|
||||
|
||||
|
||||
def comp(t0: tuple, t1: tuple) -> Iterator[bool]:
|
||||
"""
|
||||
Generator function, accepts two tuples.
|
||||
Generator function, accepts two tuples of dB values.
|
||||
|
||||
Evaluates equality of each member in both tuples.
|
||||
Only ignores changes when levels are very quiet (below -72 dB).
|
||||
"""
|
||||
|
||||
for a, b in zip(t0, t1):
|
||||
if ((1 << 16) - 1) - b <= 7200:
|
||||
yield a == b
|
||||
# If both values are very quiet (below -72dB), ignore small changes
|
||||
if a <= -72.0 and b <= -72.0:
|
||||
yield a == b # Both quiet, check if they're equal
|
||||
else:
|
||||
yield True
|
||||
yield a != b # At least one has significant level, detect changes
|
||||
|
||||
|
||||
def deep_merge(dict1, dict2):
|
||||
|
||||
@ -5,14 +5,14 @@ import threading
|
||||
import time
|
||||
from pathlib import Path
|
||||
from queue import Queue
|
||||
from typing import Iterable, Union
|
||||
from typing import Union
|
||||
|
||||
from .enums import NBS
|
||||
from .error import VBANCMDError
|
||||
from .event import Event
|
||||
from .packet import RequestHeader
|
||||
from .packet.headers import VbanMatrixResponseHeader, VbanRequestHeader
|
||||
from .subject import Subject
|
||||
from .util import bump_framecounter, deep_merge, script
|
||||
from .util import bump_framecounter, deep_merge
|
||||
from .worker import Producer, Subscriber, Updater
|
||||
|
||||
logger = logging.getLogger(__name__)
|
||||
@ -86,8 +86,8 @@ class VbanCmd(abc.ABC):
|
||||
self.logout()
|
||||
|
||||
def login(self) -> None:
|
||||
"""Starts the subscriber and updater threads (unless in outbound mode)"""
|
||||
if not self.outbound:
|
||||
"""Starts the subscriber and updater threads (unless disable_rt_listeners is True) and logs into Voicemeeter."""
|
||||
if not self.disable_rt_listeners:
|
||||
self.event.info()
|
||||
|
||||
self.stop_event = threading.Event()
|
||||
@ -120,38 +120,39 @@ class VbanCmd(abc.ABC):
|
||||
def stopped(self):
|
||||
return self.stop_event is None or self.stop_event.is_set()
|
||||
|
||||
def _send_request(self, payload: str) -> None:
|
||||
"""Sends a request packet over the network and bumps the framecounter."""
|
||||
self.sock.sendto(
|
||||
VbanRequestHeader.encode_with_payload(
|
||||
name=self.streamname,
|
||||
bps_index=self.BPS_OPTS.index(self.bps),
|
||||
channel=self.channel,
|
||||
framecounter=self._framecounter,
|
||||
payload=payload,
|
||||
),
|
||||
(socket.gethostbyname(self.ip), self.port),
|
||||
)
|
||||
self._framecounter = bump_framecounter(self._framecounter)
|
||||
|
||||
def _set_rt(self, cmd: str, val: Union[str, float]):
|
||||
"""Sends a string request command over a network."""
|
||||
req_packet = RequestHeader.to_bytes(
|
||||
name=self.streamname,
|
||||
bps_index=self.BPS_OPTS.index(self.bps),
|
||||
channel=self.channel,
|
||||
framecounter=self._framecounter,
|
||||
)
|
||||
self.sock.sendto(
|
||||
req_packet + f'{cmd}={val};'.encode(),
|
||||
(socket.gethostbyname(self.ip), self.port),
|
||||
)
|
||||
self._framecounter = bump_framecounter(self._framecounter)
|
||||
|
||||
self._send_request(f'{cmd}={val};')
|
||||
self.cache[cmd] = val
|
||||
|
||||
@script
|
||||
def sendtext(self, script):
|
||||
def sendtext(self, script) -> str | None:
|
||||
"""Sends a multiple parameter string over a network."""
|
||||
req_packet = RequestHeader.to_bytes(
|
||||
name=self.streamname,
|
||||
bps_index=self.BPS_OPTS.index(self.bps),
|
||||
channel=self.channel,
|
||||
framecounter=self._framecounter,
|
||||
)
|
||||
self.sock.sendto(
|
||||
req_packet + script.encode(),
|
||||
(socket.gethostbyname(self.ip), self.port),
|
||||
)
|
||||
self._framecounter = bump_framecounter(self._framecounter)
|
||||
|
||||
self._send_request(script)
|
||||
self.logger.debug(f'sendtext: {script}')
|
||||
|
||||
if self.disable_rt_listeners and script.endswith(('?', '?;')):
|
||||
try:
|
||||
response = VbanMatrixResponseHeader.extract_payload(
|
||||
self.sock.recv(1024)
|
||||
)
|
||||
return response
|
||||
except ValueError as e:
|
||||
self.logger.warning(f'Error extracting matrix response: {e}')
|
||||
|
||||
time.sleep(self.DELAY)
|
||||
|
||||
@property
|
||||
@ -184,17 +185,6 @@ class VbanCmd(abc.ABC):
|
||||
while self.pdirty:
|
||||
time.sleep(self.DELAY)
|
||||
|
||||
def _get_levels(self, packet) -> Iterable:
|
||||
"""
|
||||
returns both level arrays (strip_levels, bus_levels) BEFORE math conversion
|
||||
|
||||
strip levels in PREFADER mode.
|
||||
"""
|
||||
return (
|
||||
packet.inputlevels,
|
||||
packet.outputlevels,
|
||||
)
|
||||
|
||||
def apply(self, data: dict):
|
||||
"""
|
||||
Sets all parameters of a dict
|
||||
|
||||
@ -5,16 +5,14 @@ import time
|
||||
|
||||
from .enums import NBS
|
||||
from .error import VBANCMDConnectionError
|
||||
from .packet import (
|
||||
from .packet.headers import (
|
||||
HEADER_SIZE,
|
||||
VBAN_PROTOCOL_SERVICE,
|
||||
VBAN_SERVICE_RTPACKET,
|
||||
SubscribeHeader,
|
||||
VbanRtPacket,
|
||||
VbanRtPacketHeader,
|
||||
VbanRtPacketNBS0,
|
||||
VbanRtPacketNBS1,
|
||||
VbanPacket,
|
||||
VbanResponseHeader,
|
||||
VbanSubscribeHeader,
|
||||
)
|
||||
from .packet.nbs0 import VbanPacketNBS0
|
||||
from .packet.nbs1 import VbanPacketNBS1
|
||||
from .util import bump_framecounter
|
||||
|
||||
logger = logging.getLogger(__name__)
|
||||
@ -34,7 +32,7 @@ class Subscriber(threading.Thread):
|
||||
while not self.stopped():
|
||||
try:
|
||||
for nbs in NBS:
|
||||
sub_packet = SubscribeHeader().to_bytes(nbs, self._framecounter)
|
||||
sub_packet = VbanSubscribeHeader().to_bytes(nbs, self._framecounter)
|
||||
self._remote.sock.sendto(
|
||||
sub_packet, (self._remote.ip, self._remote.port)
|
||||
)
|
||||
@ -75,45 +73,45 @@ class Producer(threading.Thread):
|
||||
(
|
||||
self._remote.cache['strip_level'],
|
||||
self._remote.cache['bus_level'],
|
||||
) = self._remote._get_levels(self._remote.public_packets[NBS.zero])
|
||||
) = self._remote.public_packets[NBS.zero].levels
|
||||
|
||||
def _get_rt(self) -> VbanRtPacket:
|
||||
def _get_rt(self) -> VbanPacket:
|
||||
"""Attempt to fetch data packet until a valid one found"""
|
||||
|
||||
while True:
|
||||
if resp := self._fetch_rt_packet():
|
||||
return resp
|
||||
|
||||
def _fetch_rt_packet(self) -> VbanRtPacket | None:
|
||||
def _fetch_rt_packet(self) -> VbanPacket | None:
|
||||
try:
|
||||
data, _ = self._remote.sock.recvfrom(2048)
|
||||
if len(data) < HEADER_SIZE:
|
||||
return
|
||||
|
||||
response_header = VbanRtPacketHeader.from_bytes(data[:HEADER_SIZE])
|
||||
if (
|
||||
response_header.format_sr != VBAN_PROTOCOL_SERVICE
|
||||
or response_header.format_nbc != VBAN_SERVICE_RTPACKET
|
||||
):
|
||||
return
|
||||
|
||||
match response_header.format_nbs:
|
||||
case NBS.zero:
|
||||
return VbanRtPacketNBS0.from_bytes(
|
||||
nbs=NBS.zero, kind=self._remote.kind, data=data
|
||||
)
|
||||
|
||||
case NBS.one:
|
||||
return VbanRtPacketNBS1.from_bytes(
|
||||
nbs=NBS.one, kind=self._remote.kind, data=data
|
||||
)
|
||||
return None
|
||||
except TimeoutError as e:
|
||||
self.logger.exception(f'{type(e).__name__}: {e}')
|
||||
raise VBANCMDConnectionError(
|
||||
f'timeout waiting for RtPacket from {self._remote.ip}'
|
||||
f'timeout waiting for response from {self._remote.ip}:{self._remote.port}'
|
||||
) from e
|
||||
|
||||
try:
|
||||
header = VbanResponseHeader.from_bytes(data[:HEADER_SIZE])
|
||||
except ValueError as e:
|
||||
self.logger.debug(f'Error parsing response packet: {e}')
|
||||
return None
|
||||
|
||||
match header.format_nbs:
|
||||
case NBS.zero:
|
||||
return VbanPacketNBS0.from_bytes(
|
||||
nbs=NBS.zero, kind=self._remote.kind, data=data
|
||||
)
|
||||
|
||||
case NBS.one:
|
||||
return VbanPacketNBS1.from_bytes(
|
||||
nbs=NBS.one, kind=self._remote.kind, data=data
|
||||
)
|
||||
|
||||
return None
|
||||
|
||||
def stopped(self):
|
||||
return self.stop_event.is_set()
|
||||
|
||||
@ -140,7 +138,7 @@ class Producer(threading.Thread):
|
||||
self.queue.put('pdirty')
|
||||
if self._remote.event.ldirty:
|
||||
self.queue.put('ldirty')
|
||||
time.sleep(self._remote.ratelimit)
|
||||
# time.sleep(self._remote.ratelimit)
|
||||
self.logger.debug(f'terminating {self.name} thread')
|
||||
self.queue.put(None)
|
||||
|
||||
@ -177,9 +175,6 @@ class Updater(threading.Thread):
|
||||
(
|
||||
self._remote.cache['strip_level'],
|
||||
self._remote.cache['bus_level'],
|
||||
) = (
|
||||
self._remote._public_packets[NBS.zero].inputlevels,
|
||||
self._remote._public_packets[NBS.zero].outputlevels,
|
||||
)
|
||||
) = self._remote.public_packets[NBS.zero].levels
|
||||
self._remote.subject.notify(event)
|
||||
self.logger.debug(f'terminating {self.name} thread')
|
||||
|
||||
Loading…
x
Reference in New Issue
Block a user