mirror of
https://github.com/onyx-and-iris/vban-cmd-python.git
synced 2026-04-18 13:03:31 +00:00
Compare commits
30 Commits
f6d92d1c34
...
add-event-
| Author | SHA1 | Date | |
|---|---|---|---|
| cbcca14481 | |||
| f584d53835 | |||
| 72d182a488 | |||
| ee32f92914 | |||
| 3b65035e50 | |||
| c8b4bde49d | |||
| 47e9203b1e | |||
| d48e7ecd79 | |||
| 7e09a0d321 | |||
| d41ee1a12a | |||
| 1e499cd99d | |||
| 9bf52b5c11 | |||
| 77ba347e99 | |||
| 94fa33cebf | |||
| ef105d878b | |||
| 956f759e73 | |||
| dab519be9f | |||
| a4b91bf5c6 | |||
| 2a98707bf8 | |||
| 8e30c57020 | |||
| 04e18b304b | |||
| 4de384c66c | |||
| 2c8659a4e5 | |||
| 41e427e46b | |||
| fc6fdb44b5 | |||
| b49dc3b9b3 | |||
| 1ad0347478 | |||
| 2c8e4cc87c | |||
| fc3b31dfa7 | |||
| 544e0f2a32 |
39
CHANGELOG.md
39
CHANGELOG.md
@@ -11,6 +11,45 @@ Before any major/minor/patch bump all unit tests will be run to verify they pass
|
||||
|
||||
- [x]
|
||||
|
||||
## [2.3.2] - 2023-07-12
|
||||
|
||||
### Added
|
||||
|
||||
- vban.{instream,outstream} tuples now contain classes that represent MIDI and TEXT streams.
|
||||
|
||||
### Fixed
|
||||
|
||||
- apply_config() now performs a deep merge when extending a config with another.
|
||||
|
||||
## [2.3.0] - 2023-07-11
|
||||
|
||||
### Added
|
||||
|
||||
- user configs may now extend other user configs. check `config extends` section in README.
|
||||
|
||||
## [2.2.0] - 2023-07-08
|
||||
|
||||
### Added
|
||||
|
||||
- button, vban classes implemented
|
||||
- \__repr\__() method added to base class
|
||||
|
||||
## [2.1.2] - 2023-07-05
|
||||
|
||||
### Added
|
||||
|
||||
- `outbound` kwarg let's you disable incoming rt packets. Essentially the interface will work only in one direction.
|
||||
|
||||
This is useful if you are only interested in sending commands out to voicemeeter but don't need to receive parameter states.
|
||||
|
||||
By default outbound is False.
|
||||
|
||||
- sendtext logging added in base class.
|
||||
|
||||
### Fixed
|
||||
|
||||
- Bug in apply() if invoked from a higher class (not base class)
|
||||
|
||||
## [2.0.0] - 2023-06-25
|
||||
|
||||
This update introduces some breaking changes:
|
||||
|
||||
65
README.md
65
README.md
@@ -8,7 +8,7 @@
|
||||
|
||||
# VBAN CMD
|
||||
|
||||
This python interface allows you to get and set Voicemeeter parameter values over a network.
|
||||
This python interface allows you to transmit Voicemeeter parameters over a network.
|
||||
|
||||
It may be used standalone or to extend the [Voicemeeter Remote Python API](https://github.com/onyx-and-iris/voicemeeter-api-python)
|
||||
|
||||
@@ -44,7 +44,7 @@ port = 6980
|
||||
streamname = "Command1"
|
||||
```
|
||||
|
||||
It should be placed next to your `__main__.py` file.
|
||||
It should be placed in \<user home directory\> / "Documents" / "Voicemeeter" / "configs"
|
||||
|
||||
Alternatively you may pass `ip`, `port`, `streamname` as keyword arguments.
|
||||
|
||||
@@ -251,7 +251,6 @@ The following properties are available.
|
||||
example:
|
||||
|
||||
```python
|
||||
vban.bus[4].eq = true
|
||||
print(vban.bus[0].label)
|
||||
```
|
||||
|
||||
@@ -262,6 +261,10 @@ The following properties are available.
|
||||
- `on`: boolean
|
||||
- `ab`: boolean
|
||||
|
||||
```python
|
||||
vban.bus[4].eq.on = true
|
||||
```
|
||||
|
||||
##### Modes
|
||||
|
||||
The following properties are available.
|
||||
@@ -359,8 +362,8 @@ vban.apply(
|
||||
Or for each class you may do:
|
||||
|
||||
```python
|
||||
vban.strip[0].apply(mute: true, gain: 3.2, A1: true)
|
||||
vban.bus[0].apply(A1: true)
|
||||
vban.strip[0].apply({"mute": True, "gain": 3.2, "A1": True})
|
||||
vban.vban.outstream[0].apply({"on": True, "name": "streamname", "bit": 24})
|
||||
```
|
||||
|
||||
## Config Files
|
||||
@@ -369,7 +372,7 @@ vban.bus[0].apply(A1: true)
|
||||
|
||||
You may load config files in TOML format.
|
||||
Three example configs have been included with the package. Remember to save
|
||||
current settings before loading a user config. To set one you may do:
|
||||
current settings before loading a user config. To load one you may do:
|
||||
|
||||
```python
|
||||
import vban_cmd
|
||||
@@ -379,6 +382,27 @@ with vban_cmd.api('banana') as vban:
|
||||
|
||||
will load a config file at configs/banana/example.toml for Voicemeeter Banana.
|
||||
|
||||
Your configs may be located in one of the following paths:
|
||||
- \<current working directory\> / "configs" / kind_id
|
||||
- \<user home directory\> / ".config" / "vban-cmd" / kind_id
|
||||
- \<user home directory\> / "Documents" / "Voicemeeter" / "configs" / kind_id
|
||||
|
||||
If a config with the same name is located in multiple locations, only the first one found is loaded into memory, in the above order.
|
||||
|
||||
#### `config extends`
|
||||
|
||||
You may also load a config that extends another config with overrides or additional parameters.
|
||||
|
||||
You just need to define a key `extends` in the config TOML, that names the config to be extended.
|
||||
|
||||
Three example 'extender' configs are included with the repo. You may load them with:
|
||||
|
||||
```python
|
||||
import voicemeeterlib
|
||||
with voicemeeterlib.api('banana') as vm:
|
||||
vm.apply_config('extender')
|
||||
```
|
||||
|
||||
## Events
|
||||
|
||||
Level updates are considered high volume, by default they are NOT listened for. Use `subs` keyword arg to initialize event updates.
|
||||
@@ -458,8 +482,10 @@ You may pass the following optional keyword arguments:
|
||||
- `ip`: str, ip or hostname of remote machine
|
||||
- `streamname`: str, name of the stream to connect to.
|
||||
- `port`: int=6980, vban udp port of remote machine.
|
||||
- `pdirty`: parameter updates
|
||||
- `ldirty`: level updates
|
||||
- `pdirty`: boolean=False, parameter updates
|
||||
- `ldirty`: boolean=False, level updates
|
||||
- `timeout`: int=5, amount of time (seconds) to wait for an incoming RT data packet (parameter states).
|
||||
- `outbound`: boolean=False, set `True` if you are only interested in sending commands. (no rt packets will be received)
|
||||
|
||||
#### `vban.pdirty`
|
||||
|
||||
@@ -479,14 +505,31 @@ vban.sendtext("Strip[0].Mute=1;Bus[0].Mono=1")
|
||||
|
||||
#### `vban.public_packet`
|
||||
|
||||
Returns a `VbanRtPacket`. Designed to be used internally by the interface but available for parsing through this read only property object. States not guaranteed to be current (requires use of dirty parameters to confirm).
|
||||
Returns a `VbanRtPacket`. Designed to be used internally by the interface but available for parsing through this read only property object.
|
||||
|
||||
### `Errors`
|
||||
States not guaranteed to be current (requires use of dirty parameters to confirm).
|
||||
|
||||
## Errors
|
||||
|
||||
- `errors.VBANCMDError`: Exception raised when general errors occur.
|
||||
- `errors.VBANCMDConnectionError`: Exception raised when connection/timeout errors occur.
|
||||
|
||||
### `Tests`
|
||||
## Logging
|
||||
|
||||
It's possible to see the messages sent by the interface's setters and getters, may be useful for debugging.
|
||||
|
||||
example:
|
||||
```python
|
||||
import vban_cmd
|
||||
|
||||
logging.basicConfig(level=logging.DEBUG)
|
||||
|
||||
opts = {"ip": "ip.local", "port": 6980, "streamname": "Command1"}
|
||||
with vban_cmd.api('banana', **opts) as vban:
|
||||
...
|
||||
```
|
||||
|
||||
## Tests
|
||||
|
||||
First make sure you installed the [development dependencies](https://github.com/onyx-and-iris/vban-cmd-python#installation)
|
||||
|
||||
|
||||
12
configs/banana/extender.toml
Normal file
12
configs/banana/extender.toml
Normal file
@@ -0,0 +1,12 @@
|
||||
extends = "example"
|
||||
[strip-0]
|
||||
label = "strip0_extended"
|
||||
A1 = false
|
||||
gain = 0.0
|
||||
|
||||
[bus-0]
|
||||
label = "bus0_extended"
|
||||
mute = false
|
||||
|
||||
[vban-in-3]
|
||||
name = "vban_extended"
|
||||
12
configs/basic/extender.toml
Normal file
12
configs/basic/extender.toml
Normal file
@@ -0,0 +1,12 @@
|
||||
extends = "example"
|
||||
[strip-0]
|
||||
label = "strip0_extended"
|
||||
A1 = false
|
||||
gain = 0.0
|
||||
|
||||
[bus-0]
|
||||
label = "bus0_extended"
|
||||
mute = false
|
||||
|
||||
[vban-in-3]
|
||||
name = "vban_extended"
|
||||
12
configs/potato/extender.toml
Normal file
12
configs/potato/extender.toml
Normal file
@@ -0,0 +1,12 @@
|
||||
extends = "example"
|
||||
[strip-0]
|
||||
label = "strip0_extended"
|
||||
A1 = false
|
||||
gain = 0.0
|
||||
|
||||
[bus-0]
|
||||
label = "bus0_extended"
|
||||
mute = false
|
||||
|
||||
[vban-in-3]
|
||||
name = "vban_extended"
|
||||
@@ -8,6 +8,8 @@ from tkinter import ttk
|
||||
|
||||
|
||||
class App(tk.Tk):
|
||||
INDEX = 3
|
||||
|
||||
def __init__(self, vban):
|
||||
super().__init__()
|
||||
self.vban = vban
|
||||
@@ -15,8 +17,8 @@ class App(tk.Tk):
|
||||
self.vban.observer.add(self.on_ldirty)
|
||||
|
||||
# create widget variables
|
||||
self.button_var = tk.BooleanVar(value=vban.strip[3].mute)
|
||||
self.slider_var = tk.DoubleVar(value=vban.strip[3].gain)
|
||||
self.button_var = tk.BooleanVar(value=vban.strip[self.INDEX].mute)
|
||||
self.slider_var = tk.DoubleVar(value=vban.strip[self.INDEX].gain)
|
||||
self.meter_var = tk.DoubleVar(value=self._get_level())
|
||||
self.gainlabel_var = tk.StringVar(value=self.slider_var.get())
|
||||
|
||||
@@ -24,11 +26,12 @@ class App(tk.Tk):
|
||||
self.style = ttk.Style()
|
||||
self.style.theme_use("clam")
|
||||
self.style.configure(
|
||||
"Mute.TButton", foreground="#cd5c5c" if vban.strip[3].mute else "#5a5a5a"
|
||||
"Mute.TButton",
|
||||
foreground="#cd5c5c" if vban.strip[self.INDEX].mute else "#5a5a5a",
|
||||
)
|
||||
|
||||
# create labelframe and grid it onto the mainframe
|
||||
self.labelframe = tk.LabelFrame(text=self.vban.strip[3].label)
|
||||
self.labelframe = tk.LabelFrame(text=self.vban.strip[self.INDEX].label)
|
||||
self.labelframe.grid(padx=1)
|
||||
|
||||
# create slider and grid it onto the labelframe
|
||||
@@ -44,6 +47,7 @@ class App(tk.Tk):
|
||||
column=0,
|
||||
row=0,
|
||||
)
|
||||
slider.bind("<Double-Button-1>", self.on_button_double_click)
|
||||
|
||||
# create level meter and grid it onto the labelframe
|
||||
level_meter = ttk.Progressbar(
|
||||
@@ -72,18 +76,23 @@ class App(tk.Tk):
|
||||
|
||||
def on_slider_move(self, *args):
|
||||
val = round(self.slider_var.get(), 1)
|
||||
self.vban.strip[3].gain = val
|
||||
self.vban.strip[self.INDEX].gain = val
|
||||
self.gainlabel_var.set(val)
|
||||
|
||||
def on_button_press(self):
|
||||
self.button_var.set(not self.button_var.get())
|
||||
self.vban.strip[3].mute = self.button_var.get()
|
||||
self.vban.strip[self.INDEX].mute = self.button_var.get()
|
||||
self.style.configure(
|
||||
"Mute.TButton", foreground="#cd5c5c" if self.button_var.get() else "#5a5a5a"
|
||||
)
|
||||
|
||||
def on_button_double_click(self, e):
|
||||
self.slider_var.set(0)
|
||||
self.gainlabel_var.set(0)
|
||||
self.vban.strip[self.INDEX].gain = 0
|
||||
|
||||
def _get_level(self):
|
||||
val = max(self.vban.strip[3].levels.prefader)
|
||||
val = max(self.vban.strip[self.INDEX].levels.prefader)
|
||||
return 0 if self.button_var.get() else 72 + val - 12 + self.slider_var.get()
|
||||
|
||||
def on_ldirty(self):
|
||||
|
||||
@@ -1,6 +1,6 @@
|
||||
[tool.poetry]
|
||||
name = "vban-cmd"
|
||||
version = "2.0.0"
|
||||
version = "2.4.3"
|
||||
description = "Python interface for the VBAN RT Packet Service (Sendtext)"
|
||||
authors = ["onyx-and-iris <code@onyxandiris.online>"]
|
||||
license = "MIT"
|
||||
|
||||
@@ -14,9 +14,13 @@ class TestRemoteFactories:
|
||||
assert hasattr(vban, "strip")
|
||||
assert hasattr(vban, "bus")
|
||||
assert hasattr(vban, "command")
|
||||
assert hasattr(vban, "button")
|
||||
assert hasattr(vban, "vban")
|
||||
|
||||
assert len(vban.strip) == 3
|
||||
assert len(vban.bus) == 2
|
||||
assert len(vban.button) == 80
|
||||
assert len(vban.vban.instream) == 6 and len(vban.vban.outstream) == 5
|
||||
|
||||
@pytest.mark.skipif(
|
||||
data.name != "banana",
|
||||
@@ -26,9 +30,13 @@ class TestRemoteFactories:
|
||||
assert hasattr(vban, "strip")
|
||||
assert hasattr(vban, "bus")
|
||||
assert hasattr(vban, "command")
|
||||
assert hasattr(vban, "button")
|
||||
assert hasattr(vban, "vban")
|
||||
|
||||
assert len(vban.strip) == 5
|
||||
assert len(vban.bus) == 5
|
||||
assert len(vban.button) == 80
|
||||
assert len(vban.vban.instream) == 10 and len(vban.vban.outstream) == 9
|
||||
|
||||
@pytest.mark.skipif(
|
||||
data.name != "potato",
|
||||
@@ -38,6 +46,10 @@ class TestRemoteFactories:
|
||||
assert hasattr(vban, "strip")
|
||||
assert hasattr(vban, "bus")
|
||||
assert hasattr(vban, "command")
|
||||
assert hasattr(vban, "button")
|
||||
assert hasattr(vban, "vban")
|
||||
|
||||
assert len(vban.strip) == 8
|
||||
assert len(vban.bus) == 8
|
||||
assert len(vban.button) == 80
|
||||
assert len(vban.vban.instream) == 10 and len(vban.vban.outstream) == 9
|
||||
|
||||
@@ -102,7 +102,7 @@ class BusLevel(IRemote):
|
||||
def fget(i):
|
||||
return round((((1 << 16) - 1) - i) * -0.01, 1)
|
||||
|
||||
if self._remote.running and self._remote.event.ldirty:
|
||||
if not self._remote.stopped() and self._remote.event.ldirty:
|
||||
return tuple(
|
||||
fget(i)
|
||||
for i in self._remote.cache["bus_level"][self.range[0] : self.range[-1]]
|
||||
|
||||
@@ -1,6 +1,6 @@
|
||||
class VBANCMDError(Exception):
|
||||
"""Exception raised when general errors occur"""
|
||||
"""Base VBANCMD Exception class. Raised when general errors occur"""
|
||||
|
||||
|
||||
class VBANCMDConnectionError(Exception):
|
||||
class VBANCMDConnectionError(VBANCMDError):
|
||||
"""Exception raised when connection/timeout errors occur"""
|
||||
|
||||
@@ -2,7 +2,7 @@ import logging
|
||||
from abc import abstractmethod
|
||||
from enum import IntEnum
|
||||
from functools import cached_property
|
||||
from typing import Iterable, NoReturn
|
||||
from typing import Iterable
|
||||
|
||||
from .bus import request_bus_obj as bus
|
||||
from .command import Command
|
||||
@@ -10,7 +10,9 @@ from .config import request_config as configs
|
||||
from .error import VBANCMDError
|
||||
from .kinds import KindMapClass
|
||||
from .kinds import request_kind_map as kindmap
|
||||
from .macrobutton import MacroButton
|
||||
from .strip import request_strip_obj as strip
|
||||
from .vban import request_vban_obj as vban
|
||||
from .vbancmd import VbanCmd
|
||||
|
||||
logger = logging.getLogger(__name__)
|
||||
@@ -23,7 +25,9 @@ class FactoryBuilder:
|
||||
Separates construction from representation.
|
||||
"""
|
||||
|
||||
BuilderProgress = IntEnum("BuilderProgress", "strip bus command", start=0)
|
||||
BuilderProgress = IntEnum(
|
||||
"BuilderProgress", "strip bus command macrobutton vban", start=0
|
||||
)
|
||||
|
||||
def __init__(self, factory, kind: KindMapClass):
|
||||
self._factory = factory
|
||||
@@ -32,10 +36,12 @@ class FactoryBuilder:
|
||||
f"Finished building strips for {self._factory}",
|
||||
f"Finished building buses for {self._factory}",
|
||||
f"Finished building commands for {self._factory}",
|
||||
f"Finished building macrobuttons for {self._factory}",
|
||||
f"Finished building vban in/out streams for {self._factory}",
|
||||
)
|
||||
self.logger = logger.getChild(self.__class__.__name__)
|
||||
|
||||
def _pinfo(self, name: str) -> NoReturn:
|
||||
def _pinfo(self, name: str) -> None:
|
||||
"""prints progress status for each step"""
|
||||
name = name.split("_")[1]
|
||||
self.logger.info(self._info[int(getattr(self.BuilderProgress, name))])
|
||||
@@ -58,6 +64,14 @@ class FactoryBuilder:
|
||||
self._factory.command = Command.make(self._factory)
|
||||
return self
|
||||
|
||||
def make_macrobutton(self):
|
||||
self._factory.button = tuple(MacroButton(self._factory, i) for i in range(80))
|
||||
return self
|
||||
|
||||
def make_vban(self):
|
||||
self._factory.vban = vban(self._factory)
|
||||
return self
|
||||
|
||||
|
||||
class FactoryBase(VbanCmd):
|
||||
"""Base class for factories, subclasses VbanCmd."""
|
||||
@@ -71,6 +85,7 @@ class FactoryBase(VbanCmd):
|
||||
"channel": 0,
|
||||
"ratelimit": 0.01,
|
||||
"timeout": 5,
|
||||
"outbound": False,
|
||||
"sync": False,
|
||||
"pdirty": False,
|
||||
"ldirty": False,
|
||||
@@ -85,12 +100,20 @@ class FactoryBase(VbanCmd):
|
||||
self.builder.make_strip,
|
||||
self.builder.make_bus,
|
||||
self.builder.make_command,
|
||||
self.builder.make_macrobutton,
|
||||
self.builder.make_vban,
|
||||
)
|
||||
self._configs = None
|
||||
|
||||
def __str__(self) -> str:
|
||||
return f"Voicemeeter {self.kind}"
|
||||
|
||||
def __repr__(self):
|
||||
return (
|
||||
type(self).__name__
|
||||
+ f"({self.kind}, ip='{self.ip}', port={self.port}, streamname='{self.streamname}')"
|
||||
)
|
||||
|
||||
@property
|
||||
@abstractmethod
|
||||
def steps(self):
|
||||
|
||||
@@ -102,7 +102,7 @@ class IRemote(metaclass=ABCMeta):
|
||||
def setter(self, param, val):
|
||||
"""Sends a string request RT packet."""
|
||||
self.logger.debug(f"setter: {self._cmd(param)}={val}")
|
||||
self._remote._set_rt(self.identifier, param, val)
|
||||
self._remote._set_rt(self._cmd(param), val)
|
||||
|
||||
def _cmd(self, param):
|
||||
cmd = (self.identifier,)
|
||||
@@ -110,6 +110,7 @@ class IRemote(metaclass=ABCMeta):
|
||||
cmd += (f".{param}",)
|
||||
return "".join(cmd)
|
||||
|
||||
@property
|
||||
@abstractmethod
|
||||
def identifier(self):
|
||||
pass
|
||||
@@ -141,10 +142,10 @@ class IRemote(metaclass=ABCMeta):
|
||||
else:
|
||||
target = getattr(self, attr)
|
||||
target.apply(val)
|
||||
|
||||
self._remote.sendtext(self._remote._script)
|
||||
return self
|
||||
|
||||
def then_wait(self):
|
||||
self.logger.debug(self._remote._script)
|
||||
self._remote.sendtext(self._remote._script)
|
||||
self._remote._script = str()
|
||||
time.sleep(self._remote.DELAY)
|
||||
|
||||
@@ -53,6 +53,14 @@ class KindMapClass(metaclass=SingletonType):
|
||||
def num_bus(self):
|
||||
return sum(self.outs)
|
||||
|
||||
@property
|
||||
def num_strip_levels(self) -> int:
|
||||
return 2 * self.phys_in + 8 * self.virt_in
|
||||
|
||||
@property
|
||||
def num_bus_levels(self) -> int:
|
||||
return 8 * (self.phys_out + self.virt_out)
|
||||
|
||||
def __str__(self) -> str:
|
||||
return self.name.capitalize()
|
||||
|
||||
@@ -62,7 +70,7 @@ class BasicMap(KindMapClass):
|
||||
name: str
|
||||
ins: tuple = (2, 1)
|
||||
outs: tuple = (1, 1)
|
||||
vban: tuple = (4, 4)
|
||||
vban: tuple = (4, 4, 1, 1)
|
||||
|
||||
|
||||
@dataclass
|
||||
@@ -70,7 +78,7 @@ class BananaMap(KindMapClass):
|
||||
name: str
|
||||
ins: tuple = (3, 2)
|
||||
outs: tuple = (3, 2)
|
||||
vban: tuple = (8, 8)
|
||||
vban: tuple = (8, 8, 1, 1)
|
||||
|
||||
|
||||
@dataclass
|
||||
@@ -78,7 +86,7 @@ class PotatoMap(KindMapClass):
|
||||
name: str
|
||||
ins: tuple = (5, 3)
|
||||
outs: tuple = (5, 3)
|
||||
vban: tuple = (8, 8)
|
||||
vban: tuple = (8, 8, 1, 1)
|
||||
|
||||
|
||||
def kind_factory(kind_id):
|
||||
|
||||
36
vban_cmd/macrobutton.py
Normal file
36
vban_cmd/macrobutton.py
Normal file
@@ -0,0 +1,36 @@
|
||||
from .iremote import IRemote
|
||||
|
||||
|
||||
class MacroButton(IRemote):
|
||||
"""A placeholder class in case this interface is being used interchangeably with the Remote API"""
|
||||
|
||||
def __str__(self):
|
||||
return f"{type(self).__name__}{self._remote.kind}{self.index}"
|
||||
|
||||
@property
|
||||
def identifier(self):
|
||||
return f"command.button[{self.index}]"
|
||||
|
||||
@property
|
||||
def state(self) -> bool:
|
||||
self.logger.warning("button.state commands are not supported over VBAN")
|
||||
|
||||
@state.setter
|
||||
def state(self, _):
|
||||
self.logger.warning("button.state commands are not supported over VBAN")
|
||||
|
||||
@property
|
||||
def stateonly(self) -> bool:
|
||||
self.logger.warning("button.stateonly commands are not supported over VBAN")
|
||||
|
||||
@stateonly.setter
|
||||
def stateonly(self, v):
|
||||
self.logger.warning("button.stateonly commands are not supported over VBAN")
|
||||
|
||||
@property
|
||||
def trigger(self) -> bool:
|
||||
self.logger.warning("button.trigger commands are not supported over VBAN")
|
||||
|
||||
@trigger.setter
|
||||
def trigger(self, _):
|
||||
self.logger.warning("button.trigger commands are not supported over VBAN")
|
||||
@@ -3,10 +3,14 @@ from dataclasses import dataclass
|
||||
from .kinds import KindMapClass
|
||||
from .util import comp
|
||||
|
||||
VBAN_PROTOCOL_TXT = 0x40
|
||||
VBAN_PROTOCOL_SERVICE = 0x60
|
||||
|
||||
VBAN_SERVICE_RTPACKETREGISTER = 32
|
||||
VBAN_SERVICE_RTPACKET = 33
|
||||
|
||||
MAX_PACKET_SIZE = 1436
|
||||
HEADER_SIZE = 4 + 1 + 1 + 1 + 1 + 16 + 4
|
||||
HEADER_SIZE = 4 + 1 + 1 + 1 + 1 + 16
|
||||
|
||||
|
||||
@dataclass
|
||||
@@ -14,28 +18,28 @@ class VbanRtPacket:
|
||||
"""Represents the body of a VBAN RT data packet"""
|
||||
|
||||
_kind: KindMapClass
|
||||
_voicemeeterType: bytes
|
||||
_reserved: bytes
|
||||
_buffersize: bytes
|
||||
_voicemeeterVersion: bytes
|
||||
_optionBits: bytes
|
||||
_samplerate: bytes
|
||||
_inputLeveldB100: bytes
|
||||
_outputLeveldB100: bytes
|
||||
_TransportBit: bytes
|
||||
_stripState: bytes
|
||||
_busState: bytes
|
||||
_stripGaindB100Layer1: bytes
|
||||
_stripGaindB100Layer2: bytes
|
||||
_stripGaindB100Layer3: bytes
|
||||
_stripGaindB100Layer4: bytes
|
||||
_stripGaindB100Layer5: bytes
|
||||
_stripGaindB100Layer6: bytes
|
||||
_stripGaindB100Layer7: bytes
|
||||
_stripGaindB100Layer8: bytes
|
||||
_busGaindB100: bytes
|
||||
_stripLabelUTF8c60: bytes
|
||||
_busLabelUTF8c60: bytes
|
||||
_voicemeeterType: bytes # data[28:29]
|
||||
_reserved: bytes # data[29:30]
|
||||
_buffersize: bytes # data[30:32]
|
||||
_voicemeeterVersion: bytes # data[32:36]
|
||||
_optionBits: bytes # data[36:40]
|
||||
_samplerate: bytes # data[40:44]
|
||||
_inputLeveldB100: bytes # data[44:112]
|
||||
_outputLeveldB100: bytes # data[112:240]
|
||||
_TransportBit: bytes # data[240:244]
|
||||
_stripState: bytes # data[244:276]
|
||||
_busState: bytes # data[276:308]
|
||||
_stripGaindB100Layer1: bytes # data[308:324]
|
||||
_stripGaindB100Layer2: bytes # data[324:340]
|
||||
_stripGaindB100Layer3: bytes # data[340:356]
|
||||
_stripGaindB100Layer4: bytes # data[356:372]
|
||||
_stripGaindB100Layer5: bytes # data[372:388]
|
||||
_stripGaindB100Layer6: bytes # data[388:404]
|
||||
_stripGaindB100Layer7: bytes # data[404:420]
|
||||
_stripGaindB100Layer8: bytes # data[420:436]
|
||||
_busGaindB100: bytes # data[436:452]
|
||||
_stripLabelUTF8c60: bytes # data[452:932]
|
||||
_busLabelUTF8c60: bytes # data[932:1412]
|
||||
|
||||
def _generate_levels(self, levelarray) -> tuple:
|
||||
return tuple(
|
||||
@@ -103,12 +107,12 @@ class VbanRtPacket:
|
||||
@property
|
||||
def inputlevels(self) -> tuple:
|
||||
"""returns the entire level array across all inputs for a kind"""
|
||||
return self.strip_levels[0 : (2 * self._kind.phys_in + 8 * self._kind.virt_in)]
|
||||
return self.strip_levels[0 : self._kind.num_strip_levels]
|
||||
|
||||
@property
|
||||
def outputlevels(self) -> tuple:
|
||||
"""returns the entire level array across all outputs for a kind"""
|
||||
return self.bus_levels[0 : 8 * self._kind.num_bus]
|
||||
return self.bus_levels[0 : self._kind.num_bus_levels]
|
||||
|
||||
@property
|
||||
def stripstate(self) -> tuple:
|
||||
@@ -206,13 +210,42 @@ class VbanRtPacket:
|
||||
)
|
||||
|
||||
|
||||
@dataclass
|
||||
class SubscribeHeader:
|
||||
"""Represents the header an RT Packet Service subscription packet"""
|
||||
|
||||
name = "Register RTP"
|
||||
timeout = 15
|
||||
vban: bytes = "VBAN".encode()
|
||||
format_sr: bytes = (VBAN_PROTOCOL_SERVICE).to_bytes(1, "little")
|
||||
format_nbs: bytes = (0).to_bytes(1, "little")
|
||||
format_nbc: bytes = (VBAN_SERVICE_RTPACKETREGISTER).to_bytes(1, "little")
|
||||
format_bit: bytes = (timeout & 0x000000FF).to_bytes(1, "little") # timeout
|
||||
streamname: bytes = name.encode("ascii") + bytes(16 - len(name))
|
||||
framecounter: bytes = (0).to_bytes(4, "little")
|
||||
|
||||
@property
|
||||
def header(self):
|
||||
header = self.vban
|
||||
header += self.format_sr
|
||||
header += self.format_nbs
|
||||
header += self.format_nbc
|
||||
header += self.format_bit
|
||||
header += self.streamname
|
||||
header += self.framecounter
|
||||
assert (
|
||||
len(header) == HEADER_SIZE + 4
|
||||
), f"expected header size {HEADER_SIZE} bytes + 4 bytes framecounter ({HEADER_SIZE +4} bytes total)"
|
||||
return header
|
||||
|
||||
|
||||
@dataclass
|
||||
class VbanRtPacketHeader:
|
||||
"""Represents the header of VBAN RT data packet"""
|
||||
"""Represents the header of a VBAN RT response packet"""
|
||||
|
||||
name = "Voicemeeter-RTP"
|
||||
vban: bytes = "VBAN".encode()
|
||||
format_sr: bytes = (0x60).to_bytes(1, "little")
|
||||
format_sr: bytes = (VBAN_PROTOCOL_SERVICE).to_bytes(1, "little")
|
||||
format_nbs: bytes = (0).to_bytes(1, "little")
|
||||
format_nbc: bytes = (VBAN_SERVICE_RTPACKET).to_bytes(1, "little")
|
||||
format_bit: bytes = (0).to_bytes(1, "little")
|
||||
@@ -226,13 +259,13 @@ class VbanRtPacketHeader:
|
||||
header += self.format_nbc
|
||||
header += self.format_bit
|
||||
header += self.streamname
|
||||
assert len(header) == HEADER_SIZE - 4, f"Header expected {HEADER_SIZE-4} bytes"
|
||||
assert len(header) == HEADER_SIZE, f"expected header size {HEADER_SIZE} bytes"
|
||||
return header
|
||||
|
||||
|
||||
@dataclass
|
||||
class RequestHeader:
|
||||
"""Represents a REQUEST RT PACKET header"""
|
||||
"""Represents the header of an REQUEST RT PACKET"""
|
||||
|
||||
name: str
|
||||
bps_index: int
|
||||
@@ -244,7 +277,7 @@ class RequestHeader:
|
||||
|
||||
@property
|
||||
def sr(self):
|
||||
return (0x40 + self.bps_index).to_bytes(1, "little")
|
||||
return (VBAN_PROTOCOL_TXT + self.bps_index).to_bytes(1, "little")
|
||||
|
||||
@property
|
||||
def nbc(self):
|
||||
@@ -263,32 +296,7 @@ class RequestHeader:
|
||||
header += self.bit
|
||||
header += self.streamname
|
||||
header += self.framecounter
|
||||
assert len(header) == HEADER_SIZE, f"Header expected {HEADER_SIZE} bytes"
|
||||
return header
|
||||
|
||||
|
||||
@dataclass
|
||||
class SubscribeHeader:
|
||||
"""Represents a packet used to subscribe to the RT Packet Service"""
|
||||
|
||||
name = "Register RTP"
|
||||
timeout = 15
|
||||
vban: bytes = "VBAN".encode()
|
||||
format_sr: bytes = (0x60).to_bytes(1, "little")
|
||||
format_nbs: bytes = (0).to_bytes(1, "little")
|
||||
format_nbc: bytes = (VBAN_SERVICE_RTPACKETREGISTER).to_bytes(1, "little")
|
||||
format_bit: bytes = (timeout & 0x000000FF).to_bytes(1, "little") # timeout
|
||||
streamname: bytes = name.encode("ascii") + bytes(16 - len(name))
|
||||
framecounter: bytes = (0).to_bytes(4, "little")
|
||||
|
||||
@property
|
||||
def header(self):
|
||||
header = self.vban
|
||||
header += self.format_sr
|
||||
header += self.format_nbs
|
||||
header += self.format_nbc
|
||||
header += self.format_bit
|
||||
header += self.streamname
|
||||
header += self.framecounter
|
||||
assert len(header) == HEADER_SIZE, f"Header expected {HEADER_SIZE} bytes"
|
||||
assert (
|
||||
len(header) == HEADER_SIZE + 4
|
||||
), f"expected header size {HEADER_SIZE} bytes + 4 bytes framecounter ({HEADER_SIZE +4} bytes total)"
|
||||
return header
|
||||
|
||||
@@ -296,7 +296,7 @@ class StripLevel(IRemote):
|
||||
def fget(i):
|
||||
return round((((1 << 16) - 1) - i) * -0.01, 1)
|
||||
|
||||
if self._remote.running and self._remote.event.ldirty:
|
||||
if not self._remote.stopped() and self._remote.event.ldirty:
|
||||
return tuple(
|
||||
fget(i)
|
||||
for i in self._remote.cache["strip_level"][
|
||||
|
||||
@@ -73,4 +73,18 @@ def comp(t0: tuple, t1: tuple) -> Iterator[bool]:
|
||||
yield True
|
||||
|
||||
|
||||
def deep_merge(dict1, dict2):
|
||||
"""Generator function for deep merging two dicts"""
|
||||
for k in set(dict1) | set(dict2):
|
||||
if k in dict1 and k in dict2:
|
||||
if isinstance(dict1[k], dict) and isinstance(dict2[k], dict):
|
||||
yield k, dict(deep_merge(dict1[k], dict2[k]))
|
||||
else:
|
||||
yield k, dict2[k]
|
||||
elif k in dict1:
|
||||
yield k, dict1[k]
|
||||
else:
|
||||
yield k, dict2[k]
|
||||
|
||||
|
||||
Socket = IntEnum("Socket", "register request response", start=0)
|
||||
|
||||
242
vban_cmd/vban.py
Normal file
242
vban_cmd/vban.py
Normal file
@@ -0,0 +1,242 @@
|
||||
from abc import abstractmethod
|
||||
|
||||
from .iremote import IRemote
|
||||
from .kinds import kinds_all
|
||||
|
||||
|
||||
class VbanStream(IRemote):
|
||||
"""
|
||||
Implements the common interface
|
||||
|
||||
Defines concrete implementation for vban stream
|
||||
"""
|
||||
|
||||
@abstractmethod
|
||||
def __str__(self):
|
||||
pass
|
||||
|
||||
@property
|
||||
def identifier(self) -> str:
|
||||
return f"vban.{self.direction}stream[{self.index}]"
|
||||
|
||||
@property
|
||||
def on(self) -> bool:
|
||||
return
|
||||
|
||||
@on.setter
|
||||
def on(self, val: bool):
|
||||
self.setter("on", 1 if val else 0)
|
||||
|
||||
@property
|
||||
def name(self) -> str:
|
||||
return
|
||||
|
||||
@name.setter
|
||||
def name(self, val: str):
|
||||
self.setter("name", val)
|
||||
|
||||
@property
|
||||
def ip(self) -> str:
|
||||
return
|
||||
|
||||
@ip.setter
|
||||
def ip(self, val: str):
|
||||
self.setter("ip", val)
|
||||
|
||||
@property
|
||||
def port(self) -> int:
|
||||
return
|
||||
|
||||
@port.setter
|
||||
def port(self, val: int):
|
||||
if not 1024 <= val <= 65535:
|
||||
self.logger.warning(
|
||||
f"port got: {val} but expected a value from 1024 to 65535"
|
||||
)
|
||||
self.setter("port", val)
|
||||
|
||||
@property
|
||||
def sr(self) -> int:
|
||||
return
|
||||
|
||||
@sr.setter
|
||||
def sr(self, val: int):
|
||||
opts = (11025, 16000, 22050, 24000, 32000, 44100, 48000, 64000, 88200, 96000)
|
||||
if val not in opts:
|
||||
self.logger.warning(f"sr got: {val} but expected a value in {opts}")
|
||||
self.setter("sr", val)
|
||||
|
||||
@property
|
||||
def channel(self) -> int:
|
||||
return
|
||||
|
||||
@channel.setter
|
||||
def channel(self, val: int):
|
||||
if not 1 <= val <= 8:
|
||||
self.logger.warning(f"channel got: {val} but expected a value from 1 to 8")
|
||||
self.setter("channel", val)
|
||||
|
||||
@property
|
||||
def bit(self) -> int:
|
||||
return
|
||||
|
||||
@bit.setter
|
||||
def bit(self, val: int):
|
||||
if val not in (16, 24):
|
||||
self.logger.warning(f"bit got: {val} but expected value 16 or 24")
|
||||
self.setter("bit", 1 if (val == 16) else 2)
|
||||
|
||||
@property
|
||||
def quality(self) -> int:
|
||||
return
|
||||
|
||||
@quality.setter
|
||||
def quality(self, val: int):
|
||||
if not 0 <= val <= 4:
|
||||
self.logger.warning(f"quality got: {val} but expected a value from 0 to 4")
|
||||
self.setter("quality", val)
|
||||
|
||||
@property
|
||||
def route(self) -> int:
|
||||
return
|
||||
|
||||
@route.setter
|
||||
def route(self, val: int):
|
||||
if not 0 <= val <= 8:
|
||||
self.logger.warning(f"route got: {val} but expected a value from 0 to 8")
|
||||
self.setter("route", val)
|
||||
|
||||
|
||||
class VbanInstream(VbanStream):
|
||||
"""
|
||||
class representing a vban instream
|
||||
|
||||
subclasses VbanStream
|
||||
"""
|
||||
|
||||
def __str__(self):
|
||||
return f"{type(self).__name__}{self._remote.kind}{self.index}"
|
||||
|
||||
@property
|
||||
def direction(self) -> str:
|
||||
return "in"
|
||||
|
||||
@property
|
||||
def sr(self) -> int:
|
||||
return
|
||||
|
||||
@property
|
||||
def channel(self) -> int:
|
||||
return
|
||||
|
||||
@property
|
||||
def bit(self) -> int:
|
||||
return
|
||||
|
||||
|
||||
class VbanAudioInstream(VbanInstream):
|
||||
"""Represents a VBAN Audio Instream"""
|
||||
|
||||
|
||||
class VbanMidiInstream(VbanInstream):
|
||||
"""Represents a VBAN Midi Instream"""
|
||||
|
||||
|
||||
class VbanTextInstream(VbanInstream):
|
||||
"""Represents a VBAN Text Instream"""
|
||||
|
||||
|
||||
class VbanOutstream(VbanStream):
|
||||
"""
|
||||
class representing a vban outstream
|
||||
|
||||
Subclasses VbanStream
|
||||
"""
|
||||
|
||||
def __str__(self):
|
||||
return f"{type(self).__name__}{self._remote.kind}{self.index}"
|
||||
|
||||
@property
|
||||
def direction(self) -> str:
|
||||
return "out"
|
||||
|
||||
|
||||
class VbanAudioOutstream(VbanOutstream):
|
||||
"""Represents a VBAN Audio Outstream"""
|
||||
|
||||
|
||||
class VbanMidiOutstream(VbanOutstream):
|
||||
"""Represents a VBAN Midi Outstream"""
|
||||
|
||||
|
||||
def _make_stream_pair(remote, kind):
|
||||
num_instream, num_outstream, num_midi, num_text = kind.vban
|
||||
|
||||
def _generate_streams(i, dir):
|
||||
"""generator function for creating instream/outstream tuples"""
|
||||
if dir == "in":
|
||||
if i < num_instream:
|
||||
yield VbanAudioInstream
|
||||
elif i < num_instream + num_midi:
|
||||
yield VbanMidiInstream
|
||||
else:
|
||||
yield VbanTextInstream
|
||||
else:
|
||||
if i < num_outstream:
|
||||
yield VbanAudioOutstream
|
||||
else:
|
||||
yield VbanMidiOutstream
|
||||
|
||||
return (
|
||||
tuple(
|
||||
cls(remote, i)
|
||||
for i in range(num_instream + num_midi + num_text)
|
||||
for cls in _generate_streams(i, "in")
|
||||
),
|
||||
tuple(
|
||||
cls(remote, i)
|
||||
for i in range(num_outstream + num_midi)
|
||||
for cls in _generate_streams(i, "out")
|
||||
),
|
||||
)
|
||||
|
||||
|
||||
def _make_stream_pairs(remote):
|
||||
return {kind.name: _make_stream_pair(remote, kind) for kind in kinds_all}
|
||||
|
||||
|
||||
class Vban:
|
||||
"""
|
||||
class representing the vban module
|
||||
|
||||
Contains two tuples, one for each stream type
|
||||
"""
|
||||
|
||||
def __init__(self, remote):
|
||||
self.remote = remote
|
||||
self.instream, self.outstream = _make_stream_pairs(remote)[remote.kind.name]
|
||||
|
||||
def enable(self):
|
||||
"""if VBAN disabled there can be no communication with it"""
|
||||
|
||||
def disable(self):
|
||||
self.remote._set_rt("vban.Enable", 0)
|
||||
|
||||
|
||||
def vban_factory(remote) -> Vban:
|
||||
"""
|
||||
Factory method for vban
|
||||
|
||||
Returns a class that represents the VBAN module.
|
||||
"""
|
||||
VBAN_cls = Vban
|
||||
return type(f"{VBAN_cls.__name__}", (VBAN_cls,), {})(remote)
|
||||
|
||||
|
||||
def request_vban_obj(remote) -> Vban:
|
||||
"""
|
||||
Vban entry point.
|
||||
|
||||
Returns a reference to a Vban class of a kind
|
||||
"""
|
||||
return vban_factory(remote)
|
||||
@@ -1,16 +1,17 @@
|
||||
import logging
|
||||
import socket
|
||||
import threading
|
||||
import time
|
||||
from abc import ABCMeta, abstractmethod
|
||||
from pathlib import Path
|
||||
from queue import Queue
|
||||
from typing import Iterable, Optional, Union
|
||||
from typing import Iterable, Union
|
||||
|
||||
from .error import VBANCMDError
|
||||
from .event import Event
|
||||
from .packet import RequestHeader
|
||||
from .subject import Subject
|
||||
from .util import Socket, script
|
||||
from .util import Socket, deep_merge, script
|
||||
from .worker import Producer, Subscriber, Updater
|
||||
|
||||
logger = logging.getLogger(__name__)
|
||||
@@ -48,6 +49,7 @@ class VbanCmd(metaclass=ABCMeta):
|
||||
self.cache = {}
|
||||
self._pdirty = False
|
||||
self._ldirty = False
|
||||
self._script = str()
|
||||
|
||||
@abstractmethod
|
||||
def __str__(self):
|
||||
@@ -63,8 +65,9 @@ class VbanCmd(metaclass=ABCMeta):
|
||||
def get_filepath():
|
||||
filepaths = [
|
||||
Path.cwd() / "vban.toml",
|
||||
Path.cwd() / "configs" / "vban.toml",
|
||||
Path.home() / ".config" / "vban-cmd" / "vban.toml",
|
||||
Path.home() / "Documents" / "Voicemeeter" / "vban.toml",
|
||||
Path.home() / "Documents" / "Voicemeeter" / "configs" / "vban.toml",
|
||||
]
|
||||
for filepath in filepaths:
|
||||
if filepath.exists():
|
||||
@@ -74,60 +77,62 @@ class VbanCmd(metaclass=ABCMeta):
|
||||
with open(filepath, "rb") as f:
|
||||
conn = tomllib.load(f)
|
||||
assert (
|
||||
"ip" in conn["connection"]
|
||||
), "please provide ip, by kwarg or config"
|
||||
"connection" in conn and "ip" in conn["connection"]
|
||||
), "expected [connection][ip] in vban config"
|
||||
return conn["connection"]
|
||||
else:
|
||||
raise VBANCMDError("no ip provided and no vban.toml located.")
|
||||
raise VBANCMDError("no ip provided and no vban.toml located.")
|
||||
|
||||
def __enter__(self):
|
||||
self.login()
|
||||
return self
|
||||
|
||||
def login(self):
|
||||
"""Starts the subscriber and updater threads"""
|
||||
self.running = True
|
||||
self.event.info()
|
||||
def login(self) -> None:
|
||||
"""Starts the subscriber and updater threads (unless in outbound mode)"""
|
||||
if not self.outbound:
|
||||
self.event.info()
|
||||
|
||||
self.subscriber = Subscriber(self)
|
||||
self.subscriber.start()
|
||||
self.stop_event = threading.Event()
|
||||
self.stop_event.clear()
|
||||
self.subscriber = Subscriber(self, self.stop_event)
|
||||
self.subscriber.start()
|
||||
|
||||
queue = Queue()
|
||||
self.updater = Updater(self, queue)
|
||||
self.updater.start()
|
||||
self.producer = Producer(self, queue)
|
||||
self.producer.start()
|
||||
queue = Queue()
|
||||
self.updater = Updater(self, queue)
|
||||
self.updater.start()
|
||||
self.producer = Producer(self, queue, self.stop_event)
|
||||
self.producer.start()
|
||||
|
||||
self.logger.info(f"{type(self).__name__}: Successfully logged into {self}")
|
||||
self.logger.info(
|
||||
"Successfully logged into VBANCMD {kind} with ip='{ip}', port={port}, streamname='{streamname}'".format(
|
||||
**self.__dict__
|
||||
)
|
||||
)
|
||||
|
||||
def _set_rt(
|
||||
self,
|
||||
id_: str,
|
||||
param: Optional[str] = None,
|
||||
val: Optional[Union[int, float]] = None,
|
||||
):
|
||||
def stopped(self):
|
||||
return self.stop_event.is_set()
|
||||
|
||||
def _set_rt(self, cmd: str, val: Union[str, float]):
|
||||
"""Sends a string request command over a network."""
|
||||
cmd = f"{id_}={val};" if not param else f"{id_}.{param}={val};"
|
||||
self.socks[Socket.request].sendto(
|
||||
self.packet_request.header + cmd.encode(),
|
||||
self.packet_request.header + f"{cmd}={val};".encode(),
|
||||
(socket.gethostbyname(self.ip), self.port),
|
||||
)
|
||||
self.packet_request.framecounter = (
|
||||
int.from_bytes(self.packet_request.framecounter, "little") + 1
|
||||
).to_bytes(4, "little")
|
||||
if param:
|
||||
self.cache[f"{id_}.{param}"] = val
|
||||
self.cache[cmd] = val
|
||||
|
||||
@script
|
||||
def sendtext(self, cmd):
|
||||
def sendtext(self, script):
|
||||
"""Sends a multiple parameter string over a network."""
|
||||
self.socks[Socket.request].sendto(
|
||||
self.packet_request.header + cmd.encode(),
|
||||
self.packet_request.header + script.encode(),
|
||||
(socket.gethostbyname(self.ip), self.port),
|
||||
)
|
||||
self.packet_request.framecounter = (
|
||||
int.from_bytes(self.packet_request.framecounter, "little") + 1
|
||||
).to_bytes(4, "little")
|
||||
self.logger.debug(f"sendtext: {script}")
|
||||
time.sleep(self.DELAY)
|
||||
|
||||
@property
|
||||
@@ -154,7 +159,7 @@ class VbanCmd(metaclass=ABCMeta):
|
||||
def public_packet(self):
|
||||
return self._public_packet
|
||||
|
||||
def clear_dirty(self):
|
||||
def clear_dirty(self) -> None:
|
||||
while self.pdirty:
|
||||
time.sleep(self.DELAY)
|
||||
|
||||
@@ -179,31 +184,46 @@ class VbanCmd(metaclass=ABCMeta):
|
||||
def param(key):
|
||||
obj, m2, *rem = key.split("-")
|
||||
index = int(m2) if m2.isnumeric() else int(*rem)
|
||||
if obj in ("strip", "bus"):
|
||||
if obj in ("strip", "bus", "button"):
|
||||
return getattr(self, obj)[index]
|
||||
else:
|
||||
raise ValueError(obj)
|
||||
elif obj == "vban":
|
||||
return getattr(getattr(self, obj), f"{m2}stream")[index]
|
||||
raise ValueError(obj)
|
||||
|
||||
self._script = str()
|
||||
[param(key).apply(datum).then_wait() for key, datum in data.items()]
|
||||
|
||||
def apply_config(self, name):
|
||||
"""applies a config from memory"""
|
||||
error_msg = (
|
||||
ERR_MSG = (
|
||||
f"No config with name '{name}' is loaded into memory",
|
||||
f"Known configs: {list(self.configs.keys())}",
|
||||
)
|
||||
try:
|
||||
self.apply(self.configs[name])
|
||||
self.logger.info(f"Profile '{name}' applied!")
|
||||
except KeyError:
|
||||
self.logger.error(("\n").join(error_msg))
|
||||
config = self.configs[name]
|
||||
except KeyError as e:
|
||||
self.logger.error(("\n").join(ERR_MSG))
|
||||
raise VBANCMDError(("\n").join(ERR_MSG)) from e
|
||||
|
||||
def logout(self):
|
||||
self.running = False
|
||||
time.sleep(0.2)
|
||||
if "extends" in config:
|
||||
extended = config["extends"]
|
||||
config = {
|
||||
k: v
|
||||
for k, v in deep_merge(self.configs[extended], config)
|
||||
if k not in ("extends")
|
||||
}
|
||||
self.logger.debug(
|
||||
f"profile '{name}' extends '{extended}', profiles merged.."
|
||||
)
|
||||
self.apply(config)
|
||||
self.logger.info(f"Profile '{name}' applied!")
|
||||
|
||||
def logout(self) -> None:
|
||||
if not self.stopped():
|
||||
self.logger.debug("events thread shutdown started")
|
||||
self.stop_event.set()
|
||||
self.subscriber.join() # wait for subscriber thread to complete cycle
|
||||
[sock.close() for sock in self.socks]
|
||||
self.logger.info(f"{type(self).__name__}: Successfully logged out of {self}")
|
||||
|
||||
def __exit__(self, exc_type, exc_value, exc_traceback):
|
||||
def __exit__(self, exc_type, exc_value, exc_traceback) -> None:
|
||||
self.logout()
|
||||
|
||||
@@ -14,14 +14,15 @@ logger = logging.getLogger(__name__)
|
||||
class Subscriber(threading.Thread):
|
||||
"""fire a subscription packet every 10 seconds"""
|
||||
|
||||
def __init__(self, remote):
|
||||
super().__init__(name="subscriber", daemon=True)
|
||||
def __init__(self, remote, stop_event):
|
||||
super().__init__(name="subscriber", daemon=False)
|
||||
self._remote = remote
|
||||
self.stop_event = stop_event
|
||||
self.logger = logger.getChild(self.__class__.__name__)
|
||||
self.packet = SubscribeHeader()
|
||||
|
||||
def run(self):
|
||||
while self._remote.running:
|
||||
while not self.stopped():
|
||||
try:
|
||||
self._remote.socks[Socket.register].sendto(
|
||||
self.packet.header,
|
||||
@@ -30,23 +31,39 @@ class Subscriber(threading.Thread):
|
||||
self.packet.framecounter = (
|
||||
int.from_bytes(self.packet.framecounter, "little") + 1
|
||||
).to_bytes(4, "little")
|
||||
time.sleep(10)
|
||||
self.wait_until_stopped(10)
|
||||
except socket.gaierror as e:
|
||||
self.logger.exception(f"{type(e).__name__}: {e}")
|
||||
raise VBANCMDConnectionError(
|
||||
f"unable to resolve hostname {self._remote.ip}"
|
||||
) from e
|
||||
self.logger.debug(f"terminating {self.name} thread")
|
||||
|
||||
def stopped(self):
|
||||
return self.stop_event.is_set()
|
||||
|
||||
def wait_until_stopped(self, timeout, period=0.2):
|
||||
must_end = time.time() + timeout
|
||||
while time.time() < must_end:
|
||||
if self.stopped():
|
||||
break
|
||||
time.sleep(period)
|
||||
|
||||
|
||||
class Producer(threading.Thread):
|
||||
"""Continously send job queue to the Updater thread at a rate of self._remote.ratelimit."""
|
||||
|
||||
def __init__(self, remote, queue):
|
||||
super().__init__(name="producer", daemon=True)
|
||||
def __init__(self, remote, queue, stop_event):
|
||||
super().__init__(name="producer", daemon=False)
|
||||
self._remote = remote
|
||||
self.queue = queue
|
||||
self.stop_event = stop_event
|
||||
self.logger = logger.getChild(self.__class__.__name__)
|
||||
self.packet_expected = VbanRtPacketHeader()
|
||||
self._remote.socks[Socket.response].settimeout(self._remote.timeout)
|
||||
self._remote.socks[Socket.response].bind(
|
||||
(socket.gethostbyname(socket.gethostname()), self._remote.port)
|
||||
)
|
||||
self._remote._public_packet = self._get_rt()
|
||||
(
|
||||
self._remote.cache["strip_level"],
|
||||
@@ -60,7 +77,6 @@ class Producer(threading.Thread):
|
||||
data = None
|
||||
while not data:
|
||||
data = self._fetch_rt_packet()
|
||||
time.sleep(self._remote.DELAY)
|
||||
return data
|
||||
|
||||
return fget()
|
||||
@@ -68,10 +84,10 @@ class Producer(threading.Thread):
|
||||
def _fetch_rt_packet(self) -> Optional[VbanRtPacket]:
|
||||
try:
|
||||
data, _ = self._remote.socks[Socket.response].recvfrom(2048)
|
||||
# check for packet data
|
||||
# do we have packet data?
|
||||
if len(data) > HEADER_SIZE:
|
||||
# check if packet is of type rt packet response
|
||||
if self.packet_expected.header == data[: HEADER_SIZE - 4]:
|
||||
# is the packet of type VBAN RT response?
|
||||
if self.packet_expected.header == data[:HEADER_SIZE]:
|
||||
return VbanRtPacket(
|
||||
_kind=self._remote.kind,
|
||||
_voicemeeterType=data[28:29],
|
||||
@@ -103,8 +119,11 @@ class Producer(threading.Thread):
|
||||
f"timeout waiting for RtPacket from {self._remote.ip}"
|
||||
) from e
|
||||
|
||||
def stopped(self):
|
||||
return self.stop_event.is_set()
|
||||
|
||||
def run(self):
|
||||
while self._remote.running:
|
||||
while not self.stopped():
|
||||
_pp = self._get_rt()
|
||||
pdirty = _pp.pdirty(self._remote.public_packet)
|
||||
ldirty = _pp.ldirty(
|
||||
@@ -137,13 +156,8 @@ class Updater(threading.Thread):
|
||||
self._remote = remote
|
||||
self.queue = queue
|
||||
self.logger = logger.getChild(self.__class__.__name__)
|
||||
self._remote.socks[Socket.response].settimeout(self._remote.timeout)
|
||||
self._remote.socks[Socket.response].bind(
|
||||
(socket.gethostbyname(socket.gethostname()), self._remote.port)
|
||||
)
|
||||
p_in, v_in = self._remote.kind.ins
|
||||
self._remote._strip_comp = [False] * (2 * p_in + 8 * v_in)
|
||||
self._remote._bus_comp = [False] * (self._remote.kind.num_bus * 8)
|
||||
self._remote._strip_comp = [False] * (self._remote.kind.num_strip_levels)
|
||||
self._remote._bus_comp = [False] * (self._remote.kind.num_bus_levels)
|
||||
|
||||
def run(self):
|
||||
"""
|
||||
@@ -151,12 +165,7 @@ class Updater(threading.Thread):
|
||||
|
||||
Generate _strip_comp, _bus_comp and update level cache if ldirty.
|
||||
"""
|
||||
while True:
|
||||
event = self.queue.get()
|
||||
if event is None:
|
||||
self.logger.debug(f"terminating {self.name} thread")
|
||||
break
|
||||
|
||||
while event := self.queue.get():
|
||||
if event == "pdirty" and self._remote.pdirty:
|
||||
self._remote.subject.notify(event)
|
||||
elif event == "ldirty" and self._remote.ldirty:
|
||||
@@ -172,3 +181,4 @@ class Updater(threading.Thread):
|
||||
self._remote._public_packet.outputlevels,
|
||||
)
|
||||
self._remote.subject.notify(event)
|
||||
self.logger.debug(f"terminating {self.name} thread")
|
||||
|
||||
Reference in New Issue
Block a user