6 Commits

Author SHA1 Message Date
105aaf29b7 keep the exit codes simple (0 or 1) 2025-07-17 04:34:44 +01:00
eb34a1833f convert filter sub app 2025-07-17 04:29:21 +01:00
abbab5c746 raise OBSWSCLIError 2025-07-17 04:28:59 +01:00
f0eb518609 implement --version + debug validator
run() now handles exceptions/exit codes
2025-07-17 04:28:40 +01:00
032b957670 add custom error class + exit codes 2025-07-17 04:28:10 +01:00
8349a196e8 root meta app + scene sub_app converted
this could take a while...

note, --version flag not implemented
2025-07-17 03:18:04 +01:00
28 changed files with 279 additions and 379 deletions

View File

@@ -1,4 +1,4 @@
# SPDX-FileCopyrightText: 2025-present onyx-and-iris <code@onyxandiris.online> # SPDX-FileCopyrightText: 2025-present onyx-and-iris <code@onyxandiris.online>
# #
# SPDX-License-Identifier: MIT # SPDX-License-Identifier: MIT
__version__ = '0.21.0' __version__ = '0.20.2'

View File

@@ -2,6 +2,6 @@
# #
# SPDX-License-Identifier: MIT # SPDX-License-Identifier: MIT
from .app import app from .app import run
__all__ = ['app'] __all__ = ['run']

View File

@@ -2,166 +2,116 @@
import importlib import importlib
import logging import logging
from typing import Annotated from dataclasses import dataclass
from typing import Annotated, Any
import obsws_python as obsws import obsws_python as obsws
import typer from cyclopts import App, Group, Parameter, config
from obsws_cli.__about__ import __version__ as version from obsws_cli.__about__ import __version__ as version
from . import console, settings, styles from . import console, styles
from .alias import RootTyperAliasGroup from .context import Context
from .error import OBSWSCLIError
app = typer.Typer(cls=RootTyperAliasGroup) app = App(
for sub_typer in ( config=config.Env(
'OBS_'
), # Environment variable prefix for configuration parameters
version=version,
)
app.meta.group_parameters = Group('Session Parameters', sort_key=0)
for sub_app in (
'filter', 'filter',
'group',
'hotkey',
'input',
'profile',
'projector',
'record',
'replaybuffer',
'scene', 'scene',
'scenecollection',
'sceneitem',
'screenshot',
'stream',
'studiomode',
'text',
'virtualcam',
): ):
module = importlib.import_module(f'.{sub_typer}', package=__package__) module = importlib.import_module(f'.{sub_app}', package=__package__)
app.add_typer(module.app, name=sub_typer) app.command(module.app)
def version_callback(value: bool): @Parameter(name='*')
"""Show the version of the CLI.""" @dataclass
if value: class OBSConfig:
console.out.print(f'obsws-cli version: {version}') """Dataclass to hold OBS connection parameters."""
raise typer.Exit()
host: str = 'localhost'
port: int = 4455
password: str = ''
def setup_logging(debug: bool): @dataclass
class StyleConfig:
"""Dataclass to hold style parameters."""
name: str = 'disabled'
no_border: bool = False
def setup_logging(type_, value: Any):
"""Set up logging for the application.""" """Set up logging for the application."""
log_level = logging.DEBUG if debug else logging.CRITICAL log_level = logging.DEBUG if value else logging.CRITICAL
logging.basicConfig( logging.basicConfig(
level=log_level, level=log_level,
format='%(asctime)s - %(name)s - %(levelname)s - %(message)s', format='%(asctime)s - %(name)s - %(levelname)s - %(message)s',
) )
def validate_style(value: str): @app.meta.default
"""Validate and return the style.""" def launcher(
if value not in styles.registry: *tokens: Annotated[str, Parameter(show=False, allow_leading_hyphen=True)],
raise typer.BadParameter( obs_config: OBSConfig = Annotated[
f'Invalid style: {value}. Available styles: {", ".join(styles.registry.keys())}' OBSConfig,
) Parameter(
return value show=False, allow_leading_hyphen=True, help='OBS connection parameters'
@app.callback()
def main(
ctx: typer.Context,
host: Annotated[
str,
typer.Option(
'--host',
'-H',
envvar='OBS_HOST',
help='WebSocket host',
show_default='localhost',
), ),
] = settings.get('host'), ],
port: Annotated[ style_config: StyleConfig = Annotated[
int, StyleConfig,
typer.Option( Parameter(show=False, allow_leading_hyphen=True, help='Style parameters'),
'--port', ],
'-P',
envvar='OBS_PORT',
help='WebSocket port',
show_default=4455,
),
] = settings.get('port'),
password: Annotated[
str,
typer.Option(
'--password',
'-p',
envvar='OBS_PASSWORD',
help='WebSocket password',
show_default=False,
),
] = settings.get('password'),
timeout: Annotated[
int,
typer.Option(
'--timeout',
'-T',
envvar='OBS_TIMEOUT',
help='WebSocket timeout',
show_default=5,
),
] = settings.get('timeout'),
style: Annotated[
str,
typer.Option(
'--style',
'-s',
envvar='OBS_STYLE',
help='Set the style for the CLI output',
show_default='disabled',
callback=validate_style,
),
] = settings.get('style'),
no_border: Annotated[
bool,
typer.Option(
'--no-border',
'-b',
envvar='OBS_STYLE_NO_BORDER',
help='Disable table border styling in the CLI output',
show_default=False,
),
] = settings.get('style_no_border'),
version: Annotated[
bool,
typer.Option(
'--version',
'-v',
is_eager=True,
help='Show the CLI version and exit',
show_default=False,
callback=version_callback,
),
] = False,
debug: Annotated[ debug: Annotated[
bool, bool,
typer.Option( Parameter(validator=setup_logging),
'--debug', ] = False,
'-d',
envvar='OBS_DEBUG',
is_eager=True,
help='Enable debug logging',
show_default=False,
callback=setup_logging,
hidden=True,
),
] = settings.get('debug'),
): ):
"""obsws_cli is a command line interface for the OBS WebSocket API.""" """Initialize the OBS WebSocket client and return the context."""
ctx.ensure_object(dict) with obsws.ReqClient(
ctx.obj['obsws'] = ctx.with_resource( host=obs_config.host,
obsws.ReqClient(host=host, port=port, password=password, timeout=timeout) port=obs_config.port,
password=obs_config.password,
) as client:
additional_kwargs = {}
command, bound, ignored = app.parse_args(tokens)
if 'ctx' in ignored:
# If 'ctx' is in ignored, it means it was not passed as an argument
# and we need to add it to the bound arguments.
additional_kwargs['ctx'] = ignored['ctx'](
client,
styles.request_style_obj(style_config.name, style_config.no_border),
) )
ctx.obj['style'] = styles.request_style_obj(style, no_border) return command(*bound.args, **bound.kwargs, **additional_kwargs)
@app.command() @app.command
def obs_version(ctx: typer.Context): def obs_version(
*,
ctx: Annotated[Context, Parameter(parse=False)],
):
"""Get the OBS Client and WebSocket versions.""" """Get the OBS Client and WebSocket versions."""
resp = ctx.obj['obsws'].get_version() resp = ctx.client.get_version()
console.out.print( console.out.print(
f'OBS Client version: {console.highlight(ctx, resp.obs_version)}' f'OBS Client version: {console.highlight(ctx, resp.obs_version)}'
f' with WebSocket version: {console.highlight(ctx, resp.obs_web_socket_version)}' f' with WebSocket version: {console.highlight(ctx, resp.obs_web_socket_version)}'
) )
def run():
"""Run the OBS WebSocket CLI application.
Handles exceptions and prints error messages to the console.
"""
try:
app.meta()
except OBSWSCLIError as e:
console.err.print(f'Error: {e}')
return e.code

View File

@@ -1,12 +1,13 @@
"""module for console output handling in obsws_cli.""" """module for console output handling in obsws_cli."""
import typer
from rich.console import Console from rich.console import Console
from .context import Context
out = Console() out = Console()
err = Console(stderr=True, style='bold red') err = Console(stderr=True, style='bold red')
def highlight(ctx: typer.Context, text: str) -> str: def highlight(ctx: Context, text: str) -> str:
"""Highlight text using the current context's style.""" """Highlight text using the current context's style."""
return f'[{ctx.obj["style"].highlight}]{text}[/{ctx.obj["style"].highlight}]' return f'[{ctx.style.highlight}]{text}[/{ctx.style.highlight}]'

15
obsws_cli/context.py Normal file
View File

@@ -0,0 +1,15 @@
"""module for managing the application context."""
from dataclasses import dataclass
import obsws_python as obsws
from . import styles
@dataclass
class Context:
"""Context for the application, holding OBS and style configurations."""
client: obsws.ReqClient
style: styles.Style

10
obsws_cli/enum.py Normal file
View File

@@ -0,0 +1,10 @@
"""module for exit codes used in the application."""
from enum import IntEnum, auto
class ExitCode(IntEnum):
"""Exit codes for the application."""
SUCCESS = 0
ERROR = auto()

11
obsws_cli/error.py Normal file
View File

@@ -0,0 +1,11 @@
"""module containing error handling for OBS WebSocket CLI."""
class OBSWSCLIError(Exception):
"""Base class for OBS WebSocket CLI errors."""
def __init__(self, message: str, code: int = 1):
"""Initialize the error with a message and an optional code."""
super().__init__(message)
self.message = message
self.code = code

View File

@@ -3,44 +3,42 @@
from typing import Annotated, Optional from typing import Annotated, Optional
import obsws_python as obsws import obsws_python as obsws
import typer from cyclopts import App, Argument, Parameter
from rich.table import Table from rich.table import Table
from rich.text import Text from rich.text import Text
from . import console, util from . import console, util
from .alias import SubTyperAliasGroup from .context import Context
from .enum import ExitCode
from .error import OBSWSCLIError
app = typer.Typer(cls=SubTyperAliasGroup) app = App(name='filter')
@app.callback() @app.command(name=['list', 'ls'])
def main():
"""Control filters in OBS scenes."""
@app.command('list | ls')
def list_( def list_(
ctx: typer.Context,
source_name: Annotated[ source_name: Annotated[
Optional[str], Optional[str],
typer.Argument( Argument(
show_default='The current scene', hint='The source to list filters for',
help='The source to list filters for',
), ),
] = None, ] = None,
/,
*,
ctx: Annotated[Context, Parameter(parse=False)],
): ):
"""List filters for a source.""" """List filters for a source."""
if not source_name: if not source_name:
source_name = ctx.obj['obsws'].get_current_program_scene().scene_name source_name = ctx.client.get_current_program_scene().scene_name
try: try:
resp = ctx.obj['obsws'].get_source_filter_list(source_name) resp = ctx.client.get_source_filter_list(source_name)
except obsws.error.OBSSDKRequestError as e: except obsws.error.OBSSDKRequestError as e:
if e.code == 600: if e.code == 600:
console.err.print( raise OBSWSCLIError(
f'No source was found by the name of [yellow]{source_name}[/yellow].' f'No source found by the name of [yellow]{source_name}[/yellow].',
code=ExitCode.ERROR,
) )
raise typer.Exit(1)
else: else:
raise raise
@@ -48,25 +46,25 @@ def list_(
console.out.print( console.out.print(
f'No filters found for source {console.highlight(ctx, source_name)}' f'No filters found for source {console.highlight(ctx, source_name)}'
) )
raise typer.Exit() return
table = Table( table = Table(
title=f'Filters for Source: {source_name}', title=f'Filters for Source: {source_name}',
padding=(0, 2), padding=(0, 2),
border_style=ctx.obj['style'].border, border_style=ctx.style.border,
) )
columns = [ columns = [
(Text('Filter Name', justify='center'), 'left', ctx.obj['style'].column), (Text('Filter Name', justify='center'), 'left', ctx.style.column),
(Text('Kind', justify='center'), 'left', ctx.obj['style'].column), (Text('Kind', justify='center'), 'left', ctx.style.column),
(Text('Enabled', justify='center'), 'center', None), (Text('Enabled', justify='center'), 'center', None),
(Text('Settings', justify='center'), 'center', ctx.obj['style'].column), (Text('Settings', justify='center'), 'center', ctx.style.column),
] ]
for heading, justify, style in columns: for heading, justify, style in columns:
table.add_column(heading, justify=justify, style=style) table.add_column(heading, justify=justify, style=style)
for filter in resp.filters: for filter in resp.filters:
resp = ctx.obj['obsws'].get_source_filter_default_settings(filter['filterKind']) resp = ctx.client.get_source_filter_default_settings(filter['filterKind'])
settings = resp.default_filter_settings | filter['filterSettings'] settings = resp.default_filter_settings | filter['filterSettings']
table.add_row( table.add_row(
@@ -84,93 +82,85 @@ def list_(
console.out.print(table) console.out.print(table)
def _get_filter_enabled(ctx: typer.Context, source_name: str, filter_name: str): def _get_filter_enabled(ctx: Context, source_name: str, filter_name: str):
"""Get the status of a filter for a source.""" """Get the status of a filter for a source."""
resp = ctx.obj['obsws'].get_source_filter(source_name, filter_name) resp = ctx.client.get_source_filter(source_name, filter_name)
return resp.filter_enabled return resp.filter_enabled
@app.command('enable | on') @app.command(name=['enable', 'on'])
def enable( def enable(
ctx: typer.Context,
source_name: Annotated[ source_name: Annotated[
str, str,
typer.Argument( Argument(hint='The source to enable the filter for'),
..., show_default=False, help='The source to enable the filter for'
),
], ],
filter_name: Annotated[ filter_name: Annotated[
str, str,
typer.Argument( Argument(hint='The name of the filter to enable'),
..., show_default=False, help='The name of the filter to enable'
),
], ],
/,
*,
ctx: Annotated[Context, Parameter(parse=False)],
): ):
"""Enable a filter for a source.""" """Enable a filter for a source."""
if _get_filter_enabled(ctx, source_name, filter_name): if _get_filter_enabled(ctx, source_name, filter_name):
console.err.print( raise OBSWSCLIError(
f'Filter [yellow]{filter_name}[/yellow] is already enabled for source [yellow]{source_name}[/yellow]' f'Filter [yellow]{filter_name}[/yellow] is already enabled for source [yellow]{source_name}[/yellow]',
code=ExitCode.ERROR,
) )
raise typer.Exit(1)
ctx.obj['obsws'].set_source_filter_enabled(source_name, filter_name, enabled=True) ctx.client.set_source_filter_enabled(source_name, filter_name, enabled=True)
console.out.print( console.out.print(
f'Enabled filter {console.highlight(ctx, filter_name)} for source {console.highlight(ctx, source_name)}' f'Enabled filter {console.highlight(ctx, filter_name)} for source {console.highlight(ctx, source_name)}'
) )
@app.command('disable | off') @app.command(name=['disable', 'off'])
def disable( def disable(
ctx: typer.Context,
source_name: Annotated[ source_name: Annotated[
str, str,
typer.Argument( Argument(hint='The source to disable the filter for'),
..., show_default=False, help='The source to disable the filter for'
),
], ],
filter_name: Annotated[ filter_name: Annotated[
str, str,
typer.Argument( Argument(hint='The name of the filter to disable'),
..., show_default=False, help='The name of the filter to disable'
),
], ],
/,
*,
ctx: Annotated[Context, Parameter(parse=False)],
): ):
"""Disable a filter for a source.""" """Disable a filter for a source."""
if not _get_filter_enabled(ctx, source_name, filter_name): if not _get_filter_enabled(ctx, source_name, filter_name):
console.err.print( raise OBSWSCLIError(
f'Filter [yellow]{filter_name}[/yellow] is already disabled for source [yellow]{source_name}[/yellow]' f'Filter [yellow]{filter_name}[/yellow] is already disabled for source [yellow]{source_name}[/yellow]',
code=ExitCode.ERROR,
) )
raise typer.Exit(1)
ctx.obj['obsws'].set_source_filter_enabled(source_name, filter_name, enabled=False) ctx.client.set_source_filter_enabled(source_name, filter_name, enabled=False)
console.out.print( console.out.print(
f'Disabled filter {console.highlight(ctx, filter_name)} for source {console.highlight(ctx, source_name)}' f'Disabled filter {console.highlight(ctx, filter_name)} for source {console.highlight(ctx, source_name)}'
) )
@app.command('toggle | tg') @app.command(name=['toggle', 'tg'])
def toggle( def toggle(
ctx: typer.Context,
source_name: Annotated[ source_name: Annotated[
str, str,
typer.Argument( Argument(hint='The source to toggle the filter for'),
..., show_default=False, help='The source to toggle the filter for'
),
], ],
filter_name: Annotated[ filter_name: Annotated[
str, str,
typer.Argument( Argument(hint='The name of the filter to toggle'),
..., show_default=False, help='The name of the filter to toggle'
),
], ],
/,
*,
ctx: Annotated[Context, Parameter(parse=False)],
): ):
"""Toggle a filter for a source.""" """Toggle a filter for a source."""
is_enabled = _get_filter_enabled(ctx, source_name, filter_name) is_enabled = _get_filter_enabled(ctx, source_name, filter_name)
new_state = not is_enabled new_state = not is_enabled
ctx.obj['obsws'].set_source_filter_enabled( ctx.client.set_source_filter_enabled(source_name, filter_name, enabled=new_state)
source_name, filter_name, enabled=new_state
)
if new_state: if new_state:
console.out.print( console.out.print(
f'Enabled filter {console.highlight(ctx, filter_name)} for source {console.highlight(ctx, source_name)}' f'Enabled filter {console.highlight(ctx, filter_name)} for source {console.highlight(ctx, source_name)}'
@@ -181,21 +171,19 @@ def toggle(
) )
@app.command('status | ss') @app.command(name=['status', 'ss'])
def status( def status(
ctx: typer.Context,
source_name: Annotated[ source_name: Annotated[
str, str,
typer.Argument( Argument(hint='The source to get the filter status for'),
..., show_default=False, help='The source to get the filter status for'
),
], ],
filter_name: Annotated[ filter_name: Annotated[
str, str,
typer.Argument( Argument(hint='The name of the filter to get the status for'),
..., show_default=False, help='The name of the filter to get the status for'
),
], ],
/,
*,
ctx: Annotated[Context, Parameter(parse=False)],
): ):
"""Get the status of a filter for a source.""" """Get the status of a filter for a source."""
is_enabled = _get_filter_enabled(ctx, source_name, filter_name) is_enabled = _get_filter_enabled(ctx, source_name, filter_name)

View File

@@ -24,10 +24,6 @@ def list_(
"""List all hotkeys.""" """List all hotkeys."""
resp = ctx.obj['obsws'].get_hotkey_list() resp = ctx.obj['obsws'].get_hotkey_list()
if not resp.hotkeys:
console.out.print('No hotkeys found.')
raise typer.Exit()
table = Table( table = Table(
title='Hotkeys', title='Hotkeys',
padding=(0, 2), padding=(0, 2),

View File

@@ -22,10 +22,6 @@ def list_(ctx: typer.Context):
"""List profiles.""" """List profiles."""
resp = ctx.obj['obsws'].get_profile_list() resp = ctx.obj['obsws'].get_profile_list()
if not resp.profiles:
console.out.print('No profiles found.')
raise typer.Exit()
table = Table( table = Table(
title='Profiles', padding=(0, 2), border_style=ctx.obj['style'].border title='Profiles', padding=(0, 2), border_style=ctx.obj['style'].border
) )

View File

@@ -21,15 +21,16 @@ def main():
def list_monitors(ctx: typer.Context): def list_monitors(ctx: typer.Context):
"""List available monitors.""" """List available monitors."""
resp = ctx.obj['obsws'].get_monitor_list() resp = ctx.obj['obsws'].get_monitor_list()
if not resp.monitors:
console.out.print('No monitors found.')
return
monitors = sorted( monitors = sorted(
((m['monitorIndex'], m['monitorName']) for m in resp.monitors), ((m['monitorIndex'], m['monitorName']) for m in resp.monitors),
key=lambda m: m[0], key=lambda m: m[0],
) )
if not monitors:
console.out.print('No monitors found.')
raise typer.Exit()
table = Table( table = Table(
title='Available Monitors', title='Available Monitors',
padding=(0, 2), padding=(0, 2),

View File

@@ -2,49 +2,43 @@
from typing import Annotated from typing import Annotated
import typer from cyclopts import App, Argument, Parameter
from rich.table import Table from rich.table import Table
from rich.text import Text from rich.text import Text
from . import console, util, validate from . import console, util, validate
from .alias import SubTyperAliasGroup from .context import Context
from .enum import ExitCode
from .error import OBSWSCLIError
app = typer.Typer(cls=SubTyperAliasGroup) app = App(name='scene')
@app.callback() @app.command(name=['list', 'ls'])
def main():
"""Control OBS scenes."""
@app.command('list | ls')
def list_( def list_(
ctx: typer.Context, uuid: Annotated[bool, Parameter(help='Show UUIDs of scenes')] = False,
uuid: Annotated[bool, typer.Option(help='Show UUIDs of scenes')] = False, *,
ctx: Annotated[Context, Parameter(parse=False)],
): ):
"""List all scenes.""" """List all scenes."""
resp = ctx.obj['obsws'].get_scene_list() resp = ctx.client.get_scene_list()
scenes = ( scenes = (
(scene.get('sceneName'), scene.get('sceneUuid')) (scene.get('sceneName'), scene.get('sceneUuid'))
for scene in reversed(resp.scenes) for scene in reversed(resp.scenes)
) )
if not scenes: active_scene = ctx.client.get_current_program_scene().scene_name
console.out.print('No scenes found.')
raise typer.Exit()
active_scene = ctx.obj['obsws'].get_current_program_scene().scene_name table = Table(title='Scenes', padding=(0, 2), border_style=ctx.style.border)
table = Table(title='Scenes', padding=(0, 2), border_style=ctx.obj['style'].border)
if uuid: if uuid:
columns = [ columns = [
(Text('Scene Name', justify='center'), 'left', ctx.obj['style'].column), (Text('Scene Name', justify='center'), 'left', ctx.style.column),
(Text('Active', justify='center'), 'center', None), (Text('Active', justify='center'), 'center', None),
(Text('UUID', justify='center'), 'left', ctx.obj['style'].column), (Text('UUID', justify='center'), 'left', ctx.style.column),
] ]
else: else:
columns = [ columns = [
(Text('Scene Name', justify='center'), 'left', ctx.obj['style'].column), (Text('Scene Name', justify='center'), 'left', ctx.style.column),
(Text('Active', justify='center'), 'center', None), (Text('Active', justify='center'), 'center', None),
] ]
for heading, justify, style in columns: for heading, justify, style in columns:
@@ -66,57 +60,64 @@ def list_(
console.out.print(table) console.out.print(table)
@app.command('current | get') @app.command(name=['current', 'get'])
def current( def current(
ctx: typer.Context,
preview: Annotated[ preview: Annotated[
bool, typer.Option(help='Get the preview scene instead of the program scene') bool, Parameter(help='Get the preview scene instead of the program scene')
] = False, ] = False,
*,
ctx: Annotated[Context, Parameter(parse=False)],
): ):
"""Get the current program scene or preview scene.""" """Get the current program scene or preview scene."""
if preview and not validate.studio_mode_enabled(ctx): if preview and not validate.studio_mode_enabled(ctx):
console.err.print('Studio mode is not enabled, cannot get preview scene.') raise OBSWSCLIError(
raise typer.Exit(1) 'Studio mode is not enabled, cannot get preview scene.',
code=ExitCode.ERROR,
)
if preview: if preview:
resp = ctx.obj['obsws'].get_current_preview_scene() resp = ctx.client.get_current_preview_scene()
console.out.print( console.out.print(
f'Current Preview Scene: {console.highlight(ctx, resp.current_preview_scene_name)}' f'Current Preview Scene: {console.highlight(ctx, resp.current_preview_scene_name)}'
) )
else: else:
resp = ctx.obj['obsws'].get_current_program_scene() resp = ctx.client.get_current_program_scene()
console.out.print( console.out.print(
f'Current Program Scene: {console.highlight(ctx, resp.current_program_scene_name)}' f'Current Program Scene: {console.highlight(ctx, resp.current_program_scene_name)}'
) )
@app.command('switch | set') @app.command(name=['switch', 'set'])
def switch( def switch(
ctx: typer.Context, scene_name: Annotated[str, Argument(hint='Name of the scene to switch to')],
scene_name: Annotated[ /,
str, typer.Argument(..., help='Name of the scene to switch to')
],
preview: Annotated[ preview: Annotated[
bool, bool,
typer.Option(help='Switch to the preview scene instead of the program scene'), Parameter(help='Switch to the preview scene instead of the program scene'),
] = False, ] = False,
*,
ctx: Annotated[Context, Parameter(parse=False)],
): ):
"""Switch to a scene.""" """Switch to a scene."""
if preview and not validate.studio_mode_enabled(ctx): if preview and not validate.studio_mode_enabled(ctx):
console.err.print('Studio mode is not enabled, cannot set the preview scene.') raise OBSWSCLIError(
raise typer.Exit(1) 'Studio mode is not enabled, cannot switch to preview scene.',
code=ExitCode.ERROR,
)
if not validate.scene_in_scenes(ctx, scene_name): if not validate.scene_in_scenes(ctx, scene_name):
console.err.print(f'Scene [yellow]{scene_name}[/yellow] not found.') raise OBSWSCLIError(
raise typer.Exit(1) f'Scene [yellow]{scene_name}[/yellow] not found.',
code=ExitCode.ERROR,
)
if preview: if preview:
ctx.obj['obsws'].set_current_preview_scene(scene_name) ctx.client.set_current_preview_scene(scene_name)
console.out.print( console.out.print(
f'Switched to preview scene: {console.highlight(ctx, scene_name)}' f'Switched to preview scene: {console.highlight(ctx, scene_name)}'
) )
else: else:
ctx.obj['obsws'].set_current_program_scene(scene_name) ctx.client.set_current_program_scene(scene_name)
console.out.print( console.out.print(
f'Switched to program scene: {console.highlight(ctx, scene_name)}' f'Switched to program scene: {console.highlight(ctx, scene_name)}'
) )

View File

@@ -21,10 +21,6 @@ def list_(ctx: typer.Context):
"""List all scene collections.""" """List all scene collections."""
resp = ctx.obj['obsws'].get_scene_collection_list() resp = ctx.obj['obsws'].get_scene_collection_list()
if not resp.scene_collections:
console.out.print('No scene collections found.')
raise typer.Exit()
table = Table( table = Table(
title='Scene Collections', title='Scene Collections',
padding=(0, 2), padding=(0, 2),

View File

@@ -2,53 +2,53 @@
import typer import typer
from .context import Context
# type alias for an option that is skipped when the command is run # type alias for an option that is skipped when the command is run
skipped_option = typer.Option(parser=lambda _: _, hidden=True, expose_value=False) skipped_option = typer.Option(parser=lambda _: _, hidden=True, expose_value=False)
def input_in_inputs(ctx: typer.Context, input_name: str) -> bool: def input_in_inputs(ctx: Context, input_name: str) -> bool:
"""Check if an input is in the input list.""" """Check if an input is in the input list."""
inputs = ctx.obj['obsws'].get_input_list().inputs inputs = ctx.client.get_input_list().inputs
return any(input_.get('inputName') == input_name for input_ in inputs) return any(input_.get('inputName') == input_name for input_ in inputs)
def scene_in_scenes(ctx: typer.Context, scene_name: str) -> bool: def scene_in_scenes(ctx: Context, scene_name: str) -> bool:
"""Check if a scene exists in the list of scenes.""" """Check if a scene exists in the list of scenes."""
resp = ctx.obj['obsws'].get_scene_list() resp = ctx.client.get_scene_list()
return any(scene.get('sceneName') == scene_name for scene in resp.scenes) return any(scene.get('sceneName') == scene_name for scene in resp.scenes)
def studio_mode_enabled(ctx: typer.Context) -> bool: def studio_mode_enabled(ctx: Context) -> bool:
"""Check if studio mode is enabled.""" """Check if studio mode is enabled."""
resp = ctx.obj['obsws'].get_studio_mode_enabled() resp = ctx.client.get_studio_mode_enabled()
return resp.studio_mode_enabled return resp.studio_mode_enabled
def scene_collection_in_scene_collections( def scene_collection_in_scene_collections(
ctx: typer.Context, scene_collection_name: str ctx: Context, scene_collection_name: str
) -> bool: ) -> bool:
"""Check if a scene collection exists.""" """Check if a scene collection exists."""
resp = ctx.obj['obsws'].get_scene_collection_list() resp = ctx.client.get_scene_collection_list()
return any( return any(
collection == scene_collection_name for collection in resp.scene_collections collection == scene_collection_name for collection in resp.scene_collections
) )
def item_in_scene_item_list( def item_in_scene_item_list(ctx: Context, scene_name: str, item_name: str) -> bool:
ctx: typer.Context, scene_name: str, item_name: str
) -> bool:
"""Check if an item exists in a scene.""" """Check if an item exists in a scene."""
resp = ctx.obj['obsws'].get_scene_item_list(scene_name) resp = ctx.client.get_scene_item_list(scene_name)
return any(item.get('sourceName') == item_name for item in resp.scene_items) return any(item.get('sourceName') == item_name for item in resp.scene_items)
def profile_exists(ctx: typer.Context, profile_name: str) -> bool: def profile_exists(ctx: Context, profile_name: str) -> bool:
"""Check if a profile exists.""" """Check if a profile exists."""
resp = ctx.obj['obsws'].get_profile_list() resp = ctx.client.get_profile_list()
return any(profile == profile_name for profile in resp.profiles) return any(profile == profile_name for profile in resp.profiles)
def monitor_exists(ctx: typer.Context, monitor_index: int) -> bool: def monitor_exists(ctx: Context, monitor_index: int) -> bool:
"""Check if a monitor exists.""" """Check if a monitor exists."""
resp = ctx.obj['obsws'].get_monitor_list() resp = ctx.client.get_monitor_list()
return any(monitor['monitorIndex'] == monitor_index for monitor in resp.monitors) return any(monitor['monitorIndex'] == monitor_index for monitor in resp.monitors)

View File

@@ -21,7 +21,12 @@ classifiers = [
"Programming Language :: Python :: Implementation :: CPython", "Programming Language :: Python :: Implementation :: CPython",
"Programming Language :: Python :: Implementation :: PyPy", "Programming Language :: Python :: Implementation :: PyPy",
] ]
dependencies = ["typer>=0.19.2", "obsws-python>=1.8.0", "python-dotenv>=1.1.0"] dependencies = [
"cyclopts>=3.22.2",
"typer>=0.16.0",
"obsws-python>=1.8.0",
"python-dotenv>=1.1.0",
]
[project.urls] [project.urls]
@@ -30,7 +35,7 @@ Issues = "https://github.com/onyx-and-iris/obsws-cli/issues"
Source = "https://github.com/onyx-and-iris/obsws-cli" Source = "https://github.com/onyx-and-iris/obsws-cli"
[project.scripts] [project.scripts]
obsws-cli = "obsws_cli:app" obsws-cli = "obsws_cli:run"
[tool.hatch.version] [tool.hatch.version]
path = "obsws_cli/__about__.py" path = "obsws_cli/__about__.py"

View File

@@ -1,7 +1,6 @@
"""pytest configuration file.""" """pytest configuration file."""
import os import os
import time
import obsws_python as obsws import obsws_python as obsws
from dotenv import find_dotenv, load_dotenv from dotenv import find_dotenv, load_dotenv
@@ -45,54 +44,9 @@ def pytest_sessionstart(session):
}, },
) )
session.obsws.create_profile('pytest_profile') session.obsws.set_current_scene_collection('test-collection')
time.sleep(0.1) # Wait for the profile to be created
session.obsws.set_profile_parameter(
'SimpleOutput',
'RecRB',
'true',
)
# hack to ensure the replay buffer is enabled
session.obsws.set_current_profile('Untitled')
session.obsws.set_current_profile('pytest_profile')
session.obsws.create_scene('pytest_scene') session.obsws.create_scene('pytest_scene')
# Ensure Desktop Audio is created.
desktop_audio_kinds = {
'windows': 'wasapi_output_capture',
'linux': 'pulse_output_capture',
'darwin': 'coreaudio_output_capture',
}
platform = os.environ.get('OBS_TESTS_PLATFORM', os.uname().sysname.lower())
try:
session.obsws.create_input(
sceneName='pytest_scene',
inputName='Desktop Audio',
inputKind=desktop_audio_kinds[platform],
inputSettings={'device_id': 'default'},
sceneItemEnabled=True,
)
except obsws.error.OBSSDKRequestError as e:
if e.code == 601:
"""input already exists, continue."""
# Ensure Mic/Aux is created.
mic_kinds = {
'windows': 'wasapi_input_capture',
'linux': 'pulse_input_capture',
'darwin': 'coreaudio_input_capture',
}
try:
session.obsws.create_input(
sceneName='pytest_scene',
inputName='Mic/Aux',
inputKind=mic_kinds[platform],
inputSettings={'device_id': 'default'},
sceneItemEnabled=True,
)
except obsws.error.OBSSDKRequestError as e:
if e.code == 601:
"""input already exists, continue."""
session.obsws.create_input( session.obsws.create_input(
sceneName='pytest_scene', sceneName='pytest_scene',
inputName='pytest_input', inputName='pytest_input',
@@ -177,7 +131,7 @@ def pytest_sessionfinish(session, exitstatus):
session.obsws.remove_scene('pytest_scene') session.obsws.remove_scene('pytest_scene')
session.obsws.set_current_scene_collection('Untitled') session.obsws.set_current_scene_collection('default')
resp = session.obsws.get_stream_status() resp = session.obsws.get_stream_status()
if resp.output_active: if resp.output_active:
@@ -195,8 +149,6 @@ def pytest_sessionfinish(session, exitstatus):
if resp.studio_mode_enabled: if resp.studio_mode_enabled:
session.obsws.set_studio_mode_enabled(False) session.obsws.set_studio_mode_enabled(False)
session.obsws.remove_profile('pytest_profile')
# Close the OBS WebSocket client connection # Close the OBS WebSocket client connection
session.obsws.disconnect() session.obsws.disconnect()

View File

@@ -4,7 +4,7 @@ from typer.testing import CliRunner
from obsws_cli.app import app from obsws_cli.app import app
runner = CliRunner() runner = CliRunner(mix_stderr=False)
def test_filter_list(): def test_filter_list():

View File

@@ -1,18 +1,10 @@
"""Unit tests for the group command in the OBS WebSocket CLI.""" """Unit tests for the group command in the OBS WebSocket CLI."""
import os
import pytest
from typer.testing import CliRunner from typer.testing import CliRunner
from obsws_cli.app import app from obsws_cli.app import app
runner = CliRunner() runner = CliRunner(mix_stderr=False)
if os.environ.get('OBS_TESTS_SKIP_GROUP_TESTS'):
pytest.skip(
'Skipping group tests as per environment variable', allow_module_level=True
)
def test_group_list(): def test_group_list():

View File

@@ -4,7 +4,7 @@ from typer.testing import CliRunner
from obsws_cli.app import app from obsws_cli.app import app
runner = CliRunner() runner = CliRunner(mix_stderr=False)
def test_hotkey_list(): def test_hotkey_list():

View File

@@ -4,7 +4,7 @@ from typer.testing import CliRunner
from obsws_cli.app import app from obsws_cli.app import app
runner = CliRunner() runner = CliRunner(mix_stderr=False)
def test_input_list(): def test_input_list():
@@ -13,7 +13,10 @@ def test_input_list():
assert result.exit_code == 0 assert result.exit_code == 0
assert 'Desktop Audio' in result.stdout assert 'Desktop Audio' in result.stdout
assert 'Mic/Aux' in result.stdout assert 'Mic/Aux' in result.stdout
assert all(item in result.stdout for item in ('pytest_input', 'pytest_input_2')) assert all(
item in result.stdout
for item in ('Colour Source', 'Colour Source 2', 'Colour Source 3')
)
def test_input_list_filter_input(): def test_input_list_filter_input():
@@ -36,6 +39,9 @@ def test_input_list_filter_colour():
"""Test the input list command with colour filter.""" """Test the input list command with colour filter."""
result = runner.invoke(app, ['input', 'list', '--colour']) result = runner.invoke(app, ['input', 'list', '--colour'])
assert result.exit_code == 0 assert result.exit_code == 0
assert all(item in result.stdout for item in ('pytest_input', 'pytest_input_2')) assert all(
item in result.stdout
for item in ('Colour Source', 'Colour Source 2', 'Colour Source 3')
)
assert 'Desktop Audio' not in result.stdout assert 'Desktop Audio' not in result.stdout
assert 'Mic/Aux' not in result.stdout assert 'Mic/Aux' not in result.stdout

View File

@@ -6,7 +6,7 @@ from typer.testing import CliRunner
from obsws_cli.app import app from obsws_cli.app import app
runner = CliRunner() runner = CliRunner(mix_stderr=False)
def test_record_start(): def test_record_start():
@@ -49,9 +49,7 @@ def test_record_toggle():
result = runner.invoke(app, ['record', 'toggle']) result = runner.invoke(app, ['record', 'toggle'])
assert result.exit_code == 0 assert result.exit_code == 0
time.sleep(0.5) # Wait for the recording to toggle time.sleep(0.5) # Wait for the recording to toggle
if active: if active:
assert 'Recording stopped successfully.' in result.stdout assert 'Recording stopped successfully.' in result.stdout
else: else:

View File

@@ -1,20 +1,10 @@
"""Unit tests for the replaybuffer command in the OBS WebSocket CLI.""" """Unit tests for the replaybuffer command in the OBS WebSocket CLI."""
import os
import time
import pytest
from typer.testing import CliRunner from typer.testing import CliRunner
from obsws_cli.app import app from obsws_cli.app import app
runner = CliRunner() runner = CliRunner(mix_stderr=False)
if os.environ.get('OBS_TESTS_SKIP_REPLAYBUFFER_TESTS'):
pytest.skip(
'Skipping replaybuffer tests as per environment variable',
allow_module_level=True,
)
def test_replaybuffer_start(): def test_replaybuffer_start():
@@ -24,9 +14,6 @@ def test_replaybuffer_start():
active = 'Replay buffer is active.' in resp.stdout active = 'Replay buffer is active.' in resp.stdout
resp = runner.invoke(app, ['replaybuffer', 'start']) resp = runner.invoke(app, ['replaybuffer', 'start'])
time.sleep(0.5) # Wait for the replay buffer to start
if active: if active:
assert resp.exit_code != 0 assert resp.exit_code != 0
assert 'Replay buffer is already active.' in resp.stderr assert 'Replay buffer is already active.' in resp.stderr
@@ -42,9 +29,6 @@ def test_replaybuffer_stop():
active = 'Replay buffer is active.' in resp.stdout active = 'Replay buffer is active.' in resp.stdout
resp = runner.invoke(app, ['replaybuffer', 'stop']) resp = runner.invoke(app, ['replaybuffer', 'stop'])
time.sleep(0.5) # Wait for the replay buffer to stop
if not active: if not active:
assert resp.exit_code != 0 assert resp.exit_code != 0
assert 'Replay buffer is not active.' in resp.stderr assert 'Replay buffer is not active.' in resp.stderr
@@ -60,11 +44,9 @@ def test_replaybuffer_toggle():
active = 'Replay buffer is active.' in resp.stdout active = 'Replay buffer is active.' in resp.stdout
resp = runner.invoke(app, ['replaybuffer', 'toggle']) resp = runner.invoke(app, ['replaybuffer', 'toggle'])
assert resp.exit_code == 0
time.sleep(0.5) # Wait for the replay buffer to toggle
if active: if active:
assert resp.exit_code == 0
assert 'Replay buffer is not active.' in resp.stdout assert 'Replay buffer is not active.' in resp.stdout
else: else:
assert resp.exit_code == 0
assert 'Replay buffer is active.' in resp.stdout assert 'Replay buffer is active.' in resp.stdout

View File

@@ -4,7 +4,7 @@ from typer.testing import CliRunner
from obsws_cli.app import app from obsws_cli.app import app
runner = CliRunner() runner = CliRunner(mix_stderr=False)
def test_scene_list(): def test_scene_list():

View File

@@ -4,7 +4,7 @@ from typer.testing import CliRunner
from obsws_cli.app import app from obsws_cli.app import app
runner = CliRunner() runner = CliRunner(mix_stderr=False)
def test_sceneitem_list(): def test_sceneitem_list():

View File

@@ -6,7 +6,7 @@ from typer.testing import CliRunner
from obsws_cli.app import app from obsws_cli.app import app
runner = CliRunner() runner = CliRunner(mix_stderr=False)
def test_stream_start(): def test_stream_start():
@@ -23,7 +23,7 @@ def test_stream_start():
else: else:
assert result.exit_code == 0 assert result.exit_code == 0
assert 'Streaming started successfully.' in result.stdout assert 'Streaming started successfully.' in result.stdout
time.sleep(0.5) # Wait for the streaming to start time.sleep(1) # Wait for the streaming to start
def test_stream_stop(): def test_stream_stop():
@@ -37,7 +37,7 @@ def test_stream_stop():
if active: if active:
assert result.exit_code == 0 assert result.exit_code == 0
assert 'Streaming stopped successfully.' in result.stdout assert 'Streaming stopped successfully.' in result.stdout
time.sleep(0.5) # Wait for the streaming to stop time.sleep(1) # Wait for the streaming to stop
else: else:
assert result.exit_code != 0 assert result.exit_code != 0
assert 'Streaming is not in progress, cannot stop.' in result.stderr assert 'Streaming is not in progress, cannot stop.' in result.stderr
@@ -52,7 +52,7 @@ def test_stream_toggle():
result = runner.invoke(app, ['stream', 'toggle']) result = runner.invoke(app, ['stream', 'toggle'])
assert result.exit_code == 0 assert result.exit_code == 0
time.sleep(0.5) # Wait for the stream to toggle time.sleep(1) # Wait for the stream to toggle
if active: if active:
assert 'Streaming stopped successfully.' in result.stdout assert 'Streaming stopped successfully.' in result.stdout

View File

@@ -4,7 +4,7 @@ from typer.testing import CliRunner
from obsws_cli.app import app from obsws_cli.app import app
runner = CliRunner() runner = CliRunner(mix_stderr=False)
def test_studio_enable(): def test_studio_enable():

View File

@@ -4,7 +4,7 @@ from typer.testing import CliRunner
from obsws_cli.app import app from obsws_cli.app import app
runner = CliRunner() runner = CliRunner(mix_stderr=False)
def test_text_update(): def test_text_update():

View File

@@ -4,7 +4,7 @@ from typer.testing import CliRunner
from obsws_cli.app import app from obsws_cli.app import app
runner = CliRunner() runner = CliRunner(mix_stderr=False)
def test_version(): def test_version():