3 Commits

Author SHA1 Message Date
cbcca14481 rename until_stopped() to wait_until_stopped() 2023-08-05 13:36:36 +01:00
f584d53835 patch bump 2023-08-05 13:34:56 +01:00
72d182a488 use Threading.Event object to terminate threads
until_stopped() added to Subscriber thread
2023-08-04 23:13:58 +01:00
5 changed files with 45 additions and 23 deletions

View File

@@ -1,6 +1,6 @@
[tool.poetry] [tool.poetry]
name = "vban-cmd" name = "vban-cmd"
version = "2.4.2" version = "2.4.3"
description = "Python interface for the VBAN RT Packet Service (Sendtext)" description = "Python interface for the VBAN RT Packet Service (Sendtext)"
authors = ["onyx-and-iris <code@onyxandiris.online>"] authors = ["onyx-and-iris <code@onyxandiris.online>"]
license = "MIT" license = "MIT"

View File

@@ -102,7 +102,7 @@ class BusLevel(IRemote):
def fget(i): def fget(i):
return round((((1 << 16) - 1) - i) * -0.01, 1) return round((((1 << 16) - 1) - i) * -0.01, 1)
if self._remote.running and self._remote.event.ldirty: if not self._remote.stopped() and self._remote.event.ldirty:
return tuple( return tuple(
fget(i) fget(i)
for i in self._remote.cache["bus_level"][self.range[0] : self.range[-1]] for i in self._remote.cache["bus_level"][self.range[0] : self.range[-1]]

View File

@@ -296,7 +296,7 @@ class StripLevel(IRemote):
def fget(i): def fget(i):
return round((((1 << 16) - 1) - i) * -0.01, 1) return round((((1 << 16) - 1) - i) * -0.01, 1)
if self._remote.running and self._remote.event.ldirty: if not self._remote.stopped() and self._remote.event.ldirty:
return tuple( return tuple(
fget(i) fget(i)
for i in self._remote.cache["strip_level"][ for i in self._remote.cache["strip_level"][

View File

@@ -1,5 +1,6 @@
import logging import logging
import socket import socket
import threading
import time import time
from abc import ABCMeta, abstractmethod from abc import ABCMeta, abstractmethod
from pathlib import Path from pathlib import Path
@@ -88,16 +89,17 @@ class VbanCmd(metaclass=ABCMeta):
def login(self) -> None: def login(self) -> None:
"""Starts the subscriber and updater threads (unless in outbound mode)""" """Starts the subscriber and updater threads (unless in outbound mode)"""
if not self.outbound: if not self.outbound:
self.running = True
self.event.info() self.event.info()
self.subscriber = Subscriber(self) self.stop_event = threading.Event()
self.stop_event.clear()
self.subscriber = Subscriber(self, self.stop_event)
self.subscriber.start() self.subscriber.start()
queue = Queue() queue = Queue()
self.updater = Updater(self, queue) self.updater = Updater(self, queue)
self.updater.start() self.updater.start()
self.producer = Producer(self, queue) self.producer = Producer(self, queue, self.stop_event)
self.producer.start() self.producer.start()
self.logger.info( self.logger.info(
@@ -106,6 +108,9 @@ class VbanCmd(metaclass=ABCMeta):
) )
) )
def stopped(self):
return self.stop_event.is_set()
def _set_rt(self, cmd: str, val: Union[str, float]): def _set_rt(self, cmd: str, val: Union[str, float]):
"""Sends a string request command over a network.""" """Sends a string request command over a network."""
self.socks[Socket.request].sendto( self.socks[Socket.request].sendto(
@@ -213,8 +218,10 @@ class VbanCmd(metaclass=ABCMeta):
self.logger.info(f"Profile '{name}' applied!") self.logger.info(f"Profile '{name}' applied!")
def logout(self) -> None: def logout(self) -> None:
self.running = False if not self.stopped():
time.sleep(0.2) self.logger.debug("events thread shutdown started")
self.stop_event.set()
self.subscriber.join() # wait for subscriber thread to complete cycle
[sock.close() for sock in self.socks] [sock.close() for sock in self.socks]
self.logger.info(f"{type(self).__name__}: Successfully logged out of {self}") self.logger.info(f"{type(self).__name__}: Successfully logged out of {self}")

View File

@@ -14,14 +14,15 @@ logger = logging.getLogger(__name__)
class Subscriber(threading.Thread): class Subscriber(threading.Thread):
"""fire a subscription packet every 10 seconds""" """fire a subscription packet every 10 seconds"""
def __init__(self, remote): def __init__(self, remote, stop_event):
super().__init__(name="subscriber", daemon=True) super().__init__(name="subscriber", daemon=False)
self._remote = remote self._remote = remote
self.stop_event = stop_event
self.logger = logger.getChild(self.__class__.__name__) self.logger = logger.getChild(self.__class__.__name__)
self.packet = SubscribeHeader() self.packet = SubscribeHeader()
def run(self): def run(self):
while self._remote.running: while not self.stopped():
try: try:
self._remote.socks[Socket.register].sendto( self._remote.socks[Socket.register].sendto(
self.packet.header, self.packet.header,
@@ -30,23 +31,39 @@ class Subscriber(threading.Thread):
self.packet.framecounter = ( self.packet.framecounter = (
int.from_bytes(self.packet.framecounter, "little") + 1 int.from_bytes(self.packet.framecounter, "little") + 1
).to_bytes(4, "little") ).to_bytes(4, "little")
time.sleep(10) self.wait_until_stopped(10)
except socket.gaierror as e: except socket.gaierror as e:
self.logger.exception(f"{type(e).__name__}: {e}") self.logger.exception(f"{type(e).__name__}: {e}")
raise VBANCMDConnectionError( raise VBANCMDConnectionError(
f"unable to resolve hostname {self._remote.ip}" f"unable to resolve hostname {self._remote.ip}"
) from e ) from e
self.logger.debug(f"terminating {self.name} thread")
def stopped(self):
return self.stop_event.is_set()
def wait_until_stopped(self, timeout, period=0.2):
must_end = time.time() + timeout
while time.time() < must_end:
if self.stopped():
break
time.sleep(period)
class Producer(threading.Thread): class Producer(threading.Thread):
"""Continously send job queue to the Updater thread at a rate of self._remote.ratelimit.""" """Continously send job queue to the Updater thread at a rate of self._remote.ratelimit."""
def __init__(self, remote, queue): def __init__(self, remote, queue, stop_event):
super().__init__(name="producer", daemon=True) super().__init__(name="producer", daemon=False)
self._remote = remote self._remote = remote
self.queue = queue self.queue = queue
self.stop_event = stop_event
self.logger = logger.getChild(self.__class__.__name__) self.logger = logger.getChild(self.__class__.__name__)
self.packet_expected = VbanRtPacketHeader() self.packet_expected = VbanRtPacketHeader()
self._remote.socks[Socket.response].settimeout(self._remote.timeout)
self._remote.socks[Socket.response].bind(
(socket.gethostbyname(socket.gethostname()), self._remote.port)
)
self._remote._public_packet = self._get_rt() self._remote._public_packet = self._get_rt()
( (
self._remote.cache["strip_level"], self._remote.cache["strip_level"],
@@ -60,7 +77,6 @@ class Producer(threading.Thread):
data = None data = None
while not data: while not data:
data = self._fetch_rt_packet() data = self._fetch_rt_packet()
time.sleep(self._remote.DELAY)
return data return data
return fget() return fget()
@@ -68,10 +84,10 @@ class Producer(threading.Thread):
def _fetch_rt_packet(self) -> Optional[VbanRtPacket]: def _fetch_rt_packet(self) -> Optional[VbanRtPacket]:
try: try:
data, _ = self._remote.socks[Socket.response].recvfrom(2048) data, _ = self._remote.socks[Socket.response].recvfrom(2048)
# check for packet data # do we have packet data?
if len(data) > HEADER_SIZE: if len(data) > HEADER_SIZE:
# check if packet is of type rt packet response # is the packet of type VBAN RT response?
if self.packet_expected.header == data[: HEADER_SIZE - 4]: if self.packet_expected.header == data[:HEADER_SIZE]:
return VbanRtPacket( return VbanRtPacket(
_kind=self._remote.kind, _kind=self._remote.kind,
_voicemeeterType=data[28:29], _voicemeeterType=data[28:29],
@@ -103,8 +119,11 @@ class Producer(threading.Thread):
f"timeout waiting for RtPacket from {self._remote.ip}" f"timeout waiting for RtPacket from {self._remote.ip}"
) from e ) from e
def stopped(self):
return self.stop_event.is_set()
def run(self): def run(self):
while self._remote.running: while not self.stopped():
_pp = self._get_rt() _pp = self._get_rt()
pdirty = _pp.pdirty(self._remote.public_packet) pdirty = _pp.pdirty(self._remote.public_packet)
ldirty = _pp.ldirty( ldirty = _pp.ldirty(
@@ -137,10 +156,6 @@ class Updater(threading.Thread):
self._remote = remote self._remote = remote
self.queue = queue self.queue = queue
self.logger = logger.getChild(self.__class__.__name__) self.logger = logger.getChild(self.__class__.__name__)
self._remote.socks[Socket.response].settimeout(self._remote.timeout)
self._remote.socks[Socket.response].bind(
(socket.gethostbyname(socket.gethostname()), self._remote.port)
)
self._remote._strip_comp = [False] * (self._remote.kind.num_strip_levels) self._remote._strip_comp = [False] * (self._remote.kind.num_strip_levels)
self._remote._bus_comp = [False] * (self._remote.kind.num_bus_levels) self._remote._bus_comp = [False] * (self._remote.kind.num_bus_levels)