mirror of
https://github.com/onyx-and-iris/obsws-python.git
synced 2026-04-18 05:53:32 +00:00
namechange ops
This commit is contained in:
4
obsws_python/__init__.py
Normal file
4
obsws_python/__init__.py
Normal file
@@ -0,0 +1,4 @@
|
||||
from .events import EventClient
|
||||
from .reqs import ReqClient
|
||||
|
||||
__ALL__ = ["ReqClient", "EventClient"]
|
||||
90
obsws_python/baseclient.py
Normal file
90
obsws_python/baseclient.py
Normal file
@@ -0,0 +1,90 @@
|
||||
import base64
|
||||
import hashlib
|
||||
import json
|
||||
from pathlib import Path
|
||||
from random import randint
|
||||
|
||||
try:
|
||||
import tomllib
|
||||
except ModuleNotFoundError:
|
||||
import tomli as tomllib
|
||||
|
||||
import websocket
|
||||
|
||||
|
||||
class ObsClient:
|
||||
DELAY = 0.001
|
||||
|
||||
def __init__(self, **kwargs):
|
||||
defaultkwargs = {
|
||||
**{key: None for key in ["host", "port", "password"]},
|
||||
"subs": 0,
|
||||
}
|
||||
kwargs = defaultkwargs | kwargs
|
||||
for attr, val in kwargs.items():
|
||||
setattr(self, attr, val)
|
||||
if not (self.host and self.port and self.password):
|
||||
conn = self._conn_from_toml()
|
||||
self.host = conn["host"]
|
||||
self.port = conn["port"]
|
||||
self.password = conn["password"]
|
||||
|
||||
self.ws = websocket.WebSocket()
|
||||
self.ws.connect(f"ws://{self.host}:{self.port}")
|
||||
self.server_hello = json.loads(self.ws.recv())
|
||||
|
||||
def _conn_from_toml(self):
|
||||
filepath = Path.cwd() / "config.toml"
|
||||
self._conn = dict()
|
||||
with open(filepath, "rb") as f:
|
||||
self._conn = tomllib.load(f)
|
||||
return self._conn["connection"]
|
||||
|
||||
def authenticate(self):
|
||||
secret = base64.b64encode(
|
||||
hashlib.sha256(
|
||||
(
|
||||
self.password + self.server_hello["d"]["authentication"]["salt"]
|
||||
).encode()
|
||||
).digest()
|
||||
)
|
||||
|
||||
auth = base64.b64encode(
|
||||
hashlib.sha256(
|
||||
(
|
||||
secret.decode()
|
||||
+ self.server_hello["d"]["authentication"]["challenge"]
|
||||
).encode()
|
||||
).digest()
|
||||
).decode()
|
||||
|
||||
payload = {
|
||||
"op": 1,
|
||||
"d": {
|
||||
"rpcVersion": 1,
|
||||
"authentication": auth,
|
||||
"eventSubscriptions": self.subs,
|
||||
},
|
||||
}
|
||||
|
||||
self.ws.send(json.dumps(payload))
|
||||
return self.ws.recv()
|
||||
|
||||
def req(self, req_type, req_data=None):
|
||||
if req_data:
|
||||
payload = {
|
||||
"op": 6,
|
||||
"d": {
|
||||
"requestType": req_type,
|
||||
"requestId": randint(1, 1000),
|
||||
"requestData": req_data,
|
||||
},
|
||||
}
|
||||
else:
|
||||
payload = {
|
||||
"op": 6,
|
||||
"d": {"requestType": req_type, "requestId": randint(1, 1000)},
|
||||
}
|
||||
self.ws.send(json.dumps(payload))
|
||||
response = json.loads(self.ws.recv())
|
||||
return response["d"]
|
||||
53
obsws_python/callback.py
Normal file
53
obsws_python/callback.py
Normal file
@@ -0,0 +1,53 @@
|
||||
from typing import Callable, Iterable, Union
|
||||
|
||||
from .util import as_dataclass, to_camel_case, to_snake_case
|
||||
|
||||
|
||||
class Callback:
|
||||
"""Adds support for callbacks"""
|
||||
|
||||
def __init__(self):
|
||||
"""list of current callbacks"""
|
||||
|
||||
self._callbacks = list()
|
||||
|
||||
def get(self) -> list:
|
||||
"""returns a list of registered events"""
|
||||
|
||||
return [to_camel_case(fn.__name__[2:]) for fn in self._callbacks]
|
||||
|
||||
def trigger(self, event, data):
|
||||
"""trigger callback on event"""
|
||||
|
||||
for fn in self._callbacks:
|
||||
if fn.__name__ == f"on_{to_snake_case(event)}":
|
||||
fn(as_dataclass(event, data))
|
||||
|
||||
def register(self, fns: Union[Iterable, Callable]):
|
||||
"""registers callback functions"""
|
||||
|
||||
try:
|
||||
iterator = iter(fns)
|
||||
for fn in iterator:
|
||||
if fn not in self._callbacks:
|
||||
self._callbacks.append(fn)
|
||||
except TypeError as e:
|
||||
if fns not in self._callbacks:
|
||||
self._callbacks.append(fns)
|
||||
|
||||
def deregister(self, fns: Union[Iterable, Callable]):
|
||||
"""deregisters callback functions"""
|
||||
|
||||
try:
|
||||
iterator = iter(fns)
|
||||
for fn in iterator:
|
||||
if fn in self._callbacks:
|
||||
self._callbacks.remove(fn)
|
||||
except TypeError as e:
|
||||
if fns in self._callbacks:
|
||||
self._callbacks.remove(fns)
|
||||
|
||||
def clear(self):
|
||||
"""clears the _callbacks list"""
|
||||
|
||||
self._callbacks.clear()
|
||||
4
obsws_python/error.py
Normal file
4
obsws_python/error.py
Normal file
@@ -0,0 +1,4 @@
|
||||
class OBSSDKError(Exception):
|
||||
"""general errors"""
|
||||
|
||||
pass
|
||||
71
obsws_python/events.py
Normal file
71
obsws_python/events.py
Normal file
@@ -0,0 +1,71 @@
|
||||
import json
|
||||
import time
|
||||
from enum import IntEnum
|
||||
from threading import Thread
|
||||
|
||||
from .baseclient import ObsClient
|
||||
from .callback import Callback
|
||||
|
||||
"""
|
||||
A class to interact with obs-websocket events
|
||||
defined in official github repo
|
||||
https://github.com/obsproject/obs-websocket/blob/master/docs/generated/protocol.md#events
|
||||
"""
|
||||
|
||||
Subs = IntEnum(
|
||||
"Subs",
|
||||
"general config scenes inputs transitions filters outputs sceneitems mediainputs vendors ui",
|
||||
start=0,
|
||||
)
|
||||
|
||||
|
||||
class EventClient:
|
||||
DELAY = 0.001
|
||||
|
||||
def __init__(self, **kwargs):
|
||||
defaultkwargs = {
|
||||
"subs": (
|
||||
(1 << Subs.general)
|
||||
| (1 << Subs.config)
|
||||
| (1 << Subs.scenes)
|
||||
| (1 << Subs.inputs)
|
||||
| (1 << Subs.transitions)
|
||||
| (1 << Subs.filters)
|
||||
| (1 << Subs.outputs)
|
||||
| (1 << Subs.sceneitems)
|
||||
| (1 << Subs.mediainputs)
|
||||
| (1 << Subs.vendors)
|
||||
| (1 << Subs.ui)
|
||||
)
|
||||
}
|
||||
kwargs = defaultkwargs | kwargs
|
||||
self.base_client = ObsClient(**kwargs)
|
||||
self.base_client.authenticate()
|
||||
self.callback = Callback()
|
||||
self.subscribe()
|
||||
|
||||
def subscribe(self):
|
||||
worker = Thread(target=self.trigger, daemon=True)
|
||||
worker.start()
|
||||
|
||||
def trigger(self):
|
||||
"""
|
||||
Continuously listen for events.
|
||||
|
||||
Triggers a callback on event received.
|
||||
"""
|
||||
self.running = True
|
||||
while self.running:
|
||||
self.data = json.loads(self.base_client.ws.recv())
|
||||
event, data = (
|
||||
self.data["d"].get("eventType"),
|
||||
self.data["d"].get("eventData"),
|
||||
)
|
||||
self.callback.trigger(event, data)
|
||||
time.sleep(self.DELAY)
|
||||
|
||||
def unsubscribe(self):
|
||||
"""
|
||||
stop listening for events
|
||||
"""
|
||||
self.running = False
|
||||
1816
obsws_python/reqs.py
Normal file
1816
obsws_python/reqs.py
Normal file
File diff suppressed because it is too large
Load Diff
26
obsws_python/util.py
Normal file
26
obsws_python/util.py
Normal file
@@ -0,0 +1,26 @@
|
||||
import re
|
||||
from dataclasses import dataclass
|
||||
|
||||
|
||||
def to_camel_case(s):
|
||||
return "".join(word.title() for word in s.split("_"))
|
||||
|
||||
|
||||
def to_snake_case(s):
|
||||
return re.sub(r"(?<!^)(?=[A-Z])", "_", s).lower()
|
||||
|
||||
|
||||
def as_dataclass(identifier, data):
|
||||
def attrs():
|
||||
return list(to_snake_case(k) for k in data.keys())
|
||||
|
||||
return dataclass(
|
||||
type(
|
||||
f"{identifier}Dataclass",
|
||||
(),
|
||||
{
|
||||
"attrs": attrs,
|
||||
**{to_snake_case(k): v for k, v in data.items()},
|
||||
},
|
||||
)
|
||||
)
|
||||
Reference in New Issue
Block a user