mirror of
https://github.com/onyx-and-iris/obsws-python.git
synced 2026-04-18 05:53:32 +00:00
Compare commits
62 Commits
9c41f2bb59
...
dev
| Author | SHA1 | Date | |
|---|---|---|---|
| f70583d7ca | |||
| 3bce50701e | |||
| 8e8062d5c8 | |||
| 6f64e884d8 | |||
| 90abc4f9ee | |||
| f564f53c69 | |||
| 4d9dfa9d11 | |||
| 9e361f0f8a | |||
| 797161a6f2 | |||
| 0fe78197fc | |||
|
|
9c4c5a1df9 | ||
| f52ac163b8 | |||
|
|
197a60a7cd | ||
|
|
633093ead4 | ||
| 935392a0b6 | |||
| d2f2926334 | |||
|
|
58cd50dd6c | ||
| 7614cdfe4a | |||
|
|
9402f2e472 | ||
| ef8df5cf4d | |||
| 1abca0c7e4 | |||
| 85180c1d94 | |||
| f4db1ad95c | |||
| efaee7594e | |||
| 2cebd5eedb | |||
| cac236c004 | |||
| 6aa6db09eb | |||
| f1c2efa4a1 | |||
|
|
4654d2529f | ||
| 1494208f63 | |||
|
|
d217630289 | ||
|
|
5bfe792fa6 | ||
| 3c36619173 | |||
| c4cf817042 | |||
| ba5da8dfef | |||
| 83577e2d61 | |||
|
|
8aa2e78ba6 | ||
| 780f07e25f | |||
| 70a422696e | |||
| a7ef61018b | |||
| 013cf15024 | |||
| f88e8ee3a6 | |||
| 6fa24fe609 | |||
| ffd215aadf | |||
| f3e75c0ddf | |||
| 5db7a705c5 | |||
| ca72b92eb3 | |||
| 98b17b6749 | |||
| 5462c47b65 | |||
| 126e5cb0a4 | |||
|
|
4ced7193df | ||
|
|
468c63f697 | ||
|
|
24f8487d93 | ||
| 2c07f242ad | |||
| 4e45de17ea | |||
| 491a26aaf7 | |||
| d84d30b752 | |||
| 9e3c1d3f37 | |||
| 82b6cdcd04 | |||
|
|
64a7c2b753 | ||
|
|
15559fdb33 | ||
|
|
3adf094481 |
12
.gitignore
vendored
12
.gitignore
vendored
@@ -45,6 +45,14 @@ env.bak/
|
|||||||
venv.bak/
|
venv.bak/
|
||||||
.hatch
|
.hatch
|
||||||
|
|
||||||
|
# pyenv
|
||||||
|
# For a library or package, you might want to ignore these files since the code is
|
||||||
|
# intended to run in multiple environments; otherwise, check them in:
|
||||||
|
.python-version
|
||||||
|
|
||||||
# Test/config
|
# Test/config
|
||||||
quick.py
|
test-*.py
|
||||||
config.toml
|
config.toml
|
||||||
|
obsws.log
|
||||||
|
|
||||||
|
.vscode/
|
||||||
10
.pre-commit-config.yaml
Normal file
10
.pre-commit-config.yaml
Normal file
@@ -0,0 +1,10 @@
|
|||||||
|
repos:
|
||||||
|
- repo: local
|
||||||
|
hooks:
|
||||||
|
- id: format
|
||||||
|
name: format
|
||||||
|
entry: hatch run style:fmt
|
||||||
|
language: system
|
||||||
|
pass_filenames: false
|
||||||
|
verbose: true
|
||||||
|
files: \.(py)$
|
||||||
68
README.md
68
README.md
@@ -1,5 +1,6 @@
|
|||||||
[](https://badge.fury.io/py/obsws-python)
|
[](https://badge.fury.io/py/obsws-python)
|
||||||
[](https://github.com/aatikturk/obsstudio_sdk/blob/main/LICENSE)
|
[](https://github.com/aatikturk/obsstudio_sdk/blob/main/LICENSE)
|
||||||
|
[](https://github.com/pypa/hatch)
|
||||||
[](https://github.com/psf/black)
|
[](https://github.com/psf/black)
|
||||||
[](https://pycqa.github.io/isort/)
|
[](https://pycqa.github.io/isort/)
|
||||||
|
|
||||||
@@ -9,10 +10,10 @@ Not all endpoints in the official documentation are implemented.
|
|||||||
|
|
||||||
## Requirements
|
## Requirements
|
||||||
|
|
||||||
- [OBS Studio](https://obsproject.com/)
|
- [OBS Studio](https://obsproject.com/)
|
||||||
- [OBS Websocket v5 Plugin](https://github.com/obsproject/obs-websocket/releases/tag/5.0.0)
|
- [OBS Websocket v5 Plugin](https://github.com/obsproject/obs-websocket/releases/tag/5.0.0)
|
||||||
- With the release of OBS Studio version 28, Websocket plugin is included by default. But it should be manually installed for earlier versions of OBS.
|
- With the release of OBS Studio version 28, Websocket plugin is included by default. But it should be manually installed for earlier versions of OBS.
|
||||||
- Python 3.9 or greater
|
- Python 3.9 or greater
|
||||||
|
|
||||||
### How to install using pip
|
### How to install using pip
|
||||||
|
|
||||||
@@ -24,9 +25,10 @@ pip install obsws-python
|
|||||||
|
|
||||||
By default the clients connect with parameters:
|
By default the clients connect with parameters:
|
||||||
|
|
||||||
- `host`: "localhost"
|
- `host`: "localhost"
|
||||||
- `port`: 4455
|
- `port`: 4455
|
||||||
- `password`: ""
|
- `password`: ""
|
||||||
|
- `timeout`: None
|
||||||
|
|
||||||
You may override these parameters by storing them in a toml config file or passing them as keyword arguments.
|
You may override these parameters by storing them in a toml config file or passing them as keyword arguments.
|
||||||
|
|
||||||
@@ -53,7 +55,7 @@ Example `__main__.py`:
|
|||||||
import obsws_python as obs
|
import obsws_python as obs
|
||||||
|
|
||||||
# pass conn info if not in config.toml
|
# pass conn info if not in config.toml
|
||||||
cl = obs.ReqClient(host='localhost', port=4455, password='mystrongpass')
|
cl = obs.ReqClient(host='localhost', port=4455, password='mystrongpass', timeout=3)
|
||||||
|
|
||||||
# Toggle the mute state of your Mic input
|
# Toggle the mute state of your Mic input
|
||||||
cl.toggle_input_mute('Mic/Aux')
|
cl.toggle_input_mute('Mic/Aux')
|
||||||
@@ -61,7 +63,7 @@ cl.toggle_input_mute('Mic/Aux')
|
|||||||
|
|
||||||
### Requests
|
### Requests
|
||||||
|
|
||||||
Method names for requests match the API calls but snake cased.
|
Method names for requests match the API calls but snake cased. If a successful call is made with the Request client and the response is expected to contain fields then a response object will be returned. You may then access the response fields as class attributes. They will be snake cased.
|
||||||
|
|
||||||
example:
|
example:
|
||||||
|
|
||||||
@@ -69,14 +71,29 @@ example:
|
|||||||
# load conn info from config.toml
|
# load conn info from config.toml
|
||||||
cl = obs.ReqClient()
|
cl = obs.ReqClient()
|
||||||
|
|
||||||
# GetVersion
|
# GetVersion, returns a response object
|
||||||
resp = cl.get_version()
|
resp = cl.get_version()
|
||||||
|
# Access it's field as an attribute
|
||||||
|
print(f"OBS Version: {resp.obs_version}")
|
||||||
|
|
||||||
|
|
||||||
# SetCurrentProgramScene
|
# SetCurrentProgramScene
|
||||||
cl.set_current_program_scene("BRB")
|
cl.set_current_program_scene("BRB")
|
||||||
```
|
```
|
||||||
|
|
||||||
For a full list of requests refer to [Requests](https://github.com/obsproject/obs-websocket/blob/master/docs/generated/protocol.md#requests)
|
#### `send(param, data=None, raw=False)`
|
||||||
|
|
||||||
|
If you prefer to work with the JSON data directly the {ReqClient}.send() method accepts an argument, `raw`. If set to True the raw response data will be returned, instead of a response object.
|
||||||
|
|
||||||
|
example:
|
||||||
|
|
||||||
|
```python
|
||||||
|
resp = cl_req.send("GetVersion", raw=True)
|
||||||
|
|
||||||
|
print(f"response data: {resp}")
|
||||||
|
```
|
||||||
|
|
||||||
|
For a full list of requests refer to [Requests][obsws-reqs]
|
||||||
|
|
||||||
### Events
|
### Events
|
||||||
|
|
||||||
@@ -109,7 +126,7 @@ cl.callback.deregister(on_input_mute_state_changed)
|
|||||||
|
|
||||||
`register(fns)` and `deregister(fns)` accept both single functions and lists of functions.
|
`register(fns)` and `deregister(fns)` accept both single functions and lists of functions.
|
||||||
|
|
||||||
For a full list of events refer to [Events](https://github.com/obsproject/obs-websocket/blob/master/docs/generated/protocol.md#events)
|
For a full list of events refer to [Events][obsws-events]
|
||||||
|
|
||||||
### Attributes
|
### Attributes
|
||||||
|
|
||||||
@@ -127,9 +144,13 @@ def on_scene_created(data):
|
|||||||
|
|
||||||
### Errors
|
### Errors
|
||||||
|
|
||||||
If a request fails an `OBSSDKError` will be raised with a status code.
|
- `OBSSDKError`: Base error class.
|
||||||
|
- `OBSSDKTimeoutError`: Raised if a timeout occurs during sending/receiving a request or receiving an event
|
||||||
For a full list of status codes refer to [Codes](https://github.com/obsproject/obs-websocket/blob/master/docs/generated/protocol.md#requeststatus)
|
- `OBSSDKRequestError`: Raised when a request returns an error code.
|
||||||
|
- The following attributes are available:
|
||||||
|
- `req_name`: name of the request.
|
||||||
|
- `code`: request status code.
|
||||||
|
- For a full list of status codes refer to [Codes][obsws-codes]
|
||||||
|
|
||||||
### Logging
|
### Logging
|
||||||
|
|
||||||
@@ -148,18 +169,21 @@ logging.basicConfig(level=logging.DEBUG)
|
|||||||
|
|
||||||
### Tests
|
### Tests
|
||||||
|
|
||||||
First install development dependencies:
|
Install [hatch][hatch-install] and then:
|
||||||
|
|
||||||
`pip install -e .['dev']`
|
|
||||||
|
|
||||||
To run all tests:
|
|
||||||
|
|
||||||
```
|
```
|
||||||
pytest -v
|
hatch test
|
||||||
```
|
```
|
||||||
|
|
||||||
### Official Documentation
|
### Official Documentation
|
||||||
|
|
||||||
For the full documentation:
|
For the full documentation:
|
||||||
|
|
||||||
- [OBS Websocket SDK](https://github.com/obsproject/obs-websocket/blob/master/docs/generated/protocol.md#obs-websocket-501-protocol)
|
- [OBS Websocket SDK][obsws-pro]
|
||||||
|
|
||||||
|
|
||||||
|
[obsws-reqs]: https://github.com/obsproject/obs-websocket/blob/master/docs/generated/protocol.md#requests
|
||||||
|
[obsws-events]: https://github.com/obsproject/obs-websocket/blob/master/docs/generated/protocol.md#events
|
||||||
|
[obsws-codes]: https://github.com/obsproject/obs-websocket/blob/master/docs/generated/protocol.md#requeststatus
|
||||||
|
[obsws-pro]: https://github.com/obsproject/obs-websocket/blob/master/docs/generated/protocol.md#obs-websocket-501-protocol
|
||||||
|
[hatch-install]: https://hatch.pypa.io/latest/install/
|
||||||
|
|||||||
@@ -17,6 +17,12 @@ class Observer:
|
|||||||
print(f"Registered events: {self._client.callback.get()}")
|
print(f"Registered events: {self._client.callback.get()}")
|
||||||
self.running = True
|
self.running = True
|
||||||
|
|
||||||
|
def __enter__(self):
|
||||||
|
return self
|
||||||
|
|
||||||
|
def __exit__(self, exc_type, exc_value, exc_traceback):
|
||||||
|
self._client.disconnect()
|
||||||
|
|
||||||
def on_current_program_scene_changed(self, data):
|
def on_current_program_scene_changed(self, data):
|
||||||
"""The current program scene has changed."""
|
"""The current program scene has changed."""
|
||||||
print(f"Switched to scene {data.scene_name}")
|
print(f"Switched to scene {data.scene_name}")
|
||||||
@@ -31,13 +37,11 @@ class Observer:
|
|||||||
|
|
||||||
def on_exit_started(self, _):
|
def on_exit_started(self, _):
|
||||||
"""OBS has begun the shutdown process."""
|
"""OBS has begun the shutdown process."""
|
||||||
print(f"OBS closing!")
|
print("OBS closing!")
|
||||||
self._client.unsubscribe()
|
|
||||||
self.running = False
|
self.running = False
|
||||||
|
|
||||||
|
|
||||||
if __name__ == "__main__":
|
if __name__ == "__main__":
|
||||||
observer = Observer()
|
with Observer() as observer:
|
||||||
|
while observer.running:
|
||||||
while observer.running:
|
time.sleep(0.1)
|
||||||
time.sleep(0.1)
|
|
||||||
|
|||||||
@@ -1,6 +1,7 @@
|
|||||||
import inspect
|
import inspect
|
||||||
|
|
||||||
import keyboard
|
import keyboard
|
||||||
|
|
||||||
import obsws_python as obs
|
import obsws_python as obs
|
||||||
|
|
||||||
|
|
||||||
@@ -10,6 +11,12 @@ class Observer:
|
|||||||
self._client.callback.register(self.on_current_program_scene_changed)
|
self._client.callback.register(self.on_current_program_scene_changed)
|
||||||
print(f"Registered events: {self._client.callback.get()}")
|
print(f"Registered events: {self._client.callback.get()}")
|
||||||
|
|
||||||
|
def __enter__(self):
|
||||||
|
return self
|
||||||
|
|
||||||
|
def __exit__(self, exc_type, exc_value, exc_traceback):
|
||||||
|
self._client.disconnect()
|
||||||
|
|
||||||
@property
|
@property
|
||||||
def event_identifier(self):
|
def event_identifier(self):
|
||||||
return inspect.stack()[1].function
|
return inspect.stack()[1].function
|
||||||
@@ -31,13 +38,12 @@ def set_scene(scene, *args):
|
|||||||
|
|
||||||
|
|
||||||
if __name__ == "__main__":
|
if __name__ == "__main__":
|
||||||
req_client = obs.ReqClient()
|
with obs.ReqClient() as req_client:
|
||||||
observer = Observer()
|
with Observer() as observer:
|
||||||
|
keyboard.add_hotkey("0", version)
|
||||||
|
keyboard.add_hotkey("1", set_scene, args=("START",))
|
||||||
|
keyboard.add_hotkey("2", set_scene, args=("BRB",))
|
||||||
|
keyboard.add_hotkey("3", set_scene, args=("END",))
|
||||||
|
|
||||||
keyboard.add_hotkey("0", version)
|
print("press ctrl+enter to quit")
|
||||||
keyboard.add_hotkey("1", set_scene, args=("START",))
|
keyboard.wait("ctrl+enter")
|
||||||
keyboard.add_hotkey("2", set_scene, args=("BRB",))
|
|
||||||
keyboard.add_hotkey("3", set_scene, args=("END",))
|
|
||||||
|
|
||||||
print("press ctrl+enter to quit")
|
|
||||||
keyboard.wait("ctrl+enter")
|
|
||||||
|
|||||||
@@ -9,6 +9,8 @@ LEVELTYPE = IntEnum(
|
|||||||
start=0,
|
start=0,
|
||||||
)
|
)
|
||||||
|
|
||||||
|
DEVICE = "Desktop Audio"
|
||||||
|
|
||||||
|
|
||||||
def on_input_mute_state_changed(data):
|
def on_input_mute_state_changed(data):
|
||||||
"""An input's mute state has changed."""
|
"""An input's mute state has changed."""
|
||||||
@@ -32,15 +34,14 @@ def on_input_volume_meters(data):
|
|||||||
|
|
||||||
|
|
||||||
def main():
|
def main():
|
||||||
client = obs.EventClient(subs=(obs.Subs.LOW_VOLUME | obs.Subs.INPUTVOLUMEMETERS))
|
with obs.EventClient(
|
||||||
client.callback.register([on_input_volume_meters, on_input_mute_state_changed])
|
subs=(obs.Subs.LOW_VOLUME | obs.Subs.INPUTVOLUMEMETERS)
|
||||||
|
) as client:
|
||||||
|
client.callback.register([on_input_volume_meters, on_input_mute_state_changed])
|
||||||
|
|
||||||
while cmd := input("<Enter> to exit>\n"):
|
while _ := input("Press <Enter> to exit\n"):
|
||||||
if not cmd:
|
pass
|
||||||
break
|
|
||||||
|
|
||||||
|
|
||||||
if __name__ == "__main__":
|
if __name__ == "__main__":
|
||||||
DEVICE = "Desktop Audio"
|
|
||||||
|
|
||||||
main()
|
main()
|
||||||
|
|||||||
@@ -7,15 +7,23 @@ from random import randint
|
|||||||
from typing import Optional
|
from typing import Optional
|
||||||
|
|
||||||
import websocket
|
import websocket
|
||||||
|
from websocket import WebSocketTimeoutException
|
||||||
|
|
||||||
from .error import OBSSDKError
|
from .error import OBSSDKError, OBSSDKTimeoutError
|
||||||
|
|
||||||
|
logger = logging.getLogger(__name__)
|
||||||
|
|
||||||
|
|
||||||
class ObsClient:
|
class ObsClient:
|
||||||
logger = logging.getLogger("baseclient.obsclient")
|
|
||||||
|
|
||||||
def __init__(self, **kwargs):
|
def __init__(self, **kwargs):
|
||||||
defaultkwargs = {"host": "localhost", "port": 4455, "password": "", "subs": 0}
|
self.logger = logger.getChild(self.__class__.__name__)
|
||||||
|
defaultkwargs = {
|
||||||
|
"host": "localhost",
|
||||||
|
"port": 4455,
|
||||||
|
"password": "",
|
||||||
|
"subs": 0,
|
||||||
|
"timeout": None,
|
||||||
|
}
|
||||||
if not any(key in kwargs for key in ("host", "port", "password")):
|
if not any(key in kwargs for key in ("host", "port", "password")):
|
||||||
kwargs |= self._conn_from_toml()
|
kwargs |= self._conn_from_toml()
|
||||||
kwargs = defaultkwargs | kwargs
|
kwargs = defaultkwargs | kwargs
|
||||||
@@ -23,14 +31,21 @@ class ObsClient:
|
|||||||
setattr(self, attr, val)
|
setattr(self, attr, val)
|
||||||
|
|
||||||
self.logger.info(
|
self.logger.info(
|
||||||
"Connecting with parameters: host='{host}' port={port} password='{password}' subs={subs}".format(
|
"Connecting with parameters: host='{host}' port={port} password='{password}' subs={subs} timeout={timeout}".format(
|
||||||
**self.__dict__
|
**self.__dict__
|
||||||
)
|
)
|
||||||
)
|
)
|
||||||
|
|
||||||
self.ws = websocket.WebSocket()
|
try:
|
||||||
self.ws.connect(f"ws://{self.host}:{self.port}")
|
self.ws = websocket.WebSocket()
|
||||||
self.server_hello = json.loads(self.ws.recv())
|
self.ws.connect(f"ws://{self.host}:{self.port}", timeout=self.timeout)
|
||||||
|
self.server_hello = json.loads(self.ws.recv())
|
||||||
|
except ValueError as e:
|
||||||
|
self.logger.error(f"{type(e).__name__}: {e}")
|
||||||
|
raise
|
||||||
|
except (ConnectionRefusedError, TimeoutError, WebSocketTimeoutException) as e:
|
||||||
|
self.logger.exception(f"{type(e).__name__}: {e}")
|
||||||
|
raise
|
||||||
|
|
||||||
def _conn_from_toml(self) -> dict:
|
def _conn_from_toml(self) -> dict:
|
||||||
try:
|
try:
|
||||||
@@ -81,10 +96,8 @@ class ObsClient:
|
|||||||
|
|
||||||
auth = base64.b64encode(
|
auth = base64.b64encode(
|
||||||
hashlib.sha256(
|
hashlib.sha256(
|
||||||
(
|
secret
|
||||||
secret.decode()
|
+ self.server_hello["d"]["authentication"]["challenge"].encode()
|
||||||
+ self.server_hello["d"]["authentication"]["challenge"]
|
|
||||||
).encode()
|
|
||||||
).digest()
|
).digest()
|
||||||
).decode()
|
).decode()
|
||||||
|
|
||||||
@@ -93,9 +106,15 @@ class ObsClient:
|
|||||||
self.ws.send(json.dumps(payload))
|
self.ws.send(json.dumps(payload))
|
||||||
try:
|
try:
|
||||||
response = json.loads(self.ws.recv())
|
response = json.loads(self.ws.recv())
|
||||||
return response["op"] == 2
|
if response["op"] != 2:
|
||||||
|
raise OBSSDKError(
|
||||||
|
"failed to identify client with the server, expected response with OpCode 2"
|
||||||
|
)
|
||||||
|
return response["d"]
|
||||||
except json.decoder.JSONDecodeError:
|
except json.decoder.JSONDecodeError:
|
||||||
raise OBSSDKError("failed to identify client with the server")
|
raise OBSSDKError(
|
||||||
|
"failed to identify client with the server, please check connection settings"
|
||||||
|
)
|
||||||
|
|
||||||
def req(self, req_type, req_data=None):
|
def req(self, req_type, req_data=None):
|
||||||
payload = {
|
payload = {
|
||||||
@@ -105,7 +124,11 @@ class ObsClient:
|
|||||||
if req_data:
|
if req_data:
|
||||||
payload["d"]["requestData"] = req_data
|
payload["d"]["requestData"] = req_data
|
||||||
self.logger.debug(f"Sending request {payload}")
|
self.logger.debug(f"Sending request {payload}")
|
||||||
self.ws.send(json.dumps(payload))
|
try:
|
||||||
response = json.loads(self.ws.recv())
|
self.ws.send(json.dumps(payload))
|
||||||
|
response = json.loads(self.ws.recv())
|
||||||
|
except WebSocketTimeoutException as e:
|
||||||
|
self.logger.exception(f"{type(e).__name__}: {e}")
|
||||||
|
raise OBSSDKTimeoutError("Timeout while trying to send the request") from e
|
||||||
self.logger.debug(f"Response received {response}")
|
self.logger.debug(f"Response received {response}")
|
||||||
return response["d"]
|
return response["d"]
|
||||||
|
|||||||
@@ -1,4 +1,5 @@
|
|||||||
from typing import Callable, Iterable, Union
|
from collections.abc import Callable, Iterable
|
||||||
|
from typing import Union
|
||||||
|
|
||||||
from .util import as_dataclass, to_camel_case, to_snake_case
|
from .util import as_dataclass, to_camel_case, to_snake_case
|
||||||
|
|
||||||
|
|||||||
@@ -1,4 +1,18 @@
|
|||||||
class OBSSDKError(Exception):
|
class OBSSDKError(Exception):
|
||||||
"""general errors"""
|
"""Base class for OBSSDK errors"""
|
||||||
|
|
||||||
pass
|
|
||||||
|
class OBSSDKTimeoutError(OBSSDKError):
|
||||||
|
"""Exception raised when a connection times out"""
|
||||||
|
|
||||||
|
|
||||||
|
class OBSSDKRequestError(OBSSDKError):
|
||||||
|
"""Exception raised when a request returns an error code"""
|
||||||
|
|
||||||
|
def __init__(self, req_name, code, comment):
|
||||||
|
self.req_name = req_name
|
||||||
|
self.code = code
|
||||||
|
message = f"Request {self.req_name} returned code {self.code}."
|
||||||
|
if comment:
|
||||||
|
message += f" With message: {comment}"
|
||||||
|
super().__init__(message)
|
||||||
|
|||||||
@@ -1,10 +1,12 @@
|
|||||||
import json
|
import json
|
||||||
import logging
|
import logging
|
||||||
import time
|
import threading
|
||||||
from threading import Thread
|
|
||||||
|
from websocket import WebSocketConnectionClosedException, WebSocketTimeoutException
|
||||||
|
|
||||||
from .baseclient import ObsClient
|
from .baseclient import ObsClient
|
||||||
from .callback import Callback
|
from .callback import Callback
|
||||||
|
from .error import OBSSDKError, OBSSDKTimeoutError
|
||||||
from .subs import Subs
|
from .subs import Subs
|
||||||
|
|
||||||
"""
|
"""
|
||||||
@@ -13,24 +15,36 @@ defined in official github repo
|
|||||||
https://github.com/obsproject/obs-websocket/blob/master/docs/generated/protocol.md#events
|
https://github.com/obsproject/obs-websocket/blob/master/docs/generated/protocol.md#events
|
||||||
"""
|
"""
|
||||||
|
|
||||||
|
logger = logging.getLogger(__name__)
|
||||||
|
|
||||||
|
|
||||||
class EventClient:
|
class EventClient:
|
||||||
logger = logging.getLogger("events.eventclient")
|
|
||||||
DELAY = 0.001
|
|
||||||
|
|
||||||
def __init__(self, **kwargs):
|
def __init__(self, **kwargs):
|
||||||
|
self.logger = logger.getChild(self.__class__.__name__)
|
||||||
defaultkwargs = {"subs": Subs.LOW_VOLUME}
|
defaultkwargs = {"subs": Subs.LOW_VOLUME}
|
||||||
kwargs = defaultkwargs | kwargs
|
kwargs = defaultkwargs | kwargs
|
||||||
self.base_client = ObsClient(**kwargs)
|
self.base_client = ObsClient(**kwargs)
|
||||||
if self.base_client.authenticate():
|
try:
|
||||||
self.logger.info(f"Successfully identified {self} with the server")
|
success = self.base_client.authenticate()
|
||||||
|
self.logger.info(
|
||||||
|
f"Successfully identified {self} with the server using RPC version:{success['negotiatedRpcVersion']}"
|
||||||
|
)
|
||||||
|
except OBSSDKError as e:
|
||||||
|
self.logger.error(f"{type(e).__name__}: {e}")
|
||||||
|
raise
|
||||||
self.callback = Callback()
|
self.callback = Callback()
|
||||||
self.subscribe()
|
self.subscribe()
|
||||||
|
|
||||||
|
def __enter__(self):
|
||||||
|
return self
|
||||||
|
|
||||||
|
def __exit__(self, exc_type, exc_value, exc_traceback):
|
||||||
|
self.disconnect()
|
||||||
|
|
||||||
def __repr__(self):
|
def __repr__(self):
|
||||||
return type(
|
return type(
|
||||||
self
|
self
|
||||||
).__name__ + "(host='{host}', port={port}, password='{password}', subs={subs})".format(
|
).__name__ + "(host='{host}', port={port}, password='{password}', subs={subs}, timeout={timeout})".format(
|
||||||
**self.base_client.__dict__,
|
**self.base_client.__dict__,
|
||||||
)
|
)
|
||||||
|
|
||||||
@@ -38,29 +52,40 @@ class EventClient:
|
|||||||
return type(self).__name__
|
return type(self).__name__
|
||||||
|
|
||||||
def subscribe(self):
|
def subscribe(self):
|
||||||
worker = Thread(target=self.trigger, daemon=True)
|
self.base_client.ws.settimeout(None)
|
||||||
worker.start()
|
stop_event = threading.Event()
|
||||||
|
self.worker = threading.Thread(
|
||||||
|
target=self.trigger, daemon=True, args=(stop_event,)
|
||||||
|
)
|
||||||
|
self.worker.start()
|
||||||
|
|
||||||
def trigger(self):
|
def trigger(self, stop_event):
|
||||||
"""
|
"""
|
||||||
Continuously listen for events.
|
Continuously listen for events.
|
||||||
|
|
||||||
Triggers a callback on event received.
|
Triggers a callback on event received.
|
||||||
"""
|
"""
|
||||||
self.running = True
|
while not stop_event.is_set():
|
||||||
while self.running:
|
try:
|
||||||
event = json.loads(self.base_client.ws.recv())
|
if response := self.base_client.ws.recv():
|
||||||
self.logger.debug(f"Event received {event}")
|
event = json.loads(response)
|
||||||
type_, data = (
|
self.logger.debug(f"Event received {event}")
|
||||||
event["d"].get("eventType"),
|
type_, data = (
|
||||||
event["d"].get("eventData"),
|
event["d"].get("eventType"),
|
||||||
)
|
event["d"].get("eventData"),
|
||||||
self.callback.trigger(type_, data if data else {})
|
)
|
||||||
time.sleep(self.DELAY)
|
self.callback.trigger(type_, data if data else {})
|
||||||
|
except WebSocketTimeoutException as e:
|
||||||
|
self.logger.exception(f"{type(e).__name__}: {e}")
|
||||||
|
raise OBSSDKTimeoutError("Timeout while waiting for event") from e
|
||||||
|
except (WebSocketConnectionClosedException, OSError) as e:
|
||||||
|
self.logger.debug(f"{type(e).__name__} terminating the event thread")
|
||||||
|
stop_event.set()
|
||||||
|
|
||||||
|
def disconnect(self):
|
||||||
|
"""stop listening for events"""
|
||||||
|
|
||||||
def unsubscribe(self):
|
|
||||||
"""
|
|
||||||
stop listening for events
|
|
||||||
"""
|
|
||||||
self.running = False
|
|
||||||
self.base_client.ws.close()
|
self.base_client.ws.close()
|
||||||
|
self.worker.join()
|
||||||
|
|
||||||
|
unsubscribe = disconnect
|
||||||
|
|||||||
@@ -1,7 +1,8 @@
|
|||||||
import logging
|
import logging
|
||||||
|
from warnings import warn
|
||||||
|
|
||||||
from .baseclient import ObsClient
|
from .baseclient import ObsClient
|
||||||
from .error import OBSSDKError
|
from .error import OBSSDKError, OBSSDKRequestError
|
||||||
from .util import as_dataclass
|
from .util import as_dataclass
|
||||||
|
|
||||||
"""
|
"""
|
||||||
@@ -10,40 +11,53 @@ defined in official github repo
|
|||||||
https://github.com/obsproject/obs-websocket/blob/master/docs/generated/protocol.md#Requests
|
https://github.com/obsproject/obs-websocket/blob/master/docs/generated/protocol.md#Requests
|
||||||
"""
|
"""
|
||||||
|
|
||||||
|
logger = logging.getLogger(__name__)
|
||||||
|
|
||||||
|
|
||||||
class ReqClient:
|
class ReqClient:
|
||||||
logger = logging.getLogger("reqs.reqclient")
|
|
||||||
|
|
||||||
def __init__(self, **kwargs):
|
def __init__(self, **kwargs):
|
||||||
|
self.logger = logger.getChild(self.__class__.__name__)
|
||||||
self.base_client = ObsClient(**kwargs)
|
self.base_client = ObsClient(**kwargs)
|
||||||
if self.base_client.authenticate():
|
try:
|
||||||
self.logger.info(f"Successfully identified {self} with the server")
|
success = self.base_client.authenticate()
|
||||||
|
self.logger.info(
|
||||||
|
f"Successfully identified {self} with the server using RPC version:{success['negotiatedRpcVersion']}"
|
||||||
|
)
|
||||||
|
except OBSSDKError as e:
|
||||||
|
self.logger.error(f"{type(e).__name__}: {e}")
|
||||||
|
raise
|
||||||
|
|
||||||
def __enter__(self):
|
def __enter__(self):
|
||||||
return self
|
return self
|
||||||
|
|
||||||
def __exit__(self, exc_type, exc_value, exc_traceback):
|
def __exit__(self, exc_type, exc_value, exc_traceback):
|
||||||
self.base_client.ws.close()
|
self.disconnect()
|
||||||
|
|
||||||
def __repr__(self):
|
def __repr__(self):
|
||||||
return type(
|
return type(
|
||||||
self
|
self
|
||||||
).__name__ + "(host='{host}', port={port}, password='{password}')".format(
|
).__name__ + "(host='{host}', port={port}, password='{password}', timeout={timeout})".format(
|
||||||
**self.base_client.__dict__,
|
**self.base_client.__dict__,
|
||||||
)
|
)
|
||||||
|
|
||||||
def __str__(self):
|
def __str__(self):
|
||||||
return type(self).__name__
|
return type(self).__name__
|
||||||
|
|
||||||
|
def disconnect(self):
|
||||||
|
self.base_client.ws.close()
|
||||||
|
|
||||||
def send(self, param, data=None, raw=False):
|
def send(self, param, data=None, raw=False):
|
||||||
response = self.base_client.req(param, data)
|
try:
|
||||||
if not response["requestStatus"]["result"]:
|
response = self.base_client.req(param, data)
|
||||||
error = (
|
if not response["requestStatus"]["result"]:
|
||||||
f"Request {response['requestType']} returned code {response['requestStatus']['code']}",
|
raise OBSSDKRequestError(
|
||||||
)
|
response["requestType"],
|
||||||
if "comment" in response["requestStatus"]:
|
response["requestStatus"]["code"],
|
||||||
error += (f"With message: {response['requestStatus']['comment']}",)
|
response["requestStatus"].get("comment"),
|
||||||
raise OBSSDKError("\n".join(error))
|
)
|
||||||
|
except OBSSDKRequestError as e:
|
||||||
|
self.logger.exception(f"{type(e).__name__}: {e}")
|
||||||
|
raise
|
||||||
if "responseData" in response:
|
if "responseData" in response:
|
||||||
if raw:
|
if raw:
|
||||||
return response["responseData"]
|
return response["responseData"]
|
||||||
@@ -124,39 +138,39 @@ class ReqClient:
|
|||||||
|
|
||||||
get_hotkey_list = get_hot_key_list
|
get_hotkey_list = get_hot_key_list
|
||||||
|
|
||||||
def trigger_hot_key_by_name(self, hotkeyName):
|
def trigger_hot_key_by_name(self, hotkeyName, contextName=None):
|
||||||
"""
|
"""
|
||||||
Triggers a hotkey using its name. For hotkey names
|
Triggers a hotkey using its name. For hotkey names
|
||||||
See GetHotkeyList
|
See GetHotkeyList
|
||||||
|
|
||||||
:param hotkeyName: Name of the hotkey to trigger
|
:param hotkeyName: Name of the hotkey to trigger
|
||||||
:type hotkeyName: str
|
:type hotkeyName: str
|
||||||
|
:param contextName: Name of context of the hotkey to trigger
|
||||||
|
:type contextName: str, optional
|
||||||
|
|
||||||
|
|
||||||
"""
|
"""
|
||||||
payload = {"hotkeyName": hotkeyName}
|
payload = {"hotkeyName": hotkeyName, "contextName": contextName}
|
||||||
self.send("TriggerHotkeyByName", payload)
|
self.send("TriggerHotkeyByName", payload)
|
||||||
|
|
||||||
trigger_hotkey_by_name = trigger_hot_key_by_name
|
trigger_hotkey_by_name = trigger_hot_key_by_name
|
||||||
|
|
||||||
def trigger_hot_key_by_key_sequence(
|
def trigger_hot_key_by_key_sequence(
|
||||||
self, keyId, pressShift, pressCtrl, pressAlt, pressCmd
|
self, keyId, pressShift=None, pressCtrl=None, pressAlt=None, pressCmd=None
|
||||||
):
|
):
|
||||||
"""
|
"""
|
||||||
Triggers a hotkey using a sequence of keys.
|
Triggers a hotkey using a sequence of keys.
|
||||||
|
|
||||||
:param keyId: The OBS key ID to use. See https://github.com/obsproject/obs-studio/blob/master/libobs/obs-hotkeys.h
|
:param keyId: The OBS key ID to use. See https://github.com/obsproject/obs-studio/blob/master/libobs/obs-hotkeys.h
|
||||||
:type keyId: str
|
:type keyId: str
|
||||||
:param keyModifiers: Object containing key modifiers to apply.
|
:param pressShift: Press Shift
|
||||||
:type keyModifiers: dict
|
:type pressShift: bool, optional
|
||||||
:param keyModifiers.shift: Press Shift
|
:param pressCtrl: Press CTRL
|
||||||
:type keyModifiers.shift: bool
|
:type pressCtrl: bool, optional
|
||||||
:param keyModifiers.control: Press CTRL
|
:param pressAlt: Press ALT
|
||||||
:type keyModifiers.control: bool
|
:type pressAlt: bool, optional
|
||||||
:param keyModifiers.alt: Press ALT
|
:param pressCmd: Press CMD (Mac)
|
||||||
:type keyModifiers.alt: bool
|
:type pressCmd: bool, optional
|
||||||
:param keyModifiers.cmd: Press CMD (Mac)
|
|
||||||
:type keyModifiers.cmd: bool
|
|
||||||
|
|
||||||
|
|
||||||
"""
|
"""
|
||||||
@@ -422,6 +436,19 @@ class ReqClient:
|
|||||||
"""
|
"""
|
||||||
return self.send("GetRecordDirectory")
|
return self.send("GetRecordDirectory")
|
||||||
|
|
||||||
|
def set_record_directory(self, recordDirectory):
|
||||||
|
"""
|
||||||
|
Sets the current directory that the record output writes files to.
|
||||||
|
IMPORTANT NOTE: Requires obs websocket v5.3 or higher.
|
||||||
|
|
||||||
|
:param recordDirectory: Output directory
|
||||||
|
:type recordDirectory: str
|
||||||
|
"""
|
||||||
|
payload = {
|
||||||
|
"recordDirectory": recordDirectory,
|
||||||
|
}
|
||||||
|
return self.send("SetRecordDirectory", payload)
|
||||||
|
|
||||||
def get_source_active(self, name):
|
def get_source_active(self, name):
|
||||||
"""
|
"""
|
||||||
Gets the active and show state of a source
|
Gets the active and show state of a source
|
||||||
@@ -1073,6 +1100,14 @@ class ReqClient:
|
|||||||
payload = {"position": pos, "release": release}
|
payload = {"position": pos, "release": release}
|
||||||
self.send("SetTBarPosition", payload)
|
self.send("SetTBarPosition", payload)
|
||||||
|
|
||||||
|
def get_source_filter_kind_list(self):
|
||||||
|
"""
|
||||||
|
Gets an array of all available source filter kinds.
|
||||||
|
|
||||||
|
|
||||||
|
"""
|
||||||
|
return self.send("GetSourceFilterKindList")
|
||||||
|
|
||||||
def get_source_filter_list(self, name):
|
def get_source_filter_list(self, name):
|
||||||
"""
|
"""
|
||||||
Gets a list of all of a source's filters.
|
Gets a list of all of a source's filters.
|
||||||
@@ -1284,6 +1319,23 @@ class ReqClient:
|
|||||||
}
|
}
|
||||||
return self.send("GetSceneItemId", payload)
|
return self.send("GetSceneItemId", payload)
|
||||||
|
|
||||||
|
def get_scene_item_source(self, scene_name, scene_item_id):
|
||||||
|
"""
|
||||||
|
Gets the source associated with a scene item.
|
||||||
|
|
||||||
|
:param scene_item_id: Numeric ID of the scene item (>= 0)
|
||||||
|
:type scene_item_id: int
|
||||||
|
:param scene_name: Name of the scene the item is in.
|
||||||
|
:type scene_name: str
|
||||||
|
|
||||||
|
|
||||||
|
"""
|
||||||
|
payload = {
|
||||||
|
"sceneItemId": scene_item_id,
|
||||||
|
"sceneName": scene_name,
|
||||||
|
}
|
||||||
|
return self.send("GetSceneItemSource", payload)
|
||||||
|
|
||||||
def create_scene_item(self, scene_name, source_name, enabled=None):
|
def create_scene_item(self, scene_name, source_name, enabled=None):
|
||||||
"""
|
"""
|
||||||
Creates a new scene item using a source.
|
Creates a new scene item using a source.
|
||||||
@@ -1757,7 +1809,7 @@ class ReqClient:
|
|||||||
|
|
||||||
|
|
||||||
"""
|
"""
|
||||||
self.send("ToggleRecord")
|
return self.send("ToggleRecord")
|
||||||
|
|
||||||
def start_record(self):
|
def start_record(self):
|
||||||
"""
|
"""
|
||||||
@@ -1799,6 +1851,28 @@ class ReqClient:
|
|||||||
"""
|
"""
|
||||||
self.send("ResumeRecord")
|
self.send("ResumeRecord")
|
||||||
|
|
||||||
|
def split_record_file(self):
|
||||||
|
"""
|
||||||
|
Splits the current file being recorded into a new file.
|
||||||
|
|
||||||
|
|
||||||
|
"""
|
||||||
|
self.send("SplitRecordFile")
|
||||||
|
|
||||||
|
def create_record_chapter(self, chapter_name=None):
|
||||||
|
"""
|
||||||
|
Adds a new chapter marker to the file currently being recorded.
|
||||||
|
|
||||||
|
Note: As of OBS 30.2.0, the only file format supporting this feature is Hybrid MP4.
|
||||||
|
|
||||||
|
:param chapter_name: Name of the new chapter
|
||||||
|
:type chapter_name: str
|
||||||
|
|
||||||
|
|
||||||
|
"""
|
||||||
|
payload = {"chapterName": chapter_name}
|
||||||
|
self.send("CreateRecordChapter", payload)
|
||||||
|
|
||||||
def get_media_input_status(self, name):
|
def get_media_input_status(self, name):
|
||||||
"""
|
"""
|
||||||
Gets the status of a media input.
|
Gets the status of a media input.
|
||||||
@@ -1928,3 +2002,66 @@ class ReqClient:
|
|||||||
|
|
||||||
"""
|
"""
|
||||||
return self.send("GetMonitorList")
|
return self.send("GetMonitorList")
|
||||||
|
|
||||||
|
def open_video_mix_projector(
|
||||||
|
self, video_mix_type, monitor_index=-1, projector_geometry=None
|
||||||
|
):
|
||||||
|
"""
|
||||||
|
Opens a projector for a specific output video mix.
|
||||||
|
|
||||||
|
The available mix types are:
|
||||||
|
OBS_WEBSOCKET_VIDEO_MIX_TYPE_PREVIEW
|
||||||
|
OBS_WEBSOCKET_VIDEO_MIX_TYPE_PROGRAM
|
||||||
|
OBS_WEBSOCKET_VIDEO_MIX_TYPE_MULTIVIEW
|
||||||
|
|
||||||
|
:param video_mix_type: Type of mix to open.
|
||||||
|
:type video_mix_type: str
|
||||||
|
:param monitor_index: Monitor index, use GetMonitorList to obtain index
|
||||||
|
:type monitor_index: int
|
||||||
|
:param projector_geometry:
|
||||||
|
Size/Position data for a windowed projector, in Qt Base64 encoded format.
|
||||||
|
Mutually exclusive with monitorIndex
|
||||||
|
:type projector_geometry: str
|
||||||
|
|
||||||
|
"""
|
||||||
|
warn(
|
||||||
|
"open_video_mix_projector request serves to provide feature parity with 4.x. "
|
||||||
|
"It is very likely to be changed/deprecated in a future release.",
|
||||||
|
DeprecationWarning,
|
||||||
|
stacklevel=2,
|
||||||
|
)
|
||||||
|
payload = {
|
||||||
|
"videoMixType": video_mix_type,
|
||||||
|
"monitorIndex": monitor_index,
|
||||||
|
"projectorGeometry": projector_geometry,
|
||||||
|
}
|
||||||
|
self.send("OpenVideoMixProjector", payload)
|
||||||
|
|
||||||
|
def open_source_projector(
|
||||||
|
self, source_name, monitor_index=-1, projector_geometry=None
|
||||||
|
):
|
||||||
|
"""
|
||||||
|
Opens a projector for a source.
|
||||||
|
|
||||||
|
:param source_name: Name of the source to open a projector for
|
||||||
|
:type source_name: str
|
||||||
|
:param monitor_index: Monitor index, use GetMonitorList to obtain index
|
||||||
|
:type monitor_index: int
|
||||||
|
:param projector_geometry:
|
||||||
|
Size/Position data for a windowed projector, in Qt Base64 encoded format.
|
||||||
|
Mutually exclusive with monitorIndex
|
||||||
|
:type projector_geometry: str
|
||||||
|
|
||||||
|
"""
|
||||||
|
warn(
|
||||||
|
"open_source_projector request serves to provide feature parity with 4.x. "
|
||||||
|
"It is very likely to be changed/deprecated in a future release.",
|
||||||
|
DeprecationWarning,
|
||||||
|
stacklevel=2,
|
||||||
|
)
|
||||||
|
payload = {
|
||||||
|
"sourceName": source_name,
|
||||||
|
"monitorIndex": monitor_index,
|
||||||
|
"projectorGeometry": projector_geometry,
|
||||||
|
}
|
||||||
|
self.send("OpenSourceProjector", payload)
|
||||||
|
|||||||
@@ -1 +1 @@
|
|||||||
version = "1.4.2"
|
version = "1.8.0"
|
||||||
|
|||||||
@@ -7,7 +7,7 @@ name = "obsws-python"
|
|||||||
dynamic = ["version"]
|
dynamic = ["version"]
|
||||||
description = "A Python SDK for OBS Studio WebSocket v5.0"
|
description = "A Python SDK for OBS Studio WebSocket v5.0"
|
||||||
readme = "README.md"
|
readme = "README.md"
|
||||||
license = "GPL-3.0-only"
|
license = { text = "GPL-3.0-only" }
|
||||||
requires-python = ">=3.9"
|
requires-python = ">=3.9"
|
||||||
authors = [
|
authors = [
|
||||||
{ name = "Adem Atikturk", email = "aatikturk@gmail.com" },
|
{ name = "Adem Atikturk", email = "aatikturk@gmail.com" },
|
||||||
@@ -17,14 +17,6 @@ dependencies = [
|
|||||||
"websocket-client",
|
"websocket-client",
|
||||||
]
|
]
|
||||||
|
|
||||||
[project.optional-dependencies]
|
|
||||||
dev = [
|
|
||||||
"black",
|
|
||||||
"isort",
|
|
||||||
"pytest",
|
|
||||||
"pytest-randomly",
|
|
||||||
]
|
|
||||||
|
|
||||||
[project.urls]
|
[project.urls]
|
||||||
Homepage = "https://github.com/aatikturk/obsws-python"
|
Homepage = "https://github.com/aatikturk/obsws-python"
|
||||||
|
|
||||||
@@ -36,19 +28,58 @@ include = [
|
|||||||
"/obsws_python",
|
"/obsws_python",
|
||||||
]
|
]
|
||||||
|
|
||||||
|
[tool.hatch.envs.default]
|
||||||
|
dependencies = ["pre-commit"]
|
||||||
|
|
||||||
|
[tool.hatch.envs.e]
|
||||||
|
dependencies = ["keyboard"]
|
||||||
|
|
||||||
[tool.hatch.envs.e.scripts]
|
[tool.hatch.envs.e.scripts]
|
||||||
events = "python {root}\\examples\\events\\."
|
events = "python {root}\\examples\\events\\."
|
||||||
hotkeys = "python {root}\\examples\\hotkeys\\."
|
hotkeys = "python {root}\\examples\\hotkeys\\."
|
||||||
levels = "python {root}\\examples\\levels\\."
|
levels = "python {root}\\examples\\levels\\."
|
||||||
scene_rotate = "python {root}\\examples\\scene_rotate\\."
|
scene_rotate = "python {root}\\examples\\scene_rotate\\."
|
||||||
|
|
||||||
[tool.hatch.envs.test]
|
[tool.hatch.envs.hatch-test]
|
||||||
|
randomize = true
|
||||||
|
|
||||||
|
[tool.hatch.envs.hatch-test.scripts]
|
||||||
|
run = "pytest{env:HATCH_TEST_ARGS:} {args}"
|
||||||
|
|
||||||
|
[[tool.hatch.envs.hatch-test.matrix]]
|
||||||
|
python = ["313", "312", "311", "310", "39"]
|
||||||
|
|
||||||
|
[tool.hatch.envs.style]
|
||||||
|
detached = true
|
||||||
dependencies = [
|
dependencies = [
|
||||||
"pytest",
|
"black",
|
||||||
|
"isort",
|
||||||
]
|
]
|
||||||
|
|
||||||
[tool.hatch.envs.test.scripts]
|
[tool.hatch.envs.style.scripts]
|
||||||
run = 'pytest -v'
|
check = [
|
||||||
|
"black --check --diff .",
|
||||||
|
"isort --check-only --diff .",
|
||||||
|
]
|
||||||
|
fmt = [
|
||||||
|
"isort .",
|
||||||
|
"black .",
|
||||||
|
]
|
||||||
|
|
||||||
[[tool.hatch.envs.test.matrix]]
|
[tool.black]
|
||||||
python = ["39", "310", "311"]
|
line-length = 88
|
||||||
|
include = '\.pyi?$'
|
||||||
|
# 'extend-exclude' excludes files or directories in addition to the defaults
|
||||||
|
extend-exclude = '''
|
||||||
|
(
|
||||||
|
^/\.git/ # exclude all files in the .git directory
|
||||||
|
^/\.hatch/ # exclude all files in the .hatch directory
|
||||||
|
^/\.pytest_cache/ # exclude all files in the .pytest_cache directory
|
||||||
|
| .*_pb2.py # exclude autogenerated Protocol Buffer files anywhere in the project
|
||||||
|
)
|
||||||
|
'''
|
||||||
|
|
||||||
|
[tool.isort]
|
||||||
|
profile = "black"
|
||||||
|
skip = [".gitignore", ".dockerignore"]
|
||||||
|
skip_glob = [".git/*", ".hatch/*", ".pytest_cache/*"]
|
||||||
@@ -18,7 +18,12 @@ class TestAttrs:
|
|||||||
|
|
||||||
def test_get_current_program_scene_attrs(self):
|
def test_get_current_program_scene_attrs(self):
|
||||||
resp = req_cl.get_current_program_scene()
|
resp = req_cl.get_current_program_scene()
|
||||||
assert resp.attrs() == ["current_program_scene_name"]
|
assert resp.attrs() == [
|
||||||
|
"current_program_scene_name",
|
||||||
|
"current_program_scene_uuid",
|
||||||
|
"scene_name",
|
||||||
|
"scene_uuid",
|
||||||
|
]
|
||||||
|
|
||||||
def test_get_transition_kind_list_attrs(self):
|
def test_get_transition_kind_list_attrs(self):
|
||||||
resp = req_cl.get_transition_kind_list()
|
resp = req_cl.get_transition_kind_list()
|
||||||
|
|||||||
@@ -1,4 +1,5 @@
|
|||||||
import pytest
|
import pytest
|
||||||
|
|
||||||
from obsws_python.callback import Callback
|
from obsws_python.callback import Callback
|
||||||
|
|
||||||
|
|
||||||
|
|||||||
35
tests/test_error.py
Normal file
35
tests/test_error.py
Normal file
@@ -0,0 +1,35 @@
|
|||||||
|
import pytest
|
||||||
|
|
||||||
|
import obsws_python as obsws
|
||||||
|
from tests import req_cl
|
||||||
|
|
||||||
|
|
||||||
|
class TestErrors:
|
||||||
|
__test__ = True
|
||||||
|
|
||||||
|
def test_it_raises_an_obssdk_error_on_incorrect_password(self):
|
||||||
|
bad_conn = {"host": "localhost", "port": 4455, "password": "incorrectpassword"}
|
||||||
|
with pytest.raises(
|
||||||
|
obsws.error.OBSSDKError,
|
||||||
|
match="failed to identify client with the server, please check connection settings",
|
||||||
|
):
|
||||||
|
obsws.ReqClient(**bad_conn)
|
||||||
|
|
||||||
|
def test_it_raises_an_obssdk_error_if_auth_enabled_but_no_password_provided(self):
|
||||||
|
bad_conn = {"host": "localhost", "port": 4455, "password": ""}
|
||||||
|
with pytest.raises(
|
||||||
|
obsws.error.OBSSDKError,
|
||||||
|
match="authentication enabled but no password provided",
|
||||||
|
):
|
||||||
|
obsws.ReqClient(**bad_conn)
|
||||||
|
|
||||||
|
def test_it_raises_a_request_error_on_invalid_request(self):
|
||||||
|
with pytest.raises(
|
||||||
|
obsws.error.OBSSDKRequestError,
|
||||||
|
match="Request SetCurrentProgramScene returned code 600. With message: No source was found by the name of `invalid`.",
|
||||||
|
) as exc_info:
|
||||||
|
req_cl.set_current_program_scene("invalid")
|
||||||
|
|
||||||
|
e = exc_info.value
|
||||||
|
assert e.req_name == "SetCurrentProgramScene"
|
||||||
|
assert e.code == 600
|
||||||
@@ -13,33 +13,8 @@ class TestRequests:
|
|||||||
|
|
||||||
def test_get_hot_key_list(self):
|
def test_get_hot_key_list(self):
|
||||||
resp = req_cl.get_hot_key_list()
|
resp = req_cl.get_hot_key_list()
|
||||||
obsbasic_hotkey_list = [
|
assert resp.hotkeys
|
||||||
"OBSBasic.SelectScene",
|
assert any(x.startswith("OBSBasic.") for x in resp.hotkeys)
|
||||||
"OBSBasic.SelectScene",
|
|
||||||
"OBSBasic.SelectScene",
|
|
||||||
"OBSBasic.SelectScene",
|
|
||||||
"OBSBasic.StartStreaming",
|
|
||||||
"OBSBasic.StopStreaming",
|
|
||||||
"OBSBasic.ForceStopStreaming",
|
|
||||||
"OBSBasic.StartRecording",
|
|
||||||
"OBSBasic.StopRecording",
|
|
||||||
"OBSBasic.PauseRecording",
|
|
||||||
"OBSBasic.UnpauseRecording",
|
|
||||||
"OBSBasic.StartReplayBuffer",
|
|
||||||
"OBSBasic.StopReplayBuffer",
|
|
||||||
"OBSBasic.StartVirtualCam",
|
|
||||||
"OBSBasic.StopVirtualCam",
|
|
||||||
"OBSBasic.EnablePreview",
|
|
||||||
"OBSBasic.DisablePreview",
|
|
||||||
"OBSBasic.ShowContextBar",
|
|
||||||
"OBSBasic.HideContextBar",
|
|
||||||
"OBSBasic.TogglePreviewProgram",
|
|
||||||
"OBSBasic.Transition",
|
|
||||||
"OBSBasic.ResetStats",
|
|
||||||
"OBSBasic.Screenshot",
|
|
||||||
"OBSBasic.SelectedSourceScreenshot",
|
|
||||||
]
|
|
||||||
assert all(x in resp.hotkeys for x in obsbasic_hotkey_list)
|
|
||||||
|
|
||||||
@pytest.mark.parametrize(
|
@pytest.mark.parametrize(
|
||||||
"name,data",
|
"name,data",
|
||||||
@@ -53,6 +28,7 @@ class TestRequests:
|
|||||||
resp = req_cl.get_persistent_data("OBS_WEBSOCKET_DATA_REALM_PROFILE", name)
|
resp = req_cl.get_persistent_data("OBS_WEBSOCKET_DATA_REALM_PROFILE", name)
|
||||||
assert resp.slot_value == data
|
assert resp.slot_value == data
|
||||||
|
|
||||||
|
@pytest.mark.skip(reason="possible bug in obs-websocket, needs checking")
|
||||||
def test_profile_list(self):
|
def test_profile_list(self):
|
||||||
req_cl.create_profile("test")
|
req_cl.create_profile("test")
|
||||||
resp = req_cl.get_profile_list()
|
resp = req_cl.get_profile_list()
|
||||||
@@ -95,11 +71,15 @@ class TestRequests:
|
|||||||
"START_TEST", "test", "color_source_v3", {"color": 4294945535}, True
|
"START_TEST", "test", "color_source_v3", {"color": 4294945535}, True
|
||||||
)
|
)
|
||||||
resp = req_cl.get_input_list()
|
resp = req_cl.get_input_list()
|
||||||
assert {
|
for input_item in resp.inputs:
|
||||||
"inputKind": "color_source_v3",
|
if input_item["inputName"] == "test":
|
||||||
"inputName": "test",
|
assert input_item["inputKind"] == "color_source_v3"
|
||||||
"unversionedInputKind": "color_source",
|
assert input_item["unversionedInputKind"] == "color_source"
|
||||||
} in resp.inputs
|
break
|
||||||
|
else:
|
||||||
|
# This else block is executed if the for loop completes without finding the input_item with inputName "test"
|
||||||
|
raise AssertionError("Input with inputName 'test' not found")
|
||||||
|
|
||||||
resp = req_cl.get_input_settings("test")
|
resp = req_cl.get_input_settings("test")
|
||||||
assert resp.input_kind == "color_source_v3"
|
assert resp.input_kind == "color_source_v3"
|
||||||
assert resp.input_settings == {"color": 4294945535}
|
assert resp.input_settings == {"color": 4294945535}
|
||||||
|
|||||||
Reference in New Issue
Block a user