62 Commits

Author SHA1 Message Date
3bce50701e add v5.4, v5.5 methods
v5.4:
- get_source_filter_kind_list
- get_scene_item_source

v5.5:
- split_record_file
- create_record_chapter

minor bump
2025-06-23 06:25:31 +01:00
8e8062d5c8 Merge pull request #57 from onyx-and-iris/dev
Return response class for toggle_record
2025-05-14 20:19:49 +01:00
6f64e884d8 md fix 2025-05-07 22:00:19 +01:00
90abc4f9ee upd test_get_hot_key_list
- check hotkey list is not empty
- check it has at least one OBSBasic. hotkey.
2025-05-07 19:13:09 +01:00
f564f53c69 return response class for toggle_record()
patch bump
2025-05-07 18:42:29 +01:00
4d9dfa9d11 Merge pull request #55 from onyx-and-iris/dev
Update tests
2025-02-11 14:19:27 +00:00
9e361f0f8a fix hatch link 2025-02-11 10:16:56 +00:00
797161a6f2 import Callable, Iterable from collections.abs instead of typing.
update tests to reflect changes in the API.

reorganise hatch envs

add black,isort configs to pyproject.toml

add pre-commit config
2025-02-11 09:51:00 +00:00
0fe78197fc Merge branch 'aatikturk:main' into dev 2025-02-10 12:27:07 +00:00
Adem
9c4c5a1df9 Merge pull request #53 from Zynthasius39/dev-zynt
Fix Trigger Hotkey Methods
2025-02-10 15:02:36 +03:00
f52ac163b8 patch bump version 2025-02-10 09:12:09 +00:00
Zynthasius39
197a60a7cd Fix trigger_hot_key_by_key_sequence() method 2025-02-08 18:31:12 +04:00
Zynthasius39
633093ead4 Fix trigger_hot_key_by_name() method 2025-02-08 17:59:45 +04:00
935392a0b6 Merge branch 'aatikturk:main' into dev 2025-01-25 22:34:36 +00:00
d2f2926334 Merge pull request #51 from marzeq/patch-1
Fix project.license field in pyproject.toml so that setup.py doesn't fail
2025-01-25 21:48:55 +00:00
marzeq
58cd50dd6c Fix project.license field in pyproject.toml so that setup.py doesn't fail 2025-01-25 21:34:27 +01:00
7614cdfe4a add py12 to test matrix 2024-02-21 14:15:40 +00:00
Adem
9402f2e472 Merge pull request #43 from onyx-and-iris/fix-disconnect
Add disconnect() methods. Default ws timeout to None for event thread.
2024-01-21 15:45:06 +03:00
ef8df5cf4d bump to 1.7.0 2024-01-21 12:34:10 +00:00
1abca0c7e4 bump to 1.7.0b0 2024-01-09 15:37:33 +00:00
85180c1d94 upd variable name 2024-01-09 12:17:47 +00:00
f4db1ad95c fix prompt 2024-01-07 14:37:15 +00:00
efaee7594e should a socket operation be attempted after socket closed
then catch and log OSError and close thread.
2024-01-07 12:35:20 +00:00
2cebd5eedb upd examples, they now use context managers 2024-01-07 11:21:01 +00:00
cac236c004 removes timeout for socket before starting worker thread 2024-01-07 11:19:33 +00:00
6aa6db09eb adds an event object and listens until its set
sets the event object on WebSocketConnectionClosedException

adds __enter__(), __exit__() methods

adds disconnect() to event client. aliases it as unsubscribe

checks for non-empty response with:
`if r := self.base_client.ws.recv()`
before attempting to json.load() it.
2024-01-05 09:57:08 +00:00
f1c2efa4a1 adds disconnect() method to ReqClient
now calling disconnect() in __exit__()
2024-01-05 09:36:02 +00:00
Adem
4654d2529f Merge pull request #39 from onyx-and-iris/dev
patch bump for PR #37
2023-10-23 14:58:56 +03:00
1494208f63 patch bump for issue #37 2023-10-23 12:43:59 +01:00
Adem
d217630289 Merge pull request #37 from aatikturk/implement_v5.3_methods
Update reqs.py

implemented  set_record_directory method. (only availabe for obs websocket v5.3 or higher)
2023-10-23 14:00:08 +03:00
Adem
5bfe792fa6 Update reqs.py
added set_record_directory  method to ReqClient.
2023-10-23 09:29:16 +03:00
3c36619173 Merge pull request #36 from onyx-and-iris/add-projector-methods
Add projector methods
2023-10-10 17:38:53 +01:00
c4cf817042 split at full stop 2023-10-09 22:34:05 +01:00
ba5da8dfef upd obsbasic hotkey list in tests 2023-10-09 22:29:18 +01:00
83577e2d61 adds projector methods with a deprecation warning
patch bump

closes #35
2023-10-09 22:06:18 +01:00
Adem
8aa2e78ba6 Merge pull request #32 from onyx-and-iris/add-request-error-class
Error handling with base error class
2023-08-14 14:38:43 +03:00
780f07e25f minor version bump 2023-08-14 12:18:29 +01:00
70a422696e expand the Requests section in README
add a section about the {ReqClient}.send() method.
2023-08-14 11:11:46 +01:00
a7ef61018b refactor OBSSDKRequestError
reword error section in README
2023-08-14 00:44:59 +01:00
013cf15024 check req_name and code
for OBSSDKRequestError class
2023-08-12 14:51:44 +01:00
f88e8ee3a6 Errors section in readme updated 2023-08-11 22:35:25 +01:00
6fa24fe609 error tests added 2023-08-11 22:33:56 +01:00
ffd215aadf send now raises an OBSSDKRequestError
it is then logged and rethrown
2023-08-11 22:33:41 +01:00
f3e75c0ddf OBSSDKError is now the base custom error class
OBSSDKTimeoutError and OBSSDKRequestError subclass it

req_name and error code set as error class attributes.
2023-08-11 22:32:50 +01:00
5db7a705c5 log and rethrow TimeoutError on connection
we can just encode challenge here.

shorten opcode != 2 message
2023-08-11 22:31:03 +01:00
ca72b92eb3 Merge pull request #30 from aatikturk/client_auth_loggers
auth logger for clients
2023-07-04 17:17:44 +01:00
98b17b6749 add .python-version to .gitignore 2023-06-30 22:44:50 +01:00
5462c47b65 log errors raised in authenticate() 2023-06-28 17:56:56 +01:00
126e5cb0a4 raise OBSSDKError if auth reponse opcode != 2 2023-06-28 17:56:29 +01:00
Adem
4ced7193df patch bump 2023-06-23 01:53:02 +03:00
Adem
468c63f697 auth logger for clients
added RpcVersion in auth loggers for both requests and events clients.
removed the check in baseclient auth function and returned the whole response.
2023-06-23 01:48:45 +03:00
Adem
24f8487d93 Merge pull request #29 from onyx-and-iris/dev
added module level loggers.
2023-06-23 00:26:30 +03:00
2c07f242ad added module level loggers.
class loggers implemented as child loggers.

patch bump
2023-06-22 22:17:20 +01:00
4e45de17ea Merge pull request #27 from aatikturk/25-question-set-timeout-for-connection-request
Added 'timeout' option for  baseclient
2023-06-19 18:25:12 +01:00
491a26aaf7 minor ver bump 2023-06-19 17:51:16 +01:00
d84d30b752 update readme Errors section 2023-06-19 17:46:43 +01:00
9e3c1d3f37 raise timeout errors.
added some error/exception logging.

added timeout parameter to repr methods.
2023-06-19 17:45:49 +01:00
82b6cdcd04 add error class OBSSDKTimeoutError 2023-06-19 17:44:10 +01:00
Adem
64a7c2b753 update readme and base client 2023-06-14 01:09:44 +03:00
Adem
15559fdb33 updated readme 2023-05-29 10:48:41 +00:00
Adem
3adf094481 Added 'timeout' option for baseclient. bumped version 2023-05-29 10:34:40 +00:00
Adem
9c41f2bb59 Merge pull request #24 from onyx-and-iris/dev
check user home directory for config.toml
2023-03-11 22:48:43 +03:00
17 changed files with 474 additions and 169 deletions

12
.gitignore vendored
View File

@@ -45,6 +45,14 @@ env.bak/
venv.bak/
.hatch
# pyenv
# For a library or package, you might want to ignore these files since the code is
# intended to run in multiple environments; otherwise, check them in:
.python-version
# Test/config
quick.py
config.toml
test-*.py
config.toml
obsws.log
.vscode/

10
.pre-commit-config.yaml Normal file
View File

@@ -0,0 +1,10 @@
repos:
- repo: local
hooks:
- id: format
name: format
entry: hatch run style:fmt
language: system
pass_filenames: false
verbose: true
files: \.(py)$

View File

@@ -1,5 +1,6 @@
[![PyPI version](https://badge.fury.io/py/obsws-python.svg)](https://badge.fury.io/py/obsws-python)
[![License: GPL v3](https://img.shields.io/badge/License-GPLv3-blue.svg)](https://github.com/aatikturk/obsstudio_sdk/blob/main/LICENSE)
[![Hatch project](https://img.shields.io/badge/%F0%9F%A5%9A-Hatch-4051b5.svg)](https://github.com/pypa/hatch)
[![Code style: black](https://img.shields.io/badge/code%20style-black-000000.svg)](https://github.com/psf/black)
[![Imports: isort](https://img.shields.io/badge/%20imports-isort-%231674b1?style=flat&labelColor=ef8336)](https://pycqa.github.io/isort/)
@@ -9,10 +10,10 @@ Not all endpoints in the official documentation are implemented.
## Requirements
- [OBS Studio](https://obsproject.com/)
- [OBS Websocket v5 Plugin](https://github.com/obsproject/obs-websocket/releases/tag/5.0.0)
- With the release of OBS Studio version 28, Websocket plugin is included by default. But it should be manually installed for earlier versions of OBS.
- Python 3.9 or greater
- [OBS Studio](https://obsproject.com/)
- [OBS Websocket v5 Plugin](https://github.com/obsproject/obs-websocket/releases/tag/5.0.0)
- With the release of OBS Studio version 28, Websocket plugin is included by default. But it should be manually installed for earlier versions of OBS.
- Python 3.9 or greater
### How to install using pip
@@ -24,9 +25,10 @@ pip install obsws-python
By default the clients connect with parameters:
- `host`: "localhost"
- `port`: 4455
- `password`: ""
- `host`: "localhost"
- `port`: 4455
- `password`: ""
- `timeout`: None
You may override these parameters by storing them in a toml config file or passing them as keyword arguments.
@@ -53,7 +55,7 @@ Example `__main__.py`:
import obsws_python as obs
# pass conn info if not in config.toml
cl = obs.ReqClient(host='localhost', port=4455, password='mystrongpass')
cl = obs.ReqClient(host='localhost', port=4455, password='mystrongpass', timeout=3)
# Toggle the mute state of your Mic input
cl.toggle_input_mute('Mic/Aux')
@@ -61,7 +63,7 @@ cl.toggle_input_mute('Mic/Aux')
### Requests
Method names for requests match the API calls but snake cased.
Method names for requests match the API calls but snake cased. If a successful call is made with the Request client and the response is expected to contain fields then a response object will be returned. You may then access the response fields as class attributes. They will be snake cased.
example:
@@ -69,14 +71,29 @@ example:
# load conn info from config.toml
cl = obs.ReqClient()
# GetVersion
# GetVersion, returns a response object
resp = cl.get_version()
# Access it's field as an attribute
print(f"OBS Version: {resp.obs_version}")
# SetCurrentProgramScene
cl.set_current_program_scene("BRB")
```
For a full list of requests refer to [Requests](https://github.com/obsproject/obs-websocket/blob/master/docs/generated/protocol.md#requests)
#### `send(param, data=None, raw=False)`
If you prefer to work with the JSON data directly the {ReqClient}.send() method accepts an argument, `raw`. If set to True the raw response data will be returned, instead of a response object.
example:
```python
resp = cl_req.send("GetVersion", raw=True)
print(f"response data: {resp}")
```
For a full list of requests refer to [Requests][obsws-reqs]
### Events
@@ -109,7 +126,7 @@ cl.callback.deregister(on_input_mute_state_changed)
`register(fns)` and `deregister(fns)` accept both single functions and lists of functions.
For a full list of events refer to [Events](https://github.com/obsproject/obs-websocket/blob/master/docs/generated/protocol.md#events)
For a full list of events refer to [Events][obsws-events]
### Attributes
@@ -127,9 +144,13 @@ def on_scene_created(data):
### Errors
If a request fails an `OBSSDKError` will be raised with a status code.
For a full list of status codes refer to [Codes](https://github.com/obsproject/obs-websocket/blob/master/docs/generated/protocol.md#requeststatus)
- `OBSSDKError`: Base error class.
- `OBSSDKTimeoutError`: Raised if a timeout occurs during sending/receiving a request or receiving an event
- `OBSSDKRequestError`: Raised when a request returns an error code.
- The following attributes are available:
- `req_name`: name of the request.
- `code`: request status code.
- For a full list of status codes refer to [Codes][obsws-codes]
### Logging
@@ -148,18 +169,21 @@ logging.basicConfig(level=logging.DEBUG)
### Tests
First install development dependencies:
`pip install -e .['dev']`
To run all tests:
Install [hatch][hatch-install] and then:
```
pytest -v
hatch test
```
### Official Documentation
For the full documentation:
- [OBS Websocket SDK](https://github.com/obsproject/obs-websocket/blob/master/docs/generated/protocol.md#obs-websocket-501-protocol)
- [OBS Websocket SDK][obsws-pro]
[obsws-reqs]: https://github.com/obsproject/obs-websocket/blob/master/docs/generated/protocol.md#requests
[obsws-events]: https://github.com/obsproject/obs-websocket/blob/master/docs/generated/protocol.md#events
[obsws-codes]: https://github.com/obsproject/obs-websocket/blob/master/docs/generated/protocol.md#requeststatus
[obsws-pro]: https://github.com/obsproject/obs-websocket/blob/master/docs/generated/protocol.md#obs-websocket-501-protocol
[hatch-install]: https://hatch.pypa.io/latest/install/

View File

@@ -17,6 +17,12 @@ class Observer:
print(f"Registered events: {self._client.callback.get()}")
self.running = True
def __enter__(self):
return self
def __exit__(self, exc_type, exc_value, exc_traceback):
self._client.disconnect()
def on_current_program_scene_changed(self, data):
"""The current program scene has changed."""
print(f"Switched to scene {data.scene_name}")
@@ -31,13 +37,11 @@ class Observer:
def on_exit_started(self, _):
"""OBS has begun the shutdown process."""
print(f"OBS closing!")
self._client.unsubscribe()
print("OBS closing!")
self.running = False
if __name__ == "__main__":
observer = Observer()
while observer.running:
time.sleep(0.1)
with Observer() as observer:
while observer.running:
time.sleep(0.1)

View File

@@ -1,6 +1,7 @@
import inspect
import keyboard
import obsws_python as obs
@@ -10,6 +11,12 @@ class Observer:
self._client.callback.register(self.on_current_program_scene_changed)
print(f"Registered events: {self._client.callback.get()}")
def __enter__(self):
return self
def __exit__(self, exc_type, exc_value, exc_traceback):
self._client.disconnect()
@property
def event_identifier(self):
return inspect.stack()[1].function
@@ -31,13 +38,12 @@ def set_scene(scene, *args):
if __name__ == "__main__":
req_client = obs.ReqClient()
observer = Observer()
with obs.ReqClient() as req_client:
with Observer() as observer:
keyboard.add_hotkey("0", version)
keyboard.add_hotkey("1", set_scene, args=("START",))
keyboard.add_hotkey("2", set_scene, args=("BRB",))
keyboard.add_hotkey("3", set_scene, args=("END",))
keyboard.add_hotkey("0", version)
keyboard.add_hotkey("1", set_scene, args=("START",))
keyboard.add_hotkey("2", set_scene, args=("BRB",))
keyboard.add_hotkey("3", set_scene, args=("END",))
print("press ctrl+enter to quit")
keyboard.wait("ctrl+enter")
print("press ctrl+enter to quit")
keyboard.wait("ctrl+enter")

View File

@@ -9,6 +9,8 @@ LEVELTYPE = IntEnum(
start=0,
)
DEVICE = "Desktop Audio"
def on_input_mute_state_changed(data):
"""An input's mute state has changed."""
@@ -32,15 +34,14 @@ def on_input_volume_meters(data):
def main():
client = obs.EventClient(subs=(obs.Subs.LOW_VOLUME | obs.Subs.INPUTVOLUMEMETERS))
client.callback.register([on_input_volume_meters, on_input_mute_state_changed])
with obs.EventClient(
subs=(obs.Subs.LOW_VOLUME | obs.Subs.INPUTVOLUMEMETERS)
) as client:
client.callback.register([on_input_volume_meters, on_input_mute_state_changed])
while cmd := input("<Enter> to exit>\n"):
if not cmd:
break
while _ := input("Press <Enter> to exit\n"):
pass
if __name__ == "__main__":
DEVICE = "Desktop Audio"
main()

View File

@@ -7,15 +7,23 @@ from random import randint
from typing import Optional
import websocket
from websocket import WebSocketTimeoutException
from .error import OBSSDKError
from .error import OBSSDKError, OBSSDKTimeoutError
logger = logging.getLogger(__name__)
class ObsClient:
logger = logging.getLogger("baseclient.obsclient")
def __init__(self, **kwargs):
defaultkwargs = {"host": "localhost", "port": 4455, "password": "", "subs": 0}
self.logger = logger.getChild(self.__class__.__name__)
defaultkwargs = {
"host": "localhost",
"port": 4455,
"password": "",
"subs": 0,
"timeout": None,
}
if not any(key in kwargs for key in ("host", "port", "password")):
kwargs |= self._conn_from_toml()
kwargs = defaultkwargs | kwargs
@@ -23,14 +31,21 @@ class ObsClient:
setattr(self, attr, val)
self.logger.info(
"Connecting with parameters: host='{host}' port={port} password='{password}' subs={subs}".format(
"Connecting with parameters: host='{host}' port={port} password='{password}' subs={subs} timeout={timeout}".format(
**self.__dict__
)
)
self.ws = websocket.WebSocket()
self.ws.connect(f"ws://{self.host}:{self.port}")
self.server_hello = json.loads(self.ws.recv())
try:
self.ws = websocket.WebSocket()
self.ws.connect(f"ws://{self.host}:{self.port}", timeout=self.timeout)
self.server_hello = json.loads(self.ws.recv())
except ValueError as e:
self.logger.error(f"{type(e).__name__}: {e}")
raise
except (ConnectionRefusedError, TimeoutError, WebSocketTimeoutException) as e:
self.logger.exception(f"{type(e).__name__}: {e}")
raise
def _conn_from_toml(self) -> dict:
try:
@@ -81,10 +96,8 @@ class ObsClient:
auth = base64.b64encode(
hashlib.sha256(
(
secret.decode()
+ self.server_hello["d"]["authentication"]["challenge"]
).encode()
secret
+ self.server_hello["d"]["authentication"]["challenge"].encode()
).digest()
).decode()
@@ -93,9 +106,15 @@ class ObsClient:
self.ws.send(json.dumps(payload))
try:
response = json.loads(self.ws.recv())
return response["op"] == 2
if response["op"] != 2:
raise OBSSDKError(
"failed to identify client with the server, expected response with OpCode 2"
)
return response["d"]
except json.decoder.JSONDecodeError:
raise OBSSDKError("failed to identify client with the server")
raise OBSSDKError(
"failed to identify client with the server, please check connection settings"
)
def req(self, req_type, req_data=None):
payload = {
@@ -105,7 +124,11 @@ class ObsClient:
if req_data:
payload["d"]["requestData"] = req_data
self.logger.debug(f"Sending request {payload}")
self.ws.send(json.dumps(payload))
response = json.loads(self.ws.recv())
try:
self.ws.send(json.dumps(payload))
response = json.loads(self.ws.recv())
except WebSocketTimeoutException as e:
self.logger.exception(f"{type(e).__name__}: {e}")
raise OBSSDKTimeoutError("Timeout while trying to send the request") from e
self.logger.debug(f"Response received {response}")
return response["d"]

View File

@@ -1,4 +1,5 @@
from typing import Callable, Iterable, Union
from collections.abc import Callable, Iterable
from typing import Union
from .util import as_dataclass, to_camel_case, to_snake_case

View File

@@ -1,4 +1,18 @@
class OBSSDKError(Exception):
"""general errors"""
"""Base class for OBSSDK errors"""
pass
class OBSSDKTimeoutError(OBSSDKError):
"""Exception raised when a connection times out"""
class OBSSDKRequestError(OBSSDKError):
"""Exception raised when a request returns an error code"""
def __init__(self, req_name, code, comment):
self.req_name = req_name
self.code = code
message = f"Request {self.req_name} returned code {self.code}."
if comment:
message += f" With message: {comment}"
super().__init__(message)

View File

@@ -1,10 +1,12 @@
import json
import logging
import time
from threading import Thread
import threading
from websocket import WebSocketConnectionClosedException, WebSocketTimeoutException
from .baseclient import ObsClient
from .callback import Callback
from .error import OBSSDKError, OBSSDKTimeoutError
from .subs import Subs
"""
@@ -13,24 +15,36 @@ defined in official github repo
https://github.com/obsproject/obs-websocket/blob/master/docs/generated/protocol.md#events
"""
logger = logging.getLogger(__name__)
class EventClient:
logger = logging.getLogger("events.eventclient")
DELAY = 0.001
def __init__(self, **kwargs):
self.logger = logger.getChild(self.__class__.__name__)
defaultkwargs = {"subs": Subs.LOW_VOLUME}
kwargs = defaultkwargs | kwargs
self.base_client = ObsClient(**kwargs)
if self.base_client.authenticate():
self.logger.info(f"Successfully identified {self} with the server")
try:
success = self.base_client.authenticate()
self.logger.info(
f"Successfully identified {self} with the server using RPC version:{success['negotiatedRpcVersion']}"
)
except OBSSDKError as e:
self.logger.error(f"{type(e).__name__}: {e}")
raise
self.callback = Callback()
self.subscribe()
def __enter__(self):
return self
def __exit__(self, exc_type, exc_value, exc_traceback):
self.disconnect()
def __repr__(self):
return type(
self
).__name__ + "(host='{host}', port={port}, password='{password}', subs={subs})".format(
).__name__ + "(host='{host}', port={port}, password='{password}', subs={subs}, timeout={timeout})".format(
**self.base_client.__dict__,
)
@@ -38,29 +52,40 @@ class EventClient:
return type(self).__name__
def subscribe(self):
worker = Thread(target=self.trigger, daemon=True)
worker.start()
self.base_client.ws.settimeout(None)
stop_event = threading.Event()
self.worker = threading.Thread(
target=self.trigger, daemon=True, args=(stop_event,)
)
self.worker.start()
def trigger(self):
def trigger(self, stop_event):
"""
Continuously listen for events.
Triggers a callback on event received.
"""
self.running = True
while self.running:
event = json.loads(self.base_client.ws.recv())
self.logger.debug(f"Event received {event}")
type_, data = (
event["d"].get("eventType"),
event["d"].get("eventData"),
)
self.callback.trigger(type_, data if data else {})
time.sleep(self.DELAY)
while not stop_event.is_set():
try:
if response := self.base_client.ws.recv():
event = json.loads(response)
self.logger.debug(f"Event received {event}")
type_, data = (
event["d"].get("eventType"),
event["d"].get("eventData"),
)
self.callback.trigger(type_, data if data else {})
except WebSocketTimeoutException as e:
self.logger.exception(f"{type(e).__name__}: {e}")
raise OBSSDKTimeoutError("Timeout while waiting for event") from e
except (WebSocketConnectionClosedException, OSError) as e:
self.logger.debug(f"{type(e).__name__} terminating the event thread")
stop_event.set()
def disconnect(self):
"""stop listening for events"""
def unsubscribe(self):
"""
stop listening for events
"""
self.running = False
self.base_client.ws.close()
self.worker.join()
unsubscribe = disconnect

View File

@@ -1,7 +1,8 @@
import logging
from warnings import warn
from .baseclient import ObsClient
from .error import OBSSDKError
from .error import OBSSDKError, OBSSDKRequestError
from .util import as_dataclass
"""
@@ -10,40 +11,53 @@ defined in official github repo
https://github.com/obsproject/obs-websocket/blob/master/docs/generated/protocol.md#Requests
"""
logger = logging.getLogger(__name__)
class ReqClient:
logger = logging.getLogger("reqs.reqclient")
def __init__(self, **kwargs):
self.logger = logger.getChild(self.__class__.__name__)
self.base_client = ObsClient(**kwargs)
if self.base_client.authenticate():
self.logger.info(f"Successfully identified {self} with the server")
try:
success = self.base_client.authenticate()
self.logger.info(
f"Successfully identified {self} with the server using RPC version:{success['negotiatedRpcVersion']}"
)
except OBSSDKError as e:
self.logger.error(f"{type(e).__name__}: {e}")
raise
def __enter__(self):
return self
def __exit__(self, exc_type, exc_value, exc_traceback):
self.base_client.ws.close()
self.disconnect()
def __repr__(self):
return type(
self
).__name__ + "(host='{host}', port={port}, password='{password}')".format(
).__name__ + "(host='{host}', port={port}, password='{password}', timeout={timeout})".format(
**self.base_client.__dict__,
)
def __str__(self):
return type(self).__name__
def disconnect(self):
self.base_client.ws.close()
def send(self, param, data=None, raw=False):
response = self.base_client.req(param, data)
if not response["requestStatus"]["result"]:
error = (
f"Request {response['requestType']} returned code {response['requestStatus']['code']}",
)
if "comment" in response["requestStatus"]:
error += (f"With message: {response['requestStatus']['comment']}",)
raise OBSSDKError("\n".join(error))
try:
response = self.base_client.req(param, data)
if not response["requestStatus"]["result"]:
raise OBSSDKRequestError(
response["requestType"],
response["requestStatus"]["code"],
response["requestStatus"].get("comment"),
)
except OBSSDKRequestError as e:
self.logger.exception(f"{type(e).__name__}: {e}")
raise
if "responseData" in response:
if raw:
return response["responseData"]
@@ -124,39 +138,39 @@ class ReqClient:
get_hotkey_list = get_hot_key_list
def trigger_hot_key_by_name(self, hotkeyName):
def trigger_hot_key_by_name(self, hotkeyName, contextName=None):
"""
Triggers a hotkey using its name. For hotkey names
See GetHotkeyList
:param hotkeyName: Name of the hotkey to trigger
:type hotkeyName: str
:param contextName: Name of context of the hotkey to trigger
:type contextName: str, optional
"""
payload = {"hotkeyName": hotkeyName}
payload = {"hotkeyName": hotkeyName, "contextName": contextName}
self.send("TriggerHotkeyByName", payload)
trigger_hotkey_by_name = trigger_hot_key_by_name
def trigger_hot_key_by_key_sequence(
self, keyId, pressShift, pressCtrl, pressAlt, pressCmd
self, keyId, pressShift=None, pressCtrl=None, pressAlt=None, pressCmd=None
):
"""
Triggers a hotkey using a sequence of keys.
:param keyId: The OBS key ID to use. See https://github.com/obsproject/obs-studio/blob/master/libobs/obs-hotkeys.h
:type keyId: str
:param keyModifiers: Object containing key modifiers to apply.
:type keyModifiers: dict
:param keyModifiers.shift: Press Shift
:type keyModifiers.shift: bool
:param keyModifiers.control: Press CTRL
:type keyModifiers.control: bool
:param keyModifiers.alt: Press ALT
:type keyModifiers.alt: bool
:param keyModifiers.cmd: Press CMD (Mac)
:type keyModifiers.cmd: bool
:param pressShift: Press Shift
:type pressShift: bool, optional
:param pressCtrl: Press CTRL
:type pressCtrl: bool, optional
:param pressAlt: Press ALT
:type pressAlt: bool, optional
:param pressCmd: Press CMD (Mac)
:type pressCmd: bool, optional
"""
@@ -422,6 +436,19 @@ class ReqClient:
"""
return self.send("GetRecordDirectory")
def set_record_directory(self, recordDirectory):
"""
Sets the current directory that the record output writes files to.
IMPORTANT NOTE: Requires obs websocket v5.3 or higher.
:param recordDirectory: Output directory
:type recordDirectory: str
"""
payload = {
"recordDirectory": recordDirectory,
}
return self.send("SetRecordDirectory", payload)
def get_source_active(self, name):
"""
Gets the active and show state of a source
@@ -1073,6 +1100,14 @@ class ReqClient:
payload = {"position": pos, "release": release}
self.send("SetTBarPosition", payload)
def get_source_filter_kind_list(self):
"""
Gets an array of all available source filter kinds.
"""
return self.send("GetSourceFilterKindList")
def get_source_filter_list(self, name):
"""
Gets a list of all of a source's filters.
@@ -1284,6 +1319,23 @@ class ReqClient:
}
return self.send("GetSceneItemId", payload)
def get_scene_item_source(self, scene_name, scene_item_id):
"""
Gets the source associated with a scene item.
:param scene_item_id: Numeric ID of the scene item (>= 0)
:type scene_item_id: int
:param scene_name: Name of the scene the item is in.
:type scene_name: str
"""
payload = {
"sceneItemId": scene_item_id,
"sceneName": scene_name,
}
return self.send("GetSceneItemSource", payload)
def create_scene_item(self, scene_name, source_name, enabled=None):
"""
Creates a new scene item using a source.
@@ -1757,7 +1809,7 @@ class ReqClient:
"""
self.send("ToggleRecord")
return self.send("ToggleRecord")
def start_record(self):
"""
@@ -1799,6 +1851,28 @@ class ReqClient:
"""
self.send("ResumeRecord")
def split_record_file(self):
"""
Splits the current file being recorded into a new file.
"""
self.send("SplitRecordFile")
def create_record_chapter(self, chapter_name=None):
"""
Adds a new chapter marker to the file currently being recorded.
Note: As of OBS 30.2.0, the only file format supporting this feature is Hybrid MP4.
:param chapter_name: Name of the new chapter
:type chapter_name: str
"""
payload = {"chapterName": chapter_name}
self.send("CreateRecordChapter", payload)
def get_media_input_status(self, name):
"""
Gets the status of a media input.
@@ -1928,3 +2002,66 @@ class ReqClient:
"""
return self.send("GetMonitorList")
def open_video_mix_projector(
self, video_mix_type, monitor_index=-1, projector_geometry=None
):
"""
Opens a projector for a specific output video mix.
The available mix types are:
OBS_WEBSOCKET_VIDEO_MIX_TYPE_PREVIEW
OBS_WEBSOCKET_VIDEO_MIX_TYPE_PROGRAM
OBS_WEBSOCKET_VIDEO_MIX_TYPE_MULTIVIEW
:param video_mix_type: Type of mix to open.
:type video_mix_type: str
:param monitor_index: Monitor index, use GetMonitorList to obtain index
:type monitor_index: int
:param projector_geometry:
Size/Position data for a windowed projector, in Qt Base64 encoded format.
Mutually exclusive with monitorIndex
:type projector_geometry: str
"""
warn(
"open_video_mix_projector request serves to provide feature parity with 4.x. "
"It is very likely to be changed/deprecated in a future release.",
DeprecationWarning,
stacklevel=2,
)
payload = {
"videoMixType": video_mix_type,
"monitorIndex": monitor_index,
"projectorGeometry": projector_geometry,
}
self.send("OpenVideoMixProjector", payload)
def open_source_projector(
self, source_name, monitor_index=-1, projector_geometry=None
):
"""
Opens a projector for a source.
:param source_name: Name of the source to open a projector for
:type source_name: str
:param monitor_index: Monitor index, use GetMonitorList to obtain index
:type monitor_index: int
:param projector_geometry:
Size/Position data for a windowed projector, in Qt Base64 encoded format.
Mutually exclusive with monitorIndex
:type projector_geometry: str
"""
warn(
"open_source_projector request serves to provide feature parity with 4.x. "
"It is very likely to be changed/deprecated in a future release.",
DeprecationWarning,
stacklevel=2,
)
payload = {
"sourceName": source_name,
"monitorIndex": monitor_index,
"projectorGeometry": projector_geometry,
}
self.send("OpenSourceProjector", payload)

View File

@@ -1 +1 @@
version = "1.4.2"
version = "1.8.0"

View File

@@ -7,7 +7,7 @@ name = "obsws-python"
dynamic = ["version"]
description = "A Python SDK for OBS Studio WebSocket v5.0"
readme = "README.md"
license = "GPL-3.0-only"
license = { text = "GPL-3.0-only" }
requires-python = ">=3.9"
authors = [
{ name = "Adem Atikturk", email = "aatikturk@gmail.com" },
@@ -17,14 +17,6 @@ dependencies = [
"websocket-client",
]
[project.optional-dependencies]
dev = [
"black",
"isort",
"pytest",
"pytest-randomly",
]
[project.urls]
Homepage = "https://github.com/aatikturk/obsws-python"
@@ -36,19 +28,58 @@ include = [
"/obsws_python",
]
[tool.hatch.envs.default]
dependencies = ["pre-commit"]
[tool.hatch.envs.e]
dependencies = ["keyboard"]
[tool.hatch.envs.e.scripts]
events = "python {root}\\examples\\events\\."
hotkeys = "python {root}\\examples\\hotkeys\\."
levels = "python {root}\\examples\\levels\\."
scene_rotate = "python {root}\\examples\\scene_rotate\\."
[tool.hatch.envs.test]
[tool.hatch.envs.hatch-test]
randomize = true
[tool.hatch.envs.hatch-test.scripts]
run = "pytest{env:HATCH_TEST_ARGS:} {args}"
[[tool.hatch.envs.hatch-test.matrix]]
python = ["313", "312", "311", "310", "39"]
[tool.hatch.envs.style]
detached = true
dependencies = [
"pytest",
"black",
"isort",
]
[tool.hatch.envs.test.scripts]
run = 'pytest -v'
[tool.hatch.envs.style.scripts]
check = [
"black --check --diff .",
"isort --check-only --diff .",
]
fmt = [
"isort .",
"black .",
]
[[tool.hatch.envs.test.matrix]]
python = ["39", "310", "311"]
[tool.black]
line-length = 88
include = '\.pyi?$'
# 'extend-exclude' excludes files or directories in addition to the defaults
extend-exclude = '''
(
^/\.git/ # exclude all files in the .git directory
^/\.hatch/ # exclude all files in the .hatch directory
^/\.pytest_cache/ # exclude all files in the .pytest_cache directory
| .*_pb2.py # exclude autogenerated Protocol Buffer files anywhere in the project
)
'''
[tool.isort]
profile = "black"
skip = [".gitignore", ".dockerignore"]
skip_glob = [".git/*", ".hatch/*", ".pytest_cache/*"]

View File

@@ -18,7 +18,12 @@ class TestAttrs:
def test_get_current_program_scene_attrs(self):
resp = req_cl.get_current_program_scene()
assert resp.attrs() == ["current_program_scene_name"]
assert resp.attrs() == [
"current_program_scene_name",
"current_program_scene_uuid",
"scene_name",
"scene_uuid",
]
def test_get_transition_kind_list_attrs(self):
resp = req_cl.get_transition_kind_list()

View File

@@ -1,4 +1,5 @@
import pytest
from obsws_python.callback import Callback

35
tests/test_error.py Normal file
View File

@@ -0,0 +1,35 @@
import pytest
import obsws_python as obsws
from tests import req_cl
class TestErrors:
__test__ = True
def test_it_raises_an_obssdk_error_on_incorrect_password(self):
bad_conn = {"host": "localhost", "port": 4455, "password": "incorrectpassword"}
with pytest.raises(
obsws.error.OBSSDKError,
match="failed to identify client with the server, please check connection settings",
):
obsws.ReqClient(**bad_conn)
def test_it_raises_an_obssdk_error_if_auth_enabled_but_no_password_provided(self):
bad_conn = {"host": "localhost", "port": 4455, "password": ""}
with pytest.raises(
obsws.error.OBSSDKError,
match="authentication enabled but no password provided",
):
obsws.ReqClient(**bad_conn)
def test_it_raises_a_request_error_on_invalid_request(self):
with pytest.raises(
obsws.error.OBSSDKRequestError,
match="Request SetCurrentProgramScene returned code 600. With message: No source was found by the name of `invalid`.",
) as exc_info:
req_cl.set_current_program_scene("invalid")
e = exc_info.value
assert e.req_name == "SetCurrentProgramScene"
assert e.code == 600

View File

@@ -13,33 +13,8 @@ class TestRequests:
def test_get_hot_key_list(self):
resp = req_cl.get_hot_key_list()
obsbasic_hotkey_list = [
"OBSBasic.SelectScene",
"OBSBasic.SelectScene",
"OBSBasic.SelectScene",
"OBSBasic.SelectScene",
"OBSBasic.StartStreaming",
"OBSBasic.StopStreaming",
"OBSBasic.ForceStopStreaming",
"OBSBasic.StartRecording",
"OBSBasic.StopRecording",
"OBSBasic.PauseRecording",
"OBSBasic.UnpauseRecording",
"OBSBasic.StartReplayBuffer",
"OBSBasic.StopReplayBuffer",
"OBSBasic.StartVirtualCam",
"OBSBasic.StopVirtualCam",
"OBSBasic.EnablePreview",
"OBSBasic.DisablePreview",
"OBSBasic.ShowContextBar",
"OBSBasic.HideContextBar",
"OBSBasic.TogglePreviewProgram",
"OBSBasic.Transition",
"OBSBasic.ResetStats",
"OBSBasic.Screenshot",
"OBSBasic.SelectedSourceScreenshot",
]
assert all(x in resp.hotkeys for x in obsbasic_hotkey_list)
assert resp.hotkeys
assert any(x.startswith("OBSBasic.") for x in resp.hotkeys)
@pytest.mark.parametrize(
"name,data",
@@ -53,6 +28,7 @@ class TestRequests:
resp = req_cl.get_persistent_data("OBS_WEBSOCKET_DATA_REALM_PROFILE", name)
assert resp.slot_value == data
@pytest.mark.skip(reason="possible bug in obs-websocket, needs checking")
def test_profile_list(self):
req_cl.create_profile("test")
resp = req_cl.get_profile_list()
@@ -95,11 +71,15 @@ class TestRequests:
"START_TEST", "test", "color_source_v3", {"color": 4294945535}, True
)
resp = req_cl.get_input_list()
assert {
"inputKind": "color_source_v3",
"inputName": "test",
"unversionedInputKind": "color_source",
} in resp.inputs
for input_item in resp.inputs:
if input_item["inputName"] == "test":
assert input_item["inputKind"] == "color_source_v3"
assert input_item["unversionedInputKind"] == "color_source"
break
else:
# This else block is executed if the for loop completes without finding the input_item with inputName "test"
raise AssertionError("Input with inputName 'test' not found")
resp = req_cl.get_input_settings("test")
assert resp.input_kind == "color_source_v3"
assert resp.input_settings == {"color": 4294945535}