25 Commits

Author SHA1 Message Date
3b184c6531 upd dependencies 2025-07-24 04:21:58 +01:00
8c37ce1fc0 remove typer import 2025-07-24 04:21:48 +01:00
436e4d5345 remove alias, settings 2025-07-24 04:21:40 +01:00
2ef89be184 convert virtualcam commands 2025-07-24 04:09:49 +01:00
506aff833c convert text commands 2025-07-24 04:08:07 +01:00
eb939b735c convert studiomode commands 2025-07-24 04:05:01 +01:00
bb7a468dd5 convert stream commands 2025-07-24 03:59:37 +01:00
e77627b845 convert screenshot commands 2025-07-24 03:52:50 +01:00
93b066090b fix aliases 2025-07-24 03:49:31 +01:00
1ce832dfde convert sceneitem commands 2025-07-24 03:46:11 +01:00
e8664f0117 convert scenecollection commands 2025-07-24 02:34:15 +01:00
a3dff0f739 convert replaybuffer commands 2025-07-24 02:29:58 +01:00
6da9df5ceb convert record commands 2025-07-24 02:26:18 +01:00
75fc18273e convert projector commands 2025-07-24 02:20:17 +01:00
e658819719 convert profile commands 2025-07-24 02:14:38 +01:00
4451fbf22c conver input commands 2025-07-24 02:06:05 +01:00
132b283347 convert hotkey commands 2025-07-24 01:48:55 +01:00
ae8ff20cf4 convert group commands 2025-07-24 01:39:02 +01:00
de1c604c46 update the --help message
add descriptions for filter + scene command groups

Usage now after main CLI description
2025-07-24 01:38:42 +01:00
105aaf29b7 keep the exit codes simple (0 or 1) 2025-07-17 04:34:44 +01:00
eb34a1833f convert filter sub app 2025-07-17 04:29:21 +01:00
abbab5c746 raise OBSWSCLIError 2025-07-17 04:28:59 +01:00
f0eb518609 implement --version + debug validator
run() now handles exceptions/exit codes
2025-07-17 04:28:40 +01:00
032b957670 add custom error class + exit codes 2025-07-17 04:28:10 +01:00
8349a196e8 root meta app + scene sub_app converted
this could take a while...

note, --version flag not implemented
2025-07-17 03:18:04 +01:00
56 changed files with 1766 additions and 3048 deletions

View File

@@ -5,31 +5,6 @@ All notable changes to this project will be documented in this file.
The format is based on [Keep a Changelog](https://keepachangelog.com/en/1.0.0/),
and this project adheres to [Semantic Versioning](https://semver.org/spec/v2.0.0.html).
# [0.24.8] - 2026-02-07
### Changed
- --debug flag removed and replaced with --loglevel. See [Flags](https://github.com/onyx-and-iris/obsws-cli/tree/main?tab=readme-ov-file#flags). This gives the user more control over the level of logging. The default level has been set to WARNING.
### Fixed
- shell completion now works, see [Shell Completion](https://github.com/onyx-and-iris/obsws-cli/tree/main?tab=readme-ov-file#shell-completion). Unfortunately, command aliases in the help output are no longer present as it was breaking shell completion. However, the aliases do still work. See [issue #3](https://github.com/onyx-and-iris/obsws-cli/issues/3)
# [0.24.6] - 2026-01-26
### Changed
- environment variables should now be prefixed with 'OBSWS_CLI_', see [Environment Variables](https://github.com/onyx-and-iris/obsws-cli?tab=readme-ov-file#environment-variables)
# [0.24.0] - 2026-01-09
### Added
- new subcommands added to input, see [Input](https://github.com/onyx-and-iris/obsws-cli?tab=readme-ov-file#input)
- settings command group, see [Settings](https://github.com/onyx-and-iris/obsws-cli?tab=readme-ov-file#settings)
- media command group, see [Media](https://github.com/onyx-and-iris/obsws-cli?tab=readme-ov-file#media)
# [0.20.0] - 2025-07-14
### Added

161
README.md
View File

@@ -16,7 +16,6 @@ For an outline of past/future changes refer to: [CHANGELOG](CHANGELOG.md)
- [Configuration](#configuration)
- [Style](#style)
- [Commands](#root-typer)
- [Shell Completion](#shell-completion)
- [License](#license)
## Requirements
@@ -49,8 +48,6 @@ The CLI should now be discoverable as `obsws-cli`
- --password/-p: Websocket password
- --timeout/-T: Websocket timeout
- --version/-v: Print the obsws-cli version
- --loglevel/-l: Set the application's logging level
- One of *DEBUG, INFO, WARNING, ERROR, CRITICAL*
Pass `--host`, `--port` and `--password` as flags on the root command, for example:
@@ -65,10 +62,9 @@ Store and load environment variables from:
- `user home directory / .config / obsws-cli / obsws.env`
```env
OBSWS_CLI_HOST=localhost
OBSWS_CLI_PORT=4455
OBSWS_CLI_PASSWORD=<websocket password>
OBSWS_CLI_LOGLEVEL=DEBUG
OBS_HOST=localhost
OBS_PORT=4455
OBS_PASSWORD=<websocket password>
```
Flags can be used to override environment variables.
@@ -100,8 +96,8 @@ obsws-cli --style="cyan" --no-border sceneitem list
Or with environment variables:
```env
OBSWS_CLI_STYLE=cyan
OBSWS_CLI_STYLE_NO_BORDER=true
OBS_STYLE=cyan
OBS_STYLE_NO_BORDER=true
```
## Root Typer
@@ -302,20 +298,6 @@ obsws-cli group status START "test_group"
#### Input
- create: Create a new input.
- args: <input_name> <input_kind>
```console
obsws-cli input create 'stream mix' 'wasapi_input_capture'
```
- remove: Remove an input.
- args: <input_name>
```console
obsws-cli input remove 'stream mix'
```
- list: List all inputs.
- flags:
@@ -333,12 +315,6 @@ obsws-cli input list
obsws-cli input list --input --colour
```
- list-kinds: List all input kinds.
```console
obsws-cli input list-kinds
```
- mute: Mute an input.
- args: <input_name>
@@ -359,32 +335,6 @@ obsws-cli input unmute "Mic/Aux"
obsws-cli input toggle "Mic/Aux"
```
- volume: Set the volume of an input.
- args: <input_name> <volume>
```console
obsws-cli input volume -- 'Desktop Audio' -38.9
```
- show: Show information for an input in the current scene.
- args: <input_name>
- flags:
*optional*
- --verbose: List all available input devices.
```console
obsws-cli input show 'Mic/Aux' --verbose
```
- update: Name of the input to update.
- args: <input_name> <device_name>
```console
obsws-cli input update 'Mic/Aux' 'Voicemeeter Out B1 (VB-Audio Voicemeeter VAIO)'
```
#### Text
- current: Get the current text for a text input.
@@ -726,107 +676,6 @@ obsws-cli projector open --monitor-index=1 "test_group"
obsws-cli screenshot save --width=2560 --height=1440 "Scene" "C:\Users\me\Videos\screenshot.png"
```
#### Settings
- show: Show current OBS settings.
- flags:
*optional*
- --video: Show video settings.
- --record: Show recording settings.
- --profile: Show profile settings.
```console
obsws-cli settings show --video --record
```
- profile: Get/set OBS profile settings.
- args: <category> <name> <value>
```console
obsws-cli settings profile SimpleOutput VBitrate
obsws-cli settings profile SimpleOutput VBitrate 6000
```
- stream-service: Get/set OBS stream service settings.
- flags:
- --key: Stream key.
- --server: Stream server URL.
*optional*
- args: <type>
```console
obsws-cli settings stream-service
obsws-cli settings stream-service --key='live_xyzxyzxyzxyz' rtmp_common
```
- video: Get/set OBS video settings.
- flags:
*optional*
- --base-width: Base (canvas) width.
- --base-height: Base (canvas) height.
- --output-width: Output (scaled) width.
- --output-height: Output (scaled) height.
- --fps-num: Frames per second numerator.
- --fps-den: Frames per second denominator.
```console
obsws-cli settings video
obsws-cli settings video --base-width=1920 --base-height=1080
```
#### Media
- cursor: Get/set the cursor position of a media input.
- args: InputName
*optional*
- TimeString
```console
obsws-cli media cursor "Media"
obsws-cli media cursor "Media" "00:08:30"
```
- play: Plays a media input.
```console
obsws-cli media play "Media"
```
- pause: Pauses a media input.
```console
obsws-cli media pause "Media"
```
- stop: Stops a media input.
```console
obsws-cli media stop "Media"
```
- restart: Restarts a media input.
```console
obsws-cli media restart "Media"
```
## Shell Completion
```console
obsws-cli --install-completion
```
Currently supported shells: *bash* *zsh* *fish* *powershell*
## License
`obsws-cli` is distributed under the terms of the [MIT](https://spdx.org/licenses/MIT.html) license.

View File

@@ -1,4 +1,4 @@
# SPDX-FileCopyrightText: 2025-present onyx-and-iris <code@onyxandiris.online>
#
# SPDX-License-Identifier: MIT
__version__ = '0.24.8'
__version__ = '0.20.2'

View File

@@ -2,6 +2,6 @@
#
# SPDX-License-Identifier: MIT
from .app import app
from .app import run
__all__ = ['app']
__all__ = ['run']

View File

@@ -1,53 +0,0 @@
"""module defining a custom group class for handling command name aliases."""
import typer
class RootTyperAliasGroup(typer.core.TyperGroup):
"""A custom group class to handle command name aliases for the root typer."""
def __init__(self, *args, **kwargs):
"""Initialize the AliasGroup."""
super().__init__(*args, **kwargs)
self.no_args_is_help = True
def get_command(self, ctx, cmd_name):
"""Get a command by name."""
match cmd_name:
case 'f':
cmd_name = 'filter'
case 'g':
cmd_name = 'group'
case 'hk':
cmd_name = 'hotkey'
case 'i':
cmd_name = 'input'
case 'm':
cmd_name = 'media'
case 'prf':
cmd_name = 'profile'
case 'prj':
cmd_name = 'projector'
case 'rc':
cmd_name = 'record'
case 'rb':
cmd_name = 'replaybuffer'
case 'sc':
cmd_name = 'scene'
case 'scc':
cmd_name = 'scenecollection'
case 'si':
cmd_name = 'sceneitem'
case 'ss':
cmd_name = 'screenshot'
case 'set':
cmd_name = 'settings'
case 'st':
cmd_name = 'stream'
case 'sm':
cmd_name = 'studiomode'
case 't':
cmd_name = 'text'
case 'vc':
cmd_name = 'virtualcam'
return super().get_command(ctx, cmd_name)

View File

@@ -2,155 +2,131 @@
import importlib
import logging
import pkgutil
from typing import Annotated
from dataclasses import dataclass
from typing import Annotated, Any
import obsws_python as obsws
import typer
from cyclopts import App, Group, Parameter, config
from obsws_cli.__about__ import __version__ as version
from . import commands, console, envconfig, styles
from .alias import RootTyperAliasGroup
from . import console, styles
from .context import Context
from .error import OBSWSCLIError
app = typer.Typer(cls=RootTyperAliasGroup)
for importer, modname, ispkg in pkgutil.iter_modules(
commands.__path__, commands.__name__ + '.'
app = App(
config=config.Env(
'OBS_'
), # Environment variable prefix for configuration parameters
version=version,
usage='[bold][yellow]Usage:[/yellow] [white]obsws-cli [OPTIONS] COMMAND [ARGS]...[/white][/bold]',
)
app.meta.group_parameters = Group('Options', sort_key=0)
for sub_app in (
'filter',
'group',
'hotkey',
'input',
'profile',
'projector',
'record',
'replaybuffer',
'scene',
'scenecollection',
'sceneitem',
'screenshot',
'stream',
'studiomode',
'text',
'virtualcam',
):
subtyper = importlib.import_module(modname)
app.add_typer(subtyper.app, name=modname.split('.')[-1])
module = importlib.import_module(f'.{sub_app}', package=__package__)
app.command(module.app)
def version_callback(value: bool):
"""Show the version of the CLI."""
if value:
console.out.print(f'obsws-cli version: {version}')
raise typer.Exit()
@Parameter(name='*')
@dataclass
class OBSConfig:
"""Dataclass to hold OBS connection parameters."""
host: str = 'localhost'
port: int = 4455
password: str = ''
def setup_logging(loglevel: str):
@dataclass
class StyleConfig:
"""Dataclass to hold style parameters."""
name: str = 'disabled'
no_border: bool = False
def setup_logging(type_, value: Any):
"""Set up logging for the application."""
loglevel = loglevel.upper()
if loglevel not in ['DEBUG', 'INFO', 'WARNING', 'ERROR', 'CRITICAL']:
raise typer.BadParameter(
f'Invalid log level: {loglevel}. Choose from DEBUG, INFO, WARNING, ERROR, CRITICAL.'
)
log_level = logging.DEBUG if value else logging.CRITICAL
logging.basicConfig(
level=loglevel,
level=log_level,
format='%(asctime)s - %(name)s - %(levelname)s - %(message)s',
)
def validate_style(value: str):
"""Validate and return the style."""
if value not in styles.registry:
raise typer.BadParameter(
f'Invalid style: {value}. Available styles: {", ".join(styles.registry.keys())}'
)
return value
@app.callback()
def main(
ctx: typer.Context,
host: Annotated[
str,
typer.Option(
'--host',
'-H',
envvar='OBSWS_CLI_HOST',
help='WebSocket host',
show_default='localhost',
@app.meta.default
def launcher(
*tokens: Annotated[str, Parameter(show=False, allow_leading_hyphen=True)],
obs_config: OBSConfig = Annotated[
OBSConfig,
Parameter(
show=False, allow_leading_hyphen=True, help='OBS connection parameters'
),
] = envconfig.get('host'),
port: Annotated[
int,
typer.Option(
'--port',
'-P',
envvar='OBSWS_CLI_PORT',
help='WebSocket port',
show_default=4455,
),
] = envconfig.get('port'),
password: Annotated[
str,
typer.Option(
'--password',
'-p',
envvar='OBSWS_CLI_PASSWORD',
help='WebSocket password',
show_default=False,
),
] = envconfig.get('password'),
timeout: Annotated[
int,
typer.Option(
'--timeout',
'-T',
envvar='OBSWS_CLI_TIMEOUT',
help='WebSocket timeout',
show_default=5,
),
] = envconfig.get('timeout'),
style: Annotated[
str,
typer.Option(
'--style',
'-s',
envvar='OBSWS_CLI_STYLE',
help='Set the style for the CLI output',
show_default='disabled',
callback=validate_style,
),
] = envconfig.get('style'),
no_border: Annotated[
],
style_config: StyleConfig = Annotated[
StyleConfig,
Parameter(show=False, allow_leading_hyphen=True, help='Style parameters'),
],
debug: Annotated[
bool,
typer.Option(
'--no-border',
'-b',
envvar='OBSWS_CLI_STYLE_NO_BORDER',
help='Disable table border styling in the CLI output',
show_default=False,
),
] = envconfig.get('style_no_border'),
version: Annotated[
bool,
typer.Option(
'--version',
'-v',
is_eager=True,
help='Show the CLI version and exit',
show_default=False,
callback=version_callback,
),
Parameter(validator=setup_logging),
] = False,
loglevel: Annotated[
str,
typer.Option(
'--loglevel',
'-l',
envvar='OBSWS_CLI_LOGLEVEL',
is_eager=True,
help='Set the logging level',
show_default=False,
callback=setup_logging,
),
] = envconfig.get('loglevel'),
):
"""obsws_cli is a command line interface for the OBS WebSocket API."""
ctx.ensure_object(dict)
ctx.obj['obsws'] = ctx.with_resource(
obsws.ReqClient(host=host, port=port, password=password, timeout=timeout)
)
ctx.obj['style'] = styles.request_style_obj(style, no_border)
"""Command line interface for the OBS WebSocket API."""
with obsws.ReqClient(
host=obs_config.host,
port=obs_config.port,
password=obs_config.password,
) as client:
additional_kwargs = {}
command, bound, ignored = app.parse_args(tokens)
if 'ctx' in ignored:
# If 'ctx' is in ignored, it means it was not passed as an argument
# and we need to add it to the bound arguments.
additional_kwargs['ctx'] = ignored['ctx'](
client,
styles.request_style_obj(style_config.name, style_config.no_border),
)
return command(*bound.args, **bound.kwargs, **additional_kwargs)
@app.command()
def obs_version(ctx: typer.Context):
@app.command
def obs_version(
*,
ctx: Annotated[Context, Parameter(parse=False)],
):
"""Get the OBS Client and WebSocket versions."""
resp = ctx.obj['obsws'].get_version()
resp = ctx.client.get_version()
console.out.print(
f'OBS Client version: {console.highlight(ctx, resp.obs_version)}'
f' with WebSocket version: {console.highlight(ctx, resp.obs_web_socket_version)}'
)
def run():
"""Run the OBS WebSocket CLI application.
Handles exceptions and prints error messages to the console.
"""
try:
app.meta()
except OBSWSCLIError as e:
console.err.print(f'Error: {e}')
return e.code

View File

@@ -1,226 +0,0 @@
"""module containing commands for manipulating groups in scenes."""
from typing import Annotated, Optional
import typer
from rich.table import Table
from rich.text import Text
from obsws_cli import console, util, validate
from obsws_cli.protocols import DataclassProtocol
app = typer.Typer()
@app.callback()
def main():
"""Control groups in OBS scenes."""
@app.command('list')
@app.command('ls', hidden=True)
def list_(
ctx: typer.Context,
scene_name: Annotated[
Optional[str],
typer.Argument(
show_default='The current scene',
help='Scene name to list groups for',
callback=validate.scene_in_scenes,
),
] = None,
):
"""List groups in a scene."""
if scene_name is None:
scene_name = ctx.obj['obsws'].get_current_program_scene().scene_name
resp = ctx.obj['obsws'].get_scene_item_list(scene_name)
groups = [
(item.get('sceneItemId'), item.get('sourceName'), item.get('sceneItemEnabled'))
for item in resp.scene_items
if item.get('isGroup')
]
if not groups:
console.out.print(
f'No groups found in scene {console.highlight(ctx, scene_name)}.'
)
raise typer.Exit()
table = Table(
title=f'Groups in Scene: {scene_name}',
padding=(0, 2),
border_style=ctx.obj['style'].border,
)
columns = [
(Text('ID', justify='center'), 'center', ctx.obj['style'].column),
(Text('Group Name', justify='center'), 'left', ctx.obj['style'].column),
(Text('Enabled', justify='center'), 'center', None),
]
for heading, justify, style in columns:
table.add_column(heading, justify=justify, style=style)
for item_id, group_name, is_enabled in groups:
table.add_row(
str(item_id),
group_name,
util.check_mark(is_enabled),
)
console.out.print(table)
def _get_group(group_name: str, resp: DataclassProtocol) -> dict | None:
"""Get a group from the scene item list response."""
group = next(
(
item
for item in resp.scene_items
if item.get('sourceName') == group_name and item.get('isGroup')
),
None,
)
return group
@app.command('show')
@app.command('sh', hidden=True)
def show(
ctx: typer.Context,
scene_name: Annotated[
str,
typer.Argument(
...,
show_default=False,
help='Scene name the group is in',
callback=validate.scene_in_scenes,
),
],
group_name: Annotated[
str, typer.Argument(..., show_default=False, help='Group name to show')
],
):
"""Show a group in a scene."""
resp = ctx.obj['obsws'].get_scene_item_list(scene_name)
if (group := _get_group(group_name, resp)) is None:
console.err.print(
f'Group [yellow]{group_name}[/yellow] not found in scene [yellow]{scene_name}[/yellow].'
)
raise typer.Exit(1)
ctx.obj['obsws'].set_scene_item_enabled(
scene_name=scene_name,
item_id=int(group.get('sceneItemId')),
enabled=True,
)
console.out.print(f'Group {console.highlight(ctx, group_name)} is now visible.')
@app.command('hide')
@app.command('h', hidden=True)
def hide(
ctx: typer.Context,
scene_name: Annotated[
str,
typer.Argument(
...,
show_default=False,
help='Scene name the group is in',
callback=validate.scene_in_scenes,
),
],
group_name: Annotated[
str, typer.Argument(..., show_default=False, help='Group name to hide')
],
):
"""Hide a group in a scene."""
resp = ctx.obj['obsws'].get_scene_item_list(scene_name)
if (group := _get_group(group_name, resp)) is None:
console.err.print(
f'Group [yellow]{group_name}[/yellow] not found in scene [yellow]{scene_name}[/yellow].'
)
raise typer.Exit(1)
ctx.obj['obsws'].set_scene_item_enabled(
scene_name=scene_name,
item_id=int(group.get('sceneItemId')),
enabled=False,
)
console.out.print(f'Group {console.highlight(ctx, group_name)} is now hidden.')
@app.command('toggle')
@app.command('tg', hidden=True)
def toggle(
ctx: typer.Context,
scene_name: Annotated[
str,
typer.Argument(
...,
show_default=False,
help='Scene name the group is in',
callback=validate.scene_in_scenes,
),
],
group_name: Annotated[
str, typer.Argument(..., show_default=False, help='Group name to toggle')
],
):
"""Toggle a group in a scene."""
resp = ctx.obj['obsws'].get_scene_item_list(scene_name)
if (group := _get_group(group_name, resp)) is None:
console.err.print(
f'Group [yellow]{group_name}[/yellow] not found in scene [yellow]{scene_name}[/yellow].'
)
raise typer.Exit(1)
new_state = not group.get('sceneItemEnabled')
ctx.obj['obsws'].set_scene_item_enabled(
scene_name=scene_name,
item_id=int(group.get('sceneItemId')),
enabled=new_state,
)
if new_state:
console.out.print(f'Group {console.highlight(ctx, group_name)} is now visible.')
else:
console.out.print(f'Group {console.highlight(ctx, group_name)} is now hidden.')
@app.command('status')
@app.command('ss', hidden=True)
def status(
ctx: typer.Context,
scene_name: Annotated[
str,
typer.Argument(
...,
show_default=False,
help='Scene name the group is in',
callback=validate.scene_in_scenes,
),
],
group_name: Annotated[
str, typer.Argument(..., show_default=False, help='Group name to check status')
],
):
"""Get the status of a group in a scene."""
resp = ctx.obj['obsws'].get_scene_item_list(scene_name)
if (group := _get_group(group_name, resp)) is None:
console.err.print(
f'Group [yellow]{group_name}[/yellow] not found in scene [yellow]{scene_name}[/yellow].'
)
raise typer.Exit(1)
enabled = ctx.obj['obsws'].get_scene_item_enabled(
scene_name=scene_name,
item_id=int(group.get('sceneItemId')),
)
if enabled.scene_item_enabled:
console.out.print(f'Group {console.highlight(ctx, group_name)} is now visible.')
else:
console.out.print(f'Group {console.highlight(ctx, group_name)} is now hidden.')

View File

@@ -1,86 +0,0 @@
"""module containing commands for hotkey management."""
from typing import Annotated
import typer
from rich.table import Table
from rich.text import Text
from obsws_cli import console
app = typer.Typer()
@app.callback()
def main():
"""Control hotkeys in OBS."""
@app.command('list')
@app.command('ls', hidden=True)
def list_(
ctx: typer.Context,
):
"""List all hotkeys."""
resp = ctx.obj['obsws'].get_hotkey_list()
if not resp.hotkeys:
console.out.print('No hotkeys found.')
raise typer.Exit()
table = Table(
title='Hotkeys',
padding=(0, 2),
border_style=ctx.obj['style'].border,
)
table.add_column(
Text('Hotkey Name', justify='center'),
justify='left',
style=ctx.obj['style'].column,
)
for i, hotkey in enumerate(resp.hotkeys):
table.add_row(hotkey, style='' if i % 2 == 0 else 'dim')
console.out.print(table)
@app.command('trigger')
@app.command('tr', hidden=True)
def trigger(
ctx: typer.Context,
hotkey: Annotated[
str, typer.Argument(..., show_default=False, help='The hotkey to trigger')
],
):
"""Trigger a hotkey by name."""
ctx.obj['obsws'].trigger_hotkey_by_name(hotkey)
@app.command('trigger-sequence')
@app.command('trs', hidden=True)
def trigger_sequence(
ctx: typer.Context,
key_id: Annotated[
str,
typer.Argument(
...,
show_default=False,
help='The OBS key ID to trigger, see https://github.com/onyx-and-iris/obsws-cli?tab=readme-ov-file#hotkey for more info',
),
],
shift: Annotated[
bool, typer.Option(..., help='Press shift when triggering the hotkey')
] = False,
ctrl: Annotated[
bool, typer.Option(..., help='Press control when triggering the hotkey')
] = False,
alt: Annotated[
bool, typer.Option(..., help='Press alt when triggering the hotkey')
] = False,
cmd: Annotated[
bool, typer.Option(..., help='Press cmd when triggering the hotkey')
] = False,
):
"""Trigger a hotkey by sequence."""
ctx.obj['obsws'].trigger_hotkey_by_key_sequence(key_id, shift, ctrl, alt, cmd)

View File

@@ -1,465 +0,0 @@
"""module containing commands for manipulating inputs."""
from typing import Annotated
import obsws_python as obsws
import typer
from rich.table import Table
from rich.text import Text
from obsws_cli import console, util, validate
app = typer.Typer()
@app.callback()
def main():
"""Control inputs in OBS."""
@app.command('create')
@app.command('cr', hidden=True)
def create(
ctx: typer.Context,
input_name: Annotated[
str,
typer.Argument(
...,
show_default=False,
help='Name of the input to create.',
callback=validate.input_not_in_inputs,
),
],
input_kind: Annotated[
str,
typer.Argument(
...,
show_default=False,
help='Kind of the input to create.',
callback=validate.kind_in_input_kinds,
),
],
):
"""Create a new input."""
current_scene = (
ctx.obj['obsws'].get_current_program_scene().current_program_scene_name
)
try:
ctx.obj['obsws'].create_input(
inputName=input_name,
inputKind=input_kind,
sceneItemEnabled=True,
sceneName=current_scene,
inputSettings={},
)
except obsws.error.OBSSDKRequestError as e:
console.err.print(f'Failed to create input: [yellow]{e}[/yellow]')
raise typer.Exit(1)
console.out.print(
f'Input {console.highlight(ctx, input_name)} of kind '
f'{console.highlight(ctx, input_kind)} created.',
)
@app.command('remove')
@app.command('rm', hidden=True)
def remove(
ctx: typer.Context,
input_name: Annotated[
str,
typer.Argument(
...,
show_default=False,
help='Name of the input to remove.',
callback=validate.input_in_inputs,
),
],
):
"""Remove an input."""
ctx.obj['obsws'].remove_input(name=input_name)
console.out.print(f'Input {console.highlight(ctx, input_name)} removed.')
@app.command('list')
@app.command('ls', hidden=True)
def list_(
ctx: typer.Context,
input: Annotated[bool, typer.Option(help='Filter by input type.')] = False,
output: Annotated[bool, typer.Option(help='Filter by output type.')] = False,
colour: Annotated[bool, typer.Option(help='Filter by colour source type.')] = False,
ffmpeg: Annotated[bool, typer.Option(help='Filter by ffmpeg source type.')] = False,
vlc: Annotated[bool, typer.Option(help='Filter by VLC source type.')] = False,
uuid: Annotated[bool, typer.Option(help='Show UUIDs of inputs.')] = False,
):
"""List all inputs."""
resp = ctx.obj['obsws'].get_input_list()
kinds = []
if input:
kinds.append('input')
if output:
kinds.append('output')
if colour:
kinds.append('color')
if ffmpeg:
kinds.append('ffmpeg')
if vlc:
kinds.append('vlc')
if not any([input, output, colour, ffmpeg, vlc]):
kinds = ctx.obj['obsws'].get_input_kind_list(False).input_kinds
inputs = sorted(
(
(input_.get('inputName'), input_.get('inputKind'), input_.get('inputUuid'))
for input_ in filter(
lambda input_: any(kind in input_.get('inputKind') for kind in kinds),
resp.inputs,
)
),
key=lambda x: x[0], # Sort by input name
)
if not inputs:
console.out.print('No inputs found.')
raise typer.Exit()
table = Table(title='Inputs', padding=(0, 2), border_style=ctx.obj['style'].border)
if uuid:
columns = [
(Text('Input Name', justify='center'), 'left', ctx.obj['style'].column),
(Text('Kind', justify='center'), 'center', ctx.obj['style'].column),
(Text('Muted', justify='center'), 'center', None),
(Text('UUID', justify='center'), 'left', ctx.obj['style'].column),
]
else:
columns = [
(Text('Input Name', justify='center'), 'left', ctx.obj['style'].column),
(Text('Kind', justify='center'), 'center', ctx.obj['style'].column),
(Text('Muted', justify='center'), 'center', None),
]
for heading, justify, style in columns:
table.add_column(heading, justify=justify, style=style)
for input_name, input_kind, input_uuid in inputs:
input_mark = ''
try:
input_muted = ctx.obj['obsws'].get_input_mute(name=input_name).input_muted
input_mark = util.check_mark(input_muted)
except obsws.error.OBSSDKRequestError as e:
if e.code == 604: # Input does not support audio
input_mark = 'N/A'
else:
raise
if uuid:
table.add_row(
input_name,
util.snakecase_to_titlecase(input_kind),
input_mark,
input_uuid,
)
else:
table.add_row(
input_name,
util.snakecase_to_titlecase(input_kind),
input_mark,
)
console.out.print(table)
@app.command('list-kinds')
@app.command('ls-k', hidden=True)
def list_kinds(
ctx: typer.Context,
):
"""List all input kinds."""
resp = ctx.obj['obsws'].get_input_kind_list(False)
kinds = sorted(resp.input_kinds)
if not kinds:
console.out.print('No input kinds found.')
raise typer.Exit()
table = Table(
title='Input Kinds', padding=(0, 2), border_style=ctx.obj['style'].border
)
table.add_column(
Text('Input Kind', justify='center'),
justify='left',
style=ctx.obj['style'].column,
)
for kind in kinds:
table.add_row(util.snakecase_to_titlecase(kind))
console.out.print(table)
@app.command('mute')
@app.command('m', hidden=True)
def mute(
ctx: typer.Context,
input_name: Annotated[
str,
typer.Argument(
...,
show_default=False,
help='Name of the input to mute.',
callback=validate.input_in_inputs,
),
],
):
"""Mute an input."""
ctx.obj['obsws'].set_input_mute(
name=input_name,
muted=True,
)
console.out.print(f'Input {console.highlight(ctx, input_name)} muted.')
@app.command('unmute')
@app.command('um', hidden=True)
def unmute(
ctx: typer.Context,
input_name: Annotated[
str,
typer.Argument(
...,
show_default=False,
help='Name of the input to unmute.',
callback=validate.input_in_inputs,
),
],
):
"""Unmute an input."""
ctx.obj['obsws'].set_input_mute(
name=input_name,
muted=False,
)
console.out.print(f'Input {console.highlight(ctx, input_name)} unmuted.')
@app.command('toggle')
@app.command('tg', hidden=True)
def toggle(
ctx: typer.Context,
input_name: Annotated[
str,
typer.Argument(
...,
show_default=False,
help='Name of the input to toggle.',
callback=validate.input_in_inputs,
),
],
):
"""Toggle an input."""
resp = ctx.obj['obsws'].get_input_mute(name=input_name)
new_state = not resp.input_muted
ctx.obj['obsws'].set_input_mute(
name=input_name,
muted=new_state,
)
if new_state:
console.out.print(
f'Input {console.highlight(ctx, input_name)} muted.',
)
else:
console.out.print(
f'Input {console.highlight(ctx, input_name)} unmuted.',
)
@app.command('volume')
@app.command('vol', hidden=True)
def volume(
ctx: typer.Context,
input_name: Annotated[
str,
typer.Argument(
...,
show_default=False,
help='Name of the input to set volume for.',
callback=validate.input_in_inputs,
),
],
volume: Annotated[
float,
typer.Argument(
...,
show_default=False,
help='Volume level to set (-90 to 0).',
min=-90,
max=0,
),
],
):
"""Set the volume of an input."""
ctx.obj['obsws'].set_input_volume(
name=input_name,
vol_db=volume,
)
console.out.print(
f'Input {console.highlight(ctx, input_name)} volume set to {console.highlight(ctx, volume)}.',
)
@app.command('show')
@app.command('s', hidden=True)
def show(
ctx: typer.Context,
input_name: Annotated[
str,
typer.Argument(
...,
show_default=False,
help='Name of the input to show.',
callback=validate.input_in_inputs,
),
],
verbose: Annotated[
bool, typer.Option(help='List all available input devices.')
] = False,
):
"""Show information for an input in the current scene."""
input_list = ctx.obj['obsws'].get_input_list()
for input_ in input_list.inputs:
if input_['inputName'] == input_name:
input_kind = input_['inputKind']
break
for prop in ['device', 'device_id']:
try:
device_id = (
ctx.obj['obsws']
.get_input_settings(
name=input_name,
)
.input_settings.get(prop)
)
if device_id:
break
except obsws.error.OBSSDKRequestError:
continue
else:
device_id = '(N/A)'
for device in (
ctx.obj['obsws']
.get_input_properties_list_property_items(
input_name=input_name,
prop_name=prop,
)
.property_items
):
if device.get('itemValue') == device_id:
device_id = device.get('itemName')
break
table = Table(
title='Input Information', padding=(0, 2), border_style=ctx.obj['style'].border
)
columns = [
(Text('Input Name', justify='center'), 'left', ctx.obj['style'].column),
(Text('Kind', justify='center'), 'left', ctx.obj['style'].column),
(Text('Device', justify='center'), 'left', ctx.obj['style'].column),
]
for heading, justify, style in columns:
table.add_column(heading, justify=justify, style=style)
table.add_row(
input_name,
util.snakecase_to_titlecase(input_kind),
device_id,
)
console.out.print(table)
if verbose:
resp = ctx.obj['obsws'].get_input_properties_list_property_items(
input_name=input_name,
prop_name=prop,
)
table = Table(
title='Devices',
padding=(0, 2),
border_style=ctx.obj['style'].border,
)
columns = [
(Text('Name', justify='center'), 'left', ctx.obj['style'].column),
]
for heading, justify, style in columns:
table.add_column(heading, justify=justify, style=style)
for i, item in enumerate(resp.property_items):
table.add_row(
item.get('itemName'),
style='' if i % 2 == 0 else 'dim',
)
console.out.print(table)
@app.command('update')
@app.command('upd', hidden=True)
def update(
ctx: typer.Context,
input_name: Annotated[
str,
typer.Argument(
...,
show_default=False,
help='Name of the input to update.',
callback=validate.input_in_inputs,
),
],
device_name: Annotated[
str,
typer.Argument(
...,
show_default=False,
help='Name of the device to set for the input.',
),
],
):
"""Update a setting for an input."""
device_id = None
for prop in ['device', 'device_id']:
try:
for device in (
ctx.obj['obsws']
.get_input_properties_list_property_items(
input_name=input_name,
prop_name=prop,
)
.property_items
):
if device.get('itemName') == device_name:
device_id = device.get('itemValue')
break
except obsws.error.OBSSDKRequestError:
continue
if device_id:
break
if not device_id:
console.err.print(
f'Failed to find device ID for device '
f'{console.highlight(ctx, device_name)}.',
)
raise typer.Exit(1)
ctx.obj['obsws'].set_input_settings(
name=input_name, settings={prop: device_id}, overlay=True
)
console.out.print(
f'Input {console.highlight(ctx, input_name)} updated to use device '
f'{console.highlight(ctx, device_name)}.',
)

View File

@@ -1,105 +0,0 @@
"""module containing commands for media inputs."""
from typing import Annotated, Optional
import typer
from obsws_cli import console, util, validate
app = typer.Typer()
@app.callback()
def main():
"""Commands for media inputs."""
@app.command('cursor')
@app.command('cur', hidden=True)
def cursor(
ctx: typer.Context,
input_name: Annotated[
str, typer.Argument(..., help='The name of the media input.')
],
timecode: Annotated[
Optional[str],
typer.Argument(
...,
help='The timecode to set the cursor to (format: HH:MM:SS).',
callback=validate.timecode_format,
),
] = None,
):
"""Get/set the cursor position of a media input."""
if timecode is None:
resp = ctx.obj['obsws'].get_media_input_status(input_name)
console.out.print(
f'Cursor for {console.highlight(ctx, input_name)} is at {util.milliseconds_to_timecode(resp.media_cursor)}.'
)
return
cursor_position = util.timecode_to_milliseconds(timecode)
ctx.obj['obsws'].set_media_input_cursor(input_name, cursor_position)
console.out.print(
f'Cursor for {console.highlight(ctx, input_name)} set to {timecode}.'
)
@app.command('play')
@app.command('p', hidden=True)
def play(
ctx: typer.Context,
input_name: Annotated[
str, typer.Argument(..., help='The name of the media input.')
],
):
"""Get/set the playing status of a media input."""
ctx.obj['obsws'].trigger_media_input_action(
input_name, 'OBS_WEBSOCKET_MEDIA_INPUT_ACTION_PLAY'
)
console.out.print(f'Playing media input {console.highlight(ctx, input_name)}.')
@app.command('pause')
@app.command('pa', hidden=True)
def pause(
ctx: typer.Context,
input_name: Annotated[
str, typer.Argument(..., help='The name of the media input.')
],
):
"""Pause a media input."""
ctx.obj['obsws'].trigger_media_input_action(
input_name, 'OBS_WEBSOCKET_MEDIA_INPUT_ACTION_PAUSE'
)
console.out.print(f'Paused media input {console.highlight(ctx, input_name)}.')
@app.command('stop')
@app.command('s', hidden=True)
def stop(
ctx: typer.Context,
input_name: Annotated[
str, typer.Argument(..., help='The name of the media input.')
],
):
"""Stop a media input."""
ctx.obj['obsws'].trigger_media_input_action(
input_name, 'OBS_WEBSOCKET_MEDIA_INPUT_ACTION_STOP'
)
console.out.print(f'Stopped media input {console.highlight(ctx, input_name)}.')
@app.command('restart')
@app.command('r', hidden=True)
def restart(
ctx: typer.Context,
input_name: Annotated[
str, typer.Argument(..., help='The name of the media input.')
],
):
"""Restart a media input."""
ctx.obj['obsws'].trigger_media_input_action(
input_name, 'OBS_WEBSOCKET_MEDIA_INPUT_ACTION_RESTART'
)
console.out.print(f'Restarted media input {console.highlight(ctx, input_name)}.')

View File

@@ -1,121 +0,0 @@
"""module containing commands for manipulating profiles in OBS."""
from typing import Annotated
import typer
from rich.table import Table
from rich.text import Text
from obsws_cli import console, util, validate
app = typer.Typer()
@app.callback()
def main():
"""Control profiles in OBS."""
@app.command('list')
@app.command('ls', hidden=True)
def list_(ctx: typer.Context):
"""List profiles."""
resp = ctx.obj['obsws'].get_profile_list()
if not resp.profiles:
console.out.print('No profiles found.')
raise typer.Exit()
table = Table(
title='Profiles', padding=(0, 2), border_style=ctx.obj['style'].border
)
columns = [
(Text('Profile Name', justify='center'), 'left', ctx.obj['style'].column),
(Text('Current', justify='center'), 'center', None),
]
for heading, justify, style in columns:
table.add_column(heading, justify=justify, style=style)
for profile in resp.profiles:
table.add_row(
profile,
util.check_mark(
ctx, profile == resp.current_profile_name, empty_if_false=True
),
)
console.out.print(table)
@app.command('current')
@app.command('get', hidden=True)
def current(ctx: typer.Context):
"""Get the current profile."""
resp = ctx.obj['obsws'].get_profile_list()
console.out.print(
f'Current profile: {console.highlight(ctx, resp.current_profile_name)}'
)
@app.command('switch')
@app.command('set', hidden=True)
def switch(
ctx: typer.Context,
profile_name: Annotated[
str,
typer.Argument(
...,
show_default=False,
help='Name of the profile to switch to',
callback=validate.profile_exists,
),
],
):
"""Switch to a profile."""
resp = ctx.obj['obsws'].get_profile_list()
if resp.current_profile_name == profile_name:
console.err.print(
f'Profile [yellow]{profile_name}[/yellow] is already the current profile.'
)
raise typer.Exit(1)
ctx.obj['obsws'].set_current_profile(profile_name)
console.out.print(f'Switched to profile {console.highlight(ctx, profile_name)}.')
@app.command('create')
@app.command('new', hidden=True)
def create(
ctx: typer.Context,
profile_name: Annotated[
str,
typer.Argument(
...,
show_default=False,
help='Name of the profile to create.',
callback=validate.profile_not_exists,
),
],
):
"""Create a new profile."""
ctx.obj['obsws'].create_profile(profile_name)
console.out.print(f'Created profile {console.highlight(ctx, profile_name)}.')
@app.command('remove')
@app.command('rm', hidden=True)
def remove(
ctx: typer.Context,
profile_name: Annotated[
str,
typer.Argument(
...,
show_default=False,
help='Name of the profile to remove.',
callback=validate.profile_exists,
),
],
):
"""Remove a profile."""
ctx.obj['obsws'].remove_profile(profile_name)
console.out.print(f'Removed profile {console.highlight(ctx, profile_name)}.')

View File

@@ -1,180 +0,0 @@
"""module for controlling OBS recording functionality."""
from pathlib import Path
from typing import Annotated, Optional
import typer
from obsws_cli import console
app = typer.Typer()
@app.callback()
def main():
"""Control OBS recording functionality."""
def _get_recording_status(ctx: typer.Context) -> tuple:
"""Get recording status."""
resp = ctx.obj['obsws'].get_record_status()
return resp.output_active, resp.output_paused
@app.command('start')
@app.command('s', hidden=True)
def start(ctx: typer.Context):
"""Start recording."""
active, paused = _get_recording_status(ctx)
if active:
err_msg = 'Recording is already in progress, cannot start.'
if paused:
err_msg += ' Try resuming it.'
console.err.print(err_msg)
raise typer.Exit(1)
ctx.obj['obsws'].start_record()
console.out.print('Recording started successfully.')
@app.command('stop')
@app.command('st', hidden=True)
def stop(ctx: typer.Context):
"""Stop recording."""
active, _ = _get_recording_status(ctx)
if not active:
console.err.print('Recording is not in progress, cannot stop.')
raise typer.Exit(1)
resp = ctx.obj['obsws'].stop_record()
console.out.print(
f'Recording stopped successfully. Saved to: {console.highlight(ctx, resp.output_path)}'
)
@app.command('toggle')
@app.command('tg', hidden=True)
def toggle(ctx: typer.Context):
"""Toggle recording."""
resp = ctx.obj['obsws'].toggle_record()
if resp.output_active:
console.out.print('Recording started successfully.')
else:
console.out.print('Recording stopped successfully.')
@app.command('status')
@app.command('ss', hidden=True)
def status(ctx: typer.Context):
"""Get recording status."""
active, paused = _get_recording_status(ctx)
if active:
if paused:
console.out.print('Recording is in progress and paused.')
else:
console.out.print('Recording is in progress.')
else:
console.out.print('Recording is not in progress.')
@app.command('resume')
@app.command('r', hidden=True)
def resume(ctx: typer.Context):
"""Resume recording."""
active, paused = _get_recording_status(ctx)
if not active:
console.err.print('Recording is not in progress, cannot resume.')
raise typer.Exit(1)
if not paused:
console.err.print('Recording is in progress but not paused, cannot resume.')
raise typer.Exit(1)
ctx.obj['obsws'].resume_record()
console.out.print('Recording resumed successfully.')
@app.command('pause')
@app.command('p', hidden=True)
def pause(ctx: typer.Context):
"""Pause recording."""
active, paused = _get_recording_status(ctx)
if not active:
console.err.print('Recording is not in progress, cannot pause.')
raise typer.Exit(1)
if paused:
console.err.print('Recording is in progress but already paused, cannot pause.')
raise typer.Exit(1)
ctx.obj['obsws'].pause_record()
console.out.print('Recording paused successfully.')
@app.command('directory')
@app.command('d', hidden=True)
def directory(
ctx: typer.Context,
record_directory: Annotated[
Optional[Path],
# Since the CLI and OBS may be running on different platforms,
# we won't validate the path here.
typer.Argument(
file_okay=False,
dir_okay=True,
help='Directory to set for recording.',
),
] = None,
):
"""Get or set the recording directory."""
if record_directory is not None:
ctx.obj['obsws'].set_record_directory(str(record_directory))
console.out.print(
f'Recording directory updated to: {console.highlight(ctx, record_directory)}'
)
else:
resp = ctx.obj['obsws'].get_record_directory()
console.out.print(
f'Recording directory: {console.highlight(ctx, resp.record_directory)}'
)
@app.command('split')
@app.command('sp', hidden=True)
def split(ctx: typer.Context):
"""Split the current recording."""
active, paused = _get_recording_status(ctx)
if not active:
console.err.print('Recording is not in progress, cannot split.')
raise typer.Exit(1)
if paused:
console.err.print('Recording is paused, cannot split.')
raise typer.Exit(1)
ctx.obj['obsws'].split_record_file()
console.out.print('Recording split successfully.')
@app.command('chapter')
@app.command('ch', hidden=True)
def chapter(
ctx: typer.Context,
chapter_name: Annotated[
Optional[str],
typer.Argument(
help='Name of the chapter to create.',
),
] = None,
):
"""Create a chapter in the current recording."""
active, paused = _get_recording_status(ctx)
if not active:
console.err.print('Recording is not in progress, cannot create chapter.')
raise typer.Exit(1)
if paused:
console.err.print('Recording is paused, cannot create chapter.')
raise typer.Exit(1)
ctx.obj['obsws'].create_record_chapter(chapter_name)
console.out.print(
f'Chapter {console.highlight(ctx, chapter_name or "unnamed")} created successfully.'
)

View File

@@ -1,68 +0,0 @@
"""module containing commands for manipulating the replay buffer in OBS."""
import typer
from obsws_cli import console
app = typer.Typer()
@app.callback()
def main():
"""Control profiles in OBS."""
@app.command('start')
@app.command('s', hidden=True)
def start(ctx: typer.Context):
"""Start the replay buffer."""
resp = ctx.obj['obsws'].get_replay_buffer_status()
if resp.output_active:
console.err.print('Replay buffer is already active.')
raise typer.Exit(1)
ctx.obj['obsws'].start_replay_buffer()
console.out.print('Replay buffer started.')
@app.command('stop')
@app.command('st', hidden=True)
def stop(ctx: typer.Context):
"""Stop the replay buffer."""
resp = ctx.obj['obsws'].get_replay_buffer_status()
if not resp.output_active:
console.err.print('Replay buffer is not active.')
raise typer.Exit(1)
ctx.obj['obsws'].stop_replay_buffer()
console.out.print('Replay buffer stopped.')
@app.command('toggle')
@app.command('tg', hidden=True)
def toggle(ctx: typer.Context):
"""Toggle the replay buffer."""
resp = ctx.obj['obsws'].toggle_replay_buffer()
if resp.output_active:
console.out.print('Replay buffer is active.')
else:
console.out.print('Replay buffer is not active.')
@app.command('status')
@app.command('ss', hidden=True)
def status(ctx: typer.Context):
"""Get the status of the replay buffer."""
resp = ctx.obj['obsws'].get_replay_buffer_status()
if resp.output_active:
console.out.print('Replay buffer is active.')
else:
console.out.print('Replay buffer is not active.')
@app.command('save')
@app.command('sv', hidden=True)
def save(ctx: typer.Context):
"""Save the replay buffer."""
ctx.obj['obsws'].save_replay_buffer()
console.out.print('Replay buffer saved.')

View File

@@ -1,124 +0,0 @@
"""module containing commands for controlling OBS scenes."""
from typing import Annotated
import typer
from rich.table import Table
from rich.text import Text
from obsws_cli import console, util, validate
app = typer.Typer()
@app.callback()
def main():
"""Control OBS scenes."""
@app.command('list')
@app.command('ls', hidden=True)
def list_(
ctx: typer.Context,
uuid: Annotated[bool, typer.Option(help='Show UUIDs of scenes')] = False,
):
"""List all scenes."""
resp = ctx.obj['obsws'].get_scene_list()
scenes = (
(scene.get('sceneName'), scene.get('sceneUuid'))
for scene in reversed(resp.scenes)
)
if not scenes:
console.out.print('No scenes found.')
raise typer.Exit()
active_scene = ctx.obj['obsws'].get_current_program_scene().scene_name
table = Table(title='Scenes', padding=(0, 2), border_style=ctx.obj['style'].border)
if uuid:
columns = [
(Text('Scene Name', justify='center'), 'left', ctx.obj['style'].column),
(Text('Active', justify='center'), 'center', None),
(Text('UUID', justify='center'), 'left', ctx.obj['style'].column),
]
else:
columns = [
(Text('Scene Name', justify='center'), 'left', ctx.obj['style'].column),
(Text('Active', justify='center'), 'center', None),
]
for heading, justify, style in columns:
table.add_column(heading, justify=justify, style=style)
for scene_name, scene_uuid in scenes:
if uuid:
table.add_row(
scene_name,
util.check_mark(scene_name == active_scene, empty_if_false=True),
scene_uuid,
)
else:
table.add_row(
scene_name,
util.check_mark(scene_name == active_scene, empty_if_false=True),
)
console.out.print(table)
@app.command('current')
@app.command('get', hidden=True)
def current(
ctx: typer.Context,
preview: Annotated[
bool,
typer.Option(
help='Get the preview scene instead of the program scene',
callback=validate.studio_mode_enabled,
),
] = False,
):
"""Get the current program scene or preview scene."""
if preview:
resp = ctx.obj['obsws'].get_current_preview_scene()
console.out.print(
f'Current Preview Scene: {console.highlight(ctx, resp.current_preview_scene_name)}'
)
else:
resp = ctx.obj['obsws'].get_current_program_scene()
console.out.print(
f'Current Program Scene: {console.highlight(ctx, resp.current_program_scene_name)}'
)
@app.command('switch')
@app.command('set', hidden=True)
def switch(
ctx: typer.Context,
scene_name: Annotated[
str,
typer.Argument(
...,
help='Name of the scene to switch to',
callback=validate.scene_in_scenes,
),
],
preview: Annotated[
bool,
typer.Option(
help='Switch to the preview scene instead of the program scene',
callback=validate.studio_mode_enabled,
),
] = False,
):
"""Switch to a scene."""
if preview:
ctx.obj['obsws'].set_current_preview_scene(scene_name)
console.out.print(
f'Switched to preview scene: {console.highlight(ctx, scene_name)}'
)
else:
ctx.obj['obsws'].set_current_program_scene(scene_name)
console.out.print(
f'Switched to program scene: {console.highlight(ctx, scene_name)}'
)

View File

@@ -1,99 +0,0 @@
"""module containing commands for manipulating scene collections."""
from typing import Annotated
import typer
from rich.table import Table
from obsws_cli import console, validate
app = typer.Typer()
@app.callback()
def main():
"""Control scene collections in OBS."""
@app.command('list')
@app.command('ls', hidden=True)
def list_(ctx: typer.Context):
"""List all scene collections."""
resp = ctx.obj['obsws'].get_scene_collection_list()
if not resp.scene_collections:
console.out.print('No scene collections found.')
raise typer.Exit()
table = Table(
title='Scene Collections',
padding=(0, 2),
border_style=ctx.obj['style'].border,
)
table.add_column(
'Scene Collection Name', justify='left', style=ctx.obj['style'].column
)
for scene_collection_name in resp.scene_collections:
table.add_row(scene_collection_name)
console.out.print(table)
@app.command('current')
@app.command('get', hidden=True)
def current(ctx: typer.Context):
"""Get the current scene collection."""
resp = ctx.obj['obsws'].get_scene_collection_list()
console.out.print(
f'Current scene collection: {console.highlight(ctx, resp.current_scene_collection_name)}'
)
@app.command('switch')
@app.command('set', hidden=True)
def switch(
ctx: typer.Context,
scene_collection_name: Annotated[
str,
typer.Argument(
...,
help='Name of the scene collection to switch to',
callback=validate.scene_collection_in_scene_collections,
),
],
):
"""Switch to a scene collection."""
current_scene_collection = (
ctx.obj['obsws'].get_scene_collection_list().current_scene_collection_name
)
if scene_collection_name == current_scene_collection:
console.err.print(
f'Scene collection [yellow]{scene_collection_name}[/yellow] is already active.'
)
raise typer.Exit(1)
ctx.obj['obsws'].set_current_scene_collection(scene_collection_name)
console.out.print(
f'Switched to scene collection {console.highlight(ctx, scene_collection_name)}.'
)
@app.command('create')
@app.command('new', hidden=True)
def create(
ctx: typer.Context,
scene_collection_name: Annotated[
str,
typer.Argument(
...,
help='Name of the scene collection to create',
callback=validate.scene_collection_not_in_scene_collections,
),
],
):
"""Create a new scene collection."""
ctx.obj['obsws'].create_scene_collection(scene_collection_name)
console.out.print(
f'Created scene collection {console.highlight(ctx, scene_collection_name)}.'
)

View File

@@ -1,340 +0,0 @@
"""module for settings management."""
from typing import Annotated, Optional
import typer
from rich.table import Table
from rich.text import Text
from obsws_cli import console, util
app = typer.Typer()
@app.callback()
def main():
"""Manage OBS settings."""
@app.command('show')
@app.command('sh', hidden=True)
def show(
ctx: typer.Context,
video: Annotated[
bool, typer.Option('--video', '-v', help='Show video settings.')
] = False,
record: Annotated[
bool, typer.Option('--record', '-r', help='Show recording settings.')
] = False,
profile: Annotated[
bool, typer.Option('--profile', '-p', help='Show profile settings.')
] = False,
):
"""Show current OBS settings."""
if not any([video, record, profile]):
video = True
record = True
profile = True
resp = ctx.obj['obsws'].get_video_settings()
video_table = Table(
title='Video Settings',
padding=(0, 2),
border_style=ctx.obj['style'].border,
)
video_columns = (
('Setting', 'left', ctx.obj['style'].column),
('Value', 'left', ctx.obj['style'].column),
)
for header_text, justify, style in video_columns:
video_table.add_column(
Text(header_text, justify='center'),
justify=justify,
style=style,
)
for setting in resp.attrs():
video_table.add_row(
util.snakecase_to_titlecase(setting),
str(getattr(resp, setting)),
style='' if video_table.row_count % 2 == 0 else 'dim',
)
if video:
console.out.print(video_table)
resp = ctx.obj['obsws'].get_record_directory()
record_table = Table(
title='Recording Settings',
padding=(0, 2),
border_style=ctx.obj['style'].border,
)
record_columns = (
('Setting', 'left', ctx.obj['style'].column),
('Value', 'left', ctx.obj['style'].column),
)
for header_text, justify, style in record_columns:
record_table.add_column(
Text(header_text, justify='center'),
justify=justify,
style=style,
)
record_table.add_row(
'Directory',
resp.record_directory,
style='' if record_table.row_count % 2 == 0 else 'dim',
)
if record:
console.out.print(record_table)
profile_table = Table(
title='Profile Settings',
padding=(0, 2),
border_style=ctx.obj['style'].border,
)
profile_columns = (
('Setting', 'left', ctx.obj['style'].column),
('Value', 'left', ctx.obj['style'].column),
)
for header_text, justify, style in profile_columns:
profile_table.add_column(
Text(header_text, justify='center'),
justify=justify,
style=style,
)
params = [
('Output', 'Mode', 'Output Mode'),
('SimpleOutput', 'StreamEncoder', 'Simple Streaming Encoder'),
('SimpleOutput', 'RecEncoder', 'Simple Recording Encoder'),
('SimpleOutput', 'RecFormat2', 'Simple Recording Video Format'),
('SimpleOutput', 'RecAudioEncoder', 'Simple Recording Audio Format'),
('SimpleOutput', 'RecQuality', 'Simple Recording Quality'),
('AdvOut', 'Encoder', 'Advanced Streaming Encoder'),
('AdvOut', 'RecEncoder', 'Advanced Recording Encoder'),
('AdvOut', 'RecType', 'Advanced Recording Type'),
('AdvOut', 'RecFormat2', 'Advanced Recording Video Format'),
('AdvOut', 'RecAudioEncoder', 'Advanced Recording Audio Format'),
]
for category, name, display_name in params:
resp = ctx.obj['obsws'].get_profile_parameter(
category=category,
name=name,
)
if resp.parameter_value is not None:
profile_table.add_row(
display_name,
str(resp.parameter_value),
style='' if profile_table.row_count % 2 == 0 else 'dim',
)
if profile:
console.out.print(profile_table)
@app.command('profile')
@app.command('pr', hidden=True)
def profile(
ctx: typer.Context,
category: Annotated[
str,
typer.Argument(
...,
help='Profile parameter category (e.g., SimpleOutput, AdvOut).',
),
],
name: Annotated[
str,
typer.Argument(
...,
help='Profile parameter name (e.g., StreamEncoder, RecFormat2).',
),
],
value: Annotated[
Optional[str],
typer.Argument(
...,
help='Value to set for the profile parameter. If omitted, the current value is retrieved.',
),
] = None,
):
"""Get/set OBS profile settings."""
if value is None:
resp = ctx.obj['obsws'].get_profile_parameter(
category=category,
name=name,
)
console.out.print(
f'Parameter Value for [bold]{name}[/bold]: '
f'[green]{resp.parameter_value}[/green]'
)
else:
ctx.obj['obsws'].set_profile_parameter(
category=category,
name=name,
value=value,
)
console.out.print(
f'Set Parameter [bold]{name}[/bold] to [green]{value}[/green]'
)
@app.command('stream-service')
@app.command('ss', hidden=True)
def stream_service(
ctx: typer.Context,
type_: Annotated[
Optional[str],
typer.Argument(
...,
help='Stream service type (e.g., Twitch, YouTube). If omitted, current settings are retrieved.',
),
] = None,
key: Annotated[
Optional[str],
typer.Option('--key', '-k', help='Stream key to set. Optional.'),
] = None,
server: Annotated[
Optional[str],
typer.Option('--server', '-s', help='Stream server to set. Optional.'),
] = None,
):
"""Get/set OBS stream service settings."""
if type_ is None:
resp = ctx.obj['obsws'].get_stream_service_settings()
table = Table(
title='Stream Service Settings',
padding=(0, 2),
border_style=ctx.obj['style'].border,
)
columns = (
('Setting', 'left', ctx.obj['style'].column),
('Value', 'left', ctx.obj['style'].column),
)
for header_text, justify, style in columns:
table.add_column(
Text(header_text, justify='center'),
justify=justify,
style=style,
)
table.add_row(
'Type',
resp.stream_service_type,
style='' if table.row_count % 2 == 0 else 'dim',
)
table.add_row(
'Server',
resp.stream_service_settings.get('server', ''),
style='' if table.row_count % 2 == 0 else 'dim',
)
table.add_row(
'Key',
resp.stream_service_settings.get('key', ''),
style='' if table.row_count % 2 == 0 else 'dim',
)
console.out.print(table)
else:
current_settings = ctx.obj['obsws'].get_stream_service_settings()
if key is None:
key = current_settings.stream_service_settings.get('key', '')
if server is None:
server = current_settings.stream_service_settings.get('server', '')
ctx.obj['obsws'].set_stream_service_settings(
ss_type=type_,
ss_settings={'key': key, 'server': server},
)
console.out.print('Stream service settings updated.')
@app.command('video')
@app.command('vi', hidden=True)
def video(
ctx: typer.Context,
base_width: Annotated[
Optional[int],
typer.Option('--base-width', '-bw', help='Set base (canvas) width.'),
] = None,
base_height: Annotated[
Optional[int],
typer.Option('--base-height', '-bh', help='Set base (canvas) height.'),
] = None,
output_width: Annotated[
Optional[int],
typer.Option('--output-width', '-ow', help='Set output (scaled) width.'),
] = None,
output_height: Annotated[
Optional[int],
typer.Option('--output-height', '-oh', help='Set output (scaled) height.'),
] = None,
fps_num: Annotated[
Optional[int],
typer.Option('--fps-num', '-fn', help='Set FPS numerator.'),
] = None,
fps_den: Annotated[
Optional[int],
typer.Option('--fps-den', '-fd', help='Set FPS denominator.'),
] = None,
):
"""Get/set OBS video settings."""
if not any(
[
base_width,
base_height,
output_width,
output_height,
fps_num,
fps_den,
]
):
resp = ctx.obj['obsws'].get_video_settings()
table = Table(
title='Video Settings',
padding=(0, 2),
border_style=ctx.obj['style'].border,
)
columns = (
('Setting', 'left', ctx.obj['style'].column),
('Value', 'left', ctx.obj['style'].column),
)
for header_text, justify, style in columns:
table.add_column(
Text(header_text, justify='center'),
justify=justify,
style=style,
)
for setting in resp.attrs():
table.add_row(
util.snakecase_to_titlecase(setting),
str(getattr(resp, setting)),
style='' if table.row_count % 2 == 0 else 'dim',
)
console.out.print(table)
else:
current_settings = ctx.obj['obsws'].get_video_settings()
if base_width is None:
base_width = current_settings.base_width
if base_height is None:
base_height = current_settings.base_height
if output_width is None:
output_width = current_settings.output_width
if output_height is None:
output_height = current_settings.output_height
if fps_num is None:
fps_num = current_settings.fps_num
if fps_den is None:
fps_den = current_settings.fps_den
ctx.obj['obsws'].set_video_settings(
base_width=base_width,
base_height=base_height,
out_width=output_width,
out_height=output_height,
numerator=fps_num,
denominator=fps_den,
)
console.out.print('Video settings updated.')

View File

@@ -1,52 +0,0 @@
"""module containing commands for manipulating studio mode in OBS."""
import typer
from obsws_cli import console
app = typer.Typer()
@app.callback()
def main():
"""Control studio mode in OBS."""
@app.command('enable')
@app.command('on', hidden=True)
def enable(ctx: typer.Context):
"""Enable studio mode."""
ctx.obj['obsws'].set_studio_mode_enabled(True)
console.out.print('Studio mode has been enabled.')
@app.command('disable')
@app.command('off', hidden=True)
def disable(ctx: typer.Context):
"""Disable studio mode."""
ctx.obj['obsws'].set_studio_mode_enabled(False)
console.out.print('Studio mode has been disabled.')
@app.command('toggle')
@app.command('tg', hidden=True)
def toggle(ctx: typer.Context):
"""Toggle studio mode."""
resp = ctx.obj['obsws'].get_studio_mode_enabled()
if resp.studio_mode_enabled:
ctx.obj['obsws'].set_studio_mode_enabled(False)
console.out.print('Studio mode is now disabled.')
else:
ctx.obj['obsws'].set_studio_mode_enabled(True)
console.out.print('Studio mode is now enabled.')
@app.command('status')
@app.command('ss', hidden=True)
def status(ctx: typer.Context):
"""Get the status of studio mode."""
resp = ctx.obj['obsws'].get_studio_mode_enabled()
if resp.studio_mode_enabled:
console.out.print('Studio mode is enabled.')
else:
console.out.print('Studio mode is disabled.')

View File

@@ -1,79 +0,0 @@
"""module containing commands for manipulating text inputs."""
from typing import Annotated, Optional
import typer
from obsws_cli import console, validate
app = typer.Typer()
@app.callback()
def main():
"""Control text inputs in OBS."""
@app.command('current')
@app.command('get', hidden=True)
def current(
ctx: typer.Context,
input_name: Annotated[
str,
typer.Argument(
help='Name of the text input to get.', callback=validate.input_in_inputs
),
],
):
"""Get the current text for a text input."""
resp = ctx.obj['obsws'].get_input_settings(name=input_name)
if not resp.input_kind.startswith('text_'):
console.err.print(
f'Input [yellow]{input_name}[/yellow] is not a text input.',
)
raise typer.Exit(1)
current_text = resp.input_settings.get('text', '')
if not current_text:
current_text = '(empty)'
console.out.print(
f'Current text for input {console.highlight(ctx, input_name)}: {current_text}',
)
@app.command('update')
@app.command('set', hidden=True)
def update(
ctx: typer.Context,
input_name: Annotated[
str,
typer.Argument(
help='Name of the text input to update.', callback=validate.input_in_inputs
),
],
new_text: Annotated[
Optional[str],
typer.Argument(
help='The new text to set for the input.',
),
] = None,
):
"""Update the text of a text input."""
resp = ctx.obj['obsws'].get_input_settings(name=input_name)
if not resp.input_kind.startswith('text_'):
console.err.print(
f'Input [yellow]{input_name}[/yellow] is not a text input.',
)
raise typer.Exit(1)
ctx.obj['obsws'].set_input_settings(
name=input_name,
settings={'text': new_text},
overlay=True,
)
if not new_text:
new_text = '(empty)'
console.out.print(
f'Text for input {console.highlight(ctx, input_name)} updated to: {new_text}',
)

View File

@@ -1,50 +0,0 @@
"""module containing commands for manipulating virtual camera in OBS."""
import typer
from obsws_cli import console
app = typer.Typer()
@app.callback()
def main():
"""Control virtual camera in OBS."""
@app.command('start')
@app.command('s', hidden=True)
def start(ctx: typer.Context):
"""Start the virtual camera."""
ctx.obj['obsws'].start_virtual_cam()
console.out.print('Virtual camera started.')
@app.command('stop')
@app.command('p', hidden=True)
def stop(ctx: typer.Context):
"""Stop the virtual camera."""
ctx.obj['obsws'].stop_virtual_cam()
console.out.print('Virtual camera stopped.')
@app.command('toggle')
@app.command('tg', hidden=True)
def toggle(ctx: typer.Context):
"""Toggle the virtual camera."""
resp = ctx.obj['obsws'].toggle_virtual_cam()
if resp.output_active:
console.out.print('Virtual camera is enabled.')
else:
console.out.print('Virtual camera is disabled.')
@app.command('status')
@app.command('ss', hidden=True)
def status(ctx: typer.Context):
"""Get the status of the virtual camera."""
resp = ctx.obj['obsws'].get_virtual_cam_status()
if resp.output_active:
console.out.print('Virtual camera is enabled.')
else:
console.out.print('Virtual camera is disabled.')

View File

@@ -1,12 +1,13 @@
"""module for console output handling in obsws_cli."""
import typer
from rich.console import Console
from .context import Context
out = Console()
err = Console(stderr=True, style='bold red')
def highlight(ctx: typer.Context, text: str) -> str:
def highlight(ctx: Context, text: str) -> str:
"""Highlight text using the current context's style."""
return f'[{ctx.obj["style"].highlight}]{text}[/{ctx.obj["style"].highlight}]'
return f'[{ctx.style.highlight}]{text}[/{ctx.style.highlight}]'

15
obsws_cli/context.py Normal file
View File

@@ -0,0 +1,15 @@
"""module for managing the application context."""
from dataclasses import dataclass
import obsws_python as obsws
from . import styles
@dataclass
class Context:
"""Context for the application, holding OBS and style configurations."""
client: obsws.ReqClient
style: styles.Style

10
obsws_cli/enum.py Normal file
View File

@@ -0,0 +1,10 @@
"""module for exit codes used in the application."""
from enum import IntEnum, auto
class ExitCode(IntEnum):
"""Exit codes for the application."""
SUCCESS = 0
ERROR = auto()

View File

@@ -1,146 +0,0 @@
"""module for settings management for obsws-cli."""
from collections import UserDict
from pathlib import Path
from typing import Any, Union
from dotenv import dotenv_values
ConfigValue = Union[str, int, bool]
class EnvConfig(UserDict):
"""A class to manage .env config for obsws-cli.
This class extends UserDict to provide a dictionary-like interface for config.
It loads config from .env files in the following priority:
1. Local .env file
2. User config file (~/.config/obsws-cli/obsws.env)
3. Default values
Note, typer handles reading from environment variables automatically. They take precedence
over values set in this class.
The config values are expected to be in uppercase and should start with 'OBSWS_CLI_'.
Example:
-------
config = EnvConfig()
host = config['OBSWS_CLI_HOST']
config['OBSWS_CLI_PORT'] = 4455
# Or with defaults
timeout = config.get('OBSWS_CLI_TIMEOUT', 30)
# Keys will be normalised to uppercase with prefix
debug = config.get('debug', False) # Equivalent to 'OBSWS_CLI_DEBUG'
"""
PREFIX = 'OBSWS_CLI_'
def __init__(self, *args, **kwargs):
"""Initialize the Config object with hierarchical loading."""
kwargs.update(
{
**dotenv_values(Path.home() / '.config' / 'obsws-cli' / 'obsws.env'),
**dotenv_values('.env'),
}
)
super().__init__(*args, **self._convert_types(kwargs))
def _convert_types(self, config_data: dict[str, Any]) -> dict[str, ConfigValue]:
"""Convert string values to appropriate types."""
converted = {}
for key, value in config_data.items():
if isinstance(value, str):
if value.lower() in ('true', 'false'):
converted[key] = value.lower() == 'true'
elif value.isdigit():
converted[key] = int(value)
else:
converted[key] = value
else:
converted[key] = value
return converted
def __getitem__(self, key: str) -> ConfigValue:
"""Get a setting value by key."""
normalised_key = self._normalise_key(key)
try:
return self.data[normalised_key]
except KeyError as e:
raise KeyError(
f"Config key '{key}' not found. Available keys: {list(self.data.keys())}"
) from e
def __setitem__(self, key: str, value: ConfigValue):
"""Set a setting value by key."""
normalised_key = self._normalise_key(key)
self.data[normalised_key] = value
def _normalise_key(self, key: str) -> str:
"""Normalise a key to uppercase with OBS_ prefix."""
key = key.upper()
if not key.startswith(self.PREFIX):
key = f'{self.PREFIX}{key}'
return key
def get(self, key: str, default=None) -> ConfigValue:
"""Get a config value with optional default.
Args:
----
key (str): The key to retrieve
default: Default value if key is not found
Returns:
-------
The config value or default
"""
normalised_key = self._normalise_key(key)
if not self.has_key(normalised_key):
return default
return self[normalised_key]
def has_key(self, key: str) -> bool:
"""Check if a config key exists.
Args:
----
key (str): The key to check
Returns:
-------
bool: True if key exists, False otherwise
"""
normalised_key = self._normalise_key(key)
return normalised_key in self.data
_envconfig = EnvConfig(
OBSWS_CLI_HOST='localhost',
OBSWS_CLI_PORT=4455,
OBSWS_CLI_PASSWORD='',
OBSWS_CLI_TIMEOUT=5,
OBSWS_CLI_LOGLEVEL='WARNING',
OBSWS_CLI_STYLE='disabled',
OBSWS_CLI_STYLE_NO_BORDER=False,
)
def get(key: str) -> ConfigValue:
"""Get a setting value by key from the global config instance.
Args:
----
key (str): The key of the config to retrieve.
default: Default value if key is not found.
Returns:
-------
The value of the config or default value.
"""
return _envconfig.get(key)

11
obsws_cli/error.py Normal file
View File

@@ -0,0 +1,11 @@
"""module containing error handling for OBS WebSocket CLI."""
class OBSWSCLIError(Exception):
"""Base class for OBS WebSocket CLI errors."""
def __init__(self, message: str, code: int = 1):
"""Initialize the error with a message and an optional code."""
super().__init__(message)
self.message = message
self.code = code

View File

@@ -3,44 +3,42 @@
from typing import Annotated, Optional
import obsws_python as obsws
import typer
from cyclopts import App, Argument, Parameter
from rich.table import Table
from rich.text import Text
from obsws_cli import console, util
from . import console, util
from .context import Context
from .enum import ExitCode
from .error import OBSWSCLIError
app = typer.Typer()
app = App(name='filter', help='Commands for managing filters in OBS sources')
@app.callback()
def main():
"""Control filters in OBS scenes."""
@app.command('list')
@app.command('ls', hidden=True)
@app.command(name=['list', 'ls'])
def list_(
ctx: typer.Context,
source_name: Annotated[
Optional[str],
typer.Argument(
show_default='The current scene',
help='The source to list filters for',
Argument(
hint='The source to list filters for',
),
] = None,
/,
*,
ctx: Annotated[Context, Parameter(parse=False)],
):
"""List filters for a source."""
if not source_name:
source_name = ctx.obj['obsws'].get_current_program_scene().scene_name
source_name = ctx.client.get_current_program_scene().scene_name
try:
resp = ctx.obj['obsws'].get_source_filter_list(source_name)
resp = ctx.client.get_source_filter_list(source_name)
except obsws.error.OBSSDKRequestError as e:
if e.code == 600:
console.err.print(
f'No source was found by the name of [yellow]{source_name}[/yellow].'
raise OBSWSCLIError(
f'No source found by the name of [yellow]{source_name}[/yellow].',
code=ExitCode.ERROR,
)
raise typer.Exit(1)
else:
raise
@@ -48,25 +46,25 @@ def list_(
console.out.print(
f'No filters found for source {console.highlight(ctx, source_name)}'
)
raise typer.Exit()
return
table = Table(
title=f'Filters for Source: {source_name}',
padding=(0, 2),
border_style=ctx.obj['style'].border,
border_style=ctx.style.border,
)
columns = [
(Text('Filter Name', justify='center'), 'left', ctx.obj['style'].column),
(Text('Kind', justify='center'), 'left', ctx.obj['style'].column),
(Text('Filter Name', justify='center'), 'left', ctx.style.column),
(Text('Kind', justify='center'), 'left', ctx.style.column),
(Text('Enabled', justify='center'), 'center', None),
(Text('Settings', justify='center'), 'center', ctx.obj['style'].column),
(Text('Settings', justify='center'), 'center', ctx.style.column),
]
for heading, justify, style in columns:
table.add_column(heading, justify=justify, style=style)
for filter in resp.filters:
resp = ctx.obj['obsws'].get_source_filter_default_settings(filter['filterKind'])
resp = ctx.client.get_source_filter_default_settings(filter['filterKind'])
settings = resp.default_filter_settings | filter['filterSettings']
table.add_row(
@@ -84,96 +82,85 @@ def list_(
console.out.print(table)
def _get_filter_enabled(ctx: typer.Context, source_name: str, filter_name: str):
def _get_filter_enabled(ctx: Context, source_name: str, filter_name: str):
"""Get the status of a filter for a source."""
resp = ctx.obj['obsws'].get_source_filter(source_name, filter_name)
resp = ctx.client.get_source_filter(source_name, filter_name)
return resp.filter_enabled
@app.command('enable')
@app.command('on', hidden=True)
@app.command(name=['enable', 'on'])
def enable(
ctx: typer.Context,
source_name: Annotated[
str,
typer.Argument(
..., show_default=False, help='The source to enable the filter for'
),
Argument(hint='The source to enable the filter for'),
],
filter_name: Annotated[
str,
typer.Argument(
..., show_default=False, help='The name of the filter to enable'
),
Argument(hint='The name of the filter to enable'),
],
/,
*,
ctx: Annotated[Context, Parameter(parse=False)],
):
"""Enable a filter for a source."""
if _get_filter_enabled(ctx, source_name, filter_name):
console.err.print(
f'Filter [yellow]{filter_name}[/yellow] is already enabled for source [yellow]{source_name}[/yellow]'
raise OBSWSCLIError(
f'Filter [yellow]{filter_name}[/yellow] is already enabled for source [yellow]{source_name}[/yellow]',
code=ExitCode.ERROR,
)
raise typer.Exit(1)
ctx.obj['obsws'].set_source_filter_enabled(source_name, filter_name, enabled=True)
ctx.client.set_source_filter_enabled(source_name, filter_name, enabled=True)
console.out.print(
f'Enabled filter {console.highlight(ctx, filter_name)} for source {console.highlight(ctx, source_name)}'
)
@app.command('disable')
@app.command('off', hidden=True)
@app.command(name=['disable', 'off'])
def disable(
ctx: typer.Context,
source_name: Annotated[
str,
typer.Argument(
..., show_default=False, help='The source to disable the filter for'
),
Argument(hint='The source to disable the filter for'),
],
filter_name: Annotated[
str,
typer.Argument(
..., show_default=False, help='The name of the filter to disable'
),
Argument(hint='The name of the filter to disable'),
],
/,
*,
ctx: Annotated[Context, Parameter(parse=False)],
):
"""Disable a filter for a source."""
if not _get_filter_enabled(ctx, source_name, filter_name):
console.err.print(
f'Filter [yellow]{filter_name}[/yellow] is already disabled for source [yellow]{source_name}[/yellow]'
raise OBSWSCLIError(
f'Filter [yellow]{filter_name}[/yellow] is already disabled for source [yellow]{source_name}[/yellow]',
code=ExitCode.ERROR,
)
raise typer.Exit(1)
ctx.obj['obsws'].set_source_filter_enabled(source_name, filter_name, enabled=False)
ctx.client.set_source_filter_enabled(source_name, filter_name, enabled=False)
console.out.print(
f'Disabled filter {console.highlight(ctx, filter_name)} for source {console.highlight(ctx, source_name)}'
)
@app.command('toggle')
@app.command('tg', hidden=True)
@app.command(name=['toggle', 'tg'])
def toggle(
ctx: typer.Context,
source_name: Annotated[
str,
typer.Argument(
..., show_default=False, help='The source to toggle the filter for'
),
Argument(hint='The source to toggle the filter for'),
],
filter_name: Annotated[
str,
typer.Argument(
..., show_default=False, help='The name of the filter to toggle'
),
Argument(hint='The name of the filter to toggle'),
],
/,
*,
ctx: Annotated[Context, Parameter(parse=False)],
):
"""Toggle a filter for a source."""
is_enabled = _get_filter_enabled(ctx, source_name, filter_name)
new_state = not is_enabled
ctx.obj['obsws'].set_source_filter_enabled(
source_name, filter_name, enabled=new_state
)
ctx.client.set_source_filter_enabled(source_name, filter_name, enabled=new_state)
if new_state:
console.out.print(
f'Enabled filter {console.highlight(ctx, filter_name)} for source {console.highlight(ctx, source_name)}'
@@ -184,22 +171,19 @@ def toggle(
)
@app.command('status')
@app.command('ss', hidden=True)
@app.command(name=['status', 'ss'])
def status(
ctx: typer.Context,
source_name: Annotated[
str,
typer.Argument(
..., show_default=False, help='The source to get the filter status for'
),
Argument(hint='The source to get the filter status for'),
],
filter_name: Annotated[
str,
typer.Argument(
..., show_default=False, help='The name of the filter to get the status for'
),
Argument(hint='The name of the filter to get the status for'),
],
/,
*,
ctx: Annotated[Context, Parameter(parse=False)],
):
"""Get the status of a filter for a source."""
is_enabled = _get_filter_enabled(ctx, source_name, filter_name)

220
obsws_cli/group.py Normal file
View File

@@ -0,0 +1,220 @@
"""module containing commands for manipulating groups in scenes."""
from typing import Annotated, Optional
from cyclopts import App, Argument, Parameter
from rich.table import Table
from rich.text import Text
from . import console, util, validate
from .context import Context
from .enum import ExitCode
from .error import OBSWSCLIError
from .protocols import DataclassProtocol
app = App(name='group', help='Commands for managing groups in OBS scenes')
@app.command(name=['list', 'ls'])
def list_(
scene_name: Annotated[
Optional[str],
Argument(
hint='Scene name to list groups for',
),
] = None,
/,
*,
ctx: Annotated[Context, Parameter(parse=False)],
):
"""List groups in a scene."""
if not scene_name:
scene_name = ctx.client.get_current_program_scene().scene_name
if not validate.scene_in_scenes(ctx, scene_name):
raise OBSWSCLIError(
f'Scene [yellow]{scene_name}[/yellow] not found.',
code=ExitCode.ERROR,
)
resp = ctx.client.get_scene_item_list(scene_name)
groups = [
(item.get('sceneItemId'), item.get('sourceName'), item.get('sceneItemEnabled'))
for item in resp.scene_items
if item.get('isGroup')
]
if not groups:
raise OBSWSCLIError(
f'No groups found in scene {console.highlight(ctx, scene_name)}.',
code=ExitCode.SUCCESS,
)
table = Table(
title=f'Groups in Scene: {scene_name}',
padding=(0, 2),
border_style=ctx.style.border,
)
columns = [
(Text('ID', justify='center'), 'center', ctx.style.column),
(Text('Group Name', justify='center'), 'left', ctx.style.column),
(Text('Enabled', justify='center'), 'center', None),
]
for heading, justify, style in columns:
table.add_column(heading, justify=justify, style=style)
for item_id, group_name, is_enabled in groups:
table.add_row(
str(item_id),
group_name,
util.check_mark(is_enabled),
)
console.out.print(table)
def _get_group(group_name: str, resp: DataclassProtocol) -> dict | None:
"""Get a group from the scene item list response."""
group = next(
(
item
for item in resp.scene_items
if item.get('sourceName') == group_name and item.get('isGroup')
),
None,
)
return group
@app.command(name=['show', 'sh'])
def show(
scene_name: Annotated[
str,
Argument(hint='Scene name the group is in'),
],
group_name: Annotated[str, Argument(hint='Group name to show')],
/,
*,
ctx: Annotated[Context, Parameter(parse=False)],
):
"""Show a group in a scene."""
if not validate.scene_in_scenes(ctx, scene_name):
raise OBSWSCLIError(
f'Scene [yellow]{scene_name}[/yellow] not found.',
code=ExitCode.ERROR,
)
resp = ctx.client.get_scene_item_list(scene_name)
if (group := _get_group(group_name, resp)) is None:
raise OBSWSCLIError(
f'Group [yellow]{group_name}[/yellow] not found in scene [yellow]{scene_name}[/yellow].',
code=ExitCode.ERROR,
)
ctx.client.set_scene_item_enabled(
scene_name=scene_name,
item_id=int(group.get('sceneItemId')),
enabled=True,
)
console.out.print(f'Group {console.highlight(ctx, group_name)} is now visible.')
@app.command(name=['hide', 'h'])
def hide(
scene_name: Annotated[str, Argument(hint='Scene name the group is in')],
group_name: Annotated[str, Argument(hint='Group name to hide')],
/,
*,
ctx: Annotated[Context, Parameter(parse=False)],
):
"""Hide a group in a scene."""
if not validate.scene_in_scenes(ctx, scene_name):
raise OBSWSCLIError(
f'Scene [yellow]{scene_name}[/yellow] not found.',
code=ExitCode.ERROR,
)
resp = ctx.client.get_scene_item_list(scene_name)
if (group := _get_group(group_name, resp)) is None:
raise OBSWSCLIError(
f'Group [yellow]{group_name}[/yellow] not found in scene [yellow]{scene_name}[/yellow].',
code=ExitCode.ERROR,
)
ctx.client.set_scene_item_enabled(
scene_name=scene_name,
item_id=int(group.get('sceneItemId')),
enabled=False,
)
console.out.print(f'Group {console.highlight(ctx, group_name)} is now hidden.')
@app.command(name=['toggle', 'tg'])
def toggle(
scene_name: Annotated[str, Argument(hint='Scene name the group is in')],
group_name: Annotated[str, Argument(hint='Group name to toggle')],
/,
*,
ctx: Annotated[Context, Parameter(parse=False)],
):
"""Toggle a group in a scene."""
if not validate.scene_in_scenes(ctx, scene_name):
raise OBSWSCLIError(
f'Scene [yellow]{scene_name}[/yellow] not found.',
code=ExitCode.ERROR,
)
resp = ctx.client.get_scene_item_list(scene_name)
if (group := _get_group(group_name, resp)) is None:
raise OBSWSCLIError(
f'Group [yellow]{group_name}[/yellow] not found in scene [yellow]{scene_name}[/yellow].',
code=ExitCode.ERROR,
)
new_state = not group.get('sceneItemEnabled')
ctx.client.set_scene_item_enabled(
scene_name=scene_name,
item_id=int(group.get('sceneItemId')),
enabled=new_state,
)
if new_state:
console.out.print(f'Group {console.highlight(ctx, group_name)} is now visible.')
else:
console.out.print(f'Group {console.highlight(ctx, group_name)} is now hidden.')
@app.command(name=['status', 'ss'])
def status(
scene_name: Annotated[str, Argument(hint='Scene name the group is in')],
group_name: Annotated[str, Argument(hint='Group name to check status')],
/,
*,
ctx: Annotated[Context, Parameter(parse=False)],
):
"""Get the status of a group in a scene."""
if not validate.scene_in_scenes(ctx, scene_name):
raise OBSWSCLIError(
f'Scene [yellow]{scene_name}[/yellow] not found.',
code=ExitCode.ERROR,
)
resp = ctx.client.get_scene_item_list(scene_name)
if (group := _get_group(group_name, resp)) is None:
raise OBSWSCLIError(
f'Group [yellow]{group_name}[/yellow] not found in scene [yellow]{scene_name}[/yellow].',
code=ExitCode.ERROR,
)
enabled = ctx.client.get_scene_item_enabled(
scene_name=scene_name,
item_id=int(group.get('sceneItemId')),
)
if enabled.scene_item_enabled:
console.out.print(f'Group {console.highlight(ctx, group_name)} is now visible.')
else:
console.out.print(f'Group {console.highlight(ctx, group_name)} is now hidden.')

76
obsws_cli/hotkey.py Normal file
View File

@@ -0,0 +1,76 @@
"""module containing commands for hotkey management."""
from typing import Annotated
from cyclopts import App, Argument, Parameter
from rich.table import Table
from rich.text import Text
from . import console
from .context import Context
app = App(name='hotkey', help='Commands for managing hotkeys in OBS')
@app.command(name=['list', 'ls'])
def list_(
*,
ctx: Annotated[Context, Parameter(parse=False)],
):
"""List all hotkeys."""
resp = ctx.client.get_hotkey_list()
table = Table(
title='Hotkeys',
padding=(0, 2),
border_style=ctx.style.border,
)
table.add_column(
Text('Hotkey Name', justify='center'),
justify='left',
style=ctx.style.column,
)
for i, hotkey in enumerate(resp.hotkeys):
table.add_row(hotkey, style='' if i % 2 == 0 else 'dim')
console.out.print(table)
@app.command(name=['trigger', 'tr'])
def trigger(
hotkey: Annotated[str, Argument(hint='The hotkey to trigger')],
/,
*,
ctx: Annotated[Context, Parameter(parse=False)],
):
"""Trigger a hotkey by name."""
ctx.client.trigger_hotkey_by_name(hotkey)
@app.command(name=['trigger-sequence', 'trs'])
def trigger_sequence(
key_id: Annotated[
str,
Argument(
hint='The OBS key ID to trigger, see https://github.com/onyx-and-iris/obsws-cli?tab=readme-ov-file#hotkey for more info',
),
],
/,
shift: Annotated[
bool, Parameter(help='Press shift when triggering the hotkey')
] = False,
ctrl: Annotated[
bool, Parameter(help='Press control when triggering the hotkey')
] = False,
alt: Annotated[
bool, Parameter(help='Press alt when triggering the hotkey')
] = False,
cmd: Annotated[
bool, Parameter(help='Press cmd when triggering the hotkey')
] = False,
*,
ctx: Annotated[Context, Parameter(parse=False)],
):
"""Trigger a hotkey by sequence."""
ctx.client.trigger_hotkey_by_key_sequence(key_id, shift, ctrl, alt, cmd)

181
obsws_cli/input.py Normal file
View File

@@ -0,0 +1,181 @@
"""module containing commands for manipulating inputs."""
from typing import Annotated
import obsws_python as obsws
from cyclopts import App, Argument, Parameter
from rich.table import Table
from rich.text import Text
from . import console, util, validate
from .context import Context
from .enum import ExitCode
from .error import OBSWSCLIError
app = App(name='input', help='Commands for managing inputs in OBS')
@app.command(name=['list', 'ls'])
def list_(
input: Annotated[bool, Parameter(help='Filter by input type.')] = False,
output: Annotated[bool, Parameter(help='Filter by output type.')] = False,
colour: Annotated[bool, Parameter(help='Filter by colour source type.')] = False,
ffmpeg: Annotated[bool, Parameter(help='Filter by ffmpeg source type.')] = False,
vlc: Annotated[bool, Parameter(help='Filter by VLC source type.')] = False,
uuid: Annotated[bool, Parameter(help='Show UUIDs of inputs.')] = False,
*,
ctx: Annotated[Context, Parameter(parse=False)],
):
"""List all inputs."""
resp = ctx.client.get_input_list()
kinds = []
if input:
kinds.append('input')
if output:
kinds.append('output')
if colour:
kinds.append('color')
if ffmpeg:
kinds.append('ffmpeg')
if vlc:
kinds.append('vlc')
if not any([input, output, colour, ffmpeg, vlc]):
kinds = ctx.client.get_input_kind_list(False).input_kinds
inputs = sorted(
(
(input_.get('inputName'), input_.get('inputKind'), input_.get('inputUuid'))
for input_ in filter(
lambda input_: any(kind in input_.get('inputKind') for kind in kinds),
resp.inputs,
)
),
key=lambda x: x[0], # Sort by input name
)
if not inputs:
raise OBSWSCLIError('No inputs found.', code=ExitCode.SUCCESS)
table = Table(title='Inputs', padding=(0, 2), border_style=ctx.style.border)
if uuid:
columns = [
(Text('Input Name', justify='center'), 'left', ctx.style.column),
(Text('Kind', justify='center'), 'center', ctx.style.column),
(Text('Muted', justify='center'), 'center', None),
(Text('UUID', justify='center'), 'left', ctx.style.column),
]
else:
columns = [
(Text('Input Name', justify='center'), 'left', ctx.style.column),
(Text('Kind', justify='center'), 'center', ctx.style.column),
(Text('Muted', justify='center'), 'center', None),
]
for heading, justify, style in columns:
table.add_column(heading, justify=justify, style=style)
for input_name, input_kind, input_uuid in inputs:
input_mark = ''
try:
input_muted = ctx.client.get_input_mute(name=input_name).input_muted
input_mark = util.check_mark(input_muted)
except obsws.error.OBSSDKRequestError as e:
if e.code == 604: # Input does not support audio
input_mark = 'N/A'
else:
raise
if uuid:
table.add_row(
input_name,
util.snakecase_to_titlecase(input_kind),
input_mark,
input_uuid,
)
else:
table.add_row(
input_name,
util.snakecase_to_titlecase(input_kind),
input_mark,
)
console.out.print(table)
@app.command(name=['mute', 'm'])
def mute(
input_name: Annotated[str, Argument(hint='Name of the input to mute.')],
/,
*,
ctx: Annotated[Context, Parameter(parse=False)],
):
"""Mute an input."""
if not validate.input_in_inputs(ctx, input_name):
raise OBSWSCLIError(
f'Input [yellow]{input_name}[/yellow] not found.',
code=ExitCode.ERROR,
)
ctx.client.set_input_mute(
name=input_name,
muted=True,
)
console.out.print(f'Input {console.highlight(ctx, input_name)} muted.')
@app.command(name=['unmute', 'um'])
def unmute(
input_name: Annotated[str, Argument(hint='Name of the input to unmute.')],
/,
*,
ctx: Annotated[Context, Parameter(parse=False)],
):
"""Unmute an input."""
if not validate.input_in_inputs(ctx, input_name):
raise OBSWSCLIError(
f'Input [yellow]{input_name}[/yellow] not found.',
code=ExitCode.ERROR,
)
ctx.client.set_input_mute(
name=input_name,
muted=False,
)
console.out.print(f'Input {console.highlight(ctx, input_name)} unmuted.')
@app.command(name=['toggle', 'tg'])
def toggle(
input_name: Annotated[
str,
Argument(hint='Name of the input to toggle.'),
],
/,
*,
ctx: Annotated[Context, Parameter(parse=False)],
):
"""Toggle an input."""
if not validate.input_in_inputs(ctx, input_name):
raise OBSWSCLIError(
f'Input [yellow]{input_name}[/yellow] not found.',
code=ExitCode.ERROR,
)
resp = ctx.client.get_input_mute(name=input_name)
new_state = not resp.input_muted
ctx.client.set_input_mute(
name=input_name,
muted=new_state,
)
if new_state:
console.out.print(
f'Input {console.highlight(ctx, input_name)} muted.',
)
else:
console.out.print(
f'Input {console.highlight(ctx, input_name)} unmuted.',
)

125
obsws_cli/profile.py Normal file
View File

@@ -0,0 +1,125 @@
"""module containing commands for manipulating profiles in OBS."""
from typing import Annotated
from cyclopts import App, Argument, Parameter
from rich.table import Table
from rich.text import Text
from . import console, util, validate
from .context import Context
from .enum import ExitCode
from .error import OBSWSCLIError
app = App(name='profile', help='Commands for managing profiles in OBS')
@app.command(name=['list', 'ls'])
def list_(
*,
ctx: Annotated[Context, Parameter(parse=False)],
):
"""List profiles."""
resp = ctx.client.get_profile_list()
table = Table(title='Profiles', padding=(0, 2), border_style=ctx.style.border)
columns = [
(Text('Profile Name', justify='center'), 'left', ctx.style.column),
(Text('Current', justify='center'), 'center', None),
]
for heading, justify, style in columns:
table.add_column(heading, justify=justify, style=style)
for profile in resp.profiles:
table.add_row(
profile,
util.check_mark(
ctx, profile == resp.current_profile_name, empty_if_false=True
),
)
console.out.print(table)
@app.command(name=['current', 'get'])
def current(
*,
ctx: Annotated[Context, Parameter(parse=False)],
):
"""Get the current profile."""
resp = ctx.client.get_profile_list()
console.out.print(
f'Current profile: {console.highlight(ctx, resp.current_profile_name)}'
)
@app.command(name=['switch', 'set'])
def switch(
profile_name: Annotated[
str,
Argument(hint='Name of the profile to switch to'),
],
/,
*,
ctx: Annotated[Context, Parameter(parse=False)],
):
"""Switch to a profile."""
if not validate.profile_exists(ctx, profile_name):
console.err.print(f'Profile [yellow]{profile_name}[/yellow] not found.')
raise OBSWSCLIError(
f'Profile [yellow]{profile_name}[/yellow] not found.',
code=ExitCode.ERROR,
)
resp = ctx.client.get_profile_list()
if resp.current_profile_name == profile_name:
raise OBSWSCLIError(
f'Profile [yellow]{profile_name}[/yellow] is already the current profile.',
code=ExitCode.ERROR,
)
ctx.client.set_current_profile(profile_name)
console.out.print(f'Switched to profile {console.highlight(ctx, profile_name)}.')
@app.command(name=['create', 'new'])
def create(
profile_name: Annotated[
str,
Argument(hint='Name of the profile to create.'),
],
/,
*,
ctx: Annotated[Context, Parameter(parse=False)],
):
"""Create a new profile."""
if validate.profile_exists(ctx, profile_name):
raise OBSWSCLIError(
f'Profile [yellow]{profile_name}[/yellow] already exists.',
code=ExitCode.ERROR,
)
ctx.client.create_profile(profile_name)
console.out.print(f'Created profile {console.highlight(ctx, profile_name)}.')
@app.command(name=['remove', 'rm'])
def remove(
profile_name: Annotated[
str,
Argument(hint='Name of the profile to remove.'),
],
/,
*,
ctx: Annotated[Context, Parameter(parse=False)],
):
"""Remove a profile."""
if not validate.profile_exists(ctx, profile_name):
console.err.print(f'Profile [yellow]{profile_name}[/yellow] not found.')
raise OBSWSCLIError(
f'Profile [yellow]{profile_name}[/yellow] not found.',
code=ExitCode.ERROR,
)
ctx.client.remove_profile(profile_name)
console.out.print(f'Removed profile {console.highlight(ctx, profile_name)}.')

View File

@@ -2,44 +2,45 @@
from typing import Annotated
import typer
from cyclopts import App, Argument, Parameter
from rich.table import Table
from rich.text import Text
from obsws_cli import console
from . import console
from .context import Context
from .enum import ExitCode
from .error import OBSWSCLIError
app = typer.Typer()
app = App(name='projector', help='Commands for managing projectors in OBS')
@app.callback()
def main():
"""Control projectors in OBS."""
@app.command('list-monitors')
@app.command('ls-m', hidden=True)
def list_monitors(ctx: typer.Context):
@app.command(name=['list-monitors', 'ls-m'])
def list_monitors(
*,
ctx: Annotated[Context, Parameter(parse=False)],
):
"""List available monitors."""
resp = ctx.obj['obsws'].get_monitor_list()
resp = ctx.client.get_monitor_list()
if not resp.monitors:
console.out.print('No monitors found.')
return
monitors = sorted(
((m['monitorIndex'], m['monitorName']) for m in resp.monitors),
key=lambda m: m[0],
)
if not monitors:
console.out.print('No monitors found.')
raise typer.Exit()
table = Table(
title='Available Monitors',
padding=(0, 2),
border_style=ctx.obj['style'].border,
border_style=ctx.style.border,
)
table.add_column(
Text('Index', justify='center'), justify='center', style=ctx.obj['style'].column
Text('Index', justify='center'), justify='center', style=ctx.style.column
)
table.add_column(
Text('Name', justify='center'), justify='left', style=ctx.obj['style'].column
Text('Name', justify='center'), justify='left', style=ctx.style.column
)
for index, monitor in monitors:
@@ -48,30 +49,30 @@ def list_monitors(ctx: typer.Context):
console.out.print(table)
@app.command('open')
@app.command('o', hidden=True)
@app.command(name=['open', 'o'])
def open(
ctx: typer.Context,
monitor_index: Annotated[
int,
typer.Option(help='Index of the monitor to open the projector on.'),
] = 0,
source_name: Annotated[
str,
typer.Argument(
show_default='The current scene',
help='Name of the source to project.',
Argument(
hint='Name of the source to project.',
),
] = '',
/,
monitor_index: Annotated[
int,
Parameter(help='Index of the monitor to open the projector on.'),
] = 0,
*,
ctx: Annotated[Context, Parameter(parse=False)],
):
"""Open a fullscreen projector for a source on a specific monitor."""
if not source_name:
source_name = ctx.obj['obsws'].get_current_program_scene().scene_name
source_name = ctx.client.get_current_program_scene().scene_name
monitors = ctx.obj['obsws'].get_monitor_list().monitors
monitors = ctx.client.get_monitor_list().monitors
for monitor in monitors:
if monitor['monitorIndex'] == monitor_index:
ctx.obj['obsws'].open_source_projector(
ctx.client.open_source_projector(
source_name=source_name,
monitor_index=monitor_index,
)
@@ -82,8 +83,8 @@ def open(
break
else:
console.err.print(
raise OBSWSCLIError(
f'Monitor with index [yellow]{monitor_index}[/yellow] not found. '
f'Use [yellow]obsws-cli projector ls-m[/yellow] to see available monitors.'
f'Use [yellow]obsws-cli projector ls-m[/yellow] to see available monitors.',
ExitCode.ERROR,
)
raise typer.Exit(code=1)

196
obsws_cli/record.py Normal file
View File

@@ -0,0 +1,196 @@
"""module for controlling OBS recording functionality."""
from pathlib import Path
from typing import Annotated, Optional
from cyclopts import App, Argument, Parameter
from . import console
from .context import Context
from .enum import ExitCode
from .error import OBSWSCLIError
app = App(name='record', help='Commands for controlling OBS recording functionality.')
def _get_recording_status(ctx: Context) -> tuple:
"""Get recording status."""
resp = ctx.client.get_record_status()
return resp.output_active, resp.output_paused
@app.command(name=['start', 's'])
def start(
*,
ctx: Annotated[Context, Parameter(parse=False)],
):
"""Start recording."""
active, paused = _get_recording_status(ctx)
if active:
err_msg = 'Recording is already in progress, cannot start.'
if paused:
err_msg += ' Try resuming it.'
raise OBSWSCLIError(err_msg, ExitCode.ERROR)
ctx.client.start_record()
console.out.print('Recording started successfully.')
@app.command(name=['stop', 'st'])
def stop(
*,
ctx: Annotated[Context, Parameter(parse=False)],
):
"""Stop recording."""
active, _ = _get_recording_status(ctx)
if not active:
raise OBSWSCLIError(
'Recording is not in progress, cannot stop.', ExitCode.ERROR
)
resp = ctx.client.stop_record()
console.out.print(
f'Recording stopped successfully. Saved to: {console.highlight(ctx, resp.output_path)}'
)
@app.command(name=['toggle', 'tg'])
def toggle(
*,
ctx: Annotated[Context, Parameter(parse=False)],
):
"""Toggle recording."""
resp = ctx.client.toggle_record()
if resp.output_active:
console.out.print('Recording started successfully.')
else:
console.out.print('Recording stopped successfully.')
@app.command(name=['status', 'ss'])
def status(
*,
ctx: Annotated[Context, Parameter(parse=False)],
):
"""Get recording status."""
active, paused = _get_recording_status(ctx)
if active:
if paused:
console.out.print('Recording is in progress and paused.')
else:
console.out.print('Recording is in progress.')
else:
console.out.print('Recording is not in progress.')
@app.command(name=['resume', 'r'])
def resume(
*,
ctx: Annotated[Context, Parameter(parse=False)],
):
"""Resume recording."""
active, paused = _get_recording_status(ctx)
if not active:
raise OBSWSCLIError(
'Recording is not in progress, cannot resume.', ExitCode.ERROR
)
if not paused:
raise OBSWSCLIError(
'Recording is in progress but not paused, cannot resume.', ExitCode.ERROR
)
ctx.client.resume_record()
console.out.print('Recording resumed successfully.')
@app.command(name=['pause', 'p'])
def pause(
*,
ctx: Annotated[Context, Parameter(parse=False)],
):
"""Pause recording."""
active, paused = _get_recording_status(ctx)
if not active:
raise OBSWSCLIError(
'Recording is not in progress, cannot pause.', ExitCode.ERROR
)
if paused:
raise OBSWSCLIError(
'Recording is in progress but already paused, cannot pause.', ExitCode.ERROR
)
ctx.client.pause_record()
console.out.print('Recording paused successfully.')
@app.command(name=['directory', 'd'])
def directory(
record_directory: Annotated[
Optional[Path],
# Since the CLI and OBS may be running on different platforms,
# we won't validate the path here.
Argument(
hint='Directory to set for recording.',
),
] = None,
*,
ctx: Annotated[Context, Parameter(parse=False)],
):
"""Get or set the recording directory."""
if record_directory is not None:
ctx.client.set_record_directory(str(record_directory))
console.out.print(
f'Recording directory updated to: {console.highlight(ctx, record_directory)}'
)
else:
resp = ctx.client.get_record_directory()
console.out.print(
f'Recording directory: {console.highlight(ctx, resp.record_directory)}'
)
@app.command(name=['split', 'sp'])
def split(
*,
ctx: Annotated[Context, Parameter(parse=False)],
):
"""Split the current recording."""
active, paused = _get_recording_status(ctx)
if not active:
console.err.print('Recording is not in progress, cannot split.')
raise OBSWSCLIError(
'Recording is not in progress, cannot split.', ExitCode.ERROR
)
if paused:
raise OBSWSCLIError('Recording is paused, cannot split.', ExitCode.ERROR)
ctx.client.split_record_file()
console.out.print('Recording split successfully.')
@app.command(name=['chapter', 'ch'])
def chapter(
chapter_name: Annotated[
Optional[str],
Argument(
hint='Name of the chapter to create.',
),
] = None,
*,
ctx: Annotated[Context, Parameter(parse=False)],
):
"""Create a chapter in the current recording."""
active, paused = _get_recording_status(ctx)
if not active:
raise OBSWSCLIError(
'Recording is not in progress, cannot create chapter.', ExitCode.ERROR
)
if paused:
raise OBSWSCLIError(
'Recording is paused, cannot create chapter.', ExitCode.ERROR
)
ctx.client.create_record_chapter(chapter_name)
console.out.print(
f'Chapter {console.highlight(ctx, chapter_name or "unnamed")} created successfully.'
)

78
obsws_cli/replaybuffer.py Normal file
View File

@@ -0,0 +1,78 @@
"""module containing commands for manipulating the replay buffer in OBS."""
from typing import Annotated
from cyclopts import App, Parameter
from . import console
from .context import Context
from .enum import ExitCode
from .error import OBSWSCLIError
app = App(
name='replaybuffer', help='Commands for controlling the replay buffer in OBS.'
)
@app.command(name=['start', 's'])
def start(
*,
ctx: Annotated[Context, Parameter(parse=False)],
):
"""Start the replay buffer."""
resp = ctx.client.get_replay_buffer_status()
if resp.output_active:
raise OBSWSCLIError('Replay buffer is already active.', ExitCode.ERROR)
ctx.client.start_replay_buffer()
console.out.print('Replay buffer started.')
@app.command(name=['stop', 'st'])
def stop(
*,
ctx: Annotated[Context, Parameter(parse=False)],
):
"""Stop the replay buffer."""
resp = ctx.client.get_replay_buffer_status()
if not resp.output_active:
raise OBSWSCLIError('Replay buffer is not active.', ExitCode.ERROR)
ctx.client.stop_replay_buffer()
console.out.print('Replay buffer stopped.')
@app.command(name=['toggle', 'tg'])
def toggle(
*,
ctx: Annotated[Context, Parameter(parse=False)],
):
"""Toggle the replay buffer."""
resp = ctx.client.toggle_replay_buffer()
if resp.output_active:
console.out.print('Replay buffer is active.')
else:
console.out.print('Replay buffer is not active.')
@app.command(name=['status', 'ss'])
def status(
*,
ctx: Annotated[Context, Parameter(parse=False)],
):
"""Get the status of the replay buffer."""
resp = ctx.client.get_replay_buffer_status()
if resp.output_active:
console.out.print('Replay buffer is active.')
else:
console.out.print('Replay buffer is not active.')
@app.command(name=['save', 'sv'])
def save(
*,
ctx: Annotated[Context, Parameter(parse=False)],
):
"""Save the replay buffer."""
ctx.client.save_replay_buffer()
console.out.print('Replay buffer saved.')

123
obsws_cli/scene.py Normal file
View File

@@ -0,0 +1,123 @@
"""module containing commands for controlling OBS scenes."""
from typing import Annotated
from cyclopts import App, Argument, Parameter
from rich.table import Table
from rich.text import Text
from . import console, util, validate
from .context import Context
from .enum import ExitCode
from .error import OBSWSCLIError
app = App(name='scene', help='Commands for managing OBS scenes')
@app.command(name=['list', 'ls'])
def list_(
uuid: Annotated[bool, Parameter(help='Show UUIDs of scenes')] = False,
*,
ctx: Annotated[Context, Parameter(parse=False)],
):
"""List all scenes."""
resp = ctx.client.get_scene_list()
scenes = (
(scene.get('sceneName'), scene.get('sceneUuid'))
for scene in reversed(resp.scenes)
)
active_scene = ctx.client.get_current_program_scene().scene_name
table = Table(title='Scenes', padding=(0, 2), border_style=ctx.style.border)
if uuid:
columns = [
(Text('Scene Name', justify='center'), 'left', ctx.style.column),
(Text('Active', justify='center'), 'center', None),
(Text('UUID', justify='center'), 'left', ctx.style.column),
]
else:
columns = [
(Text('Scene Name', justify='center'), 'left', ctx.style.column),
(Text('Active', justify='center'), 'center', None),
]
for heading, justify, style in columns:
table.add_column(heading, justify=justify, style=style)
for scene_name, scene_uuid in scenes:
if uuid:
table.add_row(
scene_name,
util.check_mark(scene_name == active_scene, empty_if_false=True),
scene_uuid,
)
else:
table.add_row(
scene_name,
util.check_mark(scene_name == active_scene, empty_if_false=True),
)
console.out.print(table)
@app.command(name=['current', 'get'])
def current(
preview: Annotated[
bool, Parameter(help='Get the preview scene instead of the program scene')
] = False,
*,
ctx: Annotated[Context, Parameter(parse=False)],
):
"""Get the current program scene or preview scene."""
if preview and not validate.studio_mode_enabled(ctx):
raise OBSWSCLIError(
'Studio mode is not enabled, cannot get preview scene.',
code=ExitCode.ERROR,
)
if preview:
resp = ctx.client.get_current_preview_scene()
console.out.print(
f'Current Preview Scene: {console.highlight(ctx, resp.current_preview_scene_name)}'
)
else:
resp = ctx.client.get_current_program_scene()
console.out.print(
f'Current Program Scene: {console.highlight(ctx, resp.current_program_scene_name)}'
)
@app.command(name=['switch', 'set'])
def switch(
scene_name: Annotated[str, Argument(hint='Name of the scene to switch to')],
/,
preview: Annotated[
bool,
Parameter(help='Switch to the preview scene instead of the program scene'),
] = False,
*,
ctx: Annotated[Context, Parameter(parse=False)],
):
"""Switch to a scene."""
if preview and not validate.studio_mode_enabled(ctx):
raise OBSWSCLIError(
'Studio mode is not enabled, cannot switch to preview scene.',
code=ExitCode.ERROR,
)
if not validate.scene_in_scenes(ctx, scene_name):
raise OBSWSCLIError(
f'Scene [yellow]{scene_name}[/yellow] not found.',
code=ExitCode.ERROR,
)
if preview:
ctx.client.set_current_preview_scene(scene_name)
console.out.print(
f'Switched to preview scene: {console.highlight(ctx, scene_name)}'
)
else:
ctx.client.set_current_program_scene(scene_name)
console.out.print(
f'Switched to program scene: {console.highlight(ctx, scene_name)}'
)

View File

@@ -0,0 +1,101 @@
"""module containing commands for manipulating scene collections."""
from typing import Annotated
from cyclopts import App, Argument, Parameter
from rich.table import Table
from . import console, validate
from .context import Context
from .enum import ExitCode
from .error import OBSWSCLIError
app = App(
name='scenecollection', help='Commands for controlling scene collections in OBS.'
)
@app.command(name=['list', 'ls'])
def list_(
*,
ctx: Annotated[Context, Parameter(parse=False)],
):
"""List all scene collections."""
resp = ctx.client.get_scene_collection_list()
table = Table(
title='Scene Collections',
padding=(0, 2),
border_style=ctx.style.border,
)
table.add_column('Scene Collection Name', justify='left', style=ctx.style.column)
for scene_collection_name in resp.scene_collections:
table.add_row(scene_collection_name)
console.out.print(table)
@app.command(name=['current', 'get'])
def current(
*,
ctx: Annotated[Context, Parameter(parse=False)],
):
"""Get the current scene collection."""
resp = ctx.client.get_scene_collection_list()
console.out.print(
f'Current scene collection: {console.highlight(ctx, resp.current_scene_collection_name)}'
)
@app.command(name=['switch', 'set'])
def switch(
scene_collection_name: Annotated[
str, Argument(hint='Name of the scene collection to switch to')
],
/,
*,
ctx: Annotated[Context, Parameter(parse=False)],
):
"""Switch to a scene collection."""
if not validate.scene_collection_in_scene_collections(ctx, scene_collection_name):
raise OBSWSCLIError(
f'Scene collection [yellow]{scene_collection_name}[/yellow] not found.',
exit_code=ExitCode.ERROR,
)
current_scene_collection = (
ctx.client.get_scene_collection_list().current_scene_collection_name
)
if scene_collection_name == current_scene_collection:
raise OBSWSCLIError(
f'Scene collection [yellow]{scene_collection_name}[/yellow] is already active.',
exit_code=ExitCode.ERROR,
)
ctx.client.set_current_scene_collection(scene_collection_name)
console.out.print(
f'Switched to scene collection {console.highlight(ctx, scene_collection_name)}.'
)
@app.command(name=['create', 'new'])
def create(
scene_collection_name: Annotated[
str, Argument(hint='Name of the scene collection to create')
],
/,
*,
ctx: Annotated[Context, Parameter(parse=False)],
):
"""Create a new scene collection."""
if validate.scene_collection_in_scene_collections(ctx, scene_collection_name):
raise OBSWSCLIError(
f'Scene collection [yellow]{scene_collection_name}[/yellow] already exists.',
exit_code=ExitCode.ERROR,
)
ctx.client.create_scene_collection(scene_collection_name)
console.out.print(
f'Created scene collection {console.highlight(ctx, scene_collection_name)}.'
)

View File

@@ -2,38 +2,42 @@
from typing import Annotated, Optional
import typer
from cyclopts import App, Argument, Parameter
from rich.table import Table
from obsws_cli import console, util, validate
from . import console, util, validate
from .context import Context
from .enum import ExitCode
from .error import OBSWSCLIError
app = typer.Typer()
app = App(name='sceneitem', help='Commands for controlling scene items in OBS.')
@app.callback()
def main():
"""Control items in OBS scenes."""
@app.command('list')
@app.command('ls', hidden=True)
@app.command(name=['list', 'ls'])
def list_(
ctx: typer.Context,
scene_name: Annotated[
Optional[str],
typer.Argument(
show_default='The current scene',
help='Scene name to list items for',
callback=validate.scene_in_scenes,
Argument(
hint='Scene name to list items for',
),
] = None,
uuid: Annotated[bool, typer.Option(help='Show UUIDs of scene items')] = False,
/,
uuid: Annotated[bool, Parameter(help='Show UUIDs of scene items')] = False,
*,
ctx: Annotated[Context, Parameter(parse=False)],
):
"""List all items in a scene."""
if scene_name is None:
scene_name = ctx.obj['obsws'].get_current_program_scene().scene_name
if not scene_name:
scene_name = ctx.client.get_current_program_scene().scene_name
resp = ctx.obj['obsws'].get_scene_item_list(scene_name)
if not validate.scene_in_scenes(ctx, scene_name):
console.err.print(f'Scene [yellow]{scene_name}[/yellow] not found.')
raise OBSWSCLIError(
f'Scene [yellow]{scene_name}[/yellow] not found.',
exit_code=ExitCode.ERROR,
)
resp = ctx.client.get_scene_item_list(scene_name)
items = sorted(
(
(
@@ -49,10 +53,10 @@ def list_(
)
if not items:
console.out.print(
f'No items found in scene {console.highlight(ctx, scene_name)}.'
raise OBSWSCLIError(
f'No items found in scene [yellow]{scene_name}[/yellow].',
exit_code=ExitCode.SUCCESS,
)
raise typer.Exit()
table = Table(
title=f'Items in Scene: {scene_name}',
@@ -135,36 +139,39 @@ def list_(
def _validate_sources(
ctx: typer.Context,
ctx: Context,
scene_name: str,
item_name: str,
group: Optional[str] = None,
) -> bool:
):
"""Validate the scene name and item name."""
if not validate.scene_in_scenes(ctx, scene_name):
console.err.print(f'Scene [yellow]{scene_name}[/yellow] not found.')
return False
raise OBSWSCLIError(
f'Scene [yellow]{scene_name}[/yellow] not found.',
exit_code=ExitCode.ERROR,
)
if group:
if not validate.item_in_scene_item_list(ctx, scene_name, group):
console.err.print(
f'Group [yellow]{group}[/yellow] not found in scene [yellow]{scene_name}[/yellow].'
raise OBSWSCLIError(
f'Group [yellow]{group}[/yellow] not found in scene [yellow]{scene_name}[/yellow].',
exit_code=ExitCode.ERROR,
)
return False
else:
if not validate.item_in_scene_item_list(ctx, scene_name, item_name):
console.err.print(
raise OBSWSCLIError(
f'Item [yellow]{item_name}[/yellow] not found in scene [yellow]{scene_name}[/yellow]. Is the item in a group? '
f'If so use the [yellow]--group[/yellow] option to specify the parent group.\n'
'Use [yellow]obsws-cli sceneitem ls[/yellow] for a list of items in the scene.'
'Use [yellow]obsws-cli sceneitem ls[/yellow] for a list of items in the scene.',
exit_code=ExitCode.ERROR,
)
return False
return True
def _get_scene_name_and_item_id(
ctx: typer.Context, scene_name: str, item_name: str, group: Optional[str] = None
ctx: Context,
scene_name: str,
item_name: str,
group: Optional[str] = None,
):
"""Get the scene name and item ID for the given scene and item name."""
if group:
@@ -175,10 +182,10 @@ def _get_scene_name_and_item_id(
scene_item_id = item.get('sceneItemId')
break
else:
console.err.print(
f'Item [yellow]{item_name}[/yellow] not found in group [yellow]{group}[/yellow].'
raise OBSWSCLIError(
f'Item [yellow]{item_name}[/yellow] not found in group [yellow]{group}[/yellow].',
exit_code=ExitCode.ERROR,
)
raise typer.Exit(1)
else:
resp = ctx.obj['obsws'].get_scene_item_id(scene_name, item_name)
scene_item_id = resp.scene_item_id
@@ -186,22 +193,20 @@ def _get_scene_name_and_item_id(
return scene_name, scene_item_id
@app.command('show')
@app.command('sh', hidden=True)
@app.command(name=['show', 'sh'])
def show(
ctx: typer.Context,
scene_name: Annotated[
str, typer.Argument(..., show_default=False, help='Scene name the item is in')
],
scene_name: Annotated[str, Argument(hint='Scene name the item is in')],
item_name: Annotated[
str,
typer.Argument(..., show_default=False, help='Item name to show in the scene'),
Argument(hint='Item name to show in the scene'),
],
group: Annotated[Optional[str], typer.Option(help='Parent group name')] = None,
/,
group: Annotated[Optional[str], Parameter(help='Parent group name')] = None,
*,
ctx: Annotated[Context, Parameter(parse=False)],
):
"""Show an item in a scene."""
if not _validate_sources(ctx, scene_name, item_name, group):
raise typer.Exit(1)
_validate_sources(ctx, scene_name, item_name, group)
old_scene_name = scene_name
scene_name, scene_item_id = _get_scene_name_and_item_id(
@@ -229,22 +234,20 @@ def show(
)
@app.command('hide')
@app.command('h', hidden=True)
@app.command(name=['hide', 'h'])
def hide(
ctx: typer.Context,
scene_name: Annotated[
str, typer.Argument(..., show_default=False, help='Scene name the item is in')
],
scene_name: Annotated[str, Argument(hint='Scene name the item is in')],
item_name: Annotated[
str,
typer.Argument(..., show_default=False, help='Item name to hide in the scene'),
Argument(hint='Item name to hide in the scene'),
],
group: Annotated[Optional[str], typer.Option(help='Parent group name')] = None,
/,
group: Annotated[Optional[str], Parameter(help='Parent group name')] = None,
*,
ctx: Annotated[Context, Parameter(parse=False)],
):
"""Hide an item in a scene."""
if not _validate_sources(ctx, scene_name, item_name, group):
raise typer.Exit(1)
_validate_sources(ctx, scene_name, item_name, group)
old_scene_name = scene_name
scene_name, scene_item_id = _get_scene_name_and_item_id(
@@ -271,37 +274,30 @@ def hide(
)
@app.command('toggle')
@app.command('tg', hidden=True)
@app.command(name=['toggle', 'tg'])
def toggle(
ctx: typer.Context,
scene_name: Annotated[
str, typer.Argument(..., show_default=False, help='Scene name the item is in')
],
item_name: Annotated[
str,
typer.Argument(
..., show_default=False, help='Item name to toggle in the scene'
),
],
group: Annotated[Optional[str], typer.Option(help='Parent group name')] = None,
scene_name: Annotated[str, Argument(hint='Scene name the item is in')],
item_name: Annotated[str, Argument(hint='Item name to toggle in the scene')],
/,
group: Annotated[Optional[str], Parameter(help='Parent group name')] = None,
*,
ctx: Annotated[Context, Parameter(parse=False)],
):
"""Toggle an item in a scene."""
if not _validate_sources(ctx, scene_name, item_name, group):
raise typer.Exit(1)
_validate_sources(ctx, scene_name, item_name, group)
old_scene_name = scene_name
scene_name, scene_item_id = _get_scene_name_and_item_id(
ctx, scene_name, item_name, group
)
enabled = ctx.obj['obsws'].get_scene_item_enabled(
enabled = ctx.client.get_scene_item_enabled(
scene_name=scene_name,
item_id=int(scene_item_id),
)
new_state = not enabled.scene_item_enabled
ctx.obj['obsws'].set_scene_item_enabled(
ctx.client.set_scene_item_enabled(
scene_name=scene_name,
item_id=int(scene_item_id),
enabled=new_state,
@@ -333,31 +329,26 @@ def toggle(
)
@app.command('visible')
@app.command('v', hidden=True)
@app.command(name=['visible', 'v'])
def visible(
ctx: typer.Context,
scene_name: Annotated[
str, typer.Argument(..., show_default=False, help='Scene name the item is in')
],
scene_name: Annotated[str, Argument(hint='Scene name the item is in')],
item_name: Annotated[
str,
typer.Argument(
..., show_default=False, help='Item name to check visibility in the scene'
),
str, Argument(hint='Item name to check visibility in the scene')
],
group: Annotated[Optional[str], typer.Option(help='Parent group name')] = None,
/,
group: Annotated[Optional[str], Parameter(help='Parent group name')] = None,
*,
ctx: Annotated[Context, Parameter(parse=False)],
):
"""Check if an item in a scene is visible."""
if not _validate_sources(ctx, scene_name, item_name, group):
raise typer.Exit(1)
_validate_sources(ctx, scene_name, item_name, group)
old_scene_name = scene_name
scene_name, scene_item_id = _get_scene_name_and_item_id(
ctx, scene_name, item_name, group
)
enabled = ctx.obj['obsws'].get_scene_item_enabled(
enabled = ctx.client.get_scene_item_enabled(
scene_name=scene_name,
item_id=int(scene_item_id),
)
@@ -378,69 +369,62 @@ def visible(
)
@app.command('transform')
@app.command('t', hidden=True)
@app.command(name=['transform', 't'])
def transform(
ctx: typer.Context,
scene_name: Annotated[
str, typer.Argument(..., show_default=False, help='Scene name the item is in')
],
item_name: Annotated[
str,
typer.Argument(
..., show_default=False, help='Item name to transform in the scene'
),
],
group: Annotated[Optional[str], typer.Option(help='Parent group name')] = None,
scene_name: Annotated[str, Argument(hint='Scene name the item is in')],
item_name: Annotated[str, Argument(hint='Item name to transform in the scene')],
/,
group: Annotated[Optional[str], Parameter(help='Parent group name')] = None,
alignment: Annotated[
Optional[int], typer.Option(help='Alignment of the item in the scene')
Optional[int], Parameter(help='Alignment of the item in the scene')
] = None,
bounds_alignment: Annotated[
Optional[int], typer.Option(help='Bounds alignment of the item in the scene')
Optional[int], Parameter(help='Bounds alignment of the item in the scene')
] = None,
bounds_height: Annotated[
Optional[float], typer.Option(help='Height of the item in the scene')
Optional[float], Parameter(help='Height of the item in the scene')
] = None,
bounds_type: Annotated[
Optional[str], typer.Option(help='Type of bounds for the item in the scene')
Optional[str], Parameter(help='Type of bounds for the item in the scene')
] = None,
bounds_width: Annotated[
Optional[float], typer.Option(help='Width of the item in the scene')
Optional[float], Parameter(help='Width of the item in the scene')
] = None,
crop_to_bounds: Annotated[
Optional[bool], typer.Option(help='Crop the item to the bounds')
Optional[bool], Parameter(help='Crop the item to the bounds')
] = None,
crop_bottom: Annotated[
Optional[float], typer.Option(help='Bottom crop of the item in the scene')
Optional[float], Parameter(help='Bottom crop of the item in the scene')
] = None,
crop_left: Annotated[
Optional[float], typer.Option(help='Left crop of the item in the scene')
Optional[float], Parameter(help='Left crop of the item in the scene')
] = None,
crop_right: Annotated[
Optional[float], typer.Option(help='Right crop of the item in the scene')
Optional[float], Parameter(help='Right crop of the item in the scene')
] = None,
crop_top: Annotated[
Optional[float], typer.Option(help='Top crop of the item in the scene')
Optional[float], Parameter(help='Top crop of the item in the scene')
] = None,
position_x: Annotated[
Optional[float], typer.Option(help='X position of the item in the scene')
Optional[float], Parameter(help='X position of the item in the scene')
] = None,
position_y: Annotated[
Optional[float], typer.Option(help='Y position of the item in the scene')
Optional[float], Parameter(help='Y position of the item in the scene')
] = None,
rotation: Annotated[
Optional[float], typer.Option(help='Rotation of the item in the scene')
Optional[float], Parameter(help='Rotation of the item in the scene')
] = None,
scale_x: Annotated[
Optional[float], typer.Option(help='X scale of the item in the scene')
Optional[float], Parameter(help='X scale of the item in the scene')
] = None,
scale_y: Annotated[
Optional[float], typer.Option(help='Y scale of the item in the scene')
Optional[float], Parameter(help='Y scale of the item in the scene')
] = None,
*,
ctx: Annotated[Context, Parameter(parse=False)],
):
"""Set the transform of an item in a scene."""
if not _validate_sources(ctx, scene_name, item_name, group):
raise typer.Exit(1)
_validate_sources(ctx, scene_name, item_name, group)
old_scene_name = scene_name
scene_name, scene_item_id = _get_scene_name_and_item_id(
@@ -480,10 +464,12 @@ def transform(
transform['scaleY'] = scale_y
if not transform:
console.err.print('No transform options provided.')
raise typer.Exit(1)
raise OBSWSCLIError(
'No transform options provided. Use at least one of the transform options.',
exit_code=ExitCode.ERROR,
)
transform = ctx.obj['obsws'].set_scene_item_transform(
transform = ctx.client.set_scene_item_transform(
scene_name=scene_name,
item_id=int(scene_item_id),
transform=transform,

View File

@@ -4,66 +4,57 @@ from pathlib import Path
from typing import Annotated
import obsws_python as obsws
import typer
from cyclopts import App, Argument, Parameter
from obsws_cli import console
from . import console
from .context import Context
from .enum import ExitCode
from .error import OBSWSCLIError
app = typer.Typer()
app = App(name='screenshot', help='Commands for taking screenshots using OBS.')
@app.callback()
def main():
"""Take screenshots using OBS."""
@app.command('save')
@app.command('s', hidden=True)
@app.command(name=['save', 'sv'])
def save(
ctx: typer.Context,
source_name: Annotated[
str,
typer.Argument(
...,
show_default=False,
help='Name of the source to take a screenshot of.',
Argument(
hint='Name of the source to take a screenshot of.',
),
],
output_path: Annotated[
Path,
# Since the CLI and OBS may be running on different platforms,
# we won't validate the path here.
typer.Argument(
...,
show_default=False,
file_okay=True,
dir_okay=False,
help='Path to save the screenshot (must include file name and extension).',
Argument(
hint='Path to save the screenshot (must include file name and extension).',
),
],
/,
width: Annotated[
float,
typer.Option(
Parameter(
help='Width of the screenshot.',
),
] = 1920,
height: Annotated[
float,
typer.Option(
Parameter(
help='Height of the screenshot.',
),
] = 1080,
quality: Annotated[
float,
typer.Option(
min=-1,
max=100,
Parameter(
help='Quality of the screenshot.',
),
] = -1,
*,
ctx: Annotated[Context, Parameter(parse=False)],
):
"""Take a screenshot and save it to a file."""
try:
ctx.obj['obsws'].save_source_screenshot(
ctx.client.save_source_screenshot(
name=source_name,
img_format=output_path.suffix.lstrip('.').lower(),
file_path=str(output_path),
@@ -74,16 +65,16 @@ def save(
except obsws.error.OBSSDKRequestError as e:
match e.code:
case 403:
console.err.print(
raise OBSWSCLIError(
'The [yellow]image format[/yellow] (file extension) must be included in the file name, '
"for example: '/path/to/screenshot.png'.",
code=ExitCode.ERROR,
)
raise typer.Exit(1)
case 600:
console.err.print(
f'No source was found by the name of [yellow]{source_name}[/yellow]'
raise OBSWSCLIError(
'No source was found by the name of [yellow]{source_name}[/yellow]',
code=ExitCode.ERROR,
)
raise typer.Exit(1)
case _:
raise

View File

@@ -1,63 +1,75 @@
"""module for controlling OBS stream functionality."""
import typer
from typing import Annotated
from obsws_cli import console
from cyclopts import App, Parameter
app = typer.Typer()
from . import console
from .context import Context
from .enum import ExitCode
from .error import OBSWSCLIError
app = App(name='stream', help='Commands for controlling OBS stream functionality.')
@app.callback()
def main():
"""Control OBS stream functionality."""
def _get_streaming_status(ctx: typer.Context) -> tuple:
def _get_streaming_status(ctx: Context) -> tuple:
"""Get streaming status."""
resp = ctx.obj['obsws'].get_stream_status()
resp = ctx.client.get_stream_status()
return resp.output_active, resp.output_duration
@app.command('start')
@app.command('s', hidden=True)
def start(ctx: typer.Context):
@app.command(name=['start', 's'])
def start(
*,
ctx: Annotated[Context, Parameter(parse=False)],
):
"""Start streaming."""
active, _ = _get_streaming_status(ctx)
if active:
console.err.print('Streaming is already in progress, cannot start.')
raise typer.Exit(1)
raise OBSWSCLIError(
'Streaming is already in progress, cannot start.',
code=ExitCode.ERROR,
)
ctx.obj['obsws'].start_stream()
ctx.client.start_stream()
console.out.print('Streaming started successfully.')
@app.command('stop')
@app.command('st', hidden=True)
def stop(ctx: typer.Context):
@app.command(name=['stop', 'st'])
def stop(
*,
ctx: Annotated[Context, Parameter(parse=False)],
):
"""Stop streaming."""
active, _ = _get_streaming_status(ctx)
if not active:
console.err.print('Streaming is not in progress, cannot stop.')
raise typer.Exit(1)
raise OBSWSCLIError(
'Streaming is not in progress, cannot stop.',
code=ExitCode.ERROR,
)
ctx.obj['obsws'].stop_stream()
ctx.client.stop_stream()
console.out.print('Streaming stopped successfully.')
@app.command('toggle')
@app.command('tg', hidden=True)
def toggle(ctx: typer.Context):
@app.command(name=['toggle', 'tg'])
def toggle(
*,
ctx: Annotated[Context, Parameter(parse=False)],
):
"""Toggle streaming."""
resp = ctx.obj['obsws'].toggle_stream()
resp = ctx.client.toggle_stream()
if resp.output_active:
console.out.print('Streaming started successfully.')
else:
console.out.print('Streaming stopped successfully.')
@app.command('status')
@app.command('ss', hidden=True)
def status(ctx: typer.Context):
@app.command(name=['status', 'ss'])
def status(
*,
ctx: Annotated[Context, Parameter(parse=False)],
):
"""Get streaming status."""
active, duration = _get_streaming_status(ctx)
if active:

58
obsws_cli/studiomode.py Normal file
View File

@@ -0,0 +1,58 @@
"""module containing commands for manipulating studio mode in OBS."""
from typing import Annotated
from cyclopts import App, Parameter
from . import console
from .context import Context
app = App(name='studiomode', help='Commands for controlling studio mode in OBS.')
@app.command(name=['enable', 'on'])
def enable(
*,
ctx: Annotated[Context, Parameter(parse=False)],
):
"""Enable studio mode."""
ctx.obj['obsws'].set_studio_mode_enabled(True)
console.out.print('Studio mode has been enabled.')
@app.command(name=['disable', 'off'])
def disable(
*,
ctx: Annotated[Context, Parameter(parse=False)],
):
"""Disable studio mode."""
ctx.client.set_studio_mode_enabled(False)
console.out.print('Studio mode has been disabled.')
@app.command(name=['toggle', 'tg'])
def toggle(
*,
ctx: Annotated[Context, Parameter(parse=False)],
):
"""Toggle studio mode."""
resp = ctx.client.get_studio_mode_enabled()
if resp.studio_mode_enabled:
ctx.client.set_studio_mode_enabled(False)
console.out.print('Studio mode is now disabled.')
else:
ctx.client.set_studio_mode_enabled(True)
console.out.print('Studio mode is now enabled.')
@app.command(name=['status', 'ss'])
def status(
*,
ctx: Annotated[Context, Parameter(parse=False)],
):
"""Get the status of studio mode."""
resp = ctx.client.get_studio_mode_enabled()
if resp.studio_mode_enabled:
console.out.print('Studio mode is enabled.')
else:
console.out.print('Studio mode is disabled.')

76
obsws_cli/text.py Normal file
View File

@@ -0,0 +1,76 @@
"""module containing commands for manipulating text inputs."""
from typing import Annotated, Optional
from cyclopts import App, Argument, Parameter
from . import console, validate
from .context import Context
from .enum import ExitCode
from .error import OBSWSCLIError
app = App(name='text', help='Commands for controlling text inputs in OBS.')
@app.command(name=['current', 'get'])
def current(
input_name: Annotated[str, Argument(hint='Name of the text input to get.')],
*,
ctx: Annotated[Context, Parameter(parse=False)],
):
"""Get the current text for a text input."""
if not validate.input_in_inputs(ctx, input_name):
raise OBSWSCLIError(
f'Input [yellow]{input_name}[/yellow] not found.', code=ExitCode.ERROR
)
resp = ctx.client.get_input_settings(name=input_name)
if not resp.input_kind.startswith('text_'):
raise OBSWSCLIError(
f'Input [yellow]{input_name}[/yellow] is not a text input.',
code=ExitCode.ERROR,
)
current_text = resp.input_settings.get('text', '')
if not current_text:
current_text = '(empty)'
console.out.print(
f'Current text for input {console.highlight(ctx, input_name)}: {current_text}',
)
@app.command(name=['update', 'set'])
def update(
input_name: Annotated[str, Argument(hint='Name of the text input to update.')],
new_text: Annotated[
Optional[str],
Argument(hint='The new text to set for the input.'),
] = None,
/,
*,
ctx: Annotated[Context, Parameter(parse=False)],
):
"""Update the text of a text input."""
if not validate.input_in_inputs(ctx, input_name):
raise OBSWSCLIError(
f'Input [yellow]{input_name}[/yellow] not found.', code=ExitCode.ERROR
)
resp = ctx.client.get_input_settings(name=input_name)
if not resp.input_kind.startswith('text_'):
raise OBSWSCLIError(
f'Input [yellow]{input_name}[/yellow] is not a text input.',
code=ExitCode.ERROR,
)
ctx.client.set_input_settings(
name=input_name,
settings={'text': new_text},
overlay=True,
)
if not new_text:
new_text = '(empty)'
console.out.print(
f'Text for input {console.highlight(ctx, input_name)} updated to: {new_text}',
)

View File

@@ -20,28 +20,3 @@ def check_mark(value: bool, empty_if_false: bool = False) -> str:
if os.getenv('NO_COLOR', '') != '':
return '' if value else ''
return '' if value else ''
def timecode_to_milliseconds(timecode: str) -> int:
"""Convert a timecode string (HH:MM:SS) to total milliseconds."""
match timecode.split(':'):
case [mm, ss]:
hours = 0
minutes = int(mm)
seconds = int(ss)
case [hh, mm, ss]:
hours = int(hh)
minutes = int(mm)
seconds = int(ss)
return (hours * 3600 + minutes * 60 + seconds) * 1000
def milliseconds_to_timecode(milliseconds: int) -> str:
"""Convert total milliseconds to a timecode string (HH:MM:SS)."""
total_seconds = milliseconds // 1000
hours = total_seconds // 3600
minutes = (total_seconds % 3600) // 60
seconds = total_seconds % 60
if hours == 0:
return f'{minutes:02}:{seconds:02}'
return f'{hours:02}:{minutes:02}:{seconds:02}'

View File

@@ -1,142 +1,49 @@
"""module containing validation functions."""
from typing import Optional
import typer
from . import console
# type alias for an option that is skipped when the command is run
skipped_option = typer.Option(parser=lambda _: _, hidden=True, expose_value=False)
from .context import Context
def input_in_inputs(ctx: typer.Context, input_name: str) -> str:
"""Ensure the given input exists in the list of inputs."""
resp = ctx.obj['obsws'].get_input_list()
if not any(input.get('inputName') == input_name for input in resp.inputs):
console.err.print(f'Input [yellow]{input_name}[/yellow] does not exist.')
raise typer.Exit(1)
return input_name
def input_in_inputs(ctx: Context, input_name: str) -> bool:
"""Check if an input is in the input list."""
inputs = ctx.client.get_input_list().inputs
return any(input_.get('inputName') == input_name for input_ in inputs)
def input_not_in_inputs(ctx: typer.Context, input_name: str) -> str:
"""Ensure an input does not already exist in the list of inputs."""
resp = ctx.obj['obsws'].get_input_list()
if any(input.get('inputName') == input_name for input in resp.inputs):
console.err.print(f'Input [yellow]{input_name}[/yellow] already exists.')
raise typer.Exit(1)
return input_name
def scene_in_scenes(ctx: typer.Context, scene_name: Optional[str]) -> str | None:
def scene_in_scenes(ctx: Context, scene_name: str) -> bool:
"""Check if a scene exists in the list of scenes."""
if scene_name is None:
return
resp = ctx.obj['obsws'].get_scene_list()
if not any(scene.get('sceneName') == scene_name for scene in resp.scenes):
console.err.print(f'Scene [yellow]{scene_name}[/yellow] not found.')
raise typer.Exit(1)
return scene_name
resp = ctx.client.get_scene_list()
return any(scene.get('sceneName') == scene_name for scene in resp.scenes)
def studio_mode_enabled(ctx: typer.Context, preview: bool) -> bool:
"""Ensure studio mode is enabled if preview option is used."""
resp = ctx.obj['obsws'].get_studio_mode_enabled()
if preview and not resp.studio_mode_enabled:
console.err.print(
'Studio mode is disabled. This action requires it to be enabled.'
)
raise typer.Exit(1)
return preview
def studio_mode_enabled(ctx: Context) -> bool:
"""Check if studio mode is enabled."""
resp = ctx.client.get_studio_mode_enabled()
return resp.studio_mode_enabled
def scene_collection_in_scene_collections(
ctx: typer.Context, scene_collection_name: str
) -> str:
"""Ensure a scene collection exists in the list of scene collections."""
resp = ctx.obj['obsws'].get_scene_collection_list()
if not any(
collection == scene_collection_name for collection in resp.scene_collections
):
console.err.print(
f'Scene collection [yellow]{scene_collection_name}[/yellow] not found.'
)
raise typer.Exit(1)
return scene_collection_name
def scene_collection_not_in_scene_collections(
ctx: typer.Context, scene_collection_name: str
) -> str:
"""Ensure a scene collection does not already exist in the list of scene collections."""
resp = ctx.obj['obsws'].get_scene_collection_list()
if any(
collection == scene_collection_name for collection in resp.scene_collections
):
console.err.print(
f'Scene collection [yellow]{scene_collection_name}[/yellow] already exists.'
)
raise typer.Exit(1)
return scene_collection_name
def item_in_scene_item_list(
ctx: typer.Context, scene_name: str, item_name: str
ctx: Context, scene_collection_name: str
) -> bool:
"""Check if a scene collection exists."""
resp = ctx.client.get_scene_collection_list()
return any(
collection == scene_collection_name for collection in resp.scene_collections
)
def item_in_scene_item_list(ctx: Context, scene_name: str, item_name: str) -> bool:
"""Check if an item exists in a scene."""
resp = ctx.obj['obsws'].get_scene_item_list(scene_name)
resp = ctx.client.get_scene_item_list(scene_name)
return any(item.get('sourceName') == item_name for item in resp.scene_items)
def profile_exists(ctx: typer.Context, profile_name: str) -> str:
"""Ensure a profile exists."""
resp = ctx.obj['obsws'].get_profile_list()
if not any(profile == profile_name for profile in resp.profiles):
console.err.print(f'Profile [yellow]{profile_name}[/yellow] not found.')
raise typer.Exit(1)
return profile_name
def profile_exists(ctx: Context, profile_name: str) -> bool:
"""Check if a profile exists."""
resp = ctx.client.get_profile_list()
return any(profile == profile_name for profile in resp.profiles)
def profile_not_exists(ctx: typer.Context, profile_name: str) -> str:
"""Ensure a profile does not exist."""
resp = ctx.obj['obsws'].get_profile_list()
if any(profile == profile_name for profile in resp.profiles):
console.err.print(f'Profile [yellow]{profile_name}[/yellow] already exists.')
raise typer.Exit(1)
return profile_name
def kind_in_input_kinds(ctx: typer.Context, input_kind: str) -> str:
"""Check if an input kind is valid."""
resp = ctx.obj['obsws'].get_input_kind_list(False)
if not any(kind == input_kind for kind in resp.input_kinds):
console.err.print(f'Input kind [yellow]{input_kind}[/yellow] not found.')
raise typer.Exit(1)
return input_kind
def timecode_format(ctx: typer.Context, timecode: Optional[str]) -> str | None:
"""Validate that a timecode is in HH:MM:SS or MM:SS format."""
if timecode is None:
return
match timecode.split(':'):
case [mm, ss]:
if not (mm.isdigit() and ss.isdigit()):
console.err.print(
f'Timecode [yellow]{timecode}[/yellow] is not valid. Use MM:SS or HH:MM:SS format.'
)
raise typer.Exit(1)
case [hh, mm, ss]:
if not (hh.isdigit() and mm.isdigit() and ss.isdigit()):
console.err.print(
f'Timecode [yellow]{timecode}[/yellow] is not valid. Use MM:SS or HH:MM:SS format.'
)
raise typer.Exit(1)
case _:
console.err.print(
f'Timecode [yellow]{timecode}[/yellow] is not valid. Use MM:SS or HH:MM:SS format.'
)
raise typer.Exit(1)
return timecode
def monitor_exists(ctx: Context, monitor_index: int) -> bool:
"""Check if a monitor exists."""
resp = ctx.client.get_monitor_list()
return any(monitor['monitorIndex'] == monitor_index for monitor in resp.monitors)

52
obsws_cli/virtualcam.py Normal file
View File

@@ -0,0 +1,52 @@
"""module containing commands for manipulating virtual camera in OBS."""
from typing import Annotated
from cyclopts import App, Parameter
from . import console
from .context import Context
app = App(name='virtualcam', help='Commands for controlling the virtual camera in OBS.')
@app.command(name=['start', 's'])
def start(
ctx: Annotated[Context, Parameter(parse=False)],
):
"""Start the virtual camera."""
ctx.client.start_virtual_cam()
console.out.print('Virtual camera started.')
@app.command(name=['stop', 'p'])
def stop(
ctx: Annotated[Context, Parameter(parse=False)],
):
"""Stop the virtual camera."""
ctx.client.stop_virtual_cam()
console.out.print('Virtual camera stopped.')
@app.command(name=['toggle', 'tg'])
def toggle(
ctx: Annotated[Context, Parameter(parse=False)],
):
"""Toggle the virtual camera."""
resp = ctx.client.toggle_virtual_cam()
if resp.output_active:
console.out.print('Virtual camera is enabled.')
else:
console.out.print('Virtual camera is disabled.')
@app.command(name=['status', 'ss'])
def status(
ctx: Annotated[Context, Parameter(parse=False)],
):
"""Get the status of the virtual camera."""
resp = ctx.client.get_virtual_cam_status()
if resp.output_active:
console.out.print('Virtual camera is enabled.')
else:
console.out.print('Virtual camera is disabled.')

View File

@@ -21,7 +21,7 @@ classifiers = [
"Programming Language :: Python :: Implementation :: CPython",
"Programming Language :: Python :: Implementation :: PyPy",
]
dependencies = ["typer>=0.21.1", "obsws-python>=1.8.0", "python-dotenv>=1.1.0"]
dependencies = ["cyclopts>=3.22.2", "obsws-python>=1.8.0"]
[project.urls]
@@ -30,7 +30,7 @@ Issues = "https://github.com/onyx-and-iris/obsws-cli/issues"
Source = "https://github.com/onyx-and-iris/obsws-cli"
[project.scripts]
obsws-cli = "obsws_cli:app"
obsws-cli = "obsws_cli:run"
[tool.hatch.version]
path = "obsws_cli/__about__.py"
@@ -42,6 +42,9 @@ dependencies = ["click-man>=0.5.1"]
cli = "obsws-cli {args:}"
man = "python man/generate.py --output=./man"
[tool.hatch.envs.lazyimports.scripts]
cli = "obsws-cli {args:}"
[tool.hatch.envs.hatch-test]
randomize = true

View File

@@ -1,7 +1,6 @@
"""pytest configuration file."""
import os
import time
import obsws_python as obsws
from dotenv import find_dotenv, load_dotenv
@@ -21,9 +20,9 @@ def pytest_sessionstart(session):
"""
# Initialize the OBS WebSocket client
session.obsws = obsws.ReqClient(
host=os.environ['OBSWS_CLI_HOST'],
port=os.environ['OBSWS_CLI_PORT'],
password=os.environ['OBSWS_CLI_PASSWORD'],
host=os.environ['OBS_HOST'],
port=os.environ['OBS_PORT'],
password=os.environ['OBS_PASSWORD'],
timeout=5,
)
resp = session.obsws.get_version()
@@ -45,54 +44,9 @@ def pytest_sessionstart(session):
},
)
session.obsws.create_profile('pytest_profile')
time.sleep(0.1) # Wait for the profile to be created
session.obsws.set_profile_parameter(
'SimpleOutput',
'RecRB',
'true',
)
# hack to ensure the replay buffer is enabled
session.obsws.set_current_profile('Untitled')
session.obsws.set_current_profile('pytest_profile')
session.obsws.set_current_scene_collection('test-collection')
session.obsws.create_scene('pytest_scene')
# Ensure Desktop Audio is created.
desktop_audio_kinds = {
'windows': 'wasapi_output_capture',
'linux': 'pulse_output_capture',
'darwin': 'coreaudio_output_capture',
}
platform = os.environ.get('OBS_TESTS_PLATFORM', os.uname().sysname.lower())
try:
session.obsws.create_input(
sceneName='pytest_scene',
inputName='Desktop Audio',
inputKind=desktop_audio_kinds[platform],
inputSettings={'device_id': 'default'},
sceneItemEnabled=True,
)
except obsws.error.OBSSDKRequestError as e:
if e.code == 601:
"""input already exists, continue."""
# Ensure Mic/Aux is created.
mic_kinds = {
'windows': 'wasapi_input_capture',
'linux': 'pulse_input_capture',
'darwin': 'coreaudio_input_capture',
}
try:
session.obsws.create_input(
sceneName='pytest_scene',
inputName='Mic/Aux',
inputKind=mic_kinds[platform],
inputSettings={'device_id': 'default'},
sceneItemEnabled=True,
)
except obsws.error.OBSSDKRequestError as e:
if e.code == 601:
"""input already exists, continue."""
session.obsws.create_input(
sceneName='pytest_scene',
inputName='pytest_input',
@@ -177,7 +131,7 @@ def pytest_sessionfinish(session, exitstatus):
session.obsws.remove_scene('pytest_scene')
session.obsws.set_current_scene_collection('Untitled')
session.obsws.set_current_scene_collection('default')
resp = session.obsws.get_stream_status()
if resp.output_active:
@@ -195,8 +149,6 @@ def pytest_sessionfinish(session, exitstatus):
if resp.studio_mode_enabled:
session.obsws.set_studio_mode_enabled(False)
session.obsws.remove_profile('pytest_profile')
# Close the OBS WebSocket client connection
session.obsws.disconnect()

View File

@@ -4,7 +4,7 @@ from typer.testing import CliRunner
from obsws_cli.app import app
runner = CliRunner()
runner = CliRunner(mix_stderr=False)
def test_filter_list():

View File

@@ -1,18 +1,10 @@
"""Unit tests for the group command in the OBS WebSocket CLI."""
import os
import pytest
from typer.testing import CliRunner
from obsws_cli.app import app
runner = CliRunner()
if os.environ.get('OBS_TESTS_SKIP_GROUP_TESTS'):
pytest.skip(
'Skipping group tests as per environment variable', allow_module_level=True
)
runner = CliRunner(mix_stderr=False)
def test_group_list():

View File

@@ -4,7 +4,7 @@ from typer.testing import CliRunner
from obsws_cli.app import app
runner = CliRunner()
runner = CliRunner(mix_stderr=False)
def test_hotkey_list():

View File

@@ -4,7 +4,7 @@ from typer.testing import CliRunner
from obsws_cli.app import app
runner = CliRunner()
runner = CliRunner(mix_stderr=False)
def test_input_list():
@@ -13,7 +13,10 @@ def test_input_list():
assert result.exit_code == 0
assert 'Desktop Audio' in result.stdout
assert 'Mic/Aux' in result.stdout
assert all(item in result.stdout for item in ('pytest_input', 'pytest_input_2'))
assert all(
item in result.stdout
for item in ('Colour Source', 'Colour Source 2', 'Colour Source 3')
)
def test_input_list_filter_input():
@@ -36,6 +39,9 @@ def test_input_list_filter_colour():
"""Test the input list command with colour filter."""
result = runner.invoke(app, ['input', 'list', '--colour'])
assert result.exit_code == 0
assert all(item in result.stdout for item in ('pytest_input', 'pytest_input_2'))
assert all(
item in result.stdout
for item in ('Colour Source', 'Colour Source 2', 'Colour Source 3')
)
assert 'Desktop Audio' not in result.stdout
assert 'Mic/Aux' not in result.stdout

View File

@@ -6,7 +6,7 @@ from typer.testing import CliRunner
from obsws_cli.app import app
runner = CliRunner()
runner = CliRunner(mix_stderr=False)
def test_record_start():
@@ -49,9 +49,7 @@ def test_record_toggle():
result = runner.invoke(app, ['record', 'toggle'])
assert result.exit_code == 0
time.sleep(0.5) # Wait for the recording to toggle
if active:
assert 'Recording stopped successfully.' in result.stdout
else:

View File

@@ -1,20 +1,10 @@
"""Unit tests for the replaybuffer command in the OBS WebSocket CLI."""
import os
import time
import pytest
from typer.testing import CliRunner
from obsws_cli.app import app
runner = CliRunner()
if os.environ.get('OBS_TESTS_SKIP_REPLAYBUFFER_TESTS'):
pytest.skip(
'Skipping replaybuffer tests as per environment variable',
allow_module_level=True,
)
runner = CliRunner(mix_stderr=False)
def test_replaybuffer_start():
@@ -24,9 +14,6 @@ def test_replaybuffer_start():
active = 'Replay buffer is active.' in resp.stdout
resp = runner.invoke(app, ['replaybuffer', 'start'])
time.sleep(0.5) # Wait for the replay buffer to start
if active:
assert resp.exit_code != 0
assert 'Replay buffer is already active.' in resp.stderr
@@ -42,9 +29,6 @@ def test_replaybuffer_stop():
active = 'Replay buffer is active.' in resp.stdout
resp = runner.invoke(app, ['replaybuffer', 'stop'])
time.sleep(0.5) # Wait for the replay buffer to stop
if not active:
assert resp.exit_code != 0
assert 'Replay buffer is not active.' in resp.stderr
@@ -60,11 +44,9 @@ def test_replaybuffer_toggle():
active = 'Replay buffer is active.' in resp.stdout
resp = runner.invoke(app, ['replaybuffer', 'toggle'])
assert resp.exit_code == 0
time.sleep(0.5) # Wait for the replay buffer to toggle
if active:
assert resp.exit_code == 0
assert 'Replay buffer is not active.' in resp.stdout
else:
assert resp.exit_code == 0
assert 'Replay buffer is active.' in resp.stdout

View File

@@ -4,7 +4,7 @@ from typer.testing import CliRunner
from obsws_cli.app import app
runner = CliRunner()
runner = CliRunner(mix_stderr=False)
def test_scene_list():

View File

@@ -4,7 +4,7 @@ from typer.testing import CliRunner
from obsws_cli.app import app
runner = CliRunner()
runner = CliRunner(mix_stderr=False)
def test_sceneitem_list():

View File

@@ -6,7 +6,7 @@ from typer.testing import CliRunner
from obsws_cli.app import app
runner = CliRunner()
runner = CliRunner(mix_stderr=False)
def test_stream_start():
@@ -23,7 +23,7 @@ def test_stream_start():
else:
assert result.exit_code == 0
assert 'Streaming started successfully.' in result.stdout
time.sleep(0.5) # Wait for the streaming to start
time.sleep(1) # Wait for the streaming to start
def test_stream_stop():
@@ -37,7 +37,7 @@ def test_stream_stop():
if active:
assert result.exit_code == 0
assert 'Streaming stopped successfully.' in result.stdout
time.sleep(0.5) # Wait for the streaming to stop
time.sleep(1) # Wait for the streaming to stop
else:
assert result.exit_code != 0
assert 'Streaming is not in progress, cannot stop.' in result.stderr
@@ -52,7 +52,7 @@ def test_stream_toggle():
result = runner.invoke(app, ['stream', 'toggle'])
assert result.exit_code == 0
time.sleep(0.5) # Wait for the stream to toggle
time.sleep(1) # Wait for the stream to toggle
if active:
assert 'Streaming stopped successfully.' in result.stdout

View File

@@ -4,7 +4,7 @@ from typer.testing import CliRunner
from obsws_cli.app import app
runner = CliRunner()
runner = CliRunner(mix_stderr=False)
def test_studio_enable():

View File

@@ -4,7 +4,7 @@ from typer.testing import CliRunner
from obsws_cli.app import app
runner = CliRunner()
runner = CliRunner(mix_stderr=False)
def test_text_update():

View File

@@ -4,7 +4,7 @@ from typer.testing import CliRunner
from obsws_cli.app import app
runner = CliRunner()
runner = CliRunner(mix_stderr=False)
def test_version():