move voicemeetertype(), voicemeeterversion() and samplerate() properties into VbanPacket

add NamedTuples for Levels, Labels and States.

refactor the levels properties

update the math in util.comp()

StripLevel/BusLevel getters updated according to changes in VbanPacketNBS0

remove {VbanCmd}._get_levels(), it's no longer necessary.
This commit is contained in:
2026-03-01 00:25:22 +00:00
parent a8ef82166c
commit d689b3a301
7 changed files with 157 additions and 120 deletions

View File

@@ -10,10 +10,10 @@ from .packet import (
VBAN_PROTOCOL_SERVICE,
VBAN_SERVICE_RTPACKET,
SubscribeHeader,
VbanRtPacket,
VbanPacket,
VbanPacketNBS0,
VbanPacketNBS1,
VbanRtPacketHeader,
VbanRtPacketNBS0,
VbanRtPacketNBS1,
)
from .util import bump_framecounter
@@ -75,16 +75,16 @@ class Producer(threading.Thread):
(
self._remote.cache['strip_level'],
self._remote.cache['bus_level'],
) = self._remote._get_levels(self._remote.public_packets[NBS.zero])
) = self._remote.public_packets[NBS.zero].levels
def _get_rt(self) -> VbanRtPacket:
def _get_rt(self) -> VbanPacket:
"""Attempt to fetch data packet until a valid one found"""
while True:
if resp := self._fetch_rt_packet():
return resp
def _fetch_rt_packet(self) -> VbanRtPacket | None:
def _fetch_rt_packet(self) -> VbanPacket | None:
try:
data, _ = self._remote.sock.recvfrom(2048)
if len(data) < HEADER_SIZE:
@@ -99,12 +99,12 @@ class Producer(threading.Thread):
match response_header.format_nbs:
case NBS.zero:
return VbanRtPacketNBS0.from_bytes(
return VbanPacketNBS0.from_bytes(
nbs=NBS.zero, kind=self._remote.kind, data=data
)
case NBS.one:
return VbanRtPacketNBS1.from_bytes(
return VbanPacketNBS1.from_bytes(
nbs=NBS.one, kind=self._remote.kind, data=data
)
return None
@@ -177,9 +177,6 @@ class Updater(threading.Thread):
(
self._remote.cache['strip_level'],
self._remote.cache['bus_level'],
) = (
self._remote._public_packets[NBS.zero].inputlevels,
self._remote._public_packets[NBS.zero].outputlevels,
)
) = self._remote.public_packets[NBS.zero].levels
self._remote.subject.notify(event)
self.logger.debug(f'terminating {self.name} thread')