6 Commits

Author SHA1 Message Date
105aaf29b7 keep the exit codes simple (0 or 1) 2025-07-17 04:34:44 +01:00
eb34a1833f convert filter sub app 2025-07-17 04:29:21 +01:00
abbab5c746 raise OBSWSCLIError 2025-07-17 04:28:59 +01:00
f0eb518609 implement --version + debug validator
run() now handles exceptions/exit codes
2025-07-17 04:28:40 +01:00
032b957670 add custom error class + exit codes 2025-07-17 04:28:10 +01:00
8349a196e8 root meta app + scene sub_app converted
this could take a while...

note, --version flag not implemented
2025-07-17 03:18:04 +01:00
32 changed files with 308 additions and 760 deletions

View File

@@ -5,13 +5,6 @@ All notable changes to this project will be documented in this file.
The format is based on [Keep a Changelog](https://keepachangelog.com/en/1.0.0/),
and this project adheres to [Semantic Versioning](https://semver.org/spec/v2.0.0.html).
# [0.22.0] - 2026-01-09
### Added
- new subcommands added to input, see [Input](https://github.com/onyx-and-iris/obsws-cli?tab=readme-ov-file#input)
# [0.20.0] - 2025-07-14
### Added

View File

@@ -298,20 +298,6 @@ obsws-cli group status START "test_group"
#### Input
- create: Create a new input.
- args: <input_name> <input_kind>
```console
obsws-cli input create 'stream mix' 'wasapi_input_capture'
```
- remove: Remove an input.
- args: <input_name>
```console
obsws-cli input remove 'stream mix'
```
- list: List all inputs.
- flags:
@@ -329,12 +315,6 @@ obsws-cli input list
obsws-cli input list --input --colour
```
- list-kinds: List all input kinds.
```console
obsws-cli input list-kinds
```
- mute: Mute an input.
- args: <input_name>
@@ -355,32 +335,6 @@ obsws-cli input unmute "Mic/Aux"
obsws-cli input toggle "Mic/Aux"
```
- volume: Set the volume of an input.
- args: <input_name> <volume>
```console
obsws-cli input volume -- 'Desktop Audio' -38.9
```
- show: Show information for an input in the current scene.
- args: <input_name>
- flags:
*optional*
- --verbose: List all available input devices.
```console
obsws-cli input show 'Mic/Aux' --verbose
```
- update: Name of the input to update.
- args: <input_name> <device_name>
```console
obsws-cli input update 'Mic/Aux' 'Voicemeeter Out B1 (VB-Audio Voicemeeter VAIO)'
```
#### Text
- current: Get the current text for a text input.

View File

@@ -1,4 +1,4 @@
# SPDX-FileCopyrightText: 2025-present onyx-and-iris <code@onyxandiris.online>
#
# SPDX-License-Identifier: MIT
__version__ = '0.22.0'
__version__ = '0.20.2'

View File

@@ -2,6 +2,6 @@
#
# SPDX-License-Identifier: MIT
from .app import app
from .app import run
__all__ = ['app']
__all__ = ['run']

View File

@@ -2,166 +2,116 @@
import importlib
import logging
from typing import Annotated
from dataclasses import dataclass
from typing import Annotated, Any
import obsws_python as obsws
import typer
from cyclopts import App, Group, Parameter, config
from obsws_cli.__about__ import __version__ as version
from . import console, settings, styles
from .alias import RootTyperAliasGroup
from . import console, styles
from .context import Context
from .error import OBSWSCLIError
app = typer.Typer(cls=RootTyperAliasGroup)
for sub_typer in (
app = App(
config=config.Env(
'OBS_'
), # Environment variable prefix for configuration parameters
version=version,
)
app.meta.group_parameters = Group('Session Parameters', sort_key=0)
for sub_app in (
'filter',
'group',
'hotkey',
'input',
'profile',
'projector',
'record',
'replaybuffer',
'scene',
'scenecollection',
'sceneitem',
'screenshot',
'stream',
'studiomode',
'text',
'virtualcam',
):
module = importlib.import_module(f'.{sub_typer}', package=__package__)
app.add_typer(module.app, name=sub_typer)
module = importlib.import_module(f'.{sub_app}', package=__package__)
app.command(module.app)
def version_callback(value: bool):
"""Show the version of the CLI."""
if value:
console.out.print(f'obsws-cli version: {version}')
raise typer.Exit()
@Parameter(name='*')
@dataclass
class OBSConfig:
"""Dataclass to hold OBS connection parameters."""
host: str = 'localhost'
port: int = 4455
password: str = ''
def setup_logging(debug: bool):
@dataclass
class StyleConfig:
"""Dataclass to hold style parameters."""
name: str = 'disabled'
no_border: bool = False
def setup_logging(type_, value: Any):
"""Set up logging for the application."""
log_level = logging.DEBUG if debug else logging.CRITICAL
log_level = logging.DEBUG if value else logging.CRITICAL
logging.basicConfig(
level=log_level,
format='%(asctime)s - %(name)s - %(levelname)s - %(message)s',
)
def validate_style(value: str):
"""Validate and return the style."""
if value not in styles.registry:
raise typer.BadParameter(
f'Invalid style: {value}. Available styles: {", ".join(styles.registry.keys())}'
)
return value
@app.callback()
def main(
ctx: typer.Context,
host: Annotated[
str,
typer.Option(
'--host',
'-H',
envvar='OBS_HOST',
help='WebSocket host',
show_default='localhost',
@app.meta.default
def launcher(
*tokens: Annotated[str, Parameter(show=False, allow_leading_hyphen=True)],
obs_config: OBSConfig = Annotated[
OBSConfig,
Parameter(
show=False, allow_leading_hyphen=True, help='OBS connection parameters'
),
] = settings.get('host'),
port: Annotated[
int,
typer.Option(
'--port',
'-P',
envvar='OBS_PORT',
help='WebSocket port',
show_default=4455,
),
] = settings.get('port'),
password: Annotated[
str,
typer.Option(
'--password',
'-p',
envvar='OBS_PASSWORD',
help='WebSocket password',
show_default=False,
),
] = settings.get('password'),
timeout: Annotated[
int,
typer.Option(
'--timeout',
'-T',
envvar='OBS_TIMEOUT',
help='WebSocket timeout',
show_default=5,
),
] = settings.get('timeout'),
style: Annotated[
str,
typer.Option(
'--style',
'-s',
envvar='OBS_STYLE',
help='Set the style for the CLI output',
show_default='disabled',
callback=validate_style,
),
] = settings.get('style'),
no_border: Annotated[
bool,
typer.Option(
'--no-border',
'-b',
envvar='OBS_STYLE_NO_BORDER',
help='Disable table border styling in the CLI output',
show_default=False,
),
] = settings.get('style_no_border'),
version: Annotated[
bool,
typer.Option(
'--version',
'-v',
is_eager=True,
help='Show the CLI version and exit',
show_default=False,
callback=version_callback,
),
] = False,
],
style_config: StyleConfig = Annotated[
StyleConfig,
Parameter(show=False, allow_leading_hyphen=True, help='Style parameters'),
],
debug: Annotated[
bool,
typer.Option(
'--debug',
'-d',
envvar='OBS_DEBUG',
is_eager=True,
help='Enable debug logging',
show_default=False,
callback=setup_logging,
hidden=True,
),
] = settings.get('debug'),
Parameter(validator=setup_logging),
] = False,
):
"""obsws_cli is a command line interface for the OBS WebSocket API."""
ctx.ensure_object(dict)
ctx.obj['obsws'] = ctx.with_resource(
obsws.ReqClient(host=host, port=port, password=password, timeout=timeout)
"""Initialize the OBS WebSocket client and return the context."""
with obsws.ReqClient(
host=obs_config.host,
port=obs_config.port,
password=obs_config.password,
) as client:
additional_kwargs = {}
command, bound, ignored = app.parse_args(tokens)
if 'ctx' in ignored:
# If 'ctx' is in ignored, it means it was not passed as an argument
# and we need to add it to the bound arguments.
additional_kwargs['ctx'] = ignored['ctx'](
client,
styles.request_style_obj(style_config.name, style_config.no_border),
)
ctx.obj['style'] = styles.request_style_obj(style, no_border)
return command(*bound.args, **bound.kwargs, **additional_kwargs)
@app.command()
def obs_version(ctx: typer.Context):
@app.command
def obs_version(
*,
ctx: Annotated[Context, Parameter(parse=False)],
):
"""Get the OBS Client and WebSocket versions."""
resp = ctx.obj['obsws'].get_version()
resp = ctx.client.get_version()
console.out.print(
f'OBS Client version: {console.highlight(ctx, resp.obs_version)}'
f' with WebSocket version: {console.highlight(ctx, resp.obs_web_socket_version)}'
)
def run():
"""Run the OBS WebSocket CLI application.
Handles exceptions and prints error messages to the console.
"""
try:
app.meta()
except OBSWSCLIError as e:
console.err.print(f'Error: {e}')
return e.code

View File

@@ -1,12 +1,13 @@
"""module for console output handling in obsws_cli."""
import typer
from rich.console import Console
from .context import Context
out = Console()
err = Console(stderr=True, style='bold red')
def highlight(ctx: typer.Context, text: str) -> str:
def highlight(ctx: Context, text: str) -> str:
"""Highlight text using the current context's style."""
return f'[{ctx.obj["style"].highlight}]{text}[/{ctx.obj["style"].highlight}]'
return f'[{ctx.style.highlight}]{text}[/{ctx.style.highlight}]'

15
obsws_cli/context.py Normal file
View File

@@ -0,0 +1,15 @@
"""module for managing the application context."""
from dataclasses import dataclass
import obsws_python as obsws
from . import styles
@dataclass
class Context:
"""Context for the application, holding OBS and style configurations."""
client: obsws.ReqClient
style: styles.Style

10
obsws_cli/enum.py Normal file
View File

@@ -0,0 +1,10 @@
"""module for exit codes used in the application."""
from enum import IntEnum, auto
class ExitCode(IntEnum):
"""Exit codes for the application."""
SUCCESS = 0
ERROR = auto()

11
obsws_cli/error.py Normal file
View File

@@ -0,0 +1,11 @@
"""module containing error handling for OBS WebSocket CLI."""
class OBSWSCLIError(Exception):
"""Base class for OBS WebSocket CLI errors."""
def __init__(self, message: str, code: int = 1):
"""Initialize the error with a message and an optional code."""
super().__init__(message)
self.message = message
self.code = code

View File

@@ -3,44 +3,42 @@
from typing import Annotated, Optional
import obsws_python as obsws
import typer
from cyclopts import App, Argument, Parameter
from rich.table import Table
from rich.text import Text
from . import console, util
from .alias import SubTyperAliasGroup
from .context import Context
from .enum import ExitCode
from .error import OBSWSCLIError
app = typer.Typer(cls=SubTyperAliasGroup)
app = App(name='filter')
@app.callback()
def main():
"""Control filters in OBS scenes."""
@app.command('list | ls')
@app.command(name=['list', 'ls'])
def list_(
ctx: typer.Context,
source_name: Annotated[
Optional[str],
typer.Argument(
show_default='The current scene',
help='The source to list filters for',
Argument(
hint='The source to list filters for',
),
] = None,
/,
*,
ctx: Annotated[Context, Parameter(parse=False)],
):
"""List filters for a source."""
if not source_name:
source_name = ctx.obj['obsws'].get_current_program_scene().scene_name
source_name = ctx.client.get_current_program_scene().scene_name
try:
resp = ctx.obj['obsws'].get_source_filter_list(source_name)
resp = ctx.client.get_source_filter_list(source_name)
except obsws.error.OBSSDKRequestError as e:
if e.code == 600:
console.err.print(
f'No source was found by the name of [yellow]{source_name}[/yellow].'
raise OBSWSCLIError(
f'No source found by the name of [yellow]{source_name}[/yellow].',
code=ExitCode.ERROR,
)
raise typer.Exit(1)
else:
raise
@@ -48,25 +46,25 @@ def list_(
console.out.print(
f'No filters found for source {console.highlight(ctx, source_name)}'
)
raise typer.Exit()
return
table = Table(
title=f'Filters for Source: {source_name}',
padding=(0, 2),
border_style=ctx.obj['style'].border,
border_style=ctx.style.border,
)
columns = [
(Text('Filter Name', justify='center'), 'left', ctx.obj['style'].column),
(Text('Kind', justify='center'), 'left', ctx.obj['style'].column),
(Text('Filter Name', justify='center'), 'left', ctx.style.column),
(Text('Kind', justify='center'), 'left', ctx.style.column),
(Text('Enabled', justify='center'), 'center', None),
(Text('Settings', justify='center'), 'center', ctx.obj['style'].column),
(Text('Settings', justify='center'), 'center', ctx.style.column),
]
for heading, justify, style in columns:
table.add_column(heading, justify=justify, style=style)
for filter in resp.filters:
resp = ctx.obj['obsws'].get_source_filter_default_settings(filter['filterKind'])
resp = ctx.client.get_source_filter_default_settings(filter['filterKind'])
settings = resp.default_filter_settings | filter['filterSettings']
table.add_row(
@@ -84,93 +82,85 @@ def list_(
console.out.print(table)
def _get_filter_enabled(ctx: typer.Context, source_name: str, filter_name: str):
def _get_filter_enabled(ctx: Context, source_name: str, filter_name: str):
"""Get the status of a filter for a source."""
resp = ctx.obj['obsws'].get_source_filter(source_name, filter_name)
resp = ctx.client.get_source_filter(source_name, filter_name)
return resp.filter_enabled
@app.command('enable | on')
@app.command(name=['enable', 'on'])
def enable(
ctx: typer.Context,
source_name: Annotated[
str,
typer.Argument(
..., show_default=False, help='The source to enable the filter for'
),
Argument(hint='The source to enable the filter for'),
],
filter_name: Annotated[
str,
typer.Argument(
..., show_default=False, help='The name of the filter to enable'
),
Argument(hint='The name of the filter to enable'),
],
/,
*,
ctx: Annotated[Context, Parameter(parse=False)],
):
"""Enable a filter for a source."""
if _get_filter_enabled(ctx, source_name, filter_name):
console.err.print(
f'Filter [yellow]{filter_name}[/yellow] is already enabled for source [yellow]{source_name}[/yellow]'
raise OBSWSCLIError(
f'Filter [yellow]{filter_name}[/yellow] is already enabled for source [yellow]{source_name}[/yellow]',
code=ExitCode.ERROR,
)
raise typer.Exit(1)
ctx.obj['obsws'].set_source_filter_enabled(source_name, filter_name, enabled=True)
ctx.client.set_source_filter_enabled(source_name, filter_name, enabled=True)
console.out.print(
f'Enabled filter {console.highlight(ctx, filter_name)} for source {console.highlight(ctx, source_name)}'
)
@app.command('disable | off')
@app.command(name=['disable', 'off'])
def disable(
ctx: typer.Context,
source_name: Annotated[
str,
typer.Argument(
..., show_default=False, help='The source to disable the filter for'
),
Argument(hint='The source to disable the filter for'),
],
filter_name: Annotated[
str,
typer.Argument(
..., show_default=False, help='The name of the filter to disable'
),
Argument(hint='The name of the filter to disable'),
],
/,
*,
ctx: Annotated[Context, Parameter(parse=False)],
):
"""Disable a filter for a source."""
if not _get_filter_enabled(ctx, source_name, filter_name):
console.err.print(
f'Filter [yellow]{filter_name}[/yellow] is already disabled for source [yellow]{source_name}[/yellow]'
raise OBSWSCLIError(
f'Filter [yellow]{filter_name}[/yellow] is already disabled for source [yellow]{source_name}[/yellow]',
code=ExitCode.ERROR,
)
raise typer.Exit(1)
ctx.obj['obsws'].set_source_filter_enabled(source_name, filter_name, enabled=False)
ctx.client.set_source_filter_enabled(source_name, filter_name, enabled=False)
console.out.print(
f'Disabled filter {console.highlight(ctx, filter_name)} for source {console.highlight(ctx, source_name)}'
)
@app.command('toggle | tg')
@app.command(name=['toggle', 'tg'])
def toggle(
ctx: typer.Context,
source_name: Annotated[
str,
typer.Argument(
..., show_default=False, help='The source to toggle the filter for'
),
Argument(hint='The source to toggle the filter for'),
],
filter_name: Annotated[
str,
typer.Argument(
..., show_default=False, help='The name of the filter to toggle'
),
Argument(hint='The name of the filter to toggle'),
],
/,
*,
ctx: Annotated[Context, Parameter(parse=False)],
):
"""Toggle a filter for a source."""
is_enabled = _get_filter_enabled(ctx, source_name, filter_name)
new_state = not is_enabled
ctx.obj['obsws'].set_source_filter_enabled(
source_name, filter_name, enabled=new_state
)
ctx.client.set_source_filter_enabled(source_name, filter_name, enabled=new_state)
if new_state:
console.out.print(
f'Enabled filter {console.highlight(ctx, filter_name)} for source {console.highlight(ctx, source_name)}'
@@ -181,21 +171,19 @@ def toggle(
)
@app.command('status | ss')
@app.command(name=['status', 'ss'])
def status(
ctx: typer.Context,
source_name: Annotated[
str,
typer.Argument(
..., show_default=False, help='The source to get the filter status for'
),
Argument(hint='The source to get the filter status for'),
],
filter_name: Annotated[
str,
typer.Argument(
..., show_default=False, help='The name of the filter to get the status for'
),
Argument(hint='The name of the filter to get the status for'),
],
/,
*,
ctx: Annotated[Context, Parameter(parse=False)],
):
"""Get the status of a filter for a source."""
is_enabled = _get_filter_enabled(ctx, source_name, filter_name)

View File

@@ -24,10 +24,6 @@ def list_(
"""List all hotkeys."""
resp = ctx.obj['obsws'].get_hotkey_list()
if not resp.hotkeys:
console.out.print('No hotkeys found.')
raise typer.Exit()
table = Table(
title='Hotkeys',
padding=(0, 2),

View File

@@ -18,69 +18,6 @@ def main():
"""Control inputs in OBS."""
@app.command('create | add')
def create(
ctx: typer.Context,
input_name: Annotated[
str,
typer.Argument(
...,
show_default=False,
help='Name of the input to create.',
callback=validate.input_not_in_inputs,
),
],
input_kind: Annotated[
str,
typer.Argument(
...,
show_default=False,
help='Kind of the input to create.',
callback=validate.kind_in_input_kinds,
),
],
):
"""Create a new input."""
current_scene = (
ctx.obj['obsws'].get_current_program_scene().current_program_scene_name
)
try:
ctx.obj['obsws'].create_input(
inputName=input_name,
inputKind=input_kind,
sceneItemEnabled=True,
sceneName=current_scene,
inputSettings={},
)
except obsws.error.OBSSDKRequestError as e:
console.err.print(f'Failed to create input: [yellow]{e}[/yellow]')
raise typer.Exit(1)
console.out.print(
f'Input {console.highlight(ctx, input_name)} of kind '
f'{console.highlight(ctx, input_kind)} created.',
)
@app.command('remove | rm')
def remove(
ctx: typer.Context,
input_name: Annotated[
str,
typer.Argument(
...,
show_default=False,
help='Name of the input to remove.',
callback=validate.input_in_inputs,
),
],
):
"""Remove an input."""
ctx.obj['obsws'].remove_input(name=input_name)
console.out.print(f'Input {console.highlight(ctx, input_name)} removed.')
@app.command('list | ls')
def list_(
ctx: typer.Context,
@@ -168,47 +105,18 @@ def list_(
console.out.print(table)
@app.command('list-kinds | ls-k')
def list_kinds(
ctx: typer.Context,
):
"""List all input kinds."""
resp = ctx.obj['obsws'].get_input_kind_list(False)
kinds = sorted(resp.input_kinds)
if not kinds:
console.out.print('No input kinds found.')
raise typer.Exit()
table = Table(
title='Input Kinds', padding=(0, 2), border_style=ctx.obj['style'].border
)
table.add_column(
Text('Input Kind', justify='center'),
justify='left',
style=ctx.obj['style'].column,
)
for kind in kinds:
table.add_row(util.snakecase_to_titlecase(kind))
console.out.print(table)
@app.command('mute | m')
def mute(
ctx: typer.Context,
input_name: Annotated[
str,
typer.Argument(
...,
show_default=False,
help='Name of the input to mute.',
callback=validate.input_in_inputs,
),
str, typer.Argument(..., show_default=False, help='Name of the input to mute.')
],
):
"""Mute an input."""
if not validate.input_in_inputs(ctx, input_name):
console.err.print(f'Input [yellow]{input_name}[/yellow] not found.')
raise typer.Exit(1)
ctx.obj['obsws'].set_input_mute(
name=input_name,
muted=True,
@@ -222,15 +130,14 @@ def unmute(
ctx: typer.Context,
input_name: Annotated[
str,
typer.Argument(
...,
show_default=False,
help='Name of the input to unmute.',
callback=validate.input_in_inputs,
),
typer.Argument(..., show_default=False, help='Name of the input to unmute.'),
],
):
"""Unmute an input."""
if not validate.input_in_inputs(ctx, input_name):
console.err.print(f'Input [yellow]{input_name}[/yellow] not found.')
raise typer.Exit(1)
ctx.obj['obsws'].set_input_mute(
name=input_name,
muted=False,
@@ -244,15 +151,14 @@ def toggle(
ctx: typer.Context,
input_name: Annotated[
str,
typer.Argument(
...,
show_default=False,
help='Name of the input to toggle.',
callback=validate.input_in_inputs,
),
typer.Argument(..., show_default=False, help='Name of the input to toggle.'),
],
):
"""Toggle an input."""
if not validate.input_in_inputs(ctx, input_name):
console.err.print(f'Input [yellow]{input_name}[/yellow] not found.')
raise typer.Exit(1)
resp = ctx.obj['obsws'].get_input_mute(name=input_name)
new_state = not resp.input_muted
@@ -269,188 +175,3 @@ def toggle(
console.out.print(
f'Input {console.highlight(ctx, input_name)} unmuted.',
)
@app.command('volume | vol')
def volume(
ctx: typer.Context,
input_name: Annotated[
str,
typer.Argument(
...,
show_default=False,
help='Name of the input to set volume for.',
callback=validate.input_in_inputs,
),
],
volume: Annotated[
float,
typer.Argument(
...,
show_default=False,
help='Volume level to set (-90 to 0).',
min=-90,
max=0,
),
],
):
"""Set the volume of an input."""
ctx.obj['obsws'].set_input_volume(
name=input_name,
vol_db=volume,
)
console.out.print(
f'Input {console.highlight(ctx, input_name)} volume set to {console.highlight(ctx, volume)}.',
)
@app.command('show | s')
def show(
ctx: typer.Context,
input_name: Annotated[
str,
typer.Argument(
...,
show_default=False,
help='Name of the input to show.',
callback=validate.input_in_inputs,
),
],
verbose: Annotated[
bool, typer.Option(help='List all available input devices.')
] = False,
):
"""Show information for an input in the current scene."""
input_list = ctx.obj['obsws'].get_input_list()
for input_ in input_list.inputs:
if input_['inputName'] == input_name:
input_kind = input_['inputKind']
break
for prop in ['device', 'device_id']:
try:
device_id = (
ctx.obj['obsws']
.get_input_settings(
name=input_name,
)
.input_settings.get(prop)
)
if device_id:
break
except obsws.error.OBSSDKRequestError:
continue
else:
device_id = '(N/A)'
for device in (
ctx.obj['obsws']
.get_input_properties_list_property_items(
input_name=input_name,
prop_name=prop,
)
.property_items
):
if device.get('itemValue') == device_id:
device_id = device.get('itemName')
break
table = Table(
title='Input Information', padding=(0, 2), border_style=ctx.obj['style'].border
)
columns = [
(Text('Input Name', justify='center'), 'left', ctx.obj['style'].column),
(Text('Kind', justify='center'), 'left', ctx.obj['style'].column),
(Text('Device', justify='center'), 'left', ctx.obj['style'].column),
]
for heading, justify, style in columns:
table.add_column(heading, justify=justify, style=style)
table.add_row(
input_name,
util.snakecase_to_titlecase(input_kind),
device_id,
)
console.out.print(table)
if verbose:
resp = ctx.obj['obsws'].get_input_properties_list_property_items(
input_name=input_name,
prop_name=prop,
)
table = Table(
title='Devices',
padding=(0, 2),
border_style=ctx.obj['style'].border,
)
columns = [
(Text('Name', justify='center'), 'left', ctx.obj['style'].column),
]
for heading, justify, style in columns:
table.add_column(heading, justify=justify, style=style)
for i, item in enumerate(resp.property_items):
table.add_row(
item.get('itemName'),
style='' if i % 2 == 0 else 'dim',
)
console.out.print(table)
@app.command('update | upd')
def update(
ctx: typer.Context,
input_name: Annotated[
str,
typer.Argument(
...,
show_default=False,
help='Name of the input to update.',
callback=validate.input_in_inputs,
),
],
device_name: Annotated[
str,
typer.Argument(
...,
show_default=False,
help='Name of the device to set for the input.',
),
],
):
"""Update a setting for an input."""
device_id = None
for prop in ['device', 'device_id']:
try:
for device in (
ctx.obj['obsws']
.get_input_properties_list_property_items(
input_name=input_name,
prop_name=prop,
)
.property_items
):
if device.get('itemName') == device_name:
device_id = device.get('itemValue')
break
except obsws.error.OBSSDKRequestError:
continue
if device_id:
break
if not device_id:
console.err.print(
f'Failed to find device ID for device '
f'{console.highlight(ctx, device_name)}.',
)
raise typer.Exit(1)
ctx.obj['obsws'].set_input_settings(
name=input_name, settings={prop: device_id}, overlay=True
)
console.out.print(
f'Input {console.highlight(ctx, input_name)} updated to use device '
f'{console.highlight(ctx, device_name)}.',
)

View File

@@ -22,10 +22,6 @@ def list_(ctx: typer.Context):
"""List profiles."""
resp = ctx.obj['obsws'].get_profile_list()
if not resp.profiles:
console.out.print('No profiles found.')
raise typer.Exit()
table = Table(
title='Profiles', padding=(0, 2), border_style=ctx.obj['style'].border
)

View File

@@ -21,15 +21,16 @@ def main():
def list_monitors(ctx: typer.Context):
"""List available monitors."""
resp = ctx.obj['obsws'].get_monitor_list()
if not resp.monitors:
console.out.print('No monitors found.')
return
monitors = sorted(
((m['monitorIndex'], m['monitorName']) for m in resp.monitors),
key=lambda m: m[0],
)
if not monitors:
console.out.print('No monitors found.')
raise typer.Exit()
table = Table(
title='Available Monitors',
padding=(0, 2),

View File

@@ -2,49 +2,43 @@
from typing import Annotated
import typer
from cyclopts import App, Argument, Parameter
from rich.table import Table
from rich.text import Text
from . import console, util, validate
from .alias import SubTyperAliasGroup
from .context import Context
from .enum import ExitCode
from .error import OBSWSCLIError
app = typer.Typer(cls=SubTyperAliasGroup)
app = App(name='scene')
@app.callback()
def main():
"""Control OBS scenes."""
@app.command('list | ls')
@app.command(name=['list', 'ls'])
def list_(
ctx: typer.Context,
uuid: Annotated[bool, typer.Option(help='Show UUIDs of scenes')] = False,
uuid: Annotated[bool, Parameter(help='Show UUIDs of scenes')] = False,
*,
ctx: Annotated[Context, Parameter(parse=False)],
):
"""List all scenes."""
resp = ctx.obj['obsws'].get_scene_list()
resp = ctx.client.get_scene_list()
scenes = (
(scene.get('sceneName'), scene.get('sceneUuid'))
for scene in reversed(resp.scenes)
)
if not scenes:
console.out.print('No scenes found.')
raise typer.Exit()
active_scene = ctx.client.get_current_program_scene().scene_name
active_scene = ctx.obj['obsws'].get_current_program_scene().scene_name
table = Table(title='Scenes', padding=(0, 2), border_style=ctx.obj['style'].border)
table = Table(title='Scenes', padding=(0, 2), border_style=ctx.style.border)
if uuid:
columns = [
(Text('Scene Name', justify='center'), 'left', ctx.obj['style'].column),
(Text('Scene Name', justify='center'), 'left', ctx.style.column),
(Text('Active', justify='center'), 'center', None),
(Text('UUID', justify='center'), 'left', ctx.obj['style'].column),
(Text('UUID', justify='center'), 'left', ctx.style.column),
]
else:
columns = [
(Text('Scene Name', justify='center'), 'left', ctx.obj['style'].column),
(Text('Scene Name', justify='center'), 'left', ctx.style.column),
(Text('Active', justify='center'), 'center', None),
]
for heading, justify, style in columns:
@@ -66,57 +60,64 @@ def list_(
console.out.print(table)
@app.command('current | get')
@app.command(name=['current', 'get'])
def current(
ctx: typer.Context,
preview: Annotated[
bool, typer.Option(help='Get the preview scene instead of the program scene')
bool, Parameter(help='Get the preview scene instead of the program scene')
] = False,
*,
ctx: Annotated[Context, Parameter(parse=False)],
):
"""Get the current program scene or preview scene."""
if preview and not validate.studio_mode_enabled(ctx):
console.err.print('Studio mode is not enabled, cannot get preview scene.')
raise typer.Exit(1)
raise OBSWSCLIError(
'Studio mode is not enabled, cannot get preview scene.',
code=ExitCode.ERROR,
)
if preview:
resp = ctx.obj['obsws'].get_current_preview_scene()
resp = ctx.client.get_current_preview_scene()
console.out.print(
f'Current Preview Scene: {console.highlight(ctx, resp.current_preview_scene_name)}'
)
else:
resp = ctx.obj['obsws'].get_current_program_scene()
resp = ctx.client.get_current_program_scene()
console.out.print(
f'Current Program Scene: {console.highlight(ctx, resp.current_program_scene_name)}'
)
@app.command('switch | set')
@app.command(name=['switch', 'set'])
def switch(
ctx: typer.Context,
scene_name: Annotated[
str, typer.Argument(..., help='Name of the scene to switch to')
],
scene_name: Annotated[str, Argument(hint='Name of the scene to switch to')],
/,
preview: Annotated[
bool,
typer.Option(help='Switch to the preview scene instead of the program scene'),
Parameter(help='Switch to the preview scene instead of the program scene'),
] = False,
*,
ctx: Annotated[Context, Parameter(parse=False)],
):
"""Switch to a scene."""
if preview and not validate.studio_mode_enabled(ctx):
console.err.print('Studio mode is not enabled, cannot set the preview scene.')
raise typer.Exit(1)
raise OBSWSCLIError(
'Studio mode is not enabled, cannot switch to preview scene.',
code=ExitCode.ERROR,
)
if not validate.scene_in_scenes(ctx, scene_name):
console.err.print(f'Scene [yellow]{scene_name}[/yellow] not found.')
raise typer.Exit(1)
raise OBSWSCLIError(
f'Scene [yellow]{scene_name}[/yellow] not found.',
code=ExitCode.ERROR,
)
if preview:
ctx.obj['obsws'].set_current_preview_scene(scene_name)
ctx.client.set_current_preview_scene(scene_name)
console.out.print(
f'Switched to preview scene: {console.highlight(ctx, scene_name)}'
)
else:
ctx.obj['obsws'].set_current_program_scene(scene_name)
ctx.client.set_current_program_scene(scene_name)
console.out.print(
f'Switched to program scene: {console.highlight(ctx, scene_name)}'
)

View File

@@ -21,10 +21,6 @@ def list_(ctx: typer.Context):
"""List all scene collections."""
resp = ctx.obj['obsws'].get_scene_collection_list()
if not resp.scene_collections:
console.out.print('No scene collections found.')
raise typer.Exit()
table = Table(
title='Scene Collections',
padding=(0, 2),

View File

@@ -18,14 +18,13 @@ def main():
@app.command('current | get')
def current(
ctx: typer.Context,
input_name: Annotated[
str,
typer.Argument(
help='Name of the text input to get.', callback=validate.input_in_inputs
),
],
input_name: Annotated[str, typer.Argument(help='Name of the text input to get.')],
):
"""Get the current text for a text input."""
if not validate.input_in_inputs(ctx, input_name):
console.err.print(f'Input [yellow]{input_name}[/yellow] not found.')
raise typer.Exit(1)
resp = ctx.obj['obsws'].get_input_settings(name=input_name)
if not resp.input_kind.startswith('text_'):
console.err.print(
@@ -45,10 +44,7 @@ def current(
def update(
ctx: typer.Context,
input_name: Annotated[
str,
typer.Argument(
help='Name of the text input to update.', callback=validate.input_in_inputs
),
str, typer.Argument(help='Name of the text input to update.')
],
new_text: Annotated[
Optional[str],
@@ -58,6 +54,10 @@ def update(
] = None,
):
"""Update the text of a text input."""
if not validate.input_in_inputs(ctx, input_name):
console.err.print(f'Input [yellow]{input_name}[/yellow] not found.')
raise typer.Exit(1)
resp = ctx.obj['obsws'].get_input_settings(name=input_name)
if not resp.input_kind.startswith('text_'):
console.err.print(

View File

@@ -2,76 +2,53 @@
import typer
from . import console
from .context import Context
# type alias for an option that is skipped when the command is run
skipped_option = typer.Option(parser=lambda _: _, hidden=True, expose_value=False)
def input_in_inputs(ctx: typer.Context, input_name: str) -> bool:
"""Ensure the given input exists in the list of inputs."""
resp = ctx.obj['obsws'].get_input_list()
if not any(input.get('inputName') == input_name for input in resp.inputs):
console.err.print(f'Input [yellow]{input_name}[/yellow] does not exist.')
raise typer.Exit(1)
return input_name
def input_in_inputs(ctx: Context, input_name: str) -> bool:
"""Check if an input is in the input list."""
inputs = ctx.client.get_input_list().inputs
return any(input_.get('inputName') == input_name for input_ in inputs)
def input_not_in_inputs(ctx: typer.Context, input_name: str) -> bool:
"""Ensure an input does not already exist in the list of inputs."""
resp = ctx.obj['obsws'].get_input_list()
if any(input.get('inputName') == input_name for input in resp.inputs):
console.err.print(f'Input [yellow]{input_name}[/yellow] already exists.')
raise typer.Exit(1)
return input_name
def scene_in_scenes(ctx: typer.Context, scene_name: str) -> bool:
def scene_in_scenes(ctx: Context, scene_name: str) -> bool:
"""Check if a scene exists in the list of scenes."""
resp = ctx.obj['obsws'].get_scene_list()
resp = ctx.client.get_scene_list()
return any(scene.get('sceneName') == scene_name for scene in resp.scenes)
def studio_mode_enabled(ctx: typer.Context) -> bool:
def studio_mode_enabled(ctx: Context) -> bool:
"""Check if studio mode is enabled."""
resp = ctx.obj['obsws'].get_studio_mode_enabled()
resp = ctx.client.get_studio_mode_enabled()
return resp.studio_mode_enabled
def scene_collection_in_scene_collections(
ctx: typer.Context, scene_collection_name: str
ctx: Context, scene_collection_name: str
) -> bool:
"""Check if a scene collection exists."""
resp = ctx.obj['obsws'].get_scene_collection_list()
resp = ctx.client.get_scene_collection_list()
return any(
collection == scene_collection_name for collection in resp.scene_collections
)
def item_in_scene_item_list(
ctx: typer.Context, scene_name: str, item_name: str
) -> bool:
def item_in_scene_item_list(ctx: Context, scene_name: str, item_name: str) -> bool:
"""Check if an item exists in a scene."""
resp = ctx.obj['obsws'].get_scene_item_list(scene_name)
resp = ctx.client.get_scene_item_list(scene_name)
return any(item.get('sourceName') == item_name for item in resp.scene_items)
def profile_exists(ctx: typer.Context, profile_name: str) -> bool:
def profile_exists(ctx: Context, profile_name: str) -> bool:
"""Check if a profile exists."""
resp = ctx.obj['obsws'].get_profile_list()
resp = ctx.client.get_profile_list()
return any(profile == profile_name for profile in resp.profiles)
def monitor_exists(ctx: typer.Context, monitor_index: int) -> bool:
def monitor_exists(ctx: Context, monitor_index: int) -> bool:
"""Check if a monitor exists."""
resp = ctx.obj['obsws'].get_monitor_list()
resp = ctx.client.get_monitor_list()
return any(monitor['monitorIndex'] == monitor_index for monitor in resp.monitors)
def kind_in_input_kinds(ctx: typer.Context, input_kind: str) -> str:
"""Check if an input kind is valid."""
resp = ctx.obj['obsws'].get_input_kind_list(False)
if not any(kind == input_kind for kind in resp.input_kinds):
console.err.print(f'Input kind [yellow]{input_kind}[/yellow] not found.')
raise typer.Exit(1)
return input_kind

View File

@@ -21,7 +21,12 @@ classifiers = [
"Programming Language :: Python :: Implementation :: CPython",
"Programming Language :: Python :: Implementation :: PyPy",
]
dependencies = ["typer>=0.21.1", "obsws-python>=1.8.0", "python-dotenv>=1.1.0"]
dependencies = [
"cyclopts>=3.22.2",
"typer>=0.16.0",
"obsws-python>=1.8.0",
"python-dotenv>=1.1.0",
]
[project.urls]
@@ -30,7 +35,7 @@ Issues = "https://github.com/onyx-and-iris/obsws-cli/issues"
Source = "https://github.com/onyx-and-iris/obsws-cli"
[project.scripts]
obsws-cli = "obsws_cli:app"
obsws-cli = "obsws_cli:run"
[tool.hatch.version]
path = "obsws_cli/__about__.py"
@@ -42,6 +47,9 @@ dependencies = ["click-man>=0.5.1"]
cli = "obsws-cli {args:}"
man = "python man/generate.py --output=./man"
[tool.hatch.envs.lazyimports.scripts]
cli = "obsws-cli {args:}"
[tool.hatch.envs.hatch-test]
randomize = true

View File

@@ -1,7 +1,6 @@
"""pytest configuration file."""
import os
import time
import obsws_python as obsws
from dotenv import find_dotenv, load_dotenv
@@ -45,54 +44,9 @@ def pytest_sessionstart(session):
},
)
session.obsws.create_profile('pytest_profile')
time.sleep(0.1) # Wait for the profile to be created
session.obsws.set_profile_parameter(
'SimpleOutput',
'RecRB',
'true',
)
# hack to ensure the replay buffer is enabled
session.obsws.set_current_profile('Untitled')
session.obsws.set_current_profile('pytest_profile')
session.obsws.set_current_scene_collection('test-collection')
session.obsws.create_scene('pytest_scene')
# Ensure Desktop Audio is created.
desktop_audio_kinds = {
'windows': 'wasapi_output_capture',
'linux': 'pulse_output_capture',
'darwin': 'coreaudio_output_capture',
}
platform = os.environ.get('OBS_TESTS_PLATFORM', os.uname().sysname.lower())
try:
session.obsws.create_input(
sceneName='pytest_scene',
inputName='Desktop Audio',
inputKind=desktop_audio_kinds[platform],
inputSettings={'device_id': 'default'},
sceneItemEnabled=True,
)
except obsws.error.OBSSDKRequestError as e:
if e.code == 601:
"""input already exists, continue."""
# Ensure Mic/Aux is created.
mic_kinds = {
'windows': 'wasapi_input_capture',
'linux': 'pulse_input_capture',
'darwin': 'coreaudio_input_capture',
}
try:
session.obsws.create_input(
sceneName='pytest_scene',
inputName='Mic/Aux',
inputKind=mic_kinds[platform],
inputSettings={'device_id': 'default'},
sceneItemEnabled=True,
)
except obsws.error.OBSSDKRequestError as e:
if e.code == 601:
"""input already exists, continue."""
session.obsws.create_input(
sceneName='pytest_scene',
inputName='pytest_input',
@@ -177,7 +131,7 @@ def pytest_sessionfinish(session, exitstatus):
session.obsws.remove_scene('pytest_scene')
session.obsws.set_current_scene_collection('Untitled')
session.obsws.set_current_scene_collection('default')
resp = session.obsws.get_stream_status()
if resp.output_active:
@@ -195,8 +149,6 @@ def pytest_sessionfinish(session, exitstatus):
if resp.studio_mode_enabled:
session.obsws.set_studio_mode_enabled(False)
session.obsws.remove_profile('pytest_profile')
# Close the OBS WebSocket client connection
session.obsws.disconnect()

View File

@@ -4,7 +4,7 @@ from typer.testing import CliRunner
from obsws_cli.app import app
runner = CliRunner()
runner = CliRunner(mix_stderr=False)
def test_filter_list():

View File

@@ -1,18 +1,10 @@
"""Unit tests for the group command in the OBS WebSocket CLI."""
import os
import pytest
from typer.testing import CliRunner
from obsws_cli.app import app
runner = CliRunner()
if os.environ.get('OBS_TESTS_SKIP_GROUP_TESTS'):
pytest.skip(
'Skipping group tests as per environment variable', allow_module_level=True
)
runner = CliRunner(mix_stderr=False)
def test_group_list():

View File

@@ -4,7 +4,7 @@ from typer.testing import CliRunner
from obsws_cli.app import app
runner = CliRunner()
runner = CliRunner(mix_stderr=False)
def test_hotkey_list():

View File

@@ -4,7 +4,7 @@ from typer.testing import CliRunner
from obsws_cli.app import app
runner = CliRunner()
runner = CliRunner(mix_stderr=False)
def test_input_list():
@@ -13,7 +13,10 @@ def test_input_list():
assert result.exit_code == 0
assert 'Desktop Audio' in result.stdout
assert 'Mic/Aux' in result.stdout
assert all(item in result.stdout for item in ('pytest_input', 'pytest_input_2'))
assert all(
item in result.stdout
for item in ('Colour Source', 'Colour Source 2', 'Colour Source 3')
)
def test_input_list_filter_input():
@@ -36,6 +39,9 @@ def test_input_list_filter_colour():
"""Test the input list command with colour filter."""
result = runner.invoke(app, ['input', 'list', '--colour'])
assert result.exit_code == 0
assert all(item in result.stdout for item in ('pytest_input', 'pytest_input_2'))
assert all(
item in result.stdout
for item in ('Colour Source', 'Colour Source 2', 'Colour Source 3')
)
assert 'Desktop Audio' not in result.stdout
assert 'Mic/Aux' not in result.stdout

View File

@@ -6,7 +6,7 @@ from typer.testing import CliRunner
from obsws_cli.app import app
runner = CliRunner()
runner = CliRunner(mix_stderr=False)
def test_record_start():
@@ -49,9 +49,7 @@ def test_record_toggle():
result = runner.invoke(app, ['record', 'toggle'])
assert result.exit_code == 0
time.sleep(0.5) # Wait for the recording to toggle
if active:
assert 'Recording stopped successfully.' in result.stdout
else:

View File

@@ -1,20 +1,10 @@
"""Unit tests for the replaybuffer command in the OBS WebSocket CLI."""
import os
import time
import pytest
from typer.testing import CliRunner
from obsws_cli.app import app
runner = CliRunner()
if os.environ.get('OBS_TESTS_SKIP_REPLAYBUFFER_TESTS'):
pytest.skip(
'Skipping replaybuffer tests as per environment variable',
allow_module_level=True,
)
runner = CliRunner(mix_stderr=False)
def test_replaybuffer_start():
@@ -24,9 +14,6 @@ def test_replaybuffer_start():
active = 'Replay buffer is active.' in resp.stdout
resp = runner.invoke(app, ['replaybuffer', 'start'])
time.sleep(0.5) # Wait for the replay buffer to start
if active:
assert resp.exit_code != 0
assert 'Replay buffer is already active.' in resp.stderr
@@ -42,9 +29,6 @@ def test_replaybuffer_stop():
active = 'Replay buffer is active.' in resp.stdout
resp = runner.invoke(app, ['replaybuffer', 'stop'])
time.sleep(0.5) # Wait for the replay buffer to stop
if not active:
assert resp.exit_code != 0
assert 'Replay buffer is not active.' in resp.stderr
@@ -60,11 +44,9 @@ def test_replaybuffer_toggle():
active = 'Replay buffer is active.' in resp.stdout
resp = runner.invoke(app, ['replaybuffer', 'toggle'])
assert resp.exit_code == 0
time.sleep(0.5) # Wait for the replay buffer to toggle
if active:
assert resp.exit_code == 0
assert 'Replay buffer is not active.' in resp.stdout
else:
assert resp.exit_code == 0
assert 'Replay buffer is active.' in resp.stdout

View File

@@ -4,7 +4,7 @@ from typer.testing import CliRunner
from obsws_cli.app import app
runner = CliRunner()
runner = CliRunner(mix_stderr=False)
def test_scene_list():

View File

@@ -4,7 +4,7 @@ from typer.testing import CliRunner
from obsws_cli.app import app
runner = CliRunner()
runner = CliRunner(mix_stderr=False)
def test_sceneitem_list():

View File

@@ -6,7 +6,7 @@ from typer.testing import CliRunner
from obsws_cli.app import app
runner = CliRunner()
runner = CliRunner(mix_stderr=False)
def test_stream_start():
@@ -23,7 +23,7 @@ def test_stream_start():
else:
assert result.exit_code == 0
assert 'Streaming started successfully.' in result.stdout
time.sleep(0.5) # Wait for the streaming to start
time.sleep(1) # Wait for the streaming to start
def test_stream_stop():
@@ -37,7 +37,7 @@ def test_stream_stop():
if active:
assert result.exit_code == 0
assert 'Streaming stopped successfully.' in result.stdout
time.sleep(0.5) # Wait for the streaming to stop
time.sleep(1) # Wait for the streaming to stop
else:
assert result.exit_code != 0
assert 'Streaming is not in progress, cannot stop.' in result.stderr
@@ -52,7 +52,7 @@ def test_stream_toggle():
result = runner.invoke(app, ['stream', 'toggle'])
assert result.exit_code == 0
time.sleep(0.5) # Wait for the stream to toggle
time.sleep(1) # Wait for the stream to toggle
if active:
assert 'Streaming stopped successfully.' in result.stdout

View File

@@ -4,7 +4,7 @@ from typer.testing import CliRunner
from obsws_cli.app import app
runner = CliRunner()
runner = CliRunner(mix_stderr=False)
def test_studio_enable():

View File

@@ -4,7 +4,7 @@ from typer.testing import CliRunner
from obsws_cli.app import app
runner = CliRunner()
runner = CliRunner(mix_stderr=False)
def test_text_update():

View File

@@ -4,7 +4,7 @@ from typer.testing import CliRunner
from obsws_cli.app import app
runner = CliRunner()
runner = CliRunner(mix_stderr=False)
def test_version():