first commit

This commit is contained in:
2025-04-19 20:15:26 +01:00
commit 8ce9a80eed
23 changed files with 1317 additions and 0 deletions

4
obsws_cli/__about__.py Normal file
View File

@@ -0,0 +1,4 @@
# SPDX-FileCopyrightText: 2025-present onyx-and-iris <code@onyxandiris.online>
#
# SPDX-License-Identifier: MIT
__version__ = "0.1.0"

7
obsws_cli/__init__.py Normal file
View File

@@ -0,0 +1,7 @@
# SPDX-FileCopyrightText: 2025-present onyx-and-iris <code@onyxandiris.online>
#
# SPDX-License-Identifier: MIT
from .app import app
__all__ = ["app"]

23
obsws_cli/alias.py Normal file
View File

@@ -0,0 +1,23 @@
"""module defining a custom group class for handling command name aliases."""
import re
import typer
class AliasGroup(typer.core.TyperGroup):
"""A custom group class to handle command name aliases."""
_CMD_SPLIT_P = re.compile(r' ?[,|] ?')
def get_command(self, ctx, cmd_name):
"""Get a command by name."""
cmd_name = self._group_cmd_name(cmd_name)
return super().get_command(ctx, cmd_name)
def _group_cmd_name(self, default_name):
for cmd in self.commands.values():
name = cmd.name
if name and default_name in self._CMD_SPLIT_P.split(name):
return name
return default_name

78
obsws_cli/app.py Normal file
View File

@@ -0,0 +1,78 @@
"""Command line interface for the OBS WebSocket API."""
from pathlib import Path
from typing import Annotated
import obsws_python as obsws
import typer
from pydantic import ConfigDict
from pydantic_settings import BaseSettings
from . import group, input, item, record, scene, stream
class Settings(BaseSettings):
"""Settings for the OBS WebSocket client."""
model_config = ConfigDict(
env_file=(
'.env',
Path.home() / '.config' / 'obsws-cli' / 'obsws.env',
),
env_file_encoding='utf-8',
env_prefix='OBSWS_',
)
HOST: str = 'localhost'
PORT: int = 4455
PASSWORD: str = '' # No password by default
TIMEOUT: int = 5 # Timeout for requests in seconds
app = typer.Typer()
app.add_typer(scene.app, name='scene')
app.add_typer(item.app, name='item')
app.add_typer(group.app, name='group')
app.add_typer(input.app, name='input')
app.add_typer(record.app, name='record')
app.add_typer(stream.app, name='stream')
@app.command()
def version(ctx: typer.Context):
"""Get the OBS Client and WebSocket versions."""
resp = ctx.obj['obsws'].get_version()
typer.echo(
f'OBS Client version: {resp.obs_version} with WebSocket version: {resp.obs_web_socket_version}'
)
@app.callback()
def main(
ctx: typer.Context,
host: Annotated[str, typer.Option(help='WebSocket host')] = None,
port: Annotated[int, typer.Option(help='WebSocket port')] = None,
password: Annotated[str, typer.Option(help='WebSocket password')] = None,
timeout: Annotated[int, typer.Option(help='WebSocket timeout')] = None,
):
"""obsws_cli is a command line interface for the OBS WebSocket API."""
settings = Settings()
# Allow overriding settings with command line options
if host:
settings.HOST = host
if port:
settings.PORT = port
if password:
settings.PASSWORD = password
if timeout:
settings.TIMEOUT = timeout
ctx.obj = ctx.ensure_object(dict)
ctx.obj['obsws'] = ctx.with_resource(
obsws.ReqClient(
host=settings.HOST,
port=settings.PORT,
password=settings.PASSWORD,
timeout=settings.TIMEOUT,
)
)

18
obsws_cli/errors.py Normal file
View File

@@ -0,0 +1,18 @@
"""Exceptions for obsws_cli."""
class ObswsCliError(Exception):
"""Base class for all exceptions raised by obsws_cli."""
def __init__(self, message: str):
"""Initialize the exception with a message."""
message = (
message.split('With message: ')[1]
if 'With message: ' in message
else message
)
super().__init__(message)
class ObswsCliBadParameter(ObswsCliError):
"""Exception raised when a bad parameter is passed to a command."""

81
obsws_cli/group.py Normal file
View File

@@ -0,0 +1,81 @@
"""module containing commands for manipulating groups in scenes."""
import obsws_python as obsws
import typer
from .alias import AliasGroup
from .errors import ObswsCliBadParameter
from .protocols import DataclassProtocol
app = typer.Typer(cls=AliasGroup)
@app.callback()
def main():
"""Control groups in OBS scenes."""
def _get_group(group_name: str, resp: DataclassProtocol) -> dict | None:
"""Get a group from the scene item list response."""
group = next(
(
item
for item in resp.scene_items
if item.get('sourceName') == group_name and item.get('isGroup')
),
None,
)
return group
@app.command()
def show(ctx: typer.Context, scene_name: str, group_name: str):
"""Show a group in a scene."""
try:
resp = ctx.obj['obsws'].get_scene_item_list(scene_name)
if (group := _get_group(group_name, resp)) is None:
raise ObswsCliBadParameter(f"Group '{group_name}' not found in scene.")
ctx.obj['obsws'].set_scene_item_enabled(
scene_name=scene_name,
item_id=int(group.get('sceneItemId')),
enabled=True,
)
except obsws.error.OBSSDKRequestError as e:
if e.code == 600:
raise ObswsCliBadParameter(str(e)) from e
raise
@app.command()
def hide(ctx: typer.Context, scene_name: str, group_name: str):
"""Hide a group in a scene."""
try:
resp = ctx.obj['obsws'].get_scene_item_list(scene_name)
if (group := _get_group(group_name, resp)) is None:
raise ObswsCliBadParameter(f"Group '{group_name}' not found in scene.")
ctx.obj['obsws'].set_scene_item_enabled(
scene_name=scene_name,
item_id=int(group.get('sceneItemId')),
enabled=False,
)
except obsws.error.OBSSDKRequestError as e:
if e.code == 600:
raise ObswsCliBadParameter(str(e)) from e
raise
@app.command('list | ls')
def list(ctx: typer.Context, scene_name: str):
"""List groups in a scene."""
try:
resp = ctx.obj['obsws'].get_scene_item_list(scene_name)
groups = (
item.get('sourceName') for item in resp.scene_items if item.get('isGroup')
)
typer.echo('\n'.join(groups))
except obsws.error.OBSSDKRequestError as e:
if e.code == 600:
raise ObswsCliBadParameter(str(e)) from e
raise

89
obsws_cli/input.py Normal file
View File

@@ -0,0 +1,89 @@
"""module containing commands for manipulating inputs."""
import obsws_python as obsws
import typer
from .alias import AliasGroup
from .errors import ObswsCliBadParameter
from .protocols import DataclassProtocol
app = typer.Typer(cls=AliasGroup)
@app.callback()
def main():
"""Control inputs in OBS."""
@app.command('ls')
def list(ctx: typer.Context):
"""List all inputs."""
resp = ctx.obj['obsws'].get_input_list()
inputs = (input.get('inputName') for input in resp.inputs)
typer.echo('\n'.join(inputs))
def _get_input(input_name: str, resp: DataclassProtocol) -> dict | None:
"""Get an input from the input list response."""
input_ = next(
(input_ for input_ in resp.inputs if input_.get('inputName') == input_name),
None,
)
return input_
@app.command()
def mute(ctx: typer.Context, input_name: str):
"""Mute an input."""
try:
resp = ctx.obj['obsws'].get_input_list()
if (input_ := _get_input(input_name, resp)) is None:
raise ObswsCliBadParameter(f"Input '{input_name}' not found.")
ctx.obj['obsws'].set_input_mute(
name=input_.get('inputName'),
muted=True,
)
except obsws.error.OBSSDKRequestError as e:
if e.code == 600:
raise ObswsCliBadParameter(str(e)) from e
raise
@app.command()
def unmute(ctx: typer.Context, input_name: str):
"""Unmute an input."""
try:
resp = ctx.obj['obsws'].get_input_list()
if (input_ := _get_input(input_name, resp)) is None:
raise ObswsCliBadParameter(f"Input '{input_name}' not found.")
ctx.obj['obsws'].set_input_mute(
name=input_.get('inputName'),
muted=False,
)
except obsws.error.OBSSDKRequestError as e:
if e.code == 600:
raise ObswsCliBadParameter(str(e)) from e
raise
@app.command()
def toggle(ctx: typer.Context, input_name: str):
"""Toggle an input."""
try:
resp = ctx.obj['obsws'].get_input_list()
if (input_ := _get_input(input_name, resp)) is None:
raise ObswsCliBadParameter(f"Input '{input_name}' not found.")
resp = ctx.obj['obsws'].get_input_mute(name=input_.get('inputName'))
ctx.obj['obsws'].set_input_mute(
name=input_.get('inputName'),
muted=not resp.input_muted,
)
except obsws.error.OBSSDKRequestError as e:
if e.code == 600:
raise ObswsCliBadParameter(str(e)) from e
raise

61
obsws_cli/item.py Normal file
View File

@@ -0,0 +1,61 @@
"""module containing commands for manipulating items in scenes."""
import obsws_python as obsws
import typer
from .alias import AliasGroup
from .errors import ObswsCliBadParameter
app = typer.Typer(cls=AliasGroup)
@app.callback()
def main():
"""Control items in OBS scenes."""
@app.command()
def show(ctx: typer.Context, scene_name: str, item_name: str):
"""Show an item in a scene."""
try:
resp = ctx.obj['obsws'].get_scene_item_id(scene_name, item_name)
ctx.obj['obsws'].set_scene_item_enabled(
scene_name=scene_name,
item_id=int(resp.scene_item_id),
enabled=True,
)
except obsws.error.OBSSDKRequestError as e:
if e.code == 600:
raise ObswsCliBadParameter(str(e)) from e
raise
@app.command()
def hide(ctx: typer.Context, scene_name: str, item_name: str):
"""Hide an item in a scene."""
try:
resp = ctx.obj['obsws'].get_scene_item_id(scene_name, item_name)
ctx.obj['obsws'].set_scene_item_enabled(
scene_name=scene_name,
item_id=int(resp.scene_item_id),
enabled=False,
)
except obsws.error.OBSSDKRequestError as e:
if e.code == 600:
raise ObswsCliBadParameter(str(e)) from e
raise
@app.command('list | ls')
def list(ctx: typer.Context, scene_name: str):
"""List all items in a scene."""
try:
resp = ctx.obj['obsws'].get_scene_item_list(scene_name)
items = (item.get('sourceName') for item in resp.scene_items)
typer.echo('\n'.join(items))
except obsws.error.OBSSDKRequestError as e:
if e.code == 600:
raise ObswsCliBadParameter(str(e)) from e
raise

10
obsws_cli/protocols.py Normal file
View File

@@ -0,0 +1,10 @@
"""module defining protocols for type hinting."""
from dataclasses import dataclass
from typing import Protocol, runtime_checkable
@runtime_checkable
@dataclass
class DataclassProtocol(Protocol):
"""A protocol for dataclass-like structures."""

110
obsws_cli/record.py Normal file
View File

@@ -0,0 +1,110 @@
"""module for controlling OBS recording functionality."""
import obsws_python as obsws
import typer
from .errors import ObswsCliError
app = typer.Typer()
@app.callback()
def main():
"""Control OBS recording functionality."""
@app.command()
def start(ctx: typer.Context):
"""Start recording."""
try:
ctx.obj['obsws'].start_record()
typer.echo('Recording started successfully.')
except obsws.error.OBSSDKRequestError as e:
if e.code == 500:
raise ObswsCliError(
'Recording is already in progress, cannot start.'
) from e
raise
@app.command()
def stop(ctx: typer.Context):
"""Stop recording."""
try:
ctx.obj['obsws'].stop_record()
typer.echo('Recording stopped successfully.')
except obsws.error.OBSSDKRequestError as e:
if e.code == 501:
raise ObswsCliError('Recording is not in progress, cannot stop.') from e
raise
def _get_recording_status(ctx: typer.Context) -> tuple:
"""Get recording status."""
resp = ctx.obj['obsws'].get_record_status()
return resp.output_active, resp.output_paused
@app.command()
def status(ctx: typer.Context):
"""Get recording status."""
active, paused = _get_recording_status(ctx)
if active:
if paused:
typer.echo('Recording is in progress and paused.')
else:
typer.echo('Recording is in progress.')
else:
typer.echo('Recording is not in progress.')
@app.command()
def toggle(ctx: typer.Context):
"""Toggle recording."""
active, _ = _get_recording_status(ctx)
if active:
try:
ctx.obj['obsws'].stop_record()
typer.echo('Recording stopped successfully.')
except obsws.error.OBSSDKRequestError as e:
raise ObswsCliError(str(e)) from e
else:
try:
ctx.obj['obsws'].start_record()
typer.echo('Recording started successfully.')
except obsws.error.OBSSDKRequestError as e:
raise ObswsCliError(str(e)) from e
@app.command()
def resume(ctx: typer.Context):
"""Resume recording."""
active, paused = _get_recording_status(ctx)
if not active:
raise ObswsCliError('Recording is not in progress, cannot resume.')
if not paused:
raise ObswsCliError('Recording is in progress but not paused, cannot resume.')
try:
ctx.obj['obsws'].resume_record()
typer.echo('Recording resumed successfully.')
except obsws.error.OBSSDKRequestError as e:
raise ObswsCliError(str(e)) from e
@app.command()
def pause(ctx: typer.Context):
"""Pause recording."""
active, paused = _get_recording_status(ctx)
if not active:
raise ObswsCliError('Recording is not in progress, cannot pause.')
if paused:
raise ObswsCliError(
'Recording is in progress but already paused, cannot pause.'
)
try:
ctx.obj['obsws'].pause_record()
typer.echo('Recording paused successfully.')
except obsws.error.OBSSDKRequestError as e:
raise ObswsCliError(str(e)) from e

40
obsws_cli/scene.py Normal file
View File

@@ -0,0 +1,40 @@
"""module containing commands for controlling OBS scenes."""
import obsws_python as obsws
import typer
from .alias import AliasGroup
from .errors import ObswsCliBadParameter
app = typer.Typer(cls=AliasGroup)
@app.callback()
def main():
"""Control OBS scenes."""
@app.command('list | ls')
def list(ctx: typer.Context):
"""List all scenes."""
resp = ctx.obj['obsws'].get_scene_list()
scenes = (scene.get('sceneName') for scene in reversed(resp.scenes))
typer.echo('\n'.join(scenes))
@app.command('current | get')
def current(ctx: typer.Context):
"""Get the current program scene."""
resp = ctx.obj['obsws'].get_current_program_scene()
typer.echo(resp.current_program_scene_name)
@app.command('switch | set')
def switch(ctx: typer.Context, scene_name: str):
"""Switch to a scene."""
try:
ctx.obj['obsws'].set_current_program_scene(scene_name)
except obsws.error.OBSSDKRequestError as e:
if e.code == 600:
raise ObswsCliBadParameter(f"Scene '{scene_name}' not found.")
raise

87
obsws_cli/stream.py Normal file
View File

@@ -0,0 +1,87 @@
"""module for controlling OBS stream functionality."""
import obsws_python as obsws
import typer
from .errors import ObswsCliError
app = typer.Typer()
@app.callback()
def main():
"""Control OBS stream functionality."""
@app.command()
def start(ctx: typer.Context):
"""Start streaming."""
try:
ctx.obj['obsws'].start_stream()
typer.echo('Streaming started successfully.')
except obsws.error.OBSSDKRequestError as e:
if e.code == 500:
raise ObswsCliError(
'Streaming is already in progress, cannot start.'
) from e
raise
@app.command()
def stop(ctx: typer.Context):
"""Stop streaming."""
try:
ctx.obj['obsws'].stop_stream()
typer.echo('Streaming stopped successfully.')
except obsws.error.OBSSDKRequestError as e:
if e.code == 501:
raise ObswsCliError('Streaming is not in progress, cannot stop.') from e
raise
def _get_streaming_status(ctx: typer.Context) -> tuple:
"""Get streaming status."""
resp = ctx.obj['obsws'].get_stream_status()
return resp.output_active, resp.output_duration
@app.command()
def status(ctx: typer.Context):
"""Get streaming status."""
active, duration = _get_streaming_status(ctx)
if active:
if duration > 0:
seconds = duration / 1000
minutes = int(seconds // 60)
seconds = int(seconds % 60)
if minutes > 0:
typer.echo(
f'Streaming is in progress for {minutes} minutes and {seconds} seconds.'
)
else:
if seconds > 0:
typer.echo(f'Streaming is in progress for {seconds} seconds.')
else:
typer.echo('Streaming is in progress for less than a second.')
else:
typer.echo('Streaming is in progress.')
else:
typer.echo('Streaming is not in progress.')
@app.command()
def toggle(ctx: typer.Context):
"""Toggle streaming."""
active, _ = _get_streaming_status(ctx)
if active:
try:
ctx.obj['obsws'].stop_stream()
typer.echo('Streaming stopped successfully.')
except obsws.error.OBSSDKRequestError as e:
raise ObswsCliError(str(e)) from e
else:
try:
ctx.obj['obsws'].start_stream()
typer.echo('Streaming started successfully.')
except obsws.error.OBSSDKRequestError as e:
raise ObswsCliError(str(e)) from e