34 Commits

Author SHA1 Message Date
2e5fb3800a add Style section to README 2025-06-21 23:40:45 +01:00
3c985f5e9b apply table styling + stdout highlighting
fix issues with table heading alignments
2025-06-21 23:40:30 +01:00
fb17979cb0 add highlight helper function 2025-06-21 23:39:13 +01:00
a1ed208bdf add --style and --no-border flags.
set default values in Settings class
2025-06-21 23:37:20 +01:00
02baa13dba add style definitions 2025-06-21 23:32:02 +01:00
7abbccae99 add RootTyperAliasGroup
improve the output of projector open if the monitor index is invalid (suggests prj ls-m)

fix highlight for sceneitem commands in _validate_sources()

patch bump
2025-06-21 05:19:57 +01:00
23282a60d1 test against empty string to keep it consisten with rich
patch bump
2025-06-21 02:36:44 +01:00
b6ba66db64 update disable colouring section 2025-06-21 02:06:30 +01:00
c4480895a1 add empty_if_false to check_mark
patch bump
2025-06-21 00:41:33 +01:00
fd2e629ec2 print colourless check/cross marks if NO_COLOR is set
patch bump
2025-06-20 23:19:27 +01:00
85b653891d keep the colour cyan
patch bump
2025-06-20 21:09:35 +01:00
bff5d396a4 import console as namespace
patch bump
2025-06-20 07:51:12 +01:00
47324597d7 minor bump 2025-06-20 04:04:35 +01:00
9a0659ae35 add 0.16.11 to CHANGELOG 2025-06-20 02:30:25 +01:00
a726f9699f update scene list, sceneitem list and filter list with --uuid flags 2025-06-20 02:30:02 +01:00
fbea2cb896 import console as namespace
each console object is now a singleton

patch bump
2025-06-20 02:29:36 +01:00
e5040d5ddd add hidden --debug flag for controlling logging output
patch bump
2025-06-20 02:13:50 +01:00
39f1b01926 add --uuid flags to input list, scene list and sceneitem list 2025-06-20 01:32:36 +01:00
e9b3106aa6 if no filters are applied, ensure we include the entire kind list
patch bump
2025-06-19 23:10:52 +01:00
a26ce74151 add lazyimports environment, see https://github.com/fastapi/typer/pull/1128 2025-06-19 20:35:45 +01:00
f1c569f140 remove inline if else 2025-06-08 12:38:28 +01:00
093e9a05d4 add 0.16.8 to CHANGELOG 2025-06-07 20:11:00 +01:00
1a1fbf1da1 sort input list by input name
patch bump
2025-06-07 00:24:48 +01:00
fd2baf3350 remove no filter line 2025-06-07 00:06:53 +01:00
5334879ba9 patch bump 2025-06-06 23:27:45 +01:00
77dbe52ae6 upd input list to include new options 2025-06-06 23:27:31 +01:00
1ff610410a use tuples as records to build the tables
add --fempg and --vlc options to filter list

add Muted column to list table
2025-06-06 23:27:16 +01:00
cd7614bfd6 use tuples as records to build the tables 2025-06-06 23:26:33 +01:00
74503f17e0 upd console colouring
patch bump
2025-06-06 22:27:17 +01:00
32bc4277f2 add 0.16.5 to CHANGELOG 2025-06-06 21:09:33 +01:00
21f1b5e1bb add note about disabling console colouring to README 2025-06-06 20:58:28 +01:00
434f8c0e0c add monitor validate function
upd tests to match console colour changes
2025-06-06 20:58:15 +01:00
81518a14ea error messages now have style bold red
error highlights are now yellow

normal highlights are now green

_validate_scene_name_and_item_name renamed to _validate_sources

its now a normal function and not a decorator

it also returns bool instead of raising typer.Exit()

patch bump
2025-06-06 20:55:35 +01:00
ddb92bb317 upd console colouring
error messages now have style `bold red`
error highlights are now yellow

normal highlights are now green

patch bump
2025-06-06 20:53:35 +01:00
30 changed files with 1121 additions and 538 deletions

View File

@@ -5,6 +5,45 @@ All notable changes to this project will be documented in this file.
The format is based on [Keep a Changelog](https://keepachangelog.com/en/1.0.0/), The format is based on [Keep a Changelog](https://keepachangelog.com/en/1.0.0/),
and this project adheres to [Semantic Versioning](https://semver.org/spec/v2.0.0.html). and this project adheres to [Semantic Versioning](https://semver.org/spec/v2.0.0.html).
# [0.17.3] - 2025-06-20
### Added
- input list, scene list and sceneitem list now accept --uuid flag.
- Active column added to scene list table.
### Changed
- scene list no longer prints the UUIDs by default, enable it with the --uuid flag.
- if NO_COLOR is set, print colourless check and cross marks in tables.
### Fixed
- Issue with input list not printing all inputs if no filters were applied.
# [0.16.8] - 2025-06-07
### Added
- filter list:
- --ffmpeg, --vlc flags
- Muted column to list table
# [0.16.5] - 2025-06-06
### Added
- [Disable Colouring](https://github.com/onyx-and-iris/obsws-cli?tab=readme-ov-file#disable-colouring) section added to README.
### Changed
- error output:
- now printed in bold red.
- highlights are now yellow
- normal output:
- highlights are now green
- help messages:
- removed a lot of the `[default: None]`, this affects optional flags/arguments without default values.
# [0.16.1] - 2025-06-04 # [0.16.1] - 2025-06-04
### Added ### Added

View File

@@ -81,6 +81,10 @@ obsws-cli obs-version
#### Scene #### Scene
- list: List all scenes. - list: List all scenes.
- flags:
*optional*
- --uuid: Show UUIDs of scenes
```console ```console
obsws-cli scene list obsws-cli scene list
@@ -102,6 +106,10 @@ obsws-cli scene switch LIVE
#### Scene Item #### Scene Item
- list: List all items in a scene. - list: List all items in a scene.
- flags:
*optional*
- --uuid: Show UUIDs of scene items
*optional* *optional*
- args: <scene_name> - args: <scene_name>
@@ -265,6 +273,9 @@ obsws-cli group status START "test_group"
- --input: Filter by input type. - --input: Filter by input type.
- --output: Filter by output type. - --output: Filter by output type.
- --colour: Filter by colour source type. - --colour: Filter by colour source type.
- --ffmpeg: Filter by ffmpeg source type.
- --vlc: Filter by VLC source type.
- --uuid: Show UUIDs of inputs.
```console ```console
obsws-cli input list obsws-cli input list
@@ -602,6 +613,34 @@ obsws-cli projector open --monitor-index=1 "test_group"
obsws-cli screenshot save --width=2560 --height=1440 "Scene" "C:\Users\me\Videos\screenshot.png" obsws-cli screenshot save --width=2560 --height=1440 "Scene" "C:\Users\me\Videos\screenshot.png"
``` ```
## Style
By default styling is disabled but you may enable it with:
- --style/-s: Style used in output.
- OBS_STYLE
- --no-border/-b: Disable table border styling in output.
- OBS_STYLE_NO_BORDER
Available styles:
- red
- magenta
- purple
- blue
- cyan
- green
- yellow
- orange
- white
- grey
- navy
- black
```console
obsws-cli --style=cyan --no-border scene list
```
## License ## License
`obsws-cli` is distributed under the terms of the [MIT](https://spdx.org/licenses/MIT.html) license. `obsws-cli` is distributed under the terms of the [MIT](https://spdx.org/licenses/MIT.html) license.
@@ -609,3 +648,4 @@ obsws-cli screenshot save --width=2560 --height=1440 "Scene" "C:\Users\me\Videos
[obs-studio]: https://obsproject.com/ [obs-studio]: https://obsproject.com/
[obs-keyids]: https://github.com/obsproject/obs-studio/blob/master/libobs/obs-hotkeys.h [obs-keyids]: https://github.com/obsproject/obs-studio/blob/master/libobs/obs-hotkeys.h
[no-colour]: https://no-color.org/

View File

@@ -1,4 +1,4 @@
# SPDX-FileCopyrightText: 2025-present onyx-and-iris <code@onyxandiris.online> # SPDX-FileCopyrightText: 2025-present onyx-and-iris <code@onyxandiris.online>
# #
# SPDX-License-Identifier: MIT # SPDX-License-Identifier: MIT
__version__ = "0.16.3" __version__ = "0.17.6"

View File

@@ -5,8 +5,52 @@ import re
import typer import typer
class AliasGroup(typer.core.TyperGroup): class RootTyperAliasGroup(typer.core.TyperGroup):
"""A custom group class to handle command name aliases.""" """A custom group class to handle command name aliases for the root typer."""
def __init__(self, *args, **kwargs):
"""Initialize the AliasGroup."""
super().__init__(*args, **kwargs)
self.no_args_is_help = True
def get_command(self, ctx, cmd_name):
"""Get a command by name."""
match cmd_name:
case 'f':
cmd_name = 'filter'
case 'g':
cmd_name = 'group'
case 'hk':
cmd_name = 'hotkey'
case 'i':
cmd_name = 'input'
case 'prf':
cmd_name = 'profile'
case 'prj':
cmd_name = 'projector'
case 'rc':
cmd_name = 'record'
case 'rb':
cmd_name = 'replaybuffer'
case 'sc':
cmd_name = 'scene'
case 'scc':
cmd_name = 'scenecollection'
case 'si':
cmd_name = 'sceneitem'
case 'ss':
cmd_name = 'screenshot'
case 'st':
cmd_name = 'stream'
case 'sm':
cmd_name = 'studiomode'
case 'vc':
cmd_name = 'virtualcam'
return super().get_command(ctx, cmd_name)
class SubTyperAliasGroup(typer.core.TyperGroup):
"""A custom group class to handle command name aliases for sub typers."""
_CMD_SPLIT_P = re.compile(r' ?[,|] ?') _CMD_SPLIT_P = re.compile(r' ?[,|] ?')
@@ -22,7 +66,6 @@ class AliasGroup(typer.core.TyperGroup):
def _group_cmd_name(self, default_name): def _group_cmd_name(self, default_name):
for cmd in self.commands.values(): for cmd in self.commands.values():
name = cmd.name if cmd.name and default_name in self._CMD_SPLIT_P.split(cmd.name):
if name and default_name in self._CMD_SPLIT_P.split(name): return cmd.name
return name
return default_name return default_name

View File

@@ -1,18 +1,18 @@
"""Command line interface for the OBS WebSocket API.""" """Command line interface for the OBS WebSocket API."""
import importlib import importlib
import logging
from typing import Annotated from typing import Annotated
import obsws_python as obsws import obsws_python as obsws
import typer import typer
from rich.console import Console
from obsws_cli.__about__ import __version__ as obsws_cli_version from obsws_cli.__about__ import __version__ as obsws_cli_version
from . import settings from . import console, settings, styles
from .alias import AliasGroup from .alias import RootTyperAliasGroup
app = typer.Typer(cls=AliasGroup) app = typer.Typer(cls=RootTyperAliasGroup)
for sub_typer in ( for sub_typer in (
'filter', 'filter',
'group', 'group',
@@ -33,17 +33,23 @@ for sub_typer in (
module = importlib.import_module(f'.{sub_typer}', package=__package__) module = importlib.import_module(f'.{sub_typer}', package=__package__)
app.add_typer(module.app, name=sub_typer) app.add_typer(module.app, name=sub_typer)
out_console = Console()
err_console = Console(stderr=True)
def version_callback(value: bool): def version_callback(value: bool):
"""Show the version of the CLI.""" """Show the version of the CLI."""
if value: if value:
out_console.print(f'obsws-cli version: {obsws_cli_version}') console.out.print(f'obsws-cli version: {obsws_cli_version}')
raise typer.Exit() raise typer.Exit()
def setup_logging(debug: bool):
"""Set up logging for the application."""
log_level = logging.DEBUG if debug else logging.CRITICAL
logging.basicConfig(
level=log_level,
format='%(asctime)s - %(name)s - %(levelname)s - %(message)s',
)
@app.callback() @app.callback()
def main( def main(
ctx: typer.Context, ctx: typer.Context,
@@ -60,7 +66,11 @@ def main(
port: Annotated[ port: Annotated[
int, int,
typer.Option( typer.Option(
'--port', '-P', envvar='OBS_PORT', help='WebSocket port', show_default=4455 '--port',
'-P',
envvar='OBS_PORT',
help='WebSocket port',
show_default=4455,
), ),
] = settings.get('port'), ] = settings.get('port'),
password: Annotated[ password: Annotated[
@@ -94,15 +104,51 @@ def main(
callback=version_callback, callback=version_callback,
), ),
] = False, ] = False,
style: Annotated[
str,
typer.Option(
'--style',
'-s',
envvar='OBS_STYLE',
help='Set the style for the CLI output',
show_default='disabled',
),
] = settings.get('style'),
no_border: Annotated[
bool,
typer.Option(
'--no-border',
'-b',
envvar='OBS_STYLE_NO_BORDER',
help='Disable table border styling in the CLI output',
show_default=False,
),
] = settings.get('style_no_border'),
debug: Annotated[
bool,
typer.Option(
'--debug',
'-d',
envvar='OBS_DEBUG',
is_eager=True,
help='Enable debug logging',
show_default=False,
callback=setup_logging,
hidden=True,
),
] = settings.get('debug'),
): ):
"""obsws_cli is a command line interface for the OBS WebSocket API.""" """obsws_cli is a command line interface for the OBS WebSocket API."""
ctx.obj = ctx.with_resource(obsws.ReqClient(**ctx.params)) ctx.ensure_object(dict)
ctx.obj['obsws'] = ctx.with_resource(obsws.ReqClient(**ctx.params))
ctx.obj['style'] = styles.request_style_obj(style, no_border)
@app.command() @app.command()
def obs_version(ctx: typer.Context): def obs_version(ctx: typer.Context):
"""Get the OBS Client and WebSocket versions.""" """Get the OBS Client and WebSocket versions."""
resp = ctx.obj.get_version() resp = ctx.obj['obsws'].get_version()
out_console.print( console.out.print(
f'OBS Client version: {resp.obs_version} with WebSocket version: {resp.obs_web_socket_version}' f'OBS Client version: {console.highlight(ctx, resp.obs_version)}'
f' with WebSocket version: {console.highlight(ctx, resp.obs_web_socket_version)}'
) )

12
obsws_cli/console.py Normal file
View File

@@ -0,0 +1,12 @@
"""module for console output handling in obsws_cli."""
import typer
from rich.console import Console
out = Console()
err = Console(stderr=True, style='bold red')
def highlight(ctx: typer.Context, text: str) -> str:
"""Highlight text using the current context's style."""
return f'[{ctx.obj["style"].highlight}]{text}[/{ctx.obj["style"].highlight}]'

View File

@@ -4,15 +4,13 @@ from typing import Annotated, Optional
import obsws_python as obsws import obsws_python as obsws
import typer import typer
from rich.console import Console
from rich.table import Table from rich.table import Table
from rich.text import Text
from . import util from . import console, util
from .alias import AliasGroup from .alias import SubTyperAliasGroup
app = typer.Typer(cls=AliasGroup) app = typer.Typer(cls=SubTyperAliasGroup)
out_console = Console()
err_console = Console(stderr=True)
@app.callback() @app.callback()
@@ -33,38 +31,48 @@ def list_(
): ):
"""List filters for a source.""" """List filters for a source."""
if not source_name: if not source_name:
source_name = ctx.obj.get_current_program_scene().scene_name source_name = ctx.obj['obsws'].get_current_program_scene().scene_name
try: try:
resp = ctx.obj.get_source_filter_list(source_name) resp = ctx.obj['obsws'].get_source_filter_list(source_name)
except obsws.error.OBSSDKRequestError as e: except obsws.error.OBSSDKRequestError as e:
if e.code == 600: if e.code == 600:
err_console.print(f"No source was found by the name of '{source_name}'.") console.err.print(
f'No source was found by the name of [yellow]{source_name}[/yellow].'
)
raise typer.Exit(1) raise typer.Exit(1)
else: else:
raise raise
if not resp.filters: if not resp.filters:
out_console.print(f'No filters found for source {source_name}') console.out.print(
f'No filters found for source {console.highlight(ctx, source_name)}'
)
raise typer.Exit() raise typer.Exit()
table = Table(title=f'Filters for Source: {source_name}', padding=(0, 2)) table = Table(
title=f'Filters for Source: {source_name}',
for column in ('Filter Name', 'Kind', 'Enabled', 'Settings'): padding=(0, 2),
table.add_column( border_style=ctx.obj['style'].border,
column,
justify='left' if column in ('Filter Name', 'Kind') else 'center',
style='cyan',
) )
columns = [
(Text('Filter Name', justify='center'), 'left', ctx.obj['style'].column),
(Text('Kind', justify='center'), 'left', ctx.obj['style'].column),
(Text('Enabled', justify='center'), 'center', None),
(Text('Settings', justify='center'), 'center', ctx.obj['style'].column),
]
for name, justify, style in columns:
table.add_column(name, justify=justify, style=style)
for filter in resp.filters: for filter in resp.filters:
resp = ctx.obj.get_source_filter_default_settings(filter['filterKind']) resp = ctx.obj['obsws'].get_source_filter_default_settings(filter['filterKind'])
settings = resp.default_filter_settings | filter['filterSettings'] settings = resp.default_filter_settings | filter['filterSettings']
table.add_row( table.add_row(
filter['filterName'], filter['filterName'],
util.snakecase_to_titlecase(filter['filterKind']), util.snakecase_to_titlecase(filter['filterKind']),
':white_heavy_check_mark:' if filter['filterEnabled'] else ':x:', util.check_mark(filter['filterEnabled']),
'\n'.join( '\n'.join(
[ [
f'{util.snakecase_to_titlecase(k):<20} {v:>10}' f'{util.snakecase_to_titlecase(k):<20} {v:>10}'
@@ -73,12 +81,12 @@ def list_(
), ),
) )
out_console.print(table) console.out.print(table)
def _get_filter_enabled(ctx: typer.Context, source_name: str, filter_name: str): def _get_filter_enabled(ctx: typer.Context, source_name: str, filter_name: str):
"""Get the status of a filter for a source.""" """Get the status of a filter for a source."""
resp = ctx.obj.get_source_filter(source_name, filter_name) resp = ctx.obj['obsws'].get_source_filter(source_name, filter_name)
return resp.filter_enabled return resp.filter_enabled
@@ -100,13 +108,15 @@ def enable(
): ):
"""Enable a filter for a source.""" """Enable a filter for a source."""
if _get_filter_enabled(ctx, source_name, filter_name): if _get_filter_enabled(ctx, source_name, filter_name):
err_console.print( console.err.print(
f'Filter {filter_name} is already enabled for source {source_name}' f'Filter [yellow]{filter_name}[/yellow] is already enabled for source [yellow]{source_name}[/yellow]'
) )
raise typer.Exit(1) raise typer.Exit(1)
ctx.obj.set_source_filter_enabled(source_name, filter_name, enabled=True) ctx.obj['obsws'].set_source_filter_enabled(source_name, filter_name, enabled=True)
out_console.print(f'Enabled filter {filter_name} for source {source_name}') console.out.print(
f'Enabled filter {console.highlight(ctx, filter_name)} for source {console.highlight(ctx, source_name)}'
)
@app.command('disable | off') @app.command('disable | off')
@@ -127,13 +137,15 @@ def disable(
): ):
"""Disable a filter for a source.""" """Disable a filter for a source."""
if not _get_filter_enabled(ctx, source_name, filter_name): if not _get_filter_enabled(ctx, source_name, filter_name):
err_console.print( console.err.print(
f'Filter {filter_name} is already disabled for source {source_name}' f'Filter [yellow]{filter_name}[/yellow] is already disabled for source [yellow]{source_name}[/yellow]'
) )
raise typer.Exit(1) raise typer.Exit(1)
ctx.obj.set_source_filter_enabled(source_name, filter_name, enabled=False) ctx.obj['obsws'].set_source_filter_enabled(source_name, filter_name, enabled=False)
out_console.print(f'Disabled filter {filter_name} for source {source_name}') console.out.print(
f'Disabled filter {console.highlight(ctx, filter_name)} for source {console.highlight(ctx, source_name)}'
)
@app.command('toggle | tg') @app.command('toggle | tg')
@@ -156,11 +168,17 @@ def toggle(
is_enabled = _get_filter_enabled(ctx, source_name, filter_name) is_enabled = _get_filter_enabled(ctx, source_name, filter_name)
new_state = not is_enabled new_state = not is_enabled
ctx.obj.set_source_filter_enabled(source_name, filter_name, enabled=new_state) ctx.obj['obsws'].set_source_filter_enabled(
source_name, filter_name, enabled=new_state
)
if new_state: if new_state:
out_console.print(f'Enabled filter {filter_name} for source {source_name}') console.out.print(
f'Enabled filter {console.highlight(ctx, filter_name)} for source {console.highlight(ctx, source_name)}'
)
else: else:
out_console.print(f'Disabled filter {filter_name} for source {source_name}') console.out.print(
f'Disabled filter {console.highlight(ctx, filter_name)} for source {console.highlight(ctx, source_name)}'
)
@app.command('status | ss') @app.command('status | ss')
@@ -182,6 +200,10 @@ def status(
"""Get the status of a filter for a source.""" """Get the status of a filter for a source."""
is_enabled = _get_filter_enabled(ctx, source_name, filter_name) is_enabled = _get_filter_enabled(ctx, source_name, filter_name)
if is_enabled: if is_enabled:
out_console.print(f'Filter {filter_name} is enabled for source {source_name}') console.out.print(
f'Filter {console.highlight(ctx, filter_name)} is enabled for source {console.highlight(ctx, source_name)}'
)
else: else:
out_console.print(f'Filter {filter_name} is disabled for source {source_name}') console.out.print(
f'Filter {console.highlight(ctx, filter_name)} is disabled for source {console.highlight(ctx, source_name)}'
)

View File

@@ -3,16 +3,14 @@
from typing import Annotated, Optional from typing import Annotated, Optional
import typer import typer
from rich.console import Console
from rich.table import Table from rich.table import Table
from rich.text import Text
from . import validate from . import console, util, validate
from .alias import AliasGroup from .alias import SubTyperAliasGroup
from .protocols import DataclassProtocol from .protocols import DataclassProtocol
app = typer.Typer(cls=AliasGroup) app = typer.Typer(cls=SubTyperAliasGroup)
out_console = Console()
err_console = Console(stderr=True)
@app.callback() @app.callback()
@@ -33,13 +31,13 @@ def list_(
): ):
"""List groups in a scene.""" """List groups in a scene."""
if not scene_name: if not scene_name:
scene_name = ctx.obj.get_current_program_scene().scene_name scene_name = ctx.obj['obsws'].get_current_program_scene().scene_name
if not validate.scene_in_scenes(ctx, scene_name): if not validate.scene_in_scenes(ctx, scene_name):
err_console.print(f"Scene '{scene_name}' not found.") console.err.print(f"Scene '{scene_name}' not found.")
raise typer.Exit(1) raise typer.Exit(1)
resp = ctx.obj.get_scene_item_list(scene_name) resp = ctx.obj['obsws'].get_scene_item_list(scene_name)
groups = [ groups = [
(item.get('sceneItemId'), item.get('sourceName'), item.get('sceneItemEnabled')) (item.get('sceneItemId'), item.get('sourceName'), item.get('sceneItemEnabled'))
for item in resp.scene_items for item in resp.scene_items
@@ -47,24 +45,33 @@ def list_(
] ]
if not groups: if not groups:
out_console.print(f"No groups found in scene '{scene_name}'.") console.out.print(
f'No groups found in scene {console.highlight(ctx, scene_name)}.'
)
raise typer.Exit() raise typer.Exit()
table = Table(title=f'Groups in Scene: {scene_name}', padding=(0, 2)) table = Table(
title=f'Groups in Scene: {scene_name}',
for column in ('ID', 'Group Name', 'Enabled'): padding=(0, 2),
table.add_column( border_style=ctx.obj['style'].border,
column, justify='left' if column == 'Group Name' else 'center', style='cyan'
) )
columns = [
(Text('ID', justify='center'), 'center', ctx.obj['style'].column),
(Text('Group Name', justify='center'), 'left', ctx.obj['style'].column),
(Text('Enabled', justify='center'), 'center', None),
]
for column, justify, style in columns:
table.add_column(column, justify=justify, style=style)
for item_id, group_name, is_enabled in groups: for item_id, group_name, is_enabled in groups:
table.add_row( table.add_row(
str(item_id), str(item_id),
group_name, group_name,
':white_heavy_check_mark:' if is_enabled else ':x:', util.check_mark(is_enabled),
) )
out_console.print(table) console.out.print(table)
def _get_group(group_name: str, resp: DataclassProtocol) -> dict | None: def _get_group(group_name: str, resp: DataclassProtocol) -> dict | None:
@@ -93,21 +100,23 @@ def show(
): ):
"""Show a group in a scene.""" """Show a group in a scene."""
if not validate.scene_in_scenes(ctx, scene_name): if not validate.scene_in_scenes(ctx, scene_name):
err_console.print(f"Scene '{scene_name}' not found.") console.err.print(f"Scene '{scene_name}' not found.")
raise typer.Exit(1) raise typer.Exit(1)
resp = ctx.obj.get_scene_item_list(scene_name) resp = ctx.obj['obsws'].get_scene_item_list(scene_name)
if (group := _get_group(group_name, resp)) is None: if (group := _get_group(group_name, resp)) is None:
err_console.print(f"Group '{group_name}' not found in scene {scene_name}.") console.err.print(
f'Group [yellow]{group_name}[/yellow] not found in scene [yellow]{scene_name}[/yellow].'
)
raise typer.Exit(1) raise typer.Exit(1)
ctx.obj.set_scene_item_enabled( ctx.obj['obsws'].set_scene_item_enabled(
scene_name=scene_name, scene_name=scene_name,
item_id=int(group.get('sceneItemId')), item_id=int(group.get('sceneItemId')),
enabled=True, enabled=True,
) )
out_console.print(f"Group '{group_name}' is now visible.") console.out.print(f'Group {console.highlight(ctx, group_name)} is now visible.')
@app.command('hide | h') @app.command('hide | h')
@@ -122,21 +131,23 @@ def hide(
): ):
"""Hide a group in a scene.""" """Hide a group in a scene."""
if not validate.scene_in_scenes(ctx, scene_name): if not validate.scene_in_scenes(ctx, scene_name):
err_console.print(f"Scene '{scene_name}' not found.") console.err.print(f'Scene [yellow]{scene_name}[/yellow] not found.')
raise typer.Exit(1) raise typer.Exit(1)
resp = ctx.obj.get_scene_item_list(scene_name) resp = ctx.obj['obsws'].get_scene_item_list(scene_name)
if (group := _get_group(group_name, resp)) is None: if (group := _get_group(group_name, resp)) is None:
err_console.print(f"Group '{group_name}' not found in scene {scene_name}.") console.err.print(
f'Group [yellow]{group_name}[/yellow] not found in scene [yellow]{scene_name}[/yellow].'
)
raise typer.Exit(1) raise typer.Exit(1)
ctx.obj.set_scene_item_enabled( ctx.obj['obsws'].set_scene_item_enabled(
scene_name=scene_name, scene_name=scene_name,
item_id=int(group.get('sceneItemId')), item_id=int(group.get('sceneItemId')),
enabled=False, enabled=False,
) )
out_console.print(f"Group '{group_name}' is now hidden.") console.out.print(f'Group {console.highlight(ctx, group_name)} is now hidden.')
@app.command('toggle | tg') @app.command('toggle | tg')
@@ -151,25 +162,27 @@ def toggle(
): ):
"""Toggle a group in a scene.""" """Toggle a group in a scene."""
if not validate.scene_in_scenes(ctx, scene_name): if not validate.scene_in_scenes(ctx, scene_name):
err_console.print(f"Scene '{scene_name}' not found.") console.err.print(f'Scene [yellow]{scene_name}[/yellow] not found.')
raise typer.Exit(1) raise typer.Exit(1)
resp = ctx.obj.get_scene_item_list(scene_name) resp = ctx.obj['obsws'].get_scene_item_list(scene_name)
if (group := _get_group(group_name, resp)) is None: if (group := _get_group(group_name, resp)) is None:
err_console.print(f"Group '{group_name}' not found in scene {scene_name}.") console.err.print(
f'Group [yellow]{group_name}[/yellow] not found in scene [yellow]{scene_name}[/yellow].'
)
raise typer.Exit(1) raise typer.Exit(1)
new_state = not group.get('sceneItemEnabled') new_state = not group.get('sceneItemEnabled')
ctx.obj.set_scene_item_enabled( ctx.obj['obsws'].set_scene_item_enabled(
scene_name=scene_name, scene_name=scene_name,
item_id=int(group.get('sceneItemId')), item_id=int(group.get('sceneItemId')),
enabled=new_state, enabled=new_state,
) )
if new_state: if new_state:
out_console.print(f"Group '{group_name}' is now visible.") console.out.print(f'Group {console.highlight(ctx, group_name)} is now visible.')
else: else:
out_console.print(f"Group '{group_name}' is now hidden.") console.out.print(f'Group {console.highlight(ctx, group_name)} is now hidden.')
@app.command('status | ss') @app.command('status | ss')
@@ -184,20 +197,22 @@ def status(
): ):
"""Get the status of a group in a scene.""" """Get the status of a group in a scene."""
if not validate.scene_in_scenes(ctx, scene_name): if not validate.scene_in_scenes(ctx, scene_name):
err_console.print(f"Scene '{scene_name}' not found.") console.err.print(f'Scene [yellow]{scene_name}[/yellow] not found.')
raise typer.Exit(1) raise typer.Exit(1)
resp = ctx.obj.get_scene_item_list(scene_name) resp = ctx.obj['obsws'].get_scene_item_list(scene_name)
if (group := _get_group(group_name, resp)) is None: if (group := _get_group(group_name, resp)) is None:
err_console.print(f"Group '{group_name}' not found in scene {scene_name}.") console.err.print(
f'Group [yellow]{group_name}[/yellow] not found in scene [yellow]{scene_name}[/yellow].'
)
raise typer.Exit(1) raise typer.Exit(1)
enabled = ctx.obj.get_scene_item_enabled( enabled = ctx.obj['obsws'].get_scene_item_enabled(
scene_name=scene_name, scene_name=scene_name,
item_id=int(group.get('sceneItemId')), item_id=int(group.get('sceneItemId')),
) )
if enabled.scene_item_enabled: if enabled.scene_item_enabled:
out_console.print(f"Group '{group_name}' is now visible.") console.out.print(f'Group {console.highlight(ctx, group_name)} is now visible.')
else: else:
out_console.print(f"Group '{group_name}' is now hidden.") console.out.print(f'Group {console.highlight(ctx, group_name)} is now hidden.')

View File

@@ -3,14 +3,13 @@
from typing import Annotated from typing import Annotated
import typer import typer
from rich.console import Console
from rich.table import Table from rich.table import Table
from rich.text import Text
from .alias import AliasGroup from . import console
from .alias import SubTyperAliasGroup
app = typer.Typer(cls=AliasGroup) app = typer.Typer(cls=SubTyperAliasGroup)
out_console = Console()
err_console = Console(stderr=True)
@app.callback() @app.callback()
@@ -23,15 +22,23 @@ def list_(
ctx: typer.Context, ctx: typer.Context,
): ):
"""List all hotkeys.""" """List all hotkeys."""
resp = ctx.obj.get_hotkey_list() resp = ctx.obj['obsws'].get_hotkey_list()
table = Table(title='Hotkeys', padding=(0, 2)) table = Table(
table.add_column('Hotkey Name', justify='left', style='cyan') title='Hotkeys',
padding=(0, 2),
border_style=ctx.obj['style'].border,
)
table.add_column(
Text('Hotkey Name', justify='center'),
justify='left',
style=ctx.obj['style'].column,
)
for hotkey in resp.hotkeys: for i, hotkey in enumerate(resp.hotkeys):
table.add_row(hotkey) table.add_row(hotkey, style='' if i % 2 == 0 else 'dim')
out_console.print(table) console.out.print(table)
@app.command('trigger | tr') @app.command('trigger | tr')
@@ -42,7 +49,7 @@ def trigger(
], ],
): ):
"""Trigger a hotkey by name.""" """Trigger a hotkey by name."""
ctx.obj.trigger_hotkey_by_name(hotkey) ctx.obj['obsws'].trigger_hotkey_by_name(hotkey)
@app.command('trigger-sequence | trs') @app.command('trigger-sequence | trs')
@@ -70,4 +77,4 @@ def trigger_sequence(
] = False, ] = False,
): ):
"""Trigger a hotkey by sequence.""" """Trigger a hotkey by sequence."""
ctx.obj.trigger_hotkey_by_key_sequence(key_id, shift, ctrl, alt, cmd) ctx.obj['obsws'].trigger_hotkey_by_key_sequence(key_id, shift, ctrl, alt, cmd)

View File

@@ -2,16 +2,15 @@
from typing import Annotated from typing import Annotated
import obsws_python as obsws
import typer import typer
from rich.console import Console
from rich.table import Table from rich.table import Table
from rich.text import Text
from . import util, validate from . import console, util, validate
from .alias import AliasGroup from .alias import SubTyperAliasGroup
app = typer.Typer(cls=AliasGroup) app = typer.Typer(cls=SubTyperAliasGroup)
out_console = Console()
err_console = Console(stderr=True)
@app.callback() @app.callback()
@@ -25,9 +24,12 @@ def list_(
input: Annotated[bool, typer.Option(help='Filter by input type.')] = False, input: Annotated[bool, typer.Option(help='Filter by input type.')] = False,
output: Annotated[bool, typer.Option(help='Filter by output type.')] = False, output: Annotated[bool, typer.Option(help='Filter by output type.')] = False,
colour: Annotated[bool, typer.Option(help='Filter by colour source type.')] = False, colour: Annotated[bool, typer.Option(help='Filter by colour source type.')] = False,
ffmpeg: Annotated[bool, typer.Option(help='Filter by ffmpeg source type.')] = False,
vlc: Annotated[bool, typer.Option(help='Filter by VLC source type.')] = False,
uuid: Annotated[bool, typer.Option(help='Show UUIDs of inputs.')] = False,
): ):
"""List all inputs.""" """List all inputs."""
resp = ctx.obj.get_input_list() resp = ctx.obj['obsws'].get_input_list()
kinds = [] kinds = []
if input: if input:
@@ -36,34 +38,71 @@ def list_(
kinds.append('output') kinds.append('output')
if colour: if colour:
kinds.append('color') kinds.append('color')
if not any([input, output, colour]): if ffmpeg:
kinds = ['input', 'output', 'color'] kinds.append('ffmpeg')
if vlc:
kinds.append('vlc')
if not any([input, output, colour, ffmpeg, vlc]):
kinds = ctx.obj['obsws'].get_input_kind_list(False).input_kinds
inputs = [ inputs = sorted(
(input_.get('inputName'), input_.get('inputKind')) (
(input_.get('inputName'), input_.get('inputKind'), input_.get('inputUuid'))
for input_ in filter( for input_ in filter(
lambda input_: any(kind in input_.get('inputKind') for kind in kinds), lambda input_: any(kind in input_.get('inputKind') for kind in kinds),
resp.inputs, resp.inputs,
) )
] ),
key=lambda x: x[0], # Sort by input name
if not inputs:
out_console.print('No inputs found.')
raise typer.Exit()
table = Table(title='Inputs', padding=(0, 2))
for column in ('Input Name', 'Kind'):
table.add_column(
column, justify='left' if column == 'Input Name' else 'center', style='cyan'
) )
for input_name, input_kind in inputs: if not inputs:
console.out.print('No inputs found.')
raise typer.Exit()
table = Table(title='Inputs', padding=(0, 2), border_style=ctx.obj['style'].border)
if uuid:
columns = [
(Text('Input Name', justify='center'), 'left', ctx.obj['style'].column),
(Text('Kind', justify='center'), 'center', ctx.obj['style'].column),
(Text('Muted', justify='center'), 'center', None),
(Text('UUID', justify='center'), 'left', ctx.obj['style'].column),
]
else:
columns = [
(Text('Input Name', justify='center'), 'left', ctx.obj['style'].column),
(Text('Kind', justify='center'), 'center', ctx.obj['style'].column),
(Text('Muted', justify='center'), 'center', None),
]
for column, justify, style in columns:
table.add_column(column, justify=justify, style=style)
for input_name, input_kind, input_uuid in inputs:
input_mark = ''
try:
input_muted = ctx.obj['obsws'].get_input_mute(name=input_name).input_muted
input_mark = util.check_mark(input_muted)
except obsws.error.OBSSDKRequestError as e:
if e.code == 604: # Input does not support audio
input_mark = 'N/A'
else:
raise
if uuid:
table.add_row( table.add_row(
input_name, input_name,
util.snakecase_to_titlecase(input_kind), util.snakecase_to_titlecase(input_kind),
input_mark,
input_uuid,
)
else:
table.add_row(
input_name,
util.snakecase_to_titlecase(input_kind),
input_mark,
) )
out_console.print(table) console.out.print(table)
@app.command('mute | m') @app.command('mute | m')
@@ -75,15 +114,15 @@ def mute(
): ):
"""Mute an input.""" """Mute an input."""
if not validate.input_in_inputs(ctx, input_name): if not validate.input_in_inputs(ctx, input_name):
err_console.print(f"Input '{input_name}' not found.") console.err.print(f'Input [yellow]{input_name}[/yellow] not found.')
raise typer.Exit(1) raise typer.Exit(1)
ctx.obj.set_input_mute( ctx.obj['obsws'].set_input_mute(
name=input_name, name=input_name,
muted=True, muted=True,
) )
out_console.print(f"Input '{input_name}' muted.") console.out.print(f'Input {console.highlight(ctx, input_name)} muted.')
@app.command('unmute | um') @app.command('unmute | um')
@@ -96,15 +135,15 @@ def unmute(
): ):
"""Unmute an input.""" """Unmute an input."""
if not validate.input_in_inputs(ctx, input_name): if not validate.input_in_inputs(ctx, input_name):
err_console.print(f"Input '{input_name}' not found.") console.err.print(f'Input [yellow]{input_name}[/yellow] not found.')
raise typer.Exit(1) raise typer.Exit(1)
ctx.obj.set_input_mute( ctx.obj['obsws'].set_input_mute(
name=input_name, name=input_name,
muted=False, muted=False,
) )
out_console.print(f"Input '{input_name}' unmuted.") console.out.print(f'Input {console.highlight(ctx, input_name)} unmuted.')
@app.command('toggle | tg') @app.command('toggle | tg')
@@ -117,17 +156,22 @@ def toggle(
): ):
"""Toggle an input.""" """Toggle an input."""
if not validate.input_in_inputs(ctx, input_name): if not validate.input_in_inputs(ctx, input_name):
err_console.print(f"Input '{input_name}' not found.") console.err.print(f'Input [yellow]{input_name}[/yellow] not found.')
raise typer.Exit(1) raise typer.Exit(1)
resp = ctx.obj.get_input_mute(name=input_name) resp = ctx.obj['obsws'].get_input_mute(name=input_name)
new_state = not resp.input_muted new_state = not resp.input_muted
ctx.obj.set_input_mute( ctx.obj['obsws'].set_input_mute(
name=input_name, name=input_name,
muted=new_state, muted=new_state,
) )
out_console.print( if new_state:
f"Input '{input_name}' {'muted' if new_state else 'unmuted'}.", console.out.print(
f'Input {console.highlight(ctx, input_name)} muted.',
)
else:
console.out.print(
f'Input {console.highlight(ctx, input_name)} unmuted.',
) )

View File

@@ -3,15 +3,13 @@
from typing import Annotated from typing import Annotated
import typer import typer
from rich.console import Console
from rich.table import Table from rich.table import Table
from rich.text import Text
from . import validate from . import console, util, validate
from .alias import AliasGroup from .alias import SubTyperAliasGroup
app = typer.Typer(cls=AliasGroup) app = typer.Typer(cls=SubTyperAliasGroup)
out_console = Console()
err_console = Console(stderr=True)
@app.callback() @app.callback()
@@ -22,30 +20,34 @@ def main():
@app.command('list | ls') @app.command('list | ls')
def list_(ctx: typer.Context): def list_(ctx: typer.Context):
"""List profiles.""" """List profiles."""
resp = ctx.obj.get_profile_list() resp = ctx.obj['obsws'].get_profile_list()
table = Table(title='Profiles', padding=(0, 2)) table = Table(
for column in ('Profile Name', 'Current'): title='Profiles', padding=(0, 2), border_style=ctx.obj['style'].border
table.add_column(
column,
justify='left' if column == 'Profile Name' else 'center',
style='cyan',
) )
columns = [
(Text('Profile Name', justify='center'), 'left', ctx.obj['style'].column),
(Text('Current', justify='center'), 'center', None),
]
for column, justify, style in columns:
table.add_column(column, justify=justify, style=style)
for profile in resp.profiles: for profile in resp.profiles:
table.add_row( table.add_row(
profile, profile,
':white_heavy_check_mark:' if profile == resp.current_profile_name else '', util.check_mark(profile == resp.current_profile_name, empty_if_false=True),
) )
out_console.print(table) console.out.print(table)
@app.command('current | get') @app.command('current | get')
def current(ctx: typer.Context): def current(ctx: typer.Context):
"""Get the current profile.""" """Get the current profile."""
resp = ctx.obj.get_profile_list() resp = ctx.obj['obsws'].get_profile_list()
out_console.print(resp.current_profile_name) console.out.print(
f'Current profile: {console.highlight(ctx, resp.current_profile_name)}'
)
@app.command('switch | set') @app.command('switch | set')
@@ -60,16 +62,18 @@ def switch(
): ):
"""Switch to a profile.""" """Switch to a profile."""
if not validate.profile_exists(ctx, profile_name): if not validate.profile_exists(ctx, profile_name):
err_console.print(f"Profile '{profile_name}' not found.") console.err.print(f'Profile [yellow]{profile_name}[/yellow] not found.')
raise typer.Exit(1) raise typer.Exit(1)
resp = ctx.obj.get_profile_list() resp = ctx.obj['obsws'].get_profile_list()
if resp.current_profile_name == profile_name: if resp.current_profile_name == profile_name:
err_console.print(f"Profile '{profile_name}' is already the current profile.") console.err.print(
f'Profile [yellow]{profile_name}[/yellow] is already the current profile.'
)
raise typer.Exit(1) raise typer.Exit(1)
ctx.obj.set_current_profile(profile_name) ctx.obj['obsws'].set_current_profile(profile_name)
out_console.print(f"Switched to profile '{profile_name}'.") console.out.print(f'Switched to profile {console.highlight(ctx, profile_name)}.')
@app.command('create | new') @app.command('create | new')
@@ -82,11 +86,11 @@ def create(
): ):
"""Create a new profile.""" """Create a new profile."""
if validate.profile_exists(ctx, profile_name): if validate.profile_exists(ctx, profile_name):
err_console.print(f"Profile '{profile_name}' already exists.") console.err.print(f'Profile [yellow]{profile_name}[/yellow] already exists.')
raise typer.Exit(1) raise typer.Exit(1)
ctx.obj.create_profile(profile_name) ctx.obj['obsws'].create_profile(profile_name)
out_console.print(f"Created profile '{profile_name}'.") console.out.print(f'Created profile {console.highlight(ctx, profile_name)}.')
@app.command('remove | rm') @app.command('remove | rm')
@@ -99,8 +103,8 @@ def remove(
): ):
"""Remove a profile.""" """Remove a profile."""
if not validate.profile_exists(ctx, profile_name): if not validate.profile_exists(ctx, profile_name):
err_console.print(f"Profile '{profile_name}' not found.") console.err.print(f'Profile [yellow]{profile_name}[/yellow] not found.')
raise typer.Exit(1) raise typer.Exit(1)
ctx.obj.remove_profile(profile_name) ctx.obj['obsws'].remove_profile(profile_name)
out_console.print(f"Removed profile '{profile_name}'.") console.out.print(f'Removed profile {console.highlight(ctx, profile_name)}.')

View File

@@ -3,14 +3,13 @@
from typing import Annotated from typing import Annotated
import typer import typer
from rich.console import Console
from rich.table import Table from rich.table import Table
from rich.text import Text
from .alias import AliasGroup from . import console
from .alias import SubTyperAliasGroup
app = typer.Typer(cls=AliasGroup) app = typer.Typer(cls=SubTyperAliasGroup)
out_console = Console()
err_console = Console(stderr=True)
@app.callback() @app.callback()
@@ -21,10 +20,10 @@ def main():
@app.command('list-monitors | ls-m') @app.command('list-monitors | ls-m')
def list_monitors(ctx: typer.Context): def list_monitors(ctx: typer.Context):
"""List available monitors.""" """List available monitors."""
resp = ctx.obj.get_monitor_list() resp = ctx.obj['obsws'].get_monitor_list()
if not resp.monitors: if not resp.monitors:
out_console.print('No monitors found.') console.out.print('No monitors found.')
return return
monitors = sorted( monitors = sorted(
@@ -32,14 +31,22 @@ def list_monitors(ctx: typer.Context):
key=lambda m: m[0], key=lambda m: m[0],
) )
table = Table(title='Available Monitors', padding=(0, 2)) table = Table(
table.add_column('Index', justify='center', style='cyan') title='Available Monitors',
table.add_column('Name', style='cyan') padding=(0, 2),
border_style=ctx.obj['style'].border,
)
table.add_column(
Text('Index', justify='center'), justify='center', style=ctx.obj['style'].column
)
table.add_column(
Text('Name', justify='center'), justify='left', style=ctx.obj['style'].column
)
for index, monitor in monitors: for index, monitor in monitors:
table.add_row(str(index), monitor) table.add_row(str(index), monitor)
out_console.print(table) console.out.print(table)
@app.command('open | o') @app.command('open | o')
@@ -59,13 +66,24 @@ def open(
): ):
"""Open a fullscreen projector for a source on a specific monitor.""" """Open a fullscreen projector for a source on a specific monitor."""
if not source_name: if not source_name:
source_name = ctx.obj.get_current_program_scene().scene_name source_name = ctx.obj['obsws'].get_current_program_scene().scene_name
ctx.obj.open_source_projector( monitors = ctx.obj['obsws'].get_monitor_list().monitors
for monitor in monitors:
if monitor['monitorIndex'] == monitor_index:
ctx.obj['obsws'].open_source_projector(
source_name=source_name, source_name=source_name,
monitor_index=monitor_index, monitor_index=monitor_index,
) )
out_console.print( console.out.print(
f'Opened projector for source [bold]{source_name}[/] on monitor [bold]{monitor_index}[/].' f'Opened projector for source {console.highlight(ctx, source_name)} on monitor {console.highlight(ctx, monitor["monitorName"])}.'
) )
break
else:
console.err.print(
f'Monitor with index [yellow]{monitor_index}[/yellow] not found. '
f'Use [yellow]obsws-cli projector ls-m[/yellow] to see available monitors.'
)
raise typer.Exit(code=1)

View File

@@ -4,13 +4,11 @@ from pathlib import Path
from typing import Annotated, Optional from typing import Annotated, Optional
import typer import typer
from rich.console import Console
from .alias import AliasGroup from . import console
from .alias import SubTyperAliasGroup
app = typer.Typer(cls=AliasGroup) app = typer.Typer(cls=SubTyperAliasGroup)
out_console = Console()
err_console = Console(stderr=True)
@app.callback() @app.callback()
@@ -20,7 +18,7 @@ def main():
def _get_recording_status(ctx: typer.Context) -> tuple: def _get_recording_status(ctx: typer.Context) -> tuple:
"""Get recording status.""" """Get recording status."""
resp = ctx.obj.get_record_status() resp = ctx.obj['obsws'].get_record_status()
return resp.output_active, resp.output_paused return resp.output_active, resp.output_paused
@@ -33,11 +31,11 @@ def start(ctx: typer.Context):
if paused: if paused:
err_msg += ' Try resuming it.' err_msg += ' Try resuming it.'
err_console.print(err_msg) console.err.print(err_msg)
raise typer.Exit(1) raise typer.Exit(1)
ctx.obj.start_record() ctx.obj['obsws'].start_record()
out_console.print('Recording started successfully.') console.out.print('Recording started successfully.')
@app.command('stop | st') @app.command('stop | st')
@@ -45,21 +43,23 @@ def stop(ctx: typer.Context):
"""Stop recording.""" """Stop recording."""
active, _ = _get_recording_status(ctx) active, _ = _get_recording_status(ctx)
if not active: if not active:
err_console.print('Recording is not in progress, cannot stop.') console.err.print('Recording is not in progress, cannot stop.')
raise typer.Exit(1) raise typer.Exit(1)
resp = ctx.obj.stop_record() resp = ctx.obj['obsws'].stop_record()
out_console.print(f'Recording stopped successfully. Saved to: {resp.output_path}') console.out.print(
f'Recording stopped successfully. Saved to: {console.highlight(ctx, resp.output_path)}'
)
@app.command('toggle | tg') @app.command('toggle | tg')
def toggle(ctx: typer.Context): def toggle(ctx: typer.Context):
"""Toggle recording.""" """Toggle recording."""
resp = ctx.obj.toggle_record() resp = ctx.obj['obsws'].toggle_record()
if resp.output_active: if resp.output_active:
out_console.print('Recording started successfully.') console.out.print('Recording started successfully.')
else: else:
out_console.print('Recording stopped successfully.') console.out.print('Recording stopped successfully.')
@app.command('status | ss') @app.command('status | ss')
@@ -68,11 +68,11 @@ def status(ctx: typer.Context):
active, paused = _get_recording_status(ctx) active, paused = _get_recording_status(ctx)
if active: if active:
if paused: if paused:
out_console.print('Recording is in progress and paused.') console.out.print('Recording is in progress and paused.')
else: else:
out_console.print('Recording is in progress.') console.out.print('Recording is in progress.')
else: else:
out_console.print('Recording is not in progress.') console.out.print('Recording is not in progress.')
@app.command('resume | r') @app.command('resume | r')
@@ -80,14 +80,14 @@ def resume(ctx: typer.Context):
"""Resume recording.""" """Resume recording."""
active, paused = _get_recording_status(ctx) active, paused = _get_recording_status(ctx)
if not active: if not active:
err_console.print('Recording is not in progress, cannot resume.') console.err.print('Recording is not in progress, cannot resume.')
raise typer.Exit(1) raise typer.Exit(1)
if not paused: if not paused:
err_console.print('Recording is in progress but not paused, cannot resume.') console.err.print('Recording is in progress but not paused, cannot resume.')
raise typer.Exit(1) raise typer.Exit(1)
ctx.obj.resume_record() ctx.obj['obsws'].resume_record()
out_console.print('Recording resumed successfully.') console.out.print('Recording resumed successfully.')
@app.command('pause | p') @app.command('pause | p')
@@ -95,14 +95,14 @@ def pause(ctx: typer.Context):
"""Pause recording.""" """Pause recording."""
active, paused = _get_recording_status(ctx) active, paused = _get_recording_status(ctx)
if not active: if not active:
err_console.print('Recording is not in progress, cannot pause.') console.err.print('Recording is not in progress, cannot pause.')
raise typer.Exit(1) raise typer.Exit(1)
if paused: if paused:
err_console.print('Recording is in progress but already paused, cannot pause.') console.err.print('Recording is in progress but already paused, cannot pause.')
raise typer.Exit(1) raise typer.Exit(1)
ctx.obj.pause_record() ctx.obj['obsws'].pause_record()
out_console.print('Recording paused successfully.') console.out.print('Recording paused successfully.')
@app.command('directory | d') @app.command('directory | d')
@@ -121,9 +121,12 @@ def directory(
): ):
"""Get or set the recording directory.""" """Get or set the recording directory."""
if record_directory is not None: if record_directory is not None:
ctx.obj.set_record_directory(str(record_directory)) ctx.obj['obsws'].set_record_directory(str(record_directory))
out_console.print(f'Recording directory updated to: {record_directory}') console.out.print(
else: f'Recording directory updated to: {console.highlight(ctx, record_directory)}'
out_console.print( )
f'Recording directory: {ctx.obj.get_record_directory().record_directory}' else:
resp = ctx.obj['obsws'].get_record_directory()
console.out.print(
f'Recording directory: {console.highlight(ctx, resp.record_directory)}'
) )

View File

@@ -1,13 +1,11 @@
"""module containing commands for manipulating the replay buffer in OBS.""" """module containing commands for manipulating the replay buffer in OBS."""
import typer import typer
from rich.console import Console
from .alias import AliasGroup from . import console
from .alias import SubTyperAliasGroup
app = typer.Typer(cls=AliasGroup) app = typer.Typer(cls=SubTyperAliasGroup)
out_console = Console()
err_console = Console(stderr=True)
@app.callback() @app.callback()
@@ -18,49 +16,49 @@ def main():
@app.command('start | s') @app.command('start | s')
def start(ctx: typer.Context): def start(ctx: typer.Context):
"""Start the replay buffer.""" """Start the replay buffer."""
resp = ctx.obj.get_replay_buffer_status() resp = ctx.obj['obsws'].get_replay_buffer_status()
if resp.output_active: if resp.output_active:
err_console.print('Replay buffer is already active.') console.err.print('Replay buffer is already active.')
raise typer.Exit(1) raise typer.Exit(1)
ctx.obj.start_replay_buffer() ctx.obj['obsws'].start_replay_buffer()
out_console.print('Replay buffer started.') console.out.print('Replay buffer started.')
@app.command('stop | st') @app.command('stop | st')
def stop(ctx: typer.Context): def stop(ctx: typer.Context):
"""Stop the replay buffer.""" """Stop the replay buffer."""
resp = ctx.obj.get_replay_buffer_status() resp = ctx.obj['obsws'].get_replay_buffer_status()
if not resp.output_active: if not resp.output_active:
err_console.print('Replay buffer is not active.') console.err.print('Replay buffer is not active.')
raise typer.Exit(1) raise typer.Exit(1)
ctx.obj.stop_replay_buffer() ctx.obj['obsws'].stop_replay_buffer()
out_console.print('Replay buffer stopped.') console.out.print('Replay buffer stopped.')
@app.command('toggle | tg') @app.command('toggle | tg')
def toggle(ctx: typer.Context): def toggle(ctx: typer.Context):
"""Toggle the replay buffer.""" """Toggle the replay buffer."""
resp = ctx.obj.toggle_replay_buffer() resp = ctx.obj['obsws'].toggle_replay_buffer()
if resp.output_active: if resp.output_active:
out_console.print('Replay buffer is active.') console.out.print('Replay buffer is active.')
else: else:
out_console.print('Replay buffer is not active.') console.out.print('Replay buffer is not active.')
@app.command('status | ss') @app.command('status | ss')
def status(ctx: typer.Context): def status(ctx: typer.Context):
"""Get the status of the replay buffer.""" """Get the status of the replay buffer."""
resp = ctx.obj.get_replay_buffer_status() resp = ctx.obj['obsws'].get_replay_buffer_status()
if resp.output_active: if resp.output_active:
out_console.print('Replay buffer is active.') console.out.print('Replay buffer is active.')
else: else:
out_console.print('Replay buffer is not active.') console.out.print('Replay buffer is not active.')
@app.command('save | sv') @app.command('save | sv')
def save(ctx: typer.Context): def save(ctx: typer.Context):
"""Save the replay buffer.""" """Save the replay buffer."""
ctx.obj.save_replay_buffer() ctx.obj['obsws'].save_replay_buffer()
out_console.print('Replay buffer saved.') console.out.print('Replay buffer saved.')

View File

@@ -3,15 +3,13 @@
from typing import Annotated from typing import Annotated
import typer import typer
from rich.console import Console
from rich.table import Table from rich.table import Table
from rich.text import Text
from . import validate from . import console, util, validate
from .alias import AliasGroup from .alias import SubTyperAliasGroup
app = typer.Typer(cls=AliasGroup) app = typer.Typer(cls=SubTyperAliasGroup)
out_console = Console()
err_console = Console(stderr=True)
@app.callback() @app.callback()
@@ -20,25 +18,48 @@ def main():
@app.command('list | ls') @app.command('list | ls')
def list_(ctx: typer.Context): def list_(
ctx: typer.Context,
uuid: Annotated[bool, typer.Option(help='Show UUIDs of scenes')] = False,
):
"""List all scenes.""" """List all scenes."""
resp = ctx.obj.get_scene_list() resp = ctx.obj['obsws'].get_scene_list()
scenes = ( scenes = (
(scene.get('sceneName'), scene.get('sceneUuid')) (scene.get('sceneName'), scene.get('sceneUuid'))
for scene in reversed(resp.scenes) for scene in reversed(resp.scenes)
) )
table = Table(title='Scenes', padding=(0, 2)) active_scene = ctx.obj['obsws'].get_current_program_scene().scene_name
for column in ('Scene Name', 'UUID'):
table.add_column(column, justify='left', style='cyan') table = Table(title='Scenes', padding=(0, 2), border_style=ctx.obj['style'].border)
if uuid:
columns = [
(Text('Scene Name', justify='center'), 'left', ctx.obj['style'].column),
(Text('Active', justify='center'), 'center', None),
(Text('UUID', justify='center'), 'left', ctx.obj['style'].column),
]
else:
columns = [
(Text('Scene Name', justify='center'), 'left', ctx.obj['style'].column),
(Text('Active', justify='center'), 'center', None),
]
for column, justify, style in columns:
table.add_column(column, justify=justify, style=style)
for scene_name, scene_uuid in scenes: for scene_name, scene_uuid in scenes:
if uuid:
table.add_row( table.add_row(
scene_name, scene_name,
util.check_mark(scene_name == active_scene, empty_if_false=True),
scene_uuid, scene_uuid,
) )
else:
table.add_row(
scene_name,
util.check_mark(scene_name == active_scene, empty_if_false=True),
)
out_console.print(table) console.out.print(table)
@app.command('current | get') @app.command('current | get')
@@ -50,15 +71,19 @@ def current(
): ):
"""Get the current program scene or preview scene.""" """Get the current program scene or preview scene."""
if preview and not validate.studio_mode_enabled(ctx): if preview and not validate.studio_mode_enabled(ctx):
err_console.print('Studio mode is not enabled, cannot get preview scene.') console.err.print('Studio mode is not enabled, cannot get preview scene.')
raise typer.Exit(1) raise typer.Exit(1)
if preview: if preview:
resp = ctx.obj.get_current_preview_scene() resp = ctx.obj['obsws'].get_current_preview_scene()
out_console.print(resp.current_preview_scene_name) console.out.print(
f'Current Preview Scene: {console.highlight(ctx, resp.current_preview_scene_name)}'
)
else: else:
resp = ctx.obj.get_current_program_scene() resp = ctx.obj['obsws'].get_current_program_scene()
out_console.print(resp.current_program_scene_name) console.out.print(
f'Current Program Scene: {console.highlight(ctx, resp.current_program_scene_name)}'
)
@app.command('switch | set') @app.command('switch | set')
@@ -74,16 +99,16 @@ def switch(
): ):
"""Switch to a scene.""" """Switch to a scene."""
if preview and not validate.studio_mode_enabled(ctx): if preview and not validate.studio_mode_enabled(ctx):
err_console.print('Studio mode is not enabled, cannot set the preview scene.') console.err.print('Studio mode is not enabled, cannot set the preview scene.')
raise typer.Exit(1) raise typer.Exit(1)
if not validate.scene_in_scenes(ctx, scene_name): if not validate.scene_in_scenes(ctx, scene_name):
err_console.print(f"Scene '{scene_name}' not found.") console.err.print(f'Scene [yellow]{scene_name}[/yellow] not found.')
raise typer.Exit(1) raise typer.Exit(1)
if preview: if preview:
ctx.obj.set_current_preview_scene(scene_name) ctx.obj['obsws'].set_current_preview_scene(scene_name)
out_console.print(f'Switched to preview scene: {scene_name}') console.out.print(f'Switched to preview scene: [green]{scene_name}[/green]')
else: else:
ctx.obj.set_current_program_scene(scene_name) ctx.obj['obsws'].set_current_program_scene(scene_name)
out_console.print(f'Switched to program scene: {scene_name}') console.out.print(f'Switched to program scene: [green]{scene_name}[/green]')

View File

@@ -3,15 +3,12 @@
from typing import Annotated from typing import Annotated
import typer import typer
from rich.console import Console
from rich.table import Table from rich.table import Table
from . import validate from . import console, validate
from .alias import AliasGroup from .alias import SubTyperAliasGroup
app = typer.Typer(cls=AliasGroup) app = typer.Typer(cls=SubTyperAliasGroup)
out_console = Console()
err_console = Console(stderr=True)
@app.callback() @app.callback()
@@ -22,22 +19,30 @@ def main():
@app.command('list | ls') @app.command('list | ls')
def list_(ctx: typer.Context): def list_(ctx: typer.Context):
"""List all scene collections.""" """List all scene collections."""
resp = ctx.obj.get_scene_collection_list() resp = ctx.obj['obsws'].get_scene_collection_list()
table = Table(title='Scene Collections', padding=(0, 2)) table = Table(
table.add_column('Scene Collection Name', justify='left', style='cyan') title='Scene Collections',
padding=(0, 2),
border_style=ctx.obj['style'].border,
)
table.add_column(
'Scene Collection Name', justify='left', style=ctx.obj['style'].column
)
for scene_collection_name in resp.scene_collections: for scene_collection_name in resp.scene_collections:
table.add_row(scene_collection_name) table.add_row(scene_collection_name)
out_console.print(table) console.out.print(table)
@app.command('current | get') @app.command('current | get')
def current(ctx: typer.Context): def current(ctx: typer.Context):
"""Get the current scene collection.""" """Get the current scene collection."""
resp = ctx.obj.get_scene_collection_list() resp = ctx.obj['obsws'].get_scene_collection_list()
out_console.print(resp.current_scene_collection_name) console.out.print(
f'Current scene collection: {console.highlight(ctx, resp.current_scene_collection_name)}'
)
@app.command('switch | set') @app.command('switch | set')
@@ -49,20 +54,24 @@ def switch(
): ):
"""Switch to a scene collection.""" """Switch to a scene collection."""
if not validate.scene_collection_in_scene_collections(ctx, scene_collection_name): if not validate.scene_collection_in_scene_collections(ctx, scene_collection_name):
err_console.print(f"Scene collection '{scene_collection_name}' not found.") console.err.print(
f'Scene collection [yellow]{scene_collection_name}[/yellow] not found.'
)
raise typer.Exit(1) raise typer.Exit(1)
current_scene_collection = ( current_scene_collection = (
ctx.obj.get_scene_collection_list().current_scene_collection_name ctx.obj['obsws'].get_scene_collection_list().current_scene_collection_name
) )
if scene_collection_name == current_scene_collection: if scene_collection_name == current_scene_collection:
err_console.print( console.err.print(
f'Scene collection "{scene_collection_name}" is already active.' f'Scene collection [yellow]{scene_collection_name}[/yellow] is already active.'
) )
raise typer.Exit(1) raise typer.Exit(1)
ctx.obj.set_current_scene_collection(scene_collection_name) ctx.obj['obsws'].set_current_scene_collection(scene_collection_name)
out_console.print(f"Switched to scene collection '{scene_collection_name}'") console.out.print(
f'Switched to scene collection {console.highlight(ctx, scene_collection_name)}.'
)
@app.command('create | new') @app.command('create | new')
@@ -74,8 +83,12 @@ def create(
): ):
"""Create a new scene collection.""" """Create a new scene collection."""
if validate.scene_collection_in_scene_collections(ctx, scene_collection_name): if validate.scene_collection_in_scene_collections(ctx, scene_collection_name):
err_console.print(f"Scene collection '{scene_collection_name}' already exists.") console.err.print(
f'Scene collection [yellow]{scene_collection_name}[/yellow] already exists.'
)
raise typer.Exit(1) raise typer.Exit(1)
ctx.obj.create_scene_collection(scene_collection_name) ctx.obj['obsws'].create_scene_collection(scene_collection_name)
out_console.print(f'Created scene collection {scene_collection_name}') console.out.print(
f'Created scene collection {console.highlight(ctx, scene_collection_name)}.'
)

View File

@@ -1,19 +1,14 @@
"""module containing commands for manipulating items in scenes.""" """module containing commands for manipulating items in scenes."""
from collections.abc import Callable
from typing import Annotated, Optional from typing import Annotated, Optional
import obsws_python as obsws
import typer import typer
from rich.console import Console
from rich.table import Table from rich.table import Table
from . import validate from . import console, util, validate
from .alias import AliasGroup from .alias import SubTyperAliasGroup
app = typer.Typer(cls=AliasGroup) app = typer.Typer(cls=SubTyperAliasGroup)
out_console = Console()
err_console = Console(stderr=True)
@app.callback() @app.callback()
@@ -31,16 +26,17 @@ def list_(
help='Scene name to list items for', help='Scene name to list items for',
), ),
] = None, ] = None,
uuid: Annotated[bool, typer.Option(help='Show UUIDs of scene items')] = False,
): ):
"""List all items in a scene.""" """List all items in a scene."""
if not scene_name: if not scene_name:
scene_name = ctx.obj.get_current_program_scene().scene_name scene_name = ctx.obj['obsws'].get_current_program_scene().scene_name
if not validate.scene_in_scenes(ctx, scene_name): if not validate.scene_in_scenes(ctx, scene_name):
err_console.print(f"Scene '{scene_name}' not found.") console.err.print(f'Scene [yellow]{scene_name}[/yellow] not found.')
raise typer.Exit(1) raise typer.Exit(1)
resp = ctx.obj.get_scene_item_list(scene_name) resp = ctx.obj['obsws'].get_scene_item_list(scene_name)
items = sorted( items = sorted(
( (
( (
@@ -48,6 +44,7 @@ def list_(
item.get('sourceName'), item.get('sourceName'),
item.get('isGroup'), item.get('isGroup'),
item.get('sceneItemEnabled'), item.get('sceneItemEnabled'),
item.get('sourceUuid', 'N/A'), # Include source UUID
) )
for item in resp.scene_items for item in resp.scene_items
), ),
@@ -55,80 +52,118 @@ def list_(
) )
if not items: if not items:
out_console.print(f"No items found in scene '{scene_name}'.") console.out.print(
f'No items found in scene {console.highlight(ctx, scene_name)}.'
)
raise typer.Exit() raise typer.Exit()
table = Table(title=f'Items in Scene: {scene_name}', padding=(0, 2)) table = Table(
for column in ('Item ID', 'Item Name', 'In Group', 'Enabled'): title=f'Items in Scene: {scene_name}',
table.add_column( padding=(0, 2),
column, justify='left' if column == 'Item Name' else 'center', style='cyan' border_style=ctx.obj['style'].border,
) )
if uuid:
columns = [
('Item ID', 'center', ctx.obj['style'].column),
('Item Name', 'left', ctx.obj['style'].column),
('In Group', 'left', ctx.obj['style'].column),
('Enabled', 'center', None),
('UUID', 'left', ctx.obj['style'].column),
]
else:
columns = [
('Item ID', 'center', ctx.obj['style'].column),
('Item Name', 'left', ctx.obj['style'].column),
('In Group', 'left', ctx.obj['style'].column),
('Enabled', 'center', None),
]
# Add columns to the table
for column, justify, style in columns:
table.add_column(column, justify=justify, style=style)
for item_id, item_name, is_group, is_enabled in items: for item_id, item_name, is_group, is_enabled, source_uuid in items:
if is_group: if is_group:
resp = ctx.obj.get_group_scene_item_list(item_name) resp = ctx.obj['obsws'].get_group_scene_item_list(item_name)
group_items = sorted( group_items = sorted(
( (
( (
gi.get('sceneItemId'), gi.get('sceneItemId'),
gi.get('sourceName'), gi.get('sourceName'),
gi.get('sceneItemEnabled'), gi.get('sceneItemEnabled'),
gi.get('sourceUuid', 'N/A'), # Include source UUID
) )
for gi in resp.scene_items for gi in resp.scene_items
), ),
key=lambda x: x[0], # Sort by sceneItemId key=lambda x: x[0], # Sort by sceneItemId
) )
for group_item_id, group_item_name, group_item_enabled in group_items: for (
group_item_id,
group_item_name,
group_item_enabled,
group_item_source_uuid,
) in group_items:
if uuid:
table.add_row( table.add_row(
str(group_item_id), str(group_item_id),
group_item_name, group_item_name,
item_name, item_name,
':white_heavy_check_mark:' util.check_mark(is_enabled and group_item_enabled),
if is_enabled and group_item_enabled group_item_source_uuid,
else ':x:', )
else:
table.add_row(
str(group_item_id),
group_item_name,
item_name,
util.check_mark(is_enabled and group_item_enabled),
)
else:
if uuid:
table.add_row(
str(item_id),
item_name,
'',
util.check_mark(is_enabled),
source_uuid,
) )
else: else:
table.add_row( table.add_row(
str(item_id), str(item_id),
item_name, item_name,
'', '',
':white_heavy_check_mark:' if is_enabled else ':x:', util.check_mark(is_enabled),
) )
out_console.print(table) console.out.print(table)
def _validate_scene_name_and_item_name( def _validate_sources(
func: Callable,
):
"""Validate the scene name and item name."""
def wrapper(
ctx: typer.Context, ctx: typer.Context,
scene_name: str, scene_name: str,
item_name: str, item_name: str,
group: Optional[str] = None, group: Optional[str] = None,
): ) -> bool:
"""Validate the scene name and item name."""
if not validate.scene_in_scenes(ctx, scene_name): if not validate.scene_in_scenes(ctx, scene_name):
err_console.print(f"Scene '{scene_name}' not found.") console.err.print(f'Scene [yellow]{scene_name}[/yellow] not found.')
raise typer.Exit(1) return False
if group: if group:
if not validate.item_in_scene_item_list(ctx, scene_name, group): if not validate.item_in_scene_item_list(ctx, scene_name, group):
err_console.print( console.err.print(
f"Parent group '{group}' not found in scene '{scene_name}'." f'Group [yellow]{group}[/yellow] not found in scene [yellow]{scene_name}[/yellow].'
) )
raise typer.Exit(1) return False
else: else:
if not validate.item_in_scene_item_list(ctx, scene_name, item_name): if not validate.item_in_scene_item_list(ctx, scene_name, item_name):
err_console.print( console.err.print(
f"Item '{item_name}' not found in scene '{scene_name}'." f'Item [yellow]{item_name}[/yellow] not found in scene [yellow]{scene_name}[/yellow]. Is the item in a group? '
f'If so use the [yellow]--group[/yellow] option to specify the parent group.\n'
'Use [yellow]obsws-cli sceneitem ls[/yellow] for a list of items in the scene.'
) )
raise typer.Exit(1) return False
return func(ctx, scene_name, item_name, group) return True
return wrapper
def _get_scene_name_and_item_id( def _get_scene_name_and_item_id(
@@ -136,187 +171,38 @@ def _get_scene_name_and_item_id(
): ):
"""Get the scene name and item ID for the given scene and item name.""" """Get the scene name and item ID for the given scene and item name."""
if group: if group:
resp = ctx.obj.get_group_scene_item_list(group) resp = ctx.obj['obsws'].get_group_scene_item_list(group)
for item in resp.scene_items: for item in resp.scene_items:
if item.get('sourceName') == item_name: if item.get('sourceName') == item_name:
scene_name = group scene_name = group
scene_item_id = item.get('sceneItemId') scene_item_id = item.get('sceneItemId')
break break
else: else:
err_console.print(f"Item '{item_name}' not found in group '{group}'.") console.err.print(
raise typer.Exit(1) f'Item [yellow]{item_name}[/yellow] not found in group [yellow]{group}[/yellow].'
else:
try:
resp = ctx.obj.get_scene_item_id(scene_name, item_name)
except obsws.error.OBSSDKRequestError as e:
if e.code == 600:
err_console.print(
f"Item '{item_name}' not found in scene '{scene_name}'. Is the item in a group? "
'If so use the --group option to specify the parent group. '
'See `obsws-cli sceneitem list` for a list of items in the scene.'
) )
raise typer.Exit(1) raise typer.Exit(1)
else: else:
raise resp = ctx.obj['obsws'].get_scene_item_id(scene_name, item_name)
scene_item_id = resp.scene_item_id scene_item_id = resp.scene_item_id
return scene_name, scene_item_id return scene_name, scene_item_id
@_validate_scene_name_and_item_name
@app.command('show | sh') @app.command('show | sh')
def show( def show(
ctx: typer.Context, ctx: typer.Context,
scene_name: Annotated[str, typer.Argument(..., help='Scene name the item is in')], scene_name: Annotated[
str, typer.Argument(..., show_default=False, help='Scene name the item is in')
],
item_name: Annotated[ item_name: Annotated[
str, typer.Argument(..., help='Item name to show in the scene') str,
typer.Argument(..., show_default=False, help='Item name to show in the scene'),
], ],
group: Annotated[Optional[str], typer.Option(help='Parent group name')] = None, group: Annotated[Optional[str], typer.Option(help='Parent group name')] = None,
): ):
"""Show an item in a scene.""" """Show an item in a scene."""
scene_name, scene_item_id = _get_scene_name_and_item_id( if not _validate_sources(ctx, scene_name, item_name, group):
ctx, scene_name, item_name, group
)
ctx.obj.set_scene_item_enabled(
scene_name=scene_name,
item_id=int(scene_item_id),
enabled=True,
)
if group:
out_console.print(
f"Item '{item_name}' in group '{group}' in scene '{scene_name}' has been shown."
)
else:
# If not in a parent group, just show the scene name
# This is to avoid confusion with the parent group name
# which is not the same as the scene name
# and is not needed in this case
out_console.print(f"Item '{item_name}' in scene '{scene_name}' has been shown.")
@_validate_scene_name_and_item_name
@app.command('hide | h')
def hide(
ctx: typer.Context,
scene_name: Annotated[str, typer.Argument(..., help='Scene name the item is in')],
item_name: Annotated[
str, typer.Argument(..., help='Item name to hide in the scene')
],
group: Annotated[Optional[str], typer.Option(help='Parent group name')] = None,
):
"""Hide an item in a scene."""
scene_name, scene_item_id = _get_scene_name_and_item_id(
ctx, scene_name, item_name, group
)
ctx.obj.set_scene_item_enabled(
scene_name=scene_name,
item_id=int(scene_item_id),
enabled=False,
)
if group:
out_console.print(
f"Item '{item_name}' in group '{group}' in scene '{scene_name}' has been hidden."
)
else:
# If not in a parent group, just show the scene name
# This is to avoid confusion with the parent group name
# which is not the same as the scene name
# and is not needed in this case
out_console.print(
f"Item '{item_name}' in scene '{scene_name}' has been hidden."
)
@_validate_scene_name_and_item_name
@app.command('toggle | tg')
def toggle(
ctx: typer.Context,
scene_name: Annotated[str, typer.Argument(..., help='Scene name the item is in')],
item_name: Annotated[
str, typer.Argument(..., help='Item name to toggle in the scene')
],
group: Annotated[Optional[str], typer.Option(help='Parent group name')] = None,
):
"""Toggle an item in a scene."""
if not validate.scene_in_scenes(ctx, scene_name):
err_console.print(f"Scene '{scene_name}' not found.")
raise typer.Exit(1)
if group:
if not validate.item_in_scene_item_list(ctx, scene_name, group):
err_console.print(
f"Parent group '{group}' not found in scene '{scene_name}'."
)
raise typer.Exit(1)
else:
if not validate.item_in_scene_item_list(ctx, scene_name, item_name):
err_console.print(f"Item '{item_name}' not found in scene '{scene_name}'.")
raise typer.Exit(1)
scene_name, scene_item_id = _get_scene_name_and_item_id(
ctx, scene_name, item_name, group
)
enabled = ctx.obj.get_scene_item_enabled(
scene_name=scene_name,
item_id=int(scene_item_id),
)
new_state = not enabled.scene_item_enabled
ctx.obj.set_scene_item_enabled(
scene_name=scene_name,
item_id=int(scene_item_id),
enabled=new_state,
)
if group:
if new_state:
out_console.print(
f"Item '{item_name}' in group '{group}' in scene '{scene_name}' has been shown."
)
else:
out_console.print(
f"Item '{item_name}' in group '{group}' in scene '{scene_name}' has been hidden."
)
else:
# If not in a parent group, just show the scene name
# This is to avoid confusion with the parent group name
# which is not the same as the scene name
# and is not needed in this case
if new_state:
out_console.print(
f"Item '{item_name}' in scene '{scene_name}' has been shown."
)
else:
out_console.print(
f"Item '{item_name}' in scene '{scene_name}' has been hidden."
)
@_validate_scene_name_and_item_name
@app.command('visible | v')
def visible(
ctx: typer.Context,
scene_name: Annotated[str, typer.Argument(..., help='Scene name the item is in')],
item_name: Annotated[
str, typer.Argument(..., help='Item name to check visibility in the scene')
],
group: Annotated[Optional[str], typer.Option(help='Parent group name')] = None,
):
"""Check if an item in a scene is visible."""
if group:
if not validate.item_in_scene_item_list(ctx, scene_name, group):
err_console.print(
f"Parent group '{group}' not found in scene '{scene_name}'."
)
raise typer.Exit(1)
else:
if not validate.item_in_scene_item_list(ctx, scene_name, item_name):
err_console.print(f"Item '{item_name}' not found in scene '{scene_name}'.")
raise typer.Exit(1) raise typer.Exit(1)
old_scene_name = scene_name old_scene_name = scene_name
@@ -324,31 +210,185 @@ def visible(
ctx, scene_name, item_name, group ctx, scene_name, item_name, group
) )
enabled = ctx.obj.get_scene_item_enabled( ctx.obj['obsws'].set_scene_item_enabled(
scene_name=scene_name, scene_name=scene_name,
item_id=int(scene_item_id), item_id=int(scene_item_id),
enabled=True,
) )
if group: if group:
out_console.print( console.out.print(
f"Item '{item_name}' in group '{group}' in scene '{old_scene_name}' is currently {'visible' if enabled.scene_item_enabled else 'hidden'}." f'Item {console.highlight(ctx, item_name)} in group {console.highlight(ctx, group)} '
f'in scene {console.highlight(ctx, old_scene_name)} has been shown.'
) )
else: else:
# If not in a parent group, just show the scene name # If not in a parent group, just show the scene name
# This is to avoid confusion with the parent group name # This is to avoid confusion with the parent group name
# which is not the same as the scene name # which is not the same as the scene name
# and is not needed in this case # and is not needed in this case
out_console.print( console.out.print(
f"Item '{item_name}' in scene '{scene_name}' is currently {'visible' if enabled.scene_item_enabled else 'hidden'}." f'Item {console.highlight(ctx, item_name)} in scene {console.highlight(ctx, scene_name)} has been shown.'
)
@app.command('hide | h')
def hide(
ctx: typer.Context,
scene_name: Annotated[
str, typer.Argument(..., show_default=False, help='Scene name the item is in')
],
item_name: Annotated[
str,
typer.Argument(..., show_default=False, help='Item name to hide in the scene'),
],
group: Annotated[Optional[str], typer.Option(help='Parent group name')] = None,
):
"""Hide an item in a scene."""
if not _validate_sources(ctx, scene_name, item_name, group):
raise typer.Exit(1)
old_scene_name = scene_name
scene_name, scene_item_id = _get_scene_name_and_item_id(
ctx, scene_name, item_name, group
)
ctx.obj['obsws'].set_scene_item_enabled(
scene_name=scene_name,
item_id=int(scene_item_id),
enabled=False,
)
if group:
console.out.print(
f'Item {console.highlight(ctx, item_name)} in group {console.highlight(ctx, group)} in scene {console.highlight(ctx, old_scene_name)} has been hidden.'
)
else:
# If not in a parent group, just show the scene name
# This is to avoid confusion with the parent group name
# which is not the same as the scene name
# and is not needed in this case
console.out.print(
f'Item {console.highlight(ctx, item_name)} in scene {console.highlight(ctx, scene_name)} has been hidden.'
)
@app.command('toggle | tg')
def toggle(
ctx: typer.Context,
scene_name: Annotated[
str, typer.Argument(..., show_default=False, help='Scene name the item is in')
],
item_name: Annotated[
str,
typer.Argument(
..., show_default=False, help='Item name to toggle in the scene'
),
],
group: Annotated[Optional[str], typer.Option(help='Parent group name')] = None,
):
"""Toggle an item in a scene."""
if not _validate_sources(ctx, scene_name, item_name, group):
raise typer.Exit(1)
old_scene_name = scene_name
scene_name, scene_item_id = _get_scene_name_and_item_id(
ctx, scene_name, item_name, group
)
enabled = ctx.obj['obsws'].get_scene_item_enabled(
scene_name=scene_name,
item_id=int(scene_item_id),
)
new_state = not enabled.scene_item_enabled
ctx.obj['obsws'].set_scene_item_enabled(
scene_name=scene_name,
item_id=int(scene_item_id),
enabled=new_state,
)
if group:
if new_state:
console.out.print(
f'Item {console.highlight(ctx, item_name)} in group {console.highlight(ctx, group)} '
f'in scene {console.highlight(ctx, old_scene_name)} has been shown.'
)
else:
console.out.print(
f'Item {console.highlight(ctx, item_name)} in group {console.highlight(ctx, group)} '
f'in scene {console.highlight(ctx, old_scene_name)} has been hidden.'
)
else:
# If not in a parent group, just show the scene name
# This is to avoid confusion with the parent group name
# which is not the same as the scene name
# and is not needed in this case
if new_state:
console.out.print(
f'Item {console.highlight(ctx, item_name)} in scene {console.highlight(ctx, scene_name)} has been shown.'
)
else:
console.out.print(
f'Item {console.highlight(ctx, item_name)} in scene {console.highlight(ctx, scene_name)} has been hidden.'
)
@app.command('visible | v')
def visible(
ctx: typer.Context,
scene_name: Annotated[
str, typer.Argument(..., show_default=False, help='Scene name the item is in')
],
item_name: Annotated[
str,
typer.Argument(
..., show_default=False, help='Item name to check visibility in the scene'
),
],
group: Annotated[Optional[str], typer.Option(help='Parent group name')] = None,
):
"""Check if an item in a scene is visible."""
if not _validate_sources(ctx, scene_name, item_name, group):
raise typer.Exit(1)
old_scene_name = scene_name
scene_name, scene_item_id = _get_scene_name_and_item_id(
ctx, scene_name, item_name, group
)
enabled = ctx.obj['obsws'].get_scene_item_enabled(
scene_name=scene_name,
item_id=int(scene_item_id),
)
if group:
console.out.print(
f'Item {console.highlight(ctx, item_name)} in group {console.highlight(ctx, group)} '
f'in scene {console.highlight(ctx, old_scene_name)} is currently {"visible" if enabled.scene_item_enabled else "hidden"}.'
)
else:
# If not in a parent group, just show the scene name
# This is to avoid confusion with the parent group name
# which is not the same as the scene name
# and is not needed in this case
console.out.print(
f'Item {console.highlight(ctx, item_name)} in scene {console.highlight(ctx, scene_name)} '
f'is currently {"visible" if enabled.scene_item_enabled else "hidden"}.'
) )
@_validate_scene_name_and_item_name
@app.command('transform | t') @app.command('transform | t')
def transform( def transform(
ctx: typer.Context, ctx: typer.Context,
scene_name: str, scene_name: Annotated[
item_name: str, str, typer.Argument(..., show_default=False, help='Scene name the item is in')
],
item_name: Annotated[
str,
typer.Argument(
..., show_default=False, help='Item name to transform in the scene'
),
],
group: Annotated[Optional[str], typer.Option(help='Parent group name')] = None, group: Annotated[Optional[str], typer.Option(help='Parent group name')] = None,
alignment: Annotated[ alignment: Annotated[
Optional[int], typer.Option(help='Alignment of the item in the scene') Optional[int], typer.Option(help='Alignment of the item in the scene')
@@ -397,15 +437,7 @@ def transform(
] = None, ] = None,
): ):
"""Set the transform of an item in a scene.""" """Set the transform of an item in a scene."""
if group: if not _validate_sources(ctx, scene_name, item_name, group):
if not validate.item_in_scene_item_list(ctx, scene_name, group):
err_console.print(
f"Parent group '{group}' not found in scene '{scene_name}'."
)
raise typer.Exit(1)
else:
if not validate.item_in_scene_item_list(ctx, scene_name, item_name):
err_console.print(f"Item '{item_name}' not found in scene '{scene_name}'.")
raise typer.Exit(1) raise typer.Exit(1)
old_scene_name = scene_name old_scene_name = scene_name
@@ -446,24 +478,25 @@ def transform(
transform['scaleY'] = scale_y transform['scaleY'] = scale_y
if not transform: if not transform:
err_console.print('No transform options provided.') console.err.print('No transform options provided.')
raise typer.Exit(1) raise typer.Exit(1)
transform = ctx.obj.set_scene_item_transform( transform = ctx.obj['obsws'].set_scene_item_transform(
scene_name=scene_name, scene_name=scene_name,
item_id=int(scene_item_id), item_id=int(scene_item_id),
transform=transform, transform=transform,
) )
if group: if group:
out_console.print( console.out.print(
f"Item '{item_name}' in group '{group}' in scene '{old_scene_name}' has been transformed." f'Item {console.highlight(ctx, item_name)} in group {console.highlight(ctx, group)} '
f'in scene {console.highlight(ctx, old_scene_name)} has been transformed.'
) )
else: else:
# If not in a parent group, just show the scene name # If not in a parent group, just show the scene name
# This is to avoid confusion with the parent group name # This is to avoid confusion with the parent group name
# which is not the same as the scene name # which is not the same as the scene name
# and is not needed in this case # and is not needed in this case
out_console.print( console.out.print(
f"Item '{item_name}' in scene '{scene_name}' has been transformed." f'Item {console.highlight(ctx, item_name)} in scene {console.highlight(ctx, scene_name)} has been transformed.'
) )

View File

@@ -5,15 +5,11 @@ from typing import Annotated
import obsws_python as obsws import obsws_python as obsws
import typer import typer
from rich.console import Console
from .alias import AliasGroup from . import console
from .alias import SubTyperAliasGroup
app = typer.Typer(cls=AliasGroup) app = typer.Typer(cls=SubTyperAliasGroup)
out_console = Console()
err_console = Console(
stderr=True,
)
@app.callback() @app.callback()
@@ -67,7 +63,7 @@ def save(
): ):
"""Take a screenshot and save it to a file.""" """Take a screenshot and save it to a file."""
try: try:
ctx.obj.save_source_screenshot( ctx.obj['obsws'].save_source_screenshot(
name=source_name, name=source_name,
img_format=output_path.suffix.lstrip('.').lower(), img_format=output_path.suffix.lstrip('.').lower(),
file_path=str(output_path), file_path=str(output_path),
@@ -78,15 +74,17 @@ def save(
except obsws.error.OBSSDKRequestError as e: except obsws.error.OBSSDKRequestError as e:
match e.code: match e.code:
case 403: case 403:
err_console.print( console.err.print(
'The image format (file extension) must be included in the file name ' 'The [yellow]image format[/yellow] (file extension) must be included in the file name, '
"for example: '/path/to/screenshot.png'.", "for example: '/path/to/screenshot.png'.",
) )
raise typer.Exit(1) raise typer.Exit(1)
case 600: case 600:
err_console.print(f"No source was found by the name of '{source_name}'") console.err.print(
f'No source was found by the name of [yellow]{source_name}[/yellow]'
)
raise typer.Exit(1) raise typer.Exit(1)
case _: case _:
raise raise
out_console.print(f"Screenshot saved to [bold]'{output_path}'[/bold].") console.out.print(f'Screenshot saved to {console.highlight(ctx, output_path)}.')

View File

@@ -22,6 +22,8 @@ class Settings(UserDict):
""" """
PREFIX = 'OBS_'
def __init__(self, *args, **kwargs): def __init__(self, *args, **kwargs):
"""Initialize the Settings object.""" """Initialize the Settings object."""
kwargs.update( kwargs.update(
@@ -34,19 +36,27 @@ class Settings(UserDict):
def __getitem__(self, key: str) -> SettingsValue: def __getitem__(self, key: str) -> SettingsValue:
"""Get a setting value by key.""" """Get a setting value by key."""
if not key.startswith('OBS_'): key = key.upper()
key = f'OBS_{key}' if not key.startswith(Settings.PREFIX):
return self.data[key.upper()] key = f'{Settings.PREFIX}{key}'
return self.data[key]
def __setitem__(self, key: str, value: SettingsValue): def __setitem__(self, key: str, value: SettingsValue):
"""Set a setting value by key.""" """Set a setting value by key."""
if not key.startswith('OBS_'): key = key.upper()
key = f'OBS_{key}' if not key.startswith(Settings.PREFIX):
self.data[key.upper()] = value key = f'{Settings.PREFIX}{key}'
self.data[key] = value
_settings = Settings( _settings = Settings(
OBS_HOST='localhost', OBS_PORT=4455, OBS_PASSWORD='', OBS_TIMEOUT=5 OBS_HOST='localhost',
OBS_PORT=4455,
OBS_PASSWORD='',
OBS_TIMEOUT=5,
OBS_DEBUG=False,
OBS_STYLE='disabled',
OBS_STYLE_NO_BORDER=False,
) )

View File

@@ -1,13 +1,11 @@
"""module for controlling OBS stream functionality.""" """module for controlling OBS stream functionality."""
import typer import typer
from rich.console import Console
from .alias import AliasGroup from . import console
from .alias import SubTyperAliasGroup
app = typer.Typer(cls=AliasGroup) app = typer.Typer(cls=SubTyperAliasGroup)
out_console = Console()
err_console = Console(stderr=True)
@app.callback() @app.callback()
@@ -17,7 +15,7 @@ def main():
def _get_streaming_status(ctx: typer.Context) -> tuple: def _get_streaming_status(ctx: typer.Context) -> tuple:
"""Get streaming status.""" """Get streaming status."""
resp = ctx.obj.get_stream_status() resp = ctx.obj['obsws'].get_stream_status()
return resp.output_active, resp.output_duration return resp.output_active, resp.output_duration
@@ -26,11 +24,11 @@ def start(ctx: typer.Context):
"""Start streaming.""" """Start streaming."""
active, _ = _get_streaming_status(ctx) active, _ = _get_streaming_status(ctx)
if active: if active:
err_console.print('Streaming is already in progress, cannot start.') console.err.print('Streaming is already in progress, cannot start.')
raise typer.Exit(1) raise typer.Exit(1)
ctx.obj.start_stream() ctx.obj['obsws'].start_stream()
out_console.print('Streaming started successfully.') console.out.print('Streaming started successfully.')
@app.command('stop | st') @app.command('stop | st')
@@ -38,21 +36,21 @@ def stop(ctx: typer.Context):
"""Stop streaming.""" """Stop streaming."""
active, _ = _get_streaming_status(ctx) active, _ = _get_streaming_status(ctx)
if not active: if not active:
err_console.print('Streaming is not in progress, cannot stop.') console.err.print('Streaming is not in progress, cannot stop.')
raise typer.Exit(1) raise typer.Exit(1)
ctx.obj.stop_stream() ctx.obj['obsws'].stop_stream()
out_console.print('Streaming stopped successfully.') console.out.print('Streaming stopped successfully.')
@app.command('toggle | tg') @app.command('toggle | tg')
def toggle(ctx: typer.Context): def toggle(ctx: typer.Context):
"""Toggle streaming.""" """Toggle streaming."""
resp = ctx.obj.toggle_stream() resp = ctx.obj['obsws'].toggle_stream()
if resp.output_active: if resp.output_active:
out_console.print('Streaming started successfully.') console.out.print('Streaming started successfully.')
else: else:
out_console.print('Streaming stopped successfully.') console.out.print('Streaming stopped successfully.')
@app.command('status | ss') @app.command('status | ss')
@@ -65,19 +63,19 @@ def status(ctx: typer.Context):
minutes = int(seconds // 60) minutes = int(seconds // 60)
seconds = int(seconds % 60) seconds = int(seconds % 60)
if minutes > 0: if minutes > 0:
out_console.print( console.out.print(
f'Streaming is in progress for {minutes} minutes and {seconds} seconds.' f'Streaming is in progress for {minutes} minutes and {seconds} seconds.'
) )
else: else:
if seconds > 0: if seconds > 0:
out_console.print( console.out.print(
f'Streaming is in progress for {seconds} seconds.' f'Streaming is in progress for {seconds} seconds.'
) )
else: else:
out_console.print( console.out.print(
'Streaming is in progress for less than a second.' 'Streaming is in progress for less than a second.'
) )
else: else:
out_console.print('Streaming is in progress.') console.out.print('Streaming is in progress.')
else: else:
out_console.print('Streaming is not in progress.') console.out.print('Streaming is not in progress.')

View File

@@ -1,13 +1,11 @@
"""module containing commands for manipulating studio mode in OBS.""" """module containing commands for manipulating studio mode in OBS."""
import typer import typer
from rich.console import Console
from .alias import AliasGroup from . import console
from .alias import SubTyperAliasGroup
app = typer.Typer(cls=AliasGroup) app = typer.Typer(cls=SubTyperAliasGroup)
out_console = Console()
err_console = Console(stderr=True)
@app.callback() @app.callback()
@@ -18,34 +16,34 @@ def main():
@app.command('enable | on') @app.command('enable | on')
def enable(ctx: typer.Context): def enable(ctx: typer.Context):
"""Enable studio mode.""" """Enable studio mode."""
ctx.obj.set_studio_mode_enabled(True) ctx.obj['obsws'].set_studio_mode_enabled(True)
out_console.print('Studio mode has been enabled.') console.out.print('Studio mode has been enabled.')
@app.command('disable | off') @app.command('disable | off')
def disable(ctx: typer.Context): def disable(ctx: typer.Context):
"""Disable studio mode.""" """Disable studio mode."""
ctx.obj.set_studio_mode_enabled(False) ctx.obj['obsws'].set_studio_mode_enabled(False)
out_console.print('Studio mode has been disabled.') console.out.print('Studio mode has been disabled.')
@app.command('toggle | tg') @app.command('toggle | tg')
def toggle(ctx: typer.Context): def toggle(ctx: typer.Context):
"""Toggle studio mode.""" """Toggle studio mode."""
resp = ctx.obj.get_studio_mode_enabled() resp = ctx.obj['obsws'].get_studio_mode_enabled()
if resp.studio_mode_enabled: if resp.studio_mode_enabled:
ctx.obj.set_studio_mode_enabled(False) ctx.obj['obsws'].set_studio_mode_enabled(False)
out_console.print('Studio mode is now disabled.') console.out.print('Studio mode is now disabled.')
else: else:
ctx.obj.set_studio_mode_enabled(True) ctx.obj['obsws'].set_studio_mode_enabled(True)
out_console.print('Studio mode is now enabled.') console.out.print('Studio mode is now enabled.')
@app.command('status | ss') @app.command('status | ss')
def status(ctx: typer.Context): def status(ctx: typer.Context):
"""Get the status of studio mode.""" """Get the status of studio mode."""
resp = ctx.obj.get_studio_mode_enabled() resp = ctx.obj['obsws'].get_studio_mode_enabled()
if resp.studio_mode_enabled: if resp.studio_mode_enabled:
out_console.print('Studio mode is enabled.') console.out.print('Studio mode is enabled.')
else: else:
out_console.print('Studio mode is disabled.') console.out.print('Studio mode is disabled.')

191
obsws_cli/styles.py Normal file
View File

@@ -0,0 +1,191 @@
"""module containing styles for the OBS WebSocket CLI."""
import os
from dataclasses import dataclass
_registry = {}
def register_style(cls):
"""Register a style class."""
key = cls.__name__.lower()
if key in _registry:
raise ValueError(f'Style {key} is already registered.')
_registry[key] = cls
return cls
@dataclass
class Style:
"""Base class for styles."""
name: str = 'no_colour'
description: str = 'Style disabled'
border: str | None = None
column: str | None = None
highlight: str | None = None
no_border: bool = False
def __post_init__(self):
"""Post-initialization to set default values and normalize the name."""
self.name = self.name.lower()
if self.no_border:
self.border = None
def __str__(self):
"""Return a string representation of the style."""
return f'{self.name} - {self.description}'
@register_style
@dataclass
class Red(Style):
"""Red style."""
name: str = 'red'
description: str = 'Red text color'
border: str = 'red3'
highlight: str = 'red1'
column: str = 'red1'
@register_style
@dataclass
class Magenta(Style):
"""Magenta style."""
name: str = 'magenta'
description: str = 'Magenta text color'
border: str = 'magenta3'
highlight: str = 'orchid1'
column: str = 'orchid1'
@register_style
@dataclass
class Purple(Style):
"""Purple style."""
name: str = 'purple'
description: str = 'Purple text color'
border: str = 'medium_purple4'
highlight: str = 'medium_purple'
column: str = 'medium_purple'
@register_style
@dataclass
class Blue(Style):
"""Blue style."""
name: str = 'blue'
description: str = 'Blue text color'
border: str = 'cornflower_blue'
highlight: str = 'sky_blue2'
column: str = 'sky_blue2'
@register_style
@dataclass
class Cyan(Style):
"""Cyan style."""
name: str = 'cyan'
description: str = 'Cyan text color'
border: str = 'dark_cyan'
highlight: str = 'cyan'
column: str = 'cyan'
@register_style
@dataclass
class Green(Style):
"""Green style."""
name: str = 'green'
description: str = 'Green text color'
border: str = 'green4'
highlight: str = 'spring_green3'
column: str = 'spring_green3'
@register_style
@dataclass
class Yellow(Style):
"""Yellow style."""
name: str = 'yellow'
description: str = 'Yellow text color'
border: str = 'yellow3'
highlight: str = 'wheat1'
column: str = 'wheat1'
@register_style
@dataclass
class Orange(Style):
"""Orange style."""
name: str = 'orange'
description: str = 'Orange text color'
border: str = 'dark_orange'
highlight: str = 'orange1'
column: str = 'orange1'
@register_style
@dataclass
class White(Style):
"""White style."""
name: str = 'white'
description: str = 'White text color'
border: str = 'grey82'
highlight: str = 'grey100'
column: str = 'grey100'
@register_style
@dataclass
class Grey(Style):
"""Grey style."""
name: str = 'grey'
description: str = 'Grey text color'
border: str = 'grey50'
highlight: str = 'grey70'
column: str = 'grey70'
@register_style
@dataclass
class Navy(Style):
"""Navy Blue style."""
name: str = 'navyblue'
description: str = 'Navy Blue text color'
border: str = 'deep_sky_blue4'
highlight: str = 'light_sky_blue3'
column: str = 'light_sky_blue3'
@register_style
@dataclass
class Black(Style):
"""Black style."""
name: str = 'black'
description: str = 'Black text color'
border: str = 'grey19'
column: str = 'grey11'
def request_style_obj(style_name: str, no_border: bool) -> Style:
"""Entry point for style objects. Returns a Style object based on the style name."""
style_name = str(style_name).lower() # coerce the type to string and lowercase it
if style_name not in _registry:
os.environ['NO_COLOR'] = '1' # Disable colour output
return Style()
return _registry[style_name](no_border=no_border)

View File

@@ -1,6 +1,18 @@
"""module contains utility functions for the obsws_cli package.""" """module contains utility functions for the obsws_cli package."""
import os
def snakecase_to_titlecase(snake_str):
def snakecase_to_titlecase(snake_str: str) -> str:
"""Convert a snake_case string to a title case string.""" """Convert a snake_case string to a title case string."""
return snake_str.replace('_', ' ').title() return snake_str.replace('_', ' ').title()
def check_mark(value: bool, empty_if_false: bool = False) -> str:
"""Return a check mark or cross mark based on the boolean value."""
if empty_if_false and not value:
return ''
if os.getenv('NO_COLOR', '') != '':
return '' if value else ''
return '' if value else ''

View File

@@ -8,19 +8,19 @@ skipped_option = typer.Option(parser=lambda _: _, hidden=True, expose_value=Fals
def input_in_inputs(ctx: typer.Context, input_name: str) -> bool: def input_in_inputs(ctx: typer.Context, input_name: str) -> bool:
"""Check if an input is in the input list.""" """Check if an input is in the input list."""
inputs = ctx.obj.get_input_list().inputs inputs = ctx.obj['obsws'].get_input_list().inputs
return any(input_.get('inputName') == input_name for input_ in inputs) return any(input_.get('inputName') == input_name for input_ in inputs)
def scene_in_scenes(ctx: typer.Context, scene_name: str) -> bool: def scene_in_scenes(ctx: typer.Context, scene_name: str) -> bool:
"""Check if a scene exists in the list of scenes.""" """Check if a scene exists in the list of scenes."""
resp = ctx.obj.get_scene_list() resp = ctx.obj['obsws'].get_scene_list()
return any(scene.get('sceneName') == scene_name for scene in resp.scenes) return any(scene.get('sceneName') == scene_name for scene in resp.scenes)
def studio_mode_enabled(ctx: typer.Context) -> bool: def studio_mode_enabled(ctx: typer.Context) -> bool:
"""Check if studio mode is enabled.""" """Check if studio mode is enabled."""
resp = ctx.obj.get_studio_mode_enabled() resp = ctx.obj['obsws'].get_studio_mode_enabled()
return resp.studio_mode_enabled return resp.studio_mode_enabled
@@ -28,7 +28,7 @@ def scene_collection_in_scene_collections(
ctx: typer.Context, scene_collection_name: str ctx: typer.Context, scene_collection_name: str
) -> bool: ) -> bool:
"""Check if a scene collection exists.""" """Check if a scene collection exists."""
resp = ctx.obj.get_scene_collection_list() resp = ctx.obj['obsws'].get_scene_collection_list()
return any( return any(
collection == scene_collection_name for collection in resp.scene_collections collection == scene_collection_name for collection in resp.scene_collections
) )
@@ -38,11 +38,17 @@ def item_in_scene_item_list(
ctx: typer.Context, scene_name: str, item_name: str ctx: typer.Context, scene_name: str, item_name: str
) -> bool: ) -> bool:
"""Check if an item exists in a scene.""" """Check if an item exists in a scene."""
resp = ctx.obj.get_scene_item_list(scene_name) resp = ctx.obj['obsws'].get_scene_item_list(scene_name)
return any(item.get('sourceName') == item_name for item in resp.scene_items) return any(item.get('sourceName') == item_name for item in resp.scene_items)
def profile_exists(ctx: typer.Context, profile_name: str) -> bool: def profile_exists(ctx: typer.Context, profile_name: str) -> bool:
"""Check if a profile exists.""" """Check if a profile exists."""
resp = ctx.obj.get_profile_list() resp = ctx.obj['obsws'].get_profile_list()
return any(profile == profile_name for profile in resp.profiles) return any(profile == profile_name for profile in resp.profiles)
def monitor_exists(ctx: typer.Context, monitor_index: int) -> bool:
"""Check if a monitor exists."""
resp = ctx.obj['obsws'].get_monitor_list()
return any(monitor['monitorIndex'] == monitor_index for monitor in resp.monitors)

View File

@@ -1,13 +1,11 @@
"""module containing commands for manipulating virtual camera in OBS.""" """module containing commands for manipulating virtual camera in OBS."""
import typer import typer
from rich.console import Console
from .alias import AliasGroup from . import console
from .alias import SubTyperAliasGroup
app = typer.Typer(cls=AliasGroup) app = typer.Typer(cls=SubTyperAliasGroup)
out_console = Console()
err_console = Console(stderr=True)
@app.callback() @app.callback()
@@ -18,32 +16,32 @@ def main():
@app.command('start | s') @app.command('start | s')
def start(ctx: typer.Context): def start(ctx: typer.Context):
"""Start the virtual camera.""" """Start the virtual camera."""
ctx.obj.start_virtual_cam() ctx.obj['obsws'].start_virtual_cam()
out_console.print('Virtual camera started.') console.out.print('Virtual camera started.')
@app.command('stop | p') @app.command('stop | p')
def stop(ctx: typer.Context): def stop(ctx: typer.Context):
"""Stop the virtual camera.""" """Stop the virtual camera."""
ctx.obj.stop_virtual_cam() ctx.obj['obsws'].stop_virtual_cam()
out_console.print('Virtual camera stopped.') console.out.print('Virtual camera stopped.')
@app.command('toggle | tg') @app.command('toggle | tg')
def toggle(ctx: typer.Context): def toggle(ctx: typer.Context):
"""Toggle the virtual camera.""" """Toggle the virtual camera."""
resp = ctx.obj.toggle_virtual_cam() resp = ctx.obj['obsws'].toggle_virtual_cam()
if resp.output_active: if resp.output_active:
out_console.print('Virtual camera is enabled.') console.out.print('Virtual camera is enabled.')
else: else:
out_console.print('Virtual camera is disabled.') console.out.print('Virtual camera is disabled.')
@app.command('status | ss') @app.command('status | ss')
def status(ctx: typer.Context): def status(ctx: typer.Context):
"""Get the status of the virtual camera.""" """Get the status of the virtual camera."""
resp = ctx.obj.get_virtual_cam_status() resp = ctx.obj['obsws'].get_virtual_cam_status()
if resp.output_active: if resp.output_active:
out_console.print('Virtual camera is enabled.') console.out.print('Virtual camera is enabled.')
else: else:
out_console.print('Virtual camera is disabled.') console.out.print('Virtual camera is disabled.')

View File

@@ -42,6 +42,9 @@ dependencies = ["click-man>=0.5.1"]
cli = "obsws-cli {args:}" cli = "obsws-cli {args:}"
man = "python man/generate.py --output=./man" man = "python man/generate.py --output=./man"
[tool.hatch.envs.lazyimports.scripts]
cli = "obsws-cli {args:}"
[tool.hatch.envs.hatch-test] [tool.hatch.envs.hatch-test]
randomize = true randomize = true

View File

@@ -27,4 +27,4 @@ def test_filter_list_invalid_source():
"""Test the filter list command with an invalid source.""" """Test the filter list command with an invalid source."""
result = runner.invoke(app, ['filter', 'list', 'invalid_source']) result = runner.invoke(app, ['filter', 'list', 'invalid_source'])
assert result.exit_code != 0 assert result.exit_code != 0
assert "No source was found by the name of 'invalid_source'" in result.stderr assert 'No source was found by the name of invalid_source' in result.stderr

View File

@@ -18,29 +18,29 @@ def test_group_show():
"""Test the group show command.""" """Test the group show command."""
result = runner.invoke(app, ['group', 'show', 'Scene', 'test_group']) result = runner.invoke(app, ['group', 'show', 'Scene', 'test_group'])
assert result.exit_code == 0 assert result.exit_code == 0
assert "Group 'test_group' is now visible." in result.stdout assert 'Group test_group is now visible.' in result.stdout
def test_group_toggle(): def test_group_toggle():
"""Test the group toggle command.""" """Test the group toggle command."""
result = runner.invoke(app, ['group', 'status', 'Scene', 'test_group']) result = runner.invoke(app, ['group', 'status', 'Scene', 'test_group'])
assert result.exit_code == 0 assert result.exit_code == 0
enabled = "Group 'test_group' is now visible." in result.stdout enabled = 'Group test_group is now visible.' in result.stdout
result = runner.invoke(app, ['group', 'toggle', 'Scene', 'test_group']) result = runner.invoke(app, ['group', 'toggle', 'Scene', 'test_group'])
assert result.exit_code == 0 assert result.exit_code == 0
if enabled: if enabled:
assert "Group 'test_group' is now hidden." in result.stdout assert 'Group test_group is now hidden.' in result.stdout
else: else:
assert "Group 'test_group' is now visible." in result.stdout assert 'Group test_group is now visible.' in result.stdout
def test_group_status(): def test_group_status():
"""Test the group status command.""" """Test the group status command."""
result = runner.invoke(app, ['group', 'show', 'Scene', 'test_group']) result = runner.invoke(app, ['group', 'show', 'Scene', 'test_group'])
assert result.exit_code == 0 assert result.exit_code == 0
assert "Group 'test_group' is now visible." in result.stdout assert 'Group test_group is now visible.' in result.stdout
result = runner.invoke(app, ['group', 'status', 'Scene', 'test_group']) result = runner.invoke(app, ['group', 'status', 'Scene', 'test_group'])
assert result.exit_code == 0 assert result.exit_code == 0
assert "Group 'test_group' is now visible." in result.stdout assert 'Group test_group is now visible.' in result.stdout

View File

@@ -36,3 +36,10 @@ def test_scene_switch():
result = runner.invoke(app, ['scene', 'switch', 'pytest_scene']) result = runner.invoke(app, ['scene', 'switch', 'pytest_scene'])
assert result.exit_code == 0 assert result.exit_code == 0
assert 'Switched to program scene: pytest_scene' in result.stdout assert 'Switched to program scene: pytest_scene' in result.stdout
def test_scene_switch_invalid():
"""Test the scene switch command with an invalid scene."""
result = runner.invoke(app, ['scene', 'switch', 'non_existent_scene'])
assert result.exit_code != 0
assert 'Scene non_existent_scene not found' in result.stderr

View File

@@ -29,6 +29,6 @@ def test_sceneitem_transform():
) )
assert result.exit_code == 0 assert result.exit_code == 0
assert ( assert (
"Item 'pytest_input_2' in scene 'pytest_scene' has been transformed" 'Item pytest_input_2 in scene pytest_scene has been transformed'
in result.stdout in result.stdout
) )