add _send_request() helper method.

This commit is contained in:
onyx-and-iris 2026-03-01 11:09:45 +00:00
parent 8cfeb45fcb
commit 3c3e415d7e

View File

@ -10,7 +10,7 @@ from typing import Union
from .enums import NBS from .enums import NBS
from .error import VBANCMDError from .error import VBANCMDError
from .event import Event from .event import Event
from .packet import RequestHeader from .packet.headers import VbanRequestHeader
from .subject import Subject from .subject import Subject
from .util import bump_framecounter, deep_merge, script from .util import bump_framecounter, deep_merge, script
from .worker import Producer, Subscriber, Updater from .worker import Producer, Subscriber, Updater
@ -120,37 +120,29 @@ class VbanCmd(abc.ABC):
def stopped(self): def stopped(self):
return self.stop_event is None or self.stop_event.is_set() return self.stop_event is None or self.stop_event.is_set()
def _set_rt(self, cmd: str, val: Union[str, float]): def _send_request(self, payload: str) -> None:
"""Sends a string request command over a network.""" """Sends a request packet over the network and bumps the framecounter."""
req_packet = RequestHeader.to_bytes(
name=self.streamname,
bps_index=self.BPS_OPTS.index(self.bps),
channel=self.channel,
framecounter=self._framecounter,
)
self.sock.sendto( self.sock.sendto(
req_packet + f'{cmd}={val};'.encode(), VbanRequestHeader.encode_with_payload(
name=self.streamname,
bps_index=self.BPS_OPTS.index(self.bps),
channel=self.channel,
framecounter=self._framecounter,
payload=payload,
),
(socket.gethostbyname(self.ip), self.port), (socket.gethostbyname(self.ip), self.port),
) )
self._framecounter = bump_framecounter(self._framecounter) self._framecounter = bump_framecounter(self._framecounter)
def _set_rt(self, cmd: str, val: Union[str, float]):
"""Sends a string request command over a network."""
self._send_request(f'{cmd}={val};')
self.cache[cmd] = val self.cache[cmd] = val
@script @script
def sendtext(self, script): def sendtext(self, script):
"""Sends a multiple parameter string over a network.""" """Sends a multiple parameter string over a network."""
req_packet = RequestHeader.to_bytes( self._send_request(script)
name=self.streamname,
bps_index=self.BPS_OPTS.index(self.bps),
channel=self.channel,
framecounter=self._framecounter,
)
self.sock.sendto(
req_packet + script.encode(),
(socket.gethostbyname(self.ip), self.port),
)
self._framecounter = bump_framecounter(self._framecounter)
self.logger.debug(f'sendtext: {script}') self.logger.debug(f'sendtext: {script}')
time.sleep(self.DELAY) time.sleep(self.DELAY)