117 Commits

Author SHA1 Message Date
23e358da9c print message and return 0 for list queries returning empty results 2025-07-29 02:52:00 +01:00
417ad54a0c add docstrings containing help output 2025-07-28 11:32:23 +01:00
3b184c6531 upd dependencies 2025-07-24 04:21:58 +01:00
8c37ce1fc0 remove typer import 2025-07-24 04:21:48 +01:00
436e4d5345 remove alias, settings 2025-07-24 04:21:40 +01:00
2ef89be184 convert virtualcam commands 2025-07-24 04:09:49 +01:00
506aff833c convert text commands 2025-07-24 04:08:07 +01:00
eb939b735c convert studiomode commands 2025-07-24 04:05:01 +01:00
bb7a468dd5 convert stream commands 2025-07-24 03:59:37 +01:00
e77627b845 convert screenshot commands 2025-07-24 03:52:50 +01:00
93b066090b fix aliases 2025-07-24 03:49:31 +01:00
1ce832dfde convert sceneitem commands 2025-07-24 03:46:11 +01:00
e8664f0117 convert scenecollection commands 2025-07-24 02:34:15 +01:00
a3dff0f739 convert replaybuffer commands 2025-07-24 02:29:58 +01:00
6da9df5ceb convert record commands 2025-07-24 02:26:18 +01:00
75fc18273e convert projector commands 2025-07-24 02:20:17 +01:00
e658819719 convert profile commands 2025-07-24 02:14:38 +01:00
4451fbf22c conver input commands 2025-07-24 02:06:05 +01:00
132b283347 convert hotkey commands 2025-07-24 01:48:55 +01:00
ae8ff20cf4 convert group commands 2025-07-24 01:39:02 +01:00
de1c604c46 update the --help message
add descriptions for filter + scene command groups

Usage now after main CLI description
2025-07-24 01:38:42 +01:00
105aaf29b7 keep the exit codes simple (0 or 1) 2025-07-17 04:34:44 +01:00
eb34a1833f convert filter sub app 2025-07-17 04:29:21 +01:00
abbab5c746 raise OBSWSCLIError 2025-07-17 04:28:59 +01:00
f0eb518609 implement --version + debug validator
run() now handles exceptions/exit codes
2025-07-17 04:28:40 +01:00
032b957670 add custom error class + exit codes 2025-07-17 04:28:10 +01:00
8349a196e8 root meta app + scene sub_app converted
this could take a while...

note, --version flag not implemented
2025-07-17 03:18:04 +01:00
f852a733c3 upd publish action 2025-07-14 03:27:52 +01:00
44dadcee23 upd publish action 2025-07-14 03:25:52 +01:00
ed4531c305 revert publish action 2025-07-14 03:23:25 +01:00
ec42a4cdd9 patch bump 2025-07-14 03:21:29 +01:00
6123c92d00 upd publish action 2025-07-14 03:21:06 +01:00
1ceb95ab16 fix environment name 2025-07-14 03:12:35 +01:00
f06e2d3982 upd publish action 2025-07-14 03:10:04 +01:00
39dff3cc28 patch bump 2025-07-14 03:02:53 +01:00
967c4ab699 upd publish action 2025-07-14 02:58:25 +01:00
dc128720c7 hatch fmt 2025-07-14 02:48:21 +01:00
2e3f4267cd add workflows 2025-07-14 02:45:13 +01:00
000431ab82 add 0.20.0 to CHANGELOG 2025-07-14 02:32:59 +01:00
ec3e31cc4f add Text section to README 2025-07-14 02:32:21 +01:00
cda0bbedb9 minor bump 2025-07-14 02:32:09 +01:00
d0c96b853d add text unit tests 2025-07-14 02:31:47 +01:00
040a41daa7 add text command group 2025-07-14 02:31:35 +01:00
0c72a10fb7 bump obsws-python version 2025-07-01 09:30:04 +01:00
f882302d16 fixes missing argument 2025-07-01 09:29:56 +01:00
98e0d98cc7 typo 2025-06-27 13:45:24 +01:00
c6b22c7cf2 use console.highlight() 2025-06-27 13:29:39 +01:00
c3e55200db move style section
add link to style section in ToC.

add imgs.
2025-06-27 13:14:54 +01:00
4d37714aaf patch bump 2025-06-27 12:57:49 +01:00
157e1a167c fixes bug when setting --style=disabled (we were stil getting coloured check/cross marks) 2025-06-27 12:57:34 +01:00
d628c5d3a4 rename heading variables 2025-06-27 12:53:10 +01:00
4bf8edb692 add 0.19.0 to CHANGELOG 2025-06-23 09:11:26 +01:00
d68326f37a add record split/chapter to README 2025-06-23 09:11:15 +01:00
a001455dad add record split/chapter commands 2025-06-23 09:10:53 +01:00
4632260961 add --style validation
add Disabled class to style registry

patch bump
2025-06-22 12:35:21 +01:00
55a7da67db reword 2025-06-22 10:19:46 +01:00
7bec573ef9 by setting values in the default style to 'none' we avoid the rich markup errors in console.highlight
add comment to util.check_mark and test only NO_COLOR

patch bump
2025-06-22 10:14:46 +01:00
55e60ff977 in case NO_COLOR is set manually
patch bump
2025-06-22 02:49:32 +01:00
922efddf7a check if we're in colourless mode before passing back highlighted text.
pass context to check_mark so we can do the same there.

Fixes  rich.errors.MarkupError
2025-06-22 01:57:58 +01:00
4a0147aa8a import as version 2025-06-22 00:38:19 +01:00
cec76df1d1 add 0.18.0 to CHANGELOG 2025-06-21 23:47:19 +01:00
2e5fb3800a add Style section to README 2025-06-21 23:40:45 +01:00
3c985f5e9b apply table styling + stdout highlighting
fix issues with table heading alignments
2025-06-21 23:40:30 +01:00
fb17979cb0 add highlight helper function 2025-06-21 23:39:13 +01:00
a1ed208bdf add --style and --no-border flags.
set default values in Settings class
2025-06-21 23:37:20 +01:00
02baa13dba add style definitions 2025-06-21 23:32:02 +01:00
7abbccae99 add RootTyperAliasGroup
improve the output of projector open if the monitor index is invalid (suggests prj ls-m)

fix highlight for sceneitem commands in _validate_sources()

patch bump
2025-06-21 05:19:57 +01:00
23282a60d1 test against empty string to keep it consisten with rich
patch bump
2025-06-21 02:36:44 +01:00
b6ba66db64 update disable colouring section 2025-06-21 02:06:30 +01:00
c4480895a1 add empty_if_false to check_mark
patch bump
2025-06-21 00:41:33 +01:00
fd2e629ec2 print colourless check/cross marks if NO_COLOR is set
patch bump
2025-06-20 23:19:27 +01:00
85b653891d keep the colour cyan
patch bump
2025-06-20 21:09:35 +01:00
bff5d396a4 import console as namespace
patch bump
2025-06-20 07:51:12 +01:00
47324597d7 minor bump 2025-06-20 04:04:35 +01:00
9a0659ae35 add 0.16.11 to CHANGELOG 2025-06-20 02:30:25 +01:00
a726f9699f update scene list, sceneitem list and filter list with --uuid flags 2025-06-20 02:30:02 +01:00
fbea2cb896 import console as namespace
each console object is now a singleton

patch bump
2025-06-20 02:29:36 +01:00
e5040d5ddd add hidden --debug flag for controlling logging output
patch bump
2025-06-20 02:13:50 +01:00
39f1b01926 add --uuid flags to input list, scene list and sceneitem list 2025-06-20 01:32:36 +01:00
e9b3106aa6 if no filters are applied, ensure we include the entire kind list
patch bump
2025-06-19 23:10:52 +01:00
a26ce74151 add lazyimports environment, see https://github.com/fastapi/typer/pull/1128 2025-06-19 20:35:45 +01:00
f1c569f140 remove inline if else 2025-06-08 12:38:28 +01:00
093e9a05d4 add 0.16.8 to CHANGELOG 2025-06-07 20:11:00 +01:00
1a1fbf1da1 sort input list by input name
patch bump
2025-06-07 00:24:48 +01:00
fd2baf3350 remove no filter line 2025-06-07 00:06:53 +01:00
5334879ba9 patch bump 2025-06-06 23:27:45 +01:00
77dbe52ae6 upd input list to include new options 2025-06-06 23:27:31 +01:00
1ff610410a use tuples as records to build the tables
add --fempg and --vlc options to filter list

add Muted column to list table
2025-06-06 23:27:16 +01:00
cd7614bfd6 use tuples as records to build the tables 2025-06-06 23:26:33 +01:00
74503f17e0 upd console colouring
patch bump
2025-06-06 22:27:17 +01:00
32bc4277f2 add 0.16.5 to CHANGELOG 2025-06-06 21:09:33 +01:00
21f1b5e1bb add note about disabling console colouring to README 2025-06-06 20:58:28 +01:00
434f8c0e0c add monitor validate function
upd tests to match console colour changes
2025-06-06 20:58:15 +01:00
81518a14ea error messages now have style bold red
error highlights are now yellow

normal highlights are now green

_validate_scene_name_and_item_name renamed to _validate_sources

its now a normal function and not a decorator

it also returns bool instead of raising typer.Exit()

patch bump
2025-06-06 20:55:35 +01:00
ddb92bb317 upd console colouring
error messages now have style `bold red`
error highlights are now yellow

normal highlights are now green

patch bump
2025-06-06 20:53:35 +01:00
44527b35e2 upd --password show_default 2025-06-06 17:15:27 +01:00
0bcfc2ae14 ensure set/get both enforce OBS_ prefix
add class docstring

patch bump
2025-06-06 17:03:24 +01:00
ab71414d27 no need to create list here 2025-06-04 18:03:29 +01:00
ab0679174b patch bump 2025-06-04 17:34:59 +01:00
37781f6de7 clean up defaults in help messages 2025-06-04 17:34:45 +01:00
5e84becc57 wrap annotations with Annotated 2025-06-04 16:46:29 +01:00
b8dd94ccbc wrap annotations with Annotated 2025-06-04 16:31:43 +01:00
657fa84ea3 wrap scene switch annotations with Annotated 2025-06-04 16:26:27 +01:00
59f52417cd wrap annotations with Annotated 2025-06-04 15:52:35 +01:00
2d351e00b5 wrap annotations with Annotated 2025-06-04 15:49:44 +01:00
5f606b42d0 wrap annotations with Annotated 2025-06-04 15:46:52 +01:00
ae4ec542aa wrap annotations with Annotated 2025-06-04 15:39:53 +01:00
6ac63aa5e8 patch bump 2025-06-04 15:27:03 +01:00
df90614352 add Changed filter list to 0.16.1 2025-06-04 15:25:14 +01:00
d8e89285cc upd Filter section in readme 2025-06-04 15:24:48 +01:00
3e2a1e4663 wrap annotations with Annotated
filter list source_name now optional, defaults to current scene

filter list now prints default values if they are unchanged
2025-06-04 15:24:35 +01:00
723d79e306 dry up the imports 2025-06-04 15:23:13 +01:00
868d40ec8d minor bump 2025-06-04 12:53:20 +01:00
30f19f4d87 add 0.16.0 to CHANGELOG 2025-06-04 12:53:04 +01:00
5b9dd97167 add screenshot sub typer 2025-06-04 12:52:51 +01:00
d41ad994b7 add screenshot section 2025-06-04 12:51:47 +01:00
51a4a60aa6 typo 2025-06-03 18:26:18 +01:00
42 changed files with 2688 additions and 1116 deletions

39
.github/workflows/publish.yml vendored Normal file
View File

@@ -0,0 +1,39 @@
name: Publish to PyPI
on:
release:
types: [published]
push:
tags:
- 'v*.*.*'
jobs:
deploy:
runs-on: ubuntu-latest
environment: pypi
permissions:
# This permission is needed for private repositories.
contents: read
# IMPORTANT: this permission is mandatory for trusted publishing
id-token: write
steps:
- uses: actions/checkout@v4
- name: Set up Python
uses: actions/setup-python@v4
with:
python-version: '3.11'
cache: 'pip'
- name: Install dependencies
run: |
python -m pip install --upgrade pip
pip install hatch
- name: Build package
run: hatch build
- name: Publish on PyPI
uses: pypa/gh-action-pypi-publish@release/v1

19
.github/workflows/ruff.yml vendored Normal file
View File

@@ -0,0 +1,19 @@
name: Ruff
on:
push:
branches: [main]
pull_request:
branches: [main]
workflow_dispatch:
jobs:
ruff:
runs-on: ubuntu-latest
steps:
- uses: actions/checkout@v4
- uses: astral-sh/ruff-action@v3
with:
args: 'format --check --diff'

View File

@@ -5,6 +5,79 @@ All notable changes to this project will be documented in this file.
The format is based on [Keep a Changelog](https://keepachangelog.com/en/1.0.0/),
and this project adheres to [Semantic Versioning](https://semver.org/spec/v2.0.0.html).
# [0.20.0] - 2025-07-14
### Added
- text command group, see [Text](https://github.com/onyx-and-iris/obsws-cli?tab=readme-ov-file#text)
# [0.19.0] - 2025-06-23
### Added
- record split and record chapter commands, see [Record](https://github.com/onyx-and-iris/obsws-cli?tab=readme-ov-file#record)
- As of OBS 30.2.0, the only file format supporting *record chapter* is Hybrid MP4.
# [0.18.0] - 2025-06-21
### Added
- Various colouring styles, see [Style](https://github.com/onyx-and-iris/obsws-cli/tree/main?tab=readme-ov-file#style)
- colouring is applied to list tables as well as highlighted information in stdout/stderr output.
- table border styling may be optionally disabled with the --no-border flag.
# [0.17.3] - 2025-06-20
### Added
- input list, scene list and sceneitem list now accept --uuid flag.
- Active column added to scene list table.
### Changed
- scene list no longer prints the UUIDs by default, enable it with the --uuid flag.
- if NO_COLOR is set, print colourless check and cross marks in tables.
### Fixed
- Issue with input list not printing all inputs if no filters were applied.
# [0.16.8] - 2025-06-07
### Added
- filter list:
- --ffmpeg, --vlc flags
- Muted column to list table
# [0.16.5] - 2025-06-06
### Added
- [Disable Colouring](https://github.com/onyx-and-iris/obsws-cli?tab=readme-ov-file#disable-colouring) section added to README.
### Changed
- error output:
- now printed in bold red.
- highlights are now yellow
- normal output:
- highlights are now green
- help messages:
- removed a lot of the `[default: None]`, this affects optional flags/arguments without default values.
# [0.16.1] - 2025-06-04
### Added
- screenshot save command, see [Screenshot](https://github.com/onyx-and-iris/obsws-cli/tree/main?tab=readme-ov-file#screenshot)
### Changed
- filter list:
- source_name arg is now optional, it defaults to the current scene.
- default values are printed if unmodified.
# [0.15.0] - 2025-06-02
### Added

View File

@@ -14,6 +14,7 @@ For an outline of past/future changes refer to: [CHANGELOG](CHANGELOG.md)
- [Installation](#installation)
- [Configuration](#configuration)
- [Style](#style)
- [Commands](#root-typer)
- [License](#license)
@@ -68,6 +69,37 @@ OBS_PASSWORD=<websocket password>
Flags can be used to override environment variables.
## Style
Styling is opt-in, by default you will get a colourless output:
![colourless](./img/colourless.png)
You may enable styling with the --style/-s flag:
```console
obsws-cli --style="cyan" sceneitem list
```
Available styles: _red, magenta, purple, blue, cyan, green, yellow, orange, white, grey, navy, black_
![coloured](./img/coloured-border.png)
Optionally you may disable border colouring with the --no-border flag:
![coloured-no-border](./img/coloured-no-border.png)
```console
obsws-cli --style="cyan" --no-border sceneitem list
```
Or with environment variables:
```env
OBS_STYLE=cyan
OBS_STYLE_NO_BORDER=true
```
## Root Typer
- obs-version: Get the OBS Client and WebSocket versions.
@@ -81,6 +113,10 @@ obsws-cli obs-version
#### Scene
- list: List all scenes.
- flags:
*optional*
- --uuid: Show UUIDs of scenes
```console
obsws-cli scene list
@@ -102,6 +138,10 @@ obsws-cli scene switch LIVE
#### Scene Item
- list: List all items in a scene.
- flags:
*optional*
- --uuid: Show UUIDs of scene items
*optional*
- args: <scene_name>
@@ -265,6 +305,9 @@ obsws-cli group status START "test_group"
- --input: Filter by input type.
- --output: Filter by output type.
- --colour: Filter by colour source type.
- --ffmpeg: Filter by ffmpeg source type.
- --vlc: Filter by VLC source type.
- --uuid: Show UUIDs of inputs.
```console
obsws-cli input list
@@ -292,6 +335,22 @@ obsws-cli input unmute "Mic/Aux"
obsws-cli input toggle "Mic/Aux"
```
#### Text
- current: Get the current text for a text input.
- args: <input_name>
```console
obsws-cli text current "My Text Input"
```
- update: Update the text of a text input.
- args: <input_name> <new_text>
```console
obsws-cli text update "My Text Input" "hi OBS!"
```
#### Record
- start: Start recording.
@@ -343,6 +402,21 @@ obsws-cli record directory "/home/me/obs-vids/"
obsws-cli record directory "C:/Users/me/Videos"
```
- split: Split the current recording.
```console
obsws-cli record split
```
- chapter: Create a chapter in the current recording.
*optional*
- args: <chapter_name>
```console
obsws-cli record chapter "Chapter Name"
```
#### Stream
- start: Start streaming.
@@ -519,7 +593,10 @@ obsws-cli hotkey trigger-sequence OBS_KEY_F1 --shift --ctrl
#### Filter
- list: List filters for a source.
*optional*
- args: <source_name>
- defaults to current scene
```console
obsws-cli filter list "Mic/Aux"
@@ -573,13 +650,31 @@ obsws-cli projector list-monitors
- defaults to current scene
```console
obsws-cli project open
obsws-cli projector open
obsws-cli projector open --monitor-index=1 "test_scene"
obsws-cli projector open --monitor-index=1 "test_group"
```
#### Screenshot
- save: Take a screenshot and save it to a file.
- flags:
*optional*
- --width:
- defaults to 1920
- --height:
- defaults to 1080
- --quality:
- defaults to -1
- args: <source_name> <output_path>
```console
obsws-cli screenshot save --width=2560 --height=1440 "Scene" "C:\Users\me\Videos\screenshot.png"
```
## License
@@ -588,3 +683,4 @@ obsws-cli projector open --monitor-index=1 "test_group"
[obs-studio]: https://obsproject.com/
[obs-keyids]: https://github.com/obsproject/obs-studio/blob/master/libobs/obs-hotkeys.h
[no-colour]: https://no-color.org/

BIN
img/coloured-border.png Executable file

Binary file not shown.

After

Width:  |  Height:  |  Size: 6.2 KiB

BIN
img/coloured-no-border.png Executable file

Binary file not shown.

After

Width:  |  Height:  |  Size: 6.2 KiB

BIN
img/colourless.png Executable file

Binary file not shown.

After

Width:  |  Height:  |  Size: 5.4 KiB

View File

@@ -1,4 +1,4 @@
# SPDX-FileCopyrightText: 2025-present onyx-and-iris <code@onyxandiris.online>
#
# SPDX-License-Identifier: MIT
__version__ = "0.15.2"
__version__ = '0.20.2'

View File

@@ -2,6 +2,6 @@
#
# SPDX-License-Identifier: MIT
from .app import app
from .app import run
__all__ = ["app"]
__all__ = ['run']

View File

@@ -1,28 +0,0 @@
"""module defining a custom group class for handling command name aliases."""
import re
import typer
class AliasGroup(typer.core.TyperGroup):
"""A custom group class to handle command name aliases."""
_CMD_SPLIT_P = re.compile(r' ?[,|] ?')
def __init__(self, *args, **kwargs):
"""Initialize the AliasGroup."""
super().__init__(*args, **kwargs)
self.no_args_is_help = True
def get_command(self, ctx, cmd_name):
"""Get a command by name."""
cmd_name = self._group_cmd_name(cmd_name)
return super().get_command(ctx, cmd_name)
def _group_cmd_name(self, default_name):
for cmd in self.commands.values():
name = cmd.name
if name and default_name in self._CMD_SPLIT_P.split(name):
return name
return default_name

View File

@@ -1,121 +1,137 @@
"""Command line interface for the OBS WebSocket API."""
from typing import Annotated
import importlib
import logging
from dataclasses import dataclass
from typing import Annotated, Any
import obsws_python as obsws
import typer
from rich.console import Console
from cyclopts import App, Group, Parameter, config
from obsws_cli.__about__ import __version__ as obsws_cli_version
from obsws_cli.__about__ import __version__ as version
from . import (
filter,
group,
hotkey,
input,
profile,
projector,
record,
replaybuffer,
scene,
scenecollection,
sceneitem,
settings,
stream,
studiomode,
virtualcam,
from . import console, styles
from .context import Context
from .error import OBSWSCLIError
app = App(
config=config.Env(
'OBS_'
), # Environment variable prefix for configuration parameters
version=version,
usage='[bold][yellow]Usage:[/yellow] [white]obsws-cli [OPTIONS] COMMAND [ARGS]...[/white][/bold]',
)
from .alias import AliasGroup
app = typer.Typer(cls=AliasGroup)
for module in (
filter,
group,
hotkey,
input,
projector,
profile,
record,
replaybuffer,
scene,
scenecollection,
sceneitem,
stream,
studiomode,
virtualcam,
app.meta.group_parameters = Group('Options', sort_key=0)
for sub_app in (
'filter',
'group',
'hotkey',
'input',
'profile',
'projector',
'record',
'replaybuffer',
'scene',
'scenecollection',
'sceneitem',
'screenshot',
'stream',
'studiomode',
'text',
'virtualcam',
):
app.add_typer(module.app, name=module.__name__.split('.')[-1])
out_console = Console()
err_console = Console(stderr=True)
module = importlib.import_module(f'.{sub_app}', package=__package__)
app.command(module.app)
def version_callback(value: bool):
"""Show the version of the CLI."""
if value:
out_console.print(f'obsws-cli version: {obsws_cli_version}')
raise typer.Exit()
@Parameter(name='*')
@dataclass
class OBSConfig:
"""Dataclass to hold OBS connection parameters.
Attributes:
host (str): The hostname or IP address of the OBS WebSocket server.
port (int): The port number of the OBS WebSocket server.
password (str): The password for the OBS WebSocket server, if required.
"""
host: str = 'localhost'
port: int = 4455
password: str = ''
@app.callback()
def main(
ctx: typer.Context,
host: Annotated[
str,
typer.Option(
'--host',
'-H',
envvar='OBS_HOST',
help='WebSocket host',
show_default='localhost',
),
] = settings.get('OBS_HOST'),
port: Annotated[
int,
typer.Option(
'--port', '-P', envvar='OBS_PORT', help='WebSocket port', show_default=4455
),
] = settings.get('OBS_PORT'),
password: Annotated[
str,
typer.Option(
'--password',
'-p',
envvar='OBS_PASSWORD',
help='WebSocket password',
show_default='',
),
] = settings.get('OBS_PASSWORD'),
timeout: Annotated[
int,
typer.Option(
'--timeout',
'-T',
envvar='OBS_TIMEOUT',
help='WebSocket timeout',
show_default=5,
),
] = settings.get('OBS_TIMEOUT'),
version: Annotated[
@dataclass
class StyleConfig:
"""Dataclass to hold style parameters.
Attributes:
name (str): The name of the style to use for console output.
no_border (bool): Whether to style the borders in the console output.
"""
name: str = 'disabled'
no_border: bool = False
def setup_logging(type_, value: Any):
"""Set up logging for the application."""
log_level = logging.DEBUG if value else logging.CRITICAL
logging.basicConfig(
level=log_level,
format='%(asctime)s - %(name)s - %(levelname)s - %(message)s',
)
@app.meta.default
def launcher(
*tokens: Annotated[str, Parameter(show=False, allow_leading_hyphen=True)],
obs_config: OBSConfig,
style_config: StyleConfig,
debug: Annotated[
bool,
typer.Option(
'--version',
'-v',
is_eager=True,
help='Show the CLI version and exit',
show_default=False,
callback=version_callback,
),
Parameter(validator=setup_logging, help='Enable debug logging'),
] = False,
):
"""obsws_cli is a command line interface for the OBS WebSocket API."""
ctx.obj = ctx.with_resource(obsws.ReqClient(**ctx.params))
"""Command line interface for the OBS WebSocket API."""
with obsws.ReqClient(
host=obs_config.host,
port=obs_config.port,
password=obs_config.password,
) as client:
additional_kwargs = {}
command, bound, ignored = app.parse_args(tokens)
if 'ctx' in ignored:
# If 'ctx' is in ignored, it means it was not passed as an argument
# and we need to add it to the bound arguments.
additional_kwargs['ctx'] = ignored['ctx'](
client,
styles.request_style_obj(style_config.name, style_config.no_border),
)
return command(*bound.args, **bound.kwargs, **additional_kwargs)
@app.command()
def obs_version(ctx: typer.Context):
@app.command
def obs_version(
*,
ctx: Annotated[Context, Parameter(parse=False)],
):
"""Get the OBS Client and WebSocket versions."""
resp = ctx.obj.get_version()
out_console.print(
f'OBS Client version: {resp.obs_version} with WebSocket version: {resp.obs_web_socket_version}'
resp = ctx.client.get_version()
console.out.print(
f'OBS Client version: {console.highlight(ctx, resp.obs_version)}'
f' with WebSocket version: {console.highlight(ctx, resp.obs_web_socket_version)}'
)
def run():
"""Run the OBS WebSocket CLI application.
Handles exceptions and prints error messages to the console.
"""
try:
app.meta()
except OBSWSCLIError as e:
console.err.print(f'Error: {e}')
return e.code

13
obsws_cli/console.py Normal file
View File

@@ -0,0 +1,13 @@
"""module for console output handling in obsws_cli."""
from rich.console import Console
from .context import Context
out = Console()
err = Console(stderr=True, style='bold red')
def highlight(ctx: Context, text: str) -> str:
"""Highlight text using the current context's style."""
return f'[{ctx.style.highlight}]{text}[/{ctx.style.highlight}]'

15
obsws_cli/context.py Normal file
View File

@@ -0,0 +1,15 @@
"""module for managing the application context."""
from dataclasses import dataclass
import obsws_python as obsws
from . import styles
@dataclass
class Context:
"""Context for the application, holding OBS and style configurations."""
client: obsws.ReqClient
style: styles.Style

10
obsws_cli/enum.py Normal file
View File

@@ -0,0 +1,10 @@
"""module for exit codes used in the application."""
from enum import IntEnum, auto
class ExitCode(IntEnum):
"""Exit codes for the application."""
SUCCESS = 0
ERROR = auto()

11
obsws_cli/error.py Normal file
View File

@@ -0,0 +1,11 @@
"""module containing error handling for OBS WebSocket CLI."""
class OBSWSCLIError(Exception):
"""Base class for OBS WebSocket CLI errors."""
def __init__(self, message: str, code: int = 1):
"""Initialize the error with a message and an optional code."""
super().__init__(message)
self.message = message
self.code = code

View File

@@ -1,134 +1,221 @@
"""module containing commands for manipulating filters in scenes."""
from typing import Annotated, Optional
import obsws_python as obsws
import typer
from rich.console import Console
from cyclopts import App, Parameter
from rich.table import Table
from rich.text import Text
from . import util
from .alias import AliasGroup
from . import console, util
from .context import Context
from .enum import ExitCode
from .error import OBSWSCLIError
app = typer.Typer(cls=AliasGroup)
out_console = Console()
err_console = Console(stderr=True)
app = App(name='filter', help='Commands for managing filters in OBS sources')
@app.callback()
def main():
"""Control filters in OBS scenes."""
@app.command(name=['list', 'ls'])
def list_(
source_name: Optional[str] = None,
/,
*,
ctx: Annotated[Context, Parameter(parse=False)],
):
"""List filters for a source.
Parameters
----------
source_name : str, optional
The name of the source to list filters for. If not provided, the current program scene's source will be used.
ctx : Context
The context containing the OBS client and other settings.
"""
if not source_name:
source_name = ctx.client.get_current_program_scene().scene_name
@app.command('list | ls')
def list(ctx: typer.Context, source_name: str):
"""List filters for a source."""
try:
resp = ctx.obj.get_source_filter_list(source_name)
resp = ctx.client.get_source_filter_list(source_name)
except obsws.error.OBSSDKRequestError as e:
if e.code == 600:
err_console.print(f"No source was found by the name of '{source_name}'.")
raise typer.Exit(1)
raise OBSWSCLIError(
f'No source found by the name of [yellow]{source_name}[/yellow].',
code=ExitCode.ERROR,
)
else:
raise
if not resp.filters:
out_console.print(f'No filters found for source {source_name}')
raise typer.Exit()
table = Table(title=f'Filters for Source: {source_name}', padding=(0, 2))
for column in ('Filter Name', 'Kind', 'Enabled', 'Settings'):
table.add_column(
column,
justify='left' if column in ('Filter Name', 'Kind') else 'center',
style='cyan',
console.out.print(
f'No filters found for source {console.highlight(ctx, source_name)}'
)
return
table = Table(
title=f'Filters for Source: {source_name}',
padding=(0, 2),
border_style=ctx.style.border,
)
columns = [
(Text('Filter Name', justify='center'), 'left', ctx.style.column),
(Text('Kind', justify='center'), 'left', ctx.style.column),
(Text('Enabled', justify='center'), 'center', None),
(Text('Settings', justify='center'), 'center', ctx.style.column),
]
for heading, justify, style in columns:
table.add_column(heading, justify=justify, style=style)
for filter in resp.filters:
resp = ctx.client.get_source_filter_default_settings(filter['filterKind'])
settings = resp.default_filter_settings | filter['filterSettings']
table.add_row(
filter['filterName'],
util.snakecase_to_titlecase(filter['filterKind']),
':white_heavy_check_mark:' if filter['filterEnabled'] else ':x:',
util.check_mark(filter['filterEnabled']),
'\n'.join(
[
f'{util.snakecase_to_titlecase(k):<20} {v:>10}'
for k, v in filter['filterSettings'].items()
for k, v in settings.items()
]
),
)
out_console.print(table)
console.out.print(table)
def _get_filter_enabled(ctx: typer.Context, source_name: str, filter_name: str):
def _get_filter_enabled(ctx: Context, source_name: str, filter_name: str):
"""Get the status of a filter for a source."""
resp = ctx.obj.get_source_filter(source_name, filter_name)
resp = ctx.client.get_source_filter(source_name, filter_name)
return resp.filter_enabled
@app.command('enable | on')
@app.command(name=['enable', 'on'])
def enable(
ctx: typer.Context,
source_name: str = typer.Argument(..., help='The source to enable the filter for'),
filter_name: str = typer.Argument(..., help='The name of the filter to enable'),
source_name: str,
filter_name: str,
/,
*,
ctx: Annotated[Context, Parameter(parse=False)],
):
"""Enable a filter for a source."""
"""Enable a filter for a source.
Parameters
----------
source_name : str
The name of the source to enable the filter for.
filter_name : str
The name of the filter to enable.
ctx : Context
The context containing the OBS client and other settings.
"""
if _get_filter_enabled(ctx, source_name, filter_name):
err_console.print(
f'Filter {filter_name} is already enabled for source {source_name}'
raise OBSWSCLIError(
f'Filter [yellow]{filter_name}[/yellow] is already enabled for source [yellow]{source_name}[/yellow]',
code=ExitCode.ERROR,
)
raise typer.Exit(1)
ctx.obj.set_source_filter_enabled(source_name, filter_name, enabled=True)
out_console.print(f'Enabled filter {filter_name} for source {source_name}')
ctx.client.set_source_filter_enabled(source_name, filter_name, enabled=True)
console.out.print(
f'Enabled filter {console.highlight(ctx, filter_name)} for source {console.highlight(ctx, source_name)}'
)
@app.command('disable | off')
@app.command(name=['disable', 'off'])
def disable(
ctx: typer.Context,
source_name: str = typer.Argument(..., help='The source to disable the filter for'),
filter_name: str = typer.Argument(..., help='The name of the filter to disable'),
source_name: str,
filter_name: str,
/,
*,
ctx: Annotated[Context, Parameter(parse=False)],
):
"""Disable a filter for a source."""
"""Disable a filter for a source.
Parameters
----------
source_name : str
The name of the source to disable the filter for.
filter_name : str
The name of the filter to disable.
ctx : Context
The context containing the OBS client and other settings.
"""
if not _get_filter_enabled(ctx, source_name, filter_name):
err_console.print(
f'Filter {filter_name} is already disabled for source {source_name}'
raise OBSWSCLIError(
f'Filter [yellow]{filter_name}[/yellow] is already disabled for source [yellow]{source_name}[/yellow]',
code=ExitCode.ERROR,
)
raise typer.Exit(1)
ctx.obj.set_source_filter_enabled(source_name, filter_name, enabled=False)
out_console.print(f'Disabled filter {filter_name} for source {source_name}')
ctx.client.set_source_filter_enabled(source_name, filter_name, enabled=False)
console.out.print(
f'Disabled filter {console.highlight(ctx, filter_name)} for source {console.highlight(ctx, source_name)}'
)
@app.command('toggle | tg')
@app.command(name=['toggle', 'tg'])
def toggle(
ctx: typer.Context,
source_name: str = typer.Argument(..., help='The source to toggle the filter for'),
filter_name: str = typer.Argument(..., help='The name of the filter to toggle'),
source_name: str,
filter_name: str,
/,
*,
ctx: Annotated[Context, Parameter(parse=False)],
):
"""Toggle a filter for a source."""
"""Toggle a filter for a source.
Parameters
----------
source_name : str
The name of the source to toggle the filter for.
filter_name : str
The name of the filter to toggle.
ctx : Context
The context containing the OBS client and other settings.
"""
is_enabled = _get_filter_enabled(ctx, source_name, filter_name)
new_state = not is_enabled
ctx.obj.set_source_filter_enabled(source_name, filter_name, enabled=new_state)
ctx.client.set_source_filter_enabled(source_name, filter_name, enabled=new_state)
if new_state:
out_console.print(f'Enabled filter {filter_name} for source {source_name}')
console.out.print(
f'Enabled filter {console.highlight(ctx, filter_name)} for source {console.highlight(ctx, source_name)}'
)
else:
out_console.print(f'Disabled filter {filter_name} for source {source_name}')
console.out.print(
f'Disabled filter {console.highlight(ctx, filter_name)} for source {console.highlight(ctx, source_name)}'
)
@app.command('status | ss')
@app.command(name=['status', 'ss'])
def status(
ctx: typer.Context,
source_name: str = typer.Argument(
..., help='The source to get the filter status for'
),
filter_name: str = typer.Argument(
..., help='The name of the filter to get the status for'
),
source_name: str,
filter_name: str,
/,
*,
ctx: Annotated[Context, Parameter(parse=False)],
):
"""Get the status of a filter for a source."""
"""Get the status of a filter for a source.
Parameters
----------
source_name : str
The name of the source to check the filter status for.
filter_name : str
The name of the filter to check the status for.
ctx : Context
The context containing the OBS client and other settings.
"""
is_enabled = _get_filter_enabled(ctx, source_name, filter_name)
if is_enabled:
out_console.print(f'Filter {filter_name} is enabled for source {source_name}')
console.out.print(
f'Filter {console.highlight(ctx, filter_name)} is enabled for source {console.highlight(ctx, source_name)}'
)
else:
out_console.print(f'Filter {filter_name} is disabled for source {source_name}')
console.out.print(
f'Filter {console.highlight(ctx, filter_name)} is disabled for source {console.highlight(ctx, source_name)}'
)

View File

@@ -1,39 +1,48 @@
"""module containing commands for manipulating groups in scenes."""
import typer
from rich.console import Console
from rich.table import Table
from typing import Annotated, Optional
from . import validate
from .alias import AliasGroup
from cyclopts import App, Parameter
from rich.table import Table
from rich.text import Text
from . import console, util, validate
from .context import Context
from .enum import ExitCode
from .error import OBSWSCLIError
from .protocols import DataclassProtocol
app = typer.Typer(cls=AliasGroup)
out_console = Console()
err_console = Console(stderr=True)
app = App(name='group', help='Commands for managing groups in OBS scenes')
@app.callback()
def main():
"""Control groups in OBS scenes."""
@app.command('list | ls')
def list(
ctx: typer.Context,
scene_name: str = typer.Argument(
None, help='Scene name (optional, defaults to current scene)'
),
@app.command(name=['list', 'ls'])
def list_(
scene_name: Optional[str] = None,
/,
*,
ctx: Annotated[Context, Parameter(parse=False)],
):
"""List groups in a scene."""
"""List groups in a scene.
Parameters
----------
scene_name : str, optional
The name of the scene to list groups for. If not provided, the current program scene
will be used.
ctx : Context
The context containing the OBS client and other settings.
"""
if not scene_name:
scene_name = ctx.obj.get_current_program_scene().scene_name
scene_name = ctx.client.get_current_program_scene().scene_name
if not validate.scene_in_scenes(ctx, scene_name):
err_console.print(f"Scene '{scene_name}' not found.")
raise typer.Exit(1)
raise OBSWSCLIError(
f'Scene [yellow]{scene_name}[/yellow] not found.',
code=ExitCode.ERROR,
)
resp = ctx.obj.get_scene_item_list(scene_name)
resp = ctx.client.get_scene_item_list(scene_name)
groups = [
(item.get('sceneItemId'), item.get('sourceName'), item.get('sceneItemEnabled'))
for item in resp.scene_items
@@ -41,24 +50,33 @@ def list(
]
if not groups:
out_console.print(f"No groups found in scene '{scene_name}'.")
raise typer.Exit()
table = Table(title=f'Groups in Scene: {scene_name}', padding=(0, 2))
for column in ('ID', 'Group Name', 'Enabled'):
table.add_column(
column, justify='left' if column == 'Group Name' else 'center', style='cyan'
console.out.print(
f'No groups found in scene {console.highlight(ctx, scene_name)}.'
)
return
table = Table(
title=f'Groups in Scene: {scene_name}',
padding=(0, 2),
border_style=ctx.style.border,
)
columns = [
(Text('ID', justify='center'), 'center', ctx.style.column),
(Text('Group Name', justify='center'), 'left', ctx.style.column),
(Text('Enabled', justify='center'), 'center', None),
]
for heading, justify, style in columns:
table.add_column(heading, justify=justify, style=style)
for item_id, group_name, is_enabled in groups:
table.add_row(
str(item_id),
group_name,
':white_heavy_check_mark:' if is_enabled else ':x:',
util.check_mark(is_enabled),
)
out_console.print(table)
console.out.print(table)
def _get_group(group_name: str, resp: DataclassProtocol) -> dict | None:
@@ -74,91 +92,175 @@ def _get_group(group_name: str, resp: DataclassProtocol) -> dict | None:
return group
@app.command('show | sh')
def show(ctx: typer.Context, scene_name: str, group_name: str):
"""Show a group in a scene."""
@app.command(name=['show', 'sh'])
def show(
scene_name: str,
group_name: str,
/,
*,
ctx: Annotated[Context, Parameter(parse=False)],
):
"""Show a group in a scene.
Parameters
----------
scene_name : str
The name of the scene where the group is located.
group_name : str
The name of the group to show.
ctx : Context
The context containing the OBS client and other settings.
"""
if not validate.scene_in_scenes(ctx, scene_name):
err_console.print(f"Scene '{scene_name}' not found.")
raise typer.Exit(1)
raise OBSWSCLIError(
f'Scene [yellow]{scene_name}[/yellow] not found.',
code=ExitCode.ERROR,
)
resp = ctx.obj.get_scene_item_list(scene_name)
resp = ctx.client.get_scene_item_list(scene_name)
if (group := _get_group(group_name, resp)) is None:
err_console.print(f"Group '{group_name}' not found in scene {scene_name}.")
raise typer.Exit(1)
raise OBSWSCLIError(
f'Group [yellow]{group_name}[/yellow] not found in scene [yellow]{scene_name}[/yellow].',
code=ExitCode.ERROR,
)
ctx.obj.set_scene_item_enabled(
ctx.client.set_scene_item_enabled(
scene_name=scene_name,
item_id=int(group.get('sceneItemId')),
enabled=True,
)
out_console.print(f"Group '{group_name}' is now visible.")
console.out.print(f'Group {console.highlight(ctx, group_name)} is now visible.')
@app.command('hide | h')
def hide(ctx: typer.Context, scene_name: str, group_name: str):
"""Hide a group in a scene."""
@app.command(name=['hide', 'h'])
def hide(
scene_name: str,
group_name: str,
/,
*,
ctx: Annotated[Context, Parameter(parse=False)],
):
"""Hide a group in a scene.
Parameters
----------
scene_name : str
The name of the scene where the group is located.
group_name : str
The name of the group to hide.
ctx : Context
The context containing the OBS client and other settings.
"""
if not validate.scene_in_scenes(ctx, scene_name):
err_console.print(f"Scene '{scene_name}' not found.")
raise typer.Exit(1)
raise OBSWSCLIError(
f'Scene [yellow]{scene_name}[/yellow] not found.',
code=ExitCode.ERROR,
)
resp = ctx.obj.get_scene_item_list(scene_name)
resp = ctx.client.get_scene_item_list(scene_name)
if (group := _get_group(group_name, resp)) is None:
err_console.print(f"Group '{group_name}' not found in scene {scene_name}.")
raise typer.Exit(1)
raise OBSWSCLIError(
f'Group [yellow]{group_name}[/yellow] not found in scene [yellow]{scene_name}[/yellow].',
code=ExitCode.ERROR,
)
ctx.obj.set_scene_item_enabled(
ctx.client.set_scene_item_enabled(
scene_name=scene_name,
item_id=int(group.get('sceneItemId')),
enabled=False,
)
out_console.print(f"Group '{group_name}' is now hidden.")
console.out.print(f'Group {console.highlight(ctx, group_name)} is now hidden.')
@app.command('toggle | tg')
def toggle(ctx: typer.Context, scene_name: str, group_name: str):
"""Toggle a group in a scene."""
@app.command(name=['toggle', 'tg'])
def toggle(
scene_name: str,
group_name: str,
/,
*,
ctx: Annotated[Context, Parameter(parse=False)],
):
"""Toggle a group in a scene.
Parameters
----------
scene_name : str
The name of the scene where the group is located.
group_name : str
The name of the group to toggle.
ctx : Context
The context containing the OBS client and other settings.
"""
if not validate.scene_in_scenes(ctx, scene_name):
err_console.print(f"Scene '{scene_name}' not found.")
raise typer.Exit(1)
raise OBSWSCLIError(
f'Scene [yellow]{scene_name}[/yellow] not found.',
code=ExitCode.ERROR,
)
resp = ctx.obj.get_scene_item_list(scene_name)
resp = ctx.client.get_scene_item_list(scene_name)
if (group := _get_group(group_name, resp)) is None:
err_console.print(f"Group '{group_name}' not found in scene {scene_name}.")
raise typer.Exit(1)
raise OBSWSCLIError(
f'Group [yellow]{group_name}[/yellow] not found in scene [yellow]{scene_name}[/yellow].',
code=ExitCode.ERROR,
)
new_state = not group.get('sceneItemEnabled')
ctx.obj.set_scene_item_enabled(
ctx.client.set_scene_item_enabled(
scene_name=scene_name,
item_id=int(group.get('sceneItemId')),
enabled=new_state,
)
if new_state:
out_console.print(f"Group '{group_name}' is now visible.")
console.out.print(f'Group {console.highlight(ctx, group_name)} is now visible.')
else:
out_console.print(f"Group '{group_name}' is now hidden.")
console.out.print(f'Group {console.highlight(ctx, group_name)} is now hidden.')
@app.command('status | ss')
def status(ctx: typer.Context, scene_name: str, group_name: str):
"""Get the status of a group in a scene."""
@app.command(name=['status', 'ss'])
def status(
scene_name: str,
group_name: str,
/,
*,
ctx: Annotated[Context, Parameter(parse=False)],
):
"""Get the status of a group in a scene.
Parameters
----------
scene_name : str
The name of the scene where the group is located.
group_name : str
The name of the group to check.
ctx : Context
The context containing the OBS client and other settings.
"""
if not validate.scene_in_scenes(ctx, scene_name):
err_console.print(f"Scene '{scene_name}' not found.")
raise typer.Exit(1)
raise OBSWSCLIError(
f'Scene [yellow]{scene_name}[/yellow] not found.',
code=ExitCode.ERROR,
)
resp = ctx.obj.get_scene_item_list(scene_name)
resp = ctx.client.get_scene_item_list(scene_name)
if (group := _get_group(group_name, resp)) is None:
err_console.print(f"Group '{group_name}' not found in scene {scene_name}.")
raise typer.Exit(1)
raise OBSWSCLIError(
f'Group [yellow]{group_name}[/yellow] not found in scene [yellow]{scene_name}[/yellow].',
code=ExitCode.ERROR,
)
enabled = ctx.obj.get_scene_item_enabled(
enabled = ctx.client.get_scene_item_enabled(
scene_name=scene_name,
item_id=int(group.get('sceneItemId')),
)
if enabled.scene_item_enabled:
out_console.print(f"Group '{group_name}' is now visible.")
console.out.print(f'Group {console.highlight(ctx, group_name)} is now visible.')
else:
out_console.print(f"Group '{group_name}' is now hidden.")
console.out.print(f'Group {console.highlight(ctx, group_name)} is now hidden.')

View File

@@ -1,57 +1,96 @@
"""module containing commands for hotkey management."""
import typer
from rich.console import Console
from typing import Annotated
from cyclopts import App, Parameter
from rich.table import Table
from rich.text import Text
from .alias import AliasGroup
from . import console
from .context import Context
app = typer.Typer(cls=AliasGroup)
out_console = Console()
err_console = Console(stderr=True)
app = App(name='hotkey', help='Commands for managing hotkeys in OBS')
@app.callback()
def main():
"""Control hotkeys in OBS."""
@app.command('list | ls')
def list(
ctx: typer.Context,
@app.command(name=['list', 'ls'])
def list_(
*,
ctx: Annotated[Context, Parameter(parse=False)],
):
"""List all hotkeys."""
resp = ctx.obj.get_hotkey_list()
"""List all hotkeys.
table = Table(title='Hotkeys', padding=(0, 2))
table.add_column('Hotkey Name', justify='left', style='cyan')
Parameters
----------
ctx : Context
The context containing the OBS client to interact with.
for hotkey in resp.hotkeys:
table.add_row(hotkey)
"""
resp = ctx.client.get_hotkey_list()
out_console.print(table)
table = Table(
title='Hotkeys',
padding=(0, 2),
border_style=ctx.style.border,
)
table.add_column(
Text('Hotkey Name', justify='center'),
justify='left',
style=ctx.style.column,
)
for i, hotkey in enumerate(resp.hotkeys):
table.add_row(hotkey, style='' if i % 2 == 0 else 'dim')
console.out.print(table)
@app.command('trigger | tr')
@app.command(name=['trigger', 'tr'])
def trigger(
ctx: typer.Context,
hotkey: str = typer.Argument(..., help='The hotkey to trigger'),
hotkey: str,
/,
*,
ctx: Annotated[Context, Parameter(parse=False)],
):
"""Trigger a hotkey by name."""
ctx.obj.trigger_hotkey_by_name(hotkey)
"""Trigger a hotkey by name.
Parameters
----------
hotkey : str
The name of the hotkey to trigger.
ctx : Context
The context containing the OBS client to interact with.
"""
ctx.client.trigger_hotkey_by_name(hotkey)
@app.command('trigger-sequence | trs')
@app.command(name=['trigger-sequence', 'trs'])
def trigger_sequence(
ctx: typer.Context,
shift: bool = typer.Option(False, help='Press shift when triggering the hotkey'),
ctrl: bool = typer.Option(False, help='Press control when triggering the hotkey'),
alt: bool = typer.Option(False, help='Press alt when triggering the hotkey'),
cmd: bool = typer.Option(False, help='Press cmd when triggering the hotkey'),
key_id: str = typer.Argument(
...,
help='The OBS key ID to trigger, see https://github.com/onyx-and-iris/obsws-cli?tab=readme-ov-file#hotkey for more info',
),
key_id: str,
/,
shift: bool = False,
ctrl: bool = False,
alt: bool = False,
cmd: bool = False,
*,
ctx: Annotated[Context, Parameter(parse=False)],
):
"""Trigger a hotkey by sequence."""
ctx.obj.trigger_hotkey_by_key_sequence(key_id, shift, ctrl, alt, cmd)
"""Trigger a hotkey by sequence.
Parameters
----------
key_id : str
The OBS key ID to trigger, see https://github.com/onyx-and-iris/obsws-cli?tab=readme-ov-file#hotkey for more info
shift : bool, optional
Press shift when triggering the hotkey (default is False)
ctrl : bool, optional
Press control when triggering the hotkey (default is False)
alt : bool, optional
Press alt when triggering the hotkey (default is False)
cmd : bool, optional
Press cmd when triggering the hotkey (default is False)
ctx : Context
The context containing the OBS client to interact with.
"""
ctx.client.trigger_hotkey_by_key_sequence(key_id, shift, ctrl, alt, cmd)

View File

@@ -2,32 +2,51 @@
from typing import Annotated
import typer
from rich.console import Console
import obsws_python as obsws
from cyclopts import App, Parameter
from rich.table import Table
from rich.text import Text
from . import util, validate
from .alias import AliasGroup
from . import console, util, validate
from .context import Context
from .enum import ExitCode
from .error import OBSWSCLIError
app = typer.Typer(cls=AliasGroup)
out_console = Console()
err_console = Console(stderr=True)
app = App(name='input', help='Commands for managing inputs in OBS')
@app.callback()
def main():
"""Control inputs in OBS."""
@app.command('list | ls')
def list(
ctx: typer.Context,
input: Annotated[bool, typer.Option(help='Filter by input type.')] = False,
output: Annotated[bool, typer.Option(help='Filter by output type.')] = False,
colour: Annotated[bool, typer.Option(help='Filter by colour source type.')] = False,
@app.command(name=['list', 'ls'])
def list_(
input: bool = False,
output: bool = False,
colour: bool = False,
ffmpeg: bool = False,
vlc: bool = False,
uuid: bool = False,
*,
ctx: Annotated[Context, Parameter(parse=False)],
):
"""List all inputs."""
resp = ctx.obj.get_input_list()
"""List all inputs.
Parameters
----------
input:
Filter by input type.
output:
Filter by output type.
colour:
Filter by colour source type.
ffmpeg:
Filter by ffmpeg source type.
vlc:
Filter by VLC source type.
uuid:
Show UUIDs of inputs.
ctx:
The context containing the client and style.
"""
resp = ctx.client.get_input_list()
kinds = []
if input:
@@ -36,81 +55,171 @@ def list(
kinds.append('output')
if colour:
kinds.append('color')
if not any([input, output, colour]):
kinds = ['input', 'output', 'color']
if ffmpeg:
kinds.append('ffmpeg')
if vlc:
kinds.append('vlc')
if not any([input, output, colour, ffmpeg, vlc]):
kinds = ctx.client.get_input_kind_list(False).input_kinds
inputs = [
(input_.get('inputName'), input_.get('inputKind'))
for input_ in filter(
lambda input_: any(kind in input_.get('inputKind') for kind in kinds),
resp.inputs,
)
]
inputs = sorted(
(
(input_.get('inputName'), input_.get('inputKind'), input_.get('inputUuid'))
for input_ in filter(
lambda input_: any(kind in input_.get('inputKind') for kind in kinds),
resp.inputs,
)
),
key=lambda x: x[0], # Sort by input name
)
if not inputs:
out_console.print('No inputs found.')
raise typer.Exit()
console.out.print('No inputs found matching the specified filters.')
return
table = Table(title='Inputs', padding=(0, 2))
for column in ('Input Name', 'Kind'):
table.add_column(
column, justify='left' if column == 'Input Name' else 'center', style='cyan'
)
table = Table(title='Inputs', padding=(0, 2), border_style=ctx.style.border)
if uuid:
columns = [
(Text('Input Name', justify='center'), 'left', ctx.style.column),
(Text('Kind', justify='center'), 'center', ctx.style.column),
(Text('Muted', justify='center'), 'center', None),
(Text('UUID', justify='center'), 'left', ctx.style.column),
]
else:
columns = [
(Text('Input Name', justify='center'), 'left', ctx.style.column),
(Text('Kind', justify='center'), 'center', ctx.style.column),
(Text('Muted', justify='center'), 'center', None),
]
for heading, justify, style in columns:
table.add_column(heading, justify=justify, style=style)
for input_name, input_kind in inputs:
table.add_row(
input_name,
util.snakecase_to_titlecase(input_kind),
)
for input_name, input_kind, input_uuid in inputs:
input_mark = ''
try:
input_muted = ctx.client.get_input_mute(name=input_name).input_muted
input_mark = util.check_mark(input_muted)
except obsws.error.OBSSDKRequestError as e:
if e.code == 604: # Input does not support audio
input_mark = 'N/A'
else:
raise
out_console.print(table)
if uuid:
table.add_row(
input_name,
util.snakecase_to_titlecase(input_kind),
input_mark,
input_uuid,
)
else:
table.add_row(
input_name,
util.snakecase_to_titlecase(input_kind),
input_mark,
)
console.out.print(table)
@app.command('mute | m')
def mute(ctx: typer.Context, input_name: str):
"""Mute an input."""
@app.command(name=['mute', 'm'])
def mute(
input_name: str,
/,
*,
ctx: Annotated[Context, Parameter(parse=False)],
):
"""Mute an input.
Parameters
----------
input_name: str
Name of the input to mute.
ctx: Context
The context containing the client and style.
"""
if not validate.input_in_inputs(ctx, input_name):
err_console.print(f"Input '{input_name}' not found.")
raise typer.Exit(1)
raise OBSWSCLIError(
f'Input [yellow]{input_name}[/yellow] not found.',
code=ExitCode.ERROR,
)
ctx.obj.set_input_mute(
ctx.client.set_input_mute(
name=input_name,
muted=True,
)
out_console.print(f"Input '{input_name}' muted.")
console.out.print(f'Input {console.highlight(ctx, input_name)} muted.')
@app.command('unmute | um')
def unmute(ctx: typer.Context, input_name: str):
"""Unmute an input."""
@app.command(name=['unmute', 'um'])
def unmute(
input_name: str,
/,
*,
ctx: Annotated[Context, Parameter(parse=False)],
):
"""Unmute an input.
Parameters
----------
input_name: str
Name of the input to unmute.
ctx: Context
The context containing the client and style.
"""
if not validate.input_in_inputs(ctx, input_name):
err_console.print(f"Input '{input_name}' not found.")
raise typer.Exit(1)
raise OBSWSCLIError(
f'Input [yellow]{input_name}[/yellow] not found.',
code=ExitCode.ERROR,
)
ctx.obj.set_input_mute(
ctx.client.set_input_mute(
name=input_name,
muted=False,
)
out_console.print(f"Input '{input_name}' unmuted.")
console.out.print(f'Input {console.highlight(ctx, input_name)} unmuted.')
@app.command('toggle | tg')
def toggle(ctx: typer.Context, input_name: str):
"""Toggle an input."""
@app.command(name=['toggle', 'tg'])
def toggle(
input_name: str,
/,
*,
ctx: Annotated[Context, Parameter(parse=False)],
):
"""Toggle an input.
Parameters
----------
input_name: str
Name of the input to toggle.
ctx: Context
The context containing the client and style.
"""
if not validate.input_in_inputs(ctx, input_name):
err_console.print(f"Input '{input_name}' not found.")
raise typer.Exit(1)
raise OBSWSCLIError(
f'Input [yellow]{input_name}[/yellow] not found.',
code=ExitCode.ERROR,
)
resp = ctx.obj.get_input_mute(name=input_name)
resp = ctx.client.get_input_mute(name=input_name)
new_state = not resp.input_muted
ctx.obj.set_input_mute(
ctx.client.set_input_mute(
name=input_name,
muted=new_state,
)
out_console.print(
f"Input '{input_name}' {'muted' if new_state else 'unmuted'}.",
)
if new_state:
console.out.print(
f'Input {console.highlight(ctx, input_name)} muted.',
)
else:
console.out.print(
f'Input {console.highlight(ctx, input_name)} unmuted.',
)

View File

@@ -1,84 +1,161 @@
"""module containing commands for manipulating profiles in OBS."""
import typer
from rich.console import Console
from typing import Annotated
from cyclopts import App, Parameter
from rich.table import Table
from rich.text import Text
from . import validate
from .alias import AliasGroup
from . import console, util, validate
from .context import Context
from .enum import ExitCode
from .error import OBSWSCLIError
app = typer.Typer(cls=AliasGroup)
out_console = Console()
err_console = Console(stderr=True)
app = App(name='profile', help='Commands for managing profiles in OBS')
@app.callback()
def main():
"""Control profiles in OBS."""
@app.command(name=['list', 'ls'])
def list_(
*,
ctx: Annotated[Context, Parameter(parse=False)],
):
"""List profiles.
Parameters
----------
ctx: Context
The context containing the client and style.
@app.command('list | ls')
def list(ctx: typer.Context):
"""List profiles."""
resp = ctx.obj.get_profile_list()
"""
resp = ctx.client.get_profile_list()
table = Table(title='Profiles', padding=(0, 2))
for column in ('Profile Name', 'Current'):
table.add_column(
column,
justify='left' if column == 'Profile Name' else 'center',
style='cyan',
)
table = Table(title='Profiles', padding=(0, 2), border_style=ctx.style.border)
columns = [
(Text('Profile Name', justify='center'), 'left', ctx.style.column),
(Text('Current', justify='center'), 'center', None),
]
for heading, justify, style in columns:
table.add_column(heading, justify=justify, style=style)
if not resp.profiles:
console.out.print('No profiles found.')
return
for profile in resp.profiles:
table.add_row(
profile,
':white_heavy_check_mark:' if profile == resp.current_profile_name else '',
util.check_mark(
ctx, profile == resp.current_profile_name, empty_if_false=True
),
)
out_console.print(table)
console.out.print(table)
@app.command('current | get')
def current(ctx: typer.Context):
"""Get the current profile."""
resp = ctx.obj.get_profile_list()
out_console.print(resp.current_profile_name)
@app.command(name=['current', 'get'])
def current(
*,
ctx: Annotated[Context, Parameter(parse=False)],
):
"""Get the current profile.
Parameters
----------
ctx: Context
The context containing the client and style.
"""
resp = ctx.client.get_profile_list()
console.out.print(
f'Current profile: {console.highlight(ctx, resp.current_profile_name)}'
)
@app.command('switch | set')
def switch(ctx: typer.Context, profile_name: str):
"""Switch to a profile."""
@app.command(name=['switch', 'set'])
def switch(
profile_name: str,
/,
*,
ctx: Annotated[Context, Parameter(parse=False)],
):
"""Switch to a profile.
Parameters
----------
profile_name: str
Name of the profile to switch to.
ctx: Context
The context containing the client and style.
"""
if not validate.profile_exists(ctx, profile_name):
err_console.print(f"Profile '{profile_name}' not found.")
raise typer.Exit(1)
console.err.print(f'Profile [yellow]{profile_name}[/yellow] not found.')
raise OBSWSCLIError(
f'Profile [yellow]{profile_name}[/yellow] not found.',
code=ExitCode.ERROR,
)
resp = ctx.obj.get_profile_list()
resp = ctx.client.get_profile_list()
if resp.current_profile_name == profile_name:
err_console.print(f"Profile '{profile_name}' is already the current profile.")
raise typer.Exit(1)
raise OBSWSCLIError(
f'Profile [yellow]{profile_name}[/yellow] is already the current profile.',
code=ExitCode.ERROR,
)
ctx.obj.set_current_profile(profile_name)
out_console.print(f"Switched to profile '{profile_name}'.")
ctx.client.set_current_profile(profile_name)
console.out.print(f'Switched to profile {console.highlight(ctx, profile_name)}.')
@app.command('create | new')
def create(ctx: typer.Context, profile_name: str):
"""Create a new profile."""
@app.command(name=['create', 'new'])
def create(
profile_name: str,
/,
*,
ctx: Annotated[Context, Parameter(parse=False)],
):
"""Create a new profile.
Parameters
----------
profile_name: str
Name of the profile to create.
ctx: Context
The context containing the client and style.
"""
if validate.profile_exists(ctx, profile_name):
err_console.print(f"Profile '{profile_name}' already exists.")
raise typer.Exit(1)
raise OBSWSCLIError(
f'Profile [yellow]{profile_name}[/yellow] already exists.',
code=ExitCode.ERROR,
)
ctx.obj.create_profile(profile_name)
out_console.print(f"Created profile '{profile_name}'.")
ctx.client.create_profile(profile_name)
console.out.print(f'Created profile {console.highlight(ctx, profile_name)}.')
@app.command('remove | rm')
def remove(ctx: typer.Context, profile_name: str):
"""Remove a profile."""
@app.command(name=['remove', 'rm'])
def remove(
profile_name: str,
/,
*,
ctx: Annotated[Context, Parameter(parse=False)],
):
"""Remove a profile.
Parameters
----------
profile_name: str
Name of the profile to remove.
ctx: Context
The context containing the client and style.
"""
if not validate.profile_exists(ctx, profile_name):
err_console.print(f"Profile '{profile_name}' not found.")
raise typer.Exit(1)
console.err.print(f'Profile [yellow]{profile_name}[/yellow] not found.')
raise OBSWSCLIError(
f'Profile [yellow]{profile_name}[/yellow] not found.',
code=ExitCode.ERROR,
)
ctx.obj.remove_profile(profile_name)
out_console.print(f"Removed profile '{profile_name}'.")
ctx.client.remove_profile(profile_name)
console.out.print(f'Removed profile {console.highlight(ctx, profile_name)}.')

View File

@@ -1,30 +1,36 @@
"""module containing commands for manipulating projectors in OBS."""
from typing import Annotated
from typing import Annotated, Optional
import typer
from rich.console import Console
from cyclopts import App, Parameter, validators
from rich.table import Table
from rich.text import Text
from .alias import AliasGroup
from . import console
from .context import Context
from .enum import ExitCode
from .error import OBSWSCLIError
app = typer.Typer(cls=AliasGroup)
out_console = Console()
err_console = Console(stderr=True)
app = App(name='projector', help='Commands for managing projectors in OBS')
@app.callback()
def main():
"""Control projectors in OBS."""
@app.command(name=['list-monitors', 'ls-m'])
def list_monitors(
*,
ctx: Annotated[Context, Parameter(parse=False)],
):
"""List available monitors.
Parameters
----------
ctx : Context
The context containing the OBS client and configuration.
@app.command('list-monitors | ls-m')
def list_monitors(ctx: typer.Context):
"""List available monitors."""
resp = ctx.obj.get_monitor_list()
"""
resp = ctx.client.get_monitor_list()
if not resp.monitors:
out_console.print('No monitors found.')
console.out.print('No monitors found.')
return
monitors = sorted(
@@ -32,39 +38,67 @@ def list_monitors(ctx: typer.Context):
key=lambda m: m[0],
)
table = Table(title='Available Monitors', padding=(0, 2))
table.add_column('Index', justify='center', style='cyan')
table.add_column('Name', style='cyan')
if not monitors:
console.out.print('No monitors available.')
return
table = Table(
title='Available Monitors',
padding=(0, 2),
border_style=ctx.style.border,
)
table.add_column(
Text('Index', justify='center'), justify='center', style=ctx.style.column
)
table.add_column(
Text('Name', justify='center'), justify='left', style=ctx.style.column
)
for index, monitor in monitors:
table.add_row(str(index), monitor)
out_console.print(table)
console.out.print(table)
@app.command('open | o')
@app.command(name=['open', 'o'])
def open(
ctx: typer.Context,
monitor_index: Annotated[
int,
typer.Option(help='Index of the monitor to open the projector on.'),
] = 0,
source_name: Annotated[
str,
typer.Argument(
help='Name of the source to project. (optional, defaults to current scene)'
),
] = '',
source_name: Optional[str] = None,
/,
monitor_index: Annotated[int, Parameter(validator=validators.Number(gte=0))] = 0,
*,
ctx: Annotated[Context, Parameter(parse=False)],
):
"""Open a fullscreen projector for a source on a specific monitor."""
"""Open a fullscreen projector for a source on a specific monitor.
Parameters
----------
source_name : str, optional
The name of the source to project. If not provided, the current program scene will be used.
monitor_index : int, optional
The index of the monitor to open the projector on. Defaults to 0 (the primary monitor).
ctx : Context
The context containing the OBS client and configuration.
"""
if not source_name:
source_name = ctx.obj.get_current_program_scene().scene_name
source_name = ctx.client.get_current_program_scene().scene_name
ctx.obj.open_source_projector(
source_name=source_name,
monitor_index=monitor_index,
)
monitors = ctx.client.get_monitor_list().monitors
for monitor in monitors:
if monitor['monitorIndex'] == monitor_index:
ctx.client.open_source_projector(
source_name=source_name,
monitor_index=monitor_index,
)
out_console.print(
f'Opened projector for source [bold]{source_name}[/] on monitor [bold]{monitor_index}[/].'
)
console.out.print(
f'Opened projector for source {console.highlight(ctx, source_name)} on monitor {console.highlight(ctx, monitor["monitorName"])}.'
)
break
else:
raise OBSWSCLIError(
f'Monitor with index [yellow]{monitor_index}[/yellow] not found. '
f'Use [yellow]obsws-cli projector ls-m[/yellow] to see available monitors.',
ExitCode.ERROR,
)

View File

@@ -3,127 +3,251 @@
from pathlib import Path
from typing import Annotated, Optional
import typer
from rich.console import Console
from cyclopts import App, Parameter
from .alias import AliasGroup
from . import console
from .context import Context
from .enum import ExitCode
from .error import OBSWSCLIError
app = typer.Typer(cls=AliasGroup)
out_console = Console()
err_console = Console(stderr=True)
app = App(name='record', help='Commands for controlling OBS recording functionality.')
@app.callback()
def main():
"""Control OBS recording functionality."""
def _get_recording_status(ctx: typer.Context) -> tuple:
def _get_recording_status(ctx: Context) -> tuple:
"""Get recording status."""
resp = ctx.obj.get_record_status()
resp = ctx.client.get_record_status()
return resp.output_active, resp.output_paused
@app.command('start | s')
def start(ctx: typer.Context):
"""Start recording."""
@app.command(name=['start', 's'])
def start(
*,
ctx: Annotated[Context, Parameter(parse=False)],
):
"""Start recording.
Parameters
----------
ctx: Context
The context containing the OBS client and other settings.
"""
active, paused = _get_recording_status(ctx)
if active:
err_msg = 'Recording is already in progress, cannot start.'
if paused:
err_msg += ' Try resuming it.'
raise OBSWSCLIError(err_msg, ExitCode.ERROR)
err_console.print(err_msg)
raise typer.Exit(1)
ctx.obj.start_record()
out_console.print('Recording started successfully.')
ctx.client.start_record()
console.out.print('Recording started successfully.')
@app.command('stop | st')
def stop(ctx: typer.Context):
"""Stop recording."""
@app.command(name=['stop', 'st'])
def stop(
*,
ctx: Annotated[Context, Parameter(parse=False)],
):
"""Stop recording.
Parameters
----------
ctx: Context
The context containing the OBS client and other settings.
"""
active, _ = _get_recording_status(ctx)
if not active:
err_console.print('Recording is not in progress, cannot stop.')
raise typer.Exit(1)
raise OBSWSCLIError(
'Recording is not in progress, cannot stop.', ExitCode.ERROR
)
resp = ctx.obj.stop_record()
out_console.print(f'Recording stopped successfully. Saved to: {resp.output_path}')
resp = ctx.client.stop_record()
console.out.print(
f'Recording stopped successfully. Saved to: {console.highlight(ctx, resp.output_path)}'
)
@app.command('toggle | tg')
def toggle(ctx: typer.Context):
"""Toggle recording."""
resp = ctx.obj.toggle_record()
@app.command(name=['toggle', 'tg'])
def toggle(
*,
ctx: Annotated[Context, Parameter(parse=False)],
):
"""Toggle recording.
Parameters
----------
ctx: Context
The context containing the OBS client and other settings.
"""
resp = ctx.client.toggle_record()
if resp.output_active:
out_console.print('Recording started successfully.')
console.out.print('Recording started successfully.')
else:
out_console.print('Recording stopped successfully.')
console.out.print('Recording stopped successfully.')
@app.command('status | ss')
def status(ctx: typer.Context):
"""Get recording status."""
@app.command(name=['status', 'ss'])
def status(
*,
ctx: Annotated[Context, Parameter(parse=False)],
):
"""Get recording status.
Parameters
----------
ctx: Context
The context containing the OBS client and other settings.
"""
active, paused = _get_recording_status(ctx)
if active:
if paused:
out_console.print('Recording is in progress and paused.')
console.out.print('Recording is in progress and paused.')
else:
out_console.print('Recording is in progress.')
console.out.print('Recording is in progress.')
else:
out_console.print('Recording is not in progress.')
console.out.print('Recording is not in progress.')
@app.command('resume | r')
def resume(ctx: typer.Context):
"""Resume recording."""
active, paused = _get_recording_status(ctx)
if not active:
err_console.print('Recording is not in progress, cannot resume.')
raise typer.Exit(1)
if not paused:
err_console.print('Recording is in progress but not paused, cannot resume.')
raise typer.Exit(1)
ctx.obj.resume_record()
out_console.print('Recording resumed successfully.')
@app.command('pause | p')
def pause(ctx: typer.Context):
"""Pause recording."""
active, paused = _get_recording_status(ctx)
if not active:
err_console.print('Recording is not in progress, cannot pause.')
raise typer.Exit(1)
if paused:
err_console.print('Recording is in progress but already paused, cannot pause.')
raise typer.Exit(1)
ctx.obj.pause_record()
out_console.print('Recording paused successfully.')
@app.command('directory | d')
def directory(
ctx: typer.Context,
record_directory: Annotated[
Optional[Path],
# Since the CLI and OBS may be running on different platforms,
# we won't validate the path here.
typer.Argument(
file_okay=False,
dir_okay=True,
help='Directory to set for recording.',
),
] = None,
@app.command(name=['resume', 'r'])
def resume(
*,
ctx: Annotated[Context, Parameter(parse=False)],
):
"""Get or set the recording directory."""
if record_directory is not None:
ctx.obj.set_record_directory(str(record_directory))
out_console.print(f'Recording directory updated to: {record_directory}')
else:
out_console.print(
f'Recording directory: {ctx.obj.get_record_directory().record_directory}'
"""Resume recording.
Parameters
----------
ctx: Context
The context containing the OBS client and other settings.
"""
active, paused = _get_recording_status(ctx)
if not active:
raise OBSWSCLIError(
'Recording is not in progress, cannot resume.', ExitCode.ERROR
)
if not paused:
raise OBSWSCLIError(
'Recording is in progress but not paused, cannot resume.', ExitCode.ERROR
)
ctx.client.resume_record()
console.out.print('Recording resumed successfully.')
@app.command(name=['pause', 'p'])
def pause(
*,
ctx: Annotated[Context, Parameter(parse=False)],
):
"""Pause recording.
Parameters
----------
ctx: Context
The context containing the OBS client and other settings.
"""
active, paused = _get_recording_status(ctx)
if not active:
raise OBSWSCLIError(
'Recording is not in progress, cannot pause.', ExitCode.ERROR
)
if paused:
raise OBSWSCLIError(
'Recording is in progress but already paused, cannot pause.', ExitCode.ERROR
)
ctx.client.pause_record()
console.out.print('Recording paused successfully.')
@app.command(name=['directory', 'd'])
def directory(
# Since the CLI and OBS may be running on different platforms,
# we won't validate the path here.
record_directory: Optional[Path] = None,
*,
ctx: Annotated[Context, Parameter(parse=False)],
):
"""Get or set the recording directory.
Parameters
----------
record_directory: Optional[Path]
The directory to set for recording. If not provided, the current recording directory is displayed.
ctx: Context
The context containing the OBS client and other settings.
"""
if record_directory is not None:
ctx.client.set_record_directory(str(record_directory))
console.out.print(
f'Recording directory updated to: {console.highlight(ctx, record_directory)}'
)
else:
resp = ctx.client.get_record_directory()
console.out.print(
f'Recording directory: {console.highlight(ctx, resp.record_directory)}'
)
@app.command(name=['split', 'sp'])
def split(
*,
ctx: Annotated[Context, Parameter(parse=False)],
):
"""Split the current recording.
Parameters
----------
ctx: Context
The context containing the OBS client and other settings.
"""
active, paused = _get_recording_status(ctx)
if not active:
console.err.print('Recording is not in progress, cannot split.')
raise OBSWSCLIError(
'Recording is not in progress, cannot split.', ExitCode.ERROR
)
if paused:
raise OBSWSCLIError('Recording is paused, cannot split.', ExitCode.ERROR)
ctx.client.split_record_file()
console.out.print('Recording split successfully.')
@app.command(name=['chapter', 'ch'])
def chapter(
chapter_name: Optional[str] = None,
*,
ctx: Annotated[Context, Parameter(parse=False)],
):
"""Create a chapter in the current recording.
Parameters
----------
chapter_name: Optional[str]
The name of the chapter to create. If not provided, an unnamed chapter is created.
ctx: Context
The context containing the OBS client and other settings.
"""
active, paused = _get_recording_status(ctx)
if not active:
raise OBSWSCLIError(
'Recording is not in progress, cannot create chapter.', ExitCode.ERROR
)
if paused:
raise OBSWSCLIError(
'Recording is paused, cannot create chapter.', ExitCode.ERROR
)
ctx.client.create_record_chapter(chapter_name)
console.out.print(
f'Chapter {console.highlight(ctx, chapter_name or "unnamed")} created successfully.'
)

View File

@@ -1,66 +1,113 @@
"""module containing commands for manipulating the replay buffer in OBS."""
import typer
from rich.console import Console
from typing import Annotated
from .alias import AliasGroup
from cyclopts import App, Parameter
app = typer.Typer(cls=AliasGroup)
out_console = Console()
err_console = Console(stderr=True)
from . import console
from .context import Context
from .enum import ExitCode
from .error import OBSWSCLIError
app = App(
name='replaybuffer', help='Commands for controlling the replay buffer in OBS.'
)
@app.callback()
def main():
"""Control profiles in OBS."""
@app.command(name=['start', 's'])
def start(
*,
ctx: Annotated[Context, Parameter(parse=False)],
):
"""Start the replay buffer.
Parameters
----------
ctx: Context
The context containing the OBS client and other settings.
@app.command('start | s')
def start(ctx: typer.Context):
"""Start the replay buffer."""
resp = ctx.obj.get_replay_buffer_status()
"""
resp = ctx.client.get_replay_buffer_status()
if resp.output_active:
err_console.print('Replay buffer is already active.')
raise typer.Exit(1)
raise OBSWSCLIError('Replay buffer is already active.', ExitCode.ERROR)
ctx.obj.start_replay_buffer()
out_console.print('Replay buffer started.')
ctx.client.start_replay_buffer()
console.out.print('Replay buffer started.')
@app.command('stop | st')
def stop(ctx: typer.Context):
"""Stop the replay buffer."""
resp = ctx.obj.get_replay_buffer_status()
@app.command(name=['stop', 'st'])
def stop(
*,
ctx: Annotated[Context, Parameter(parse=False)],
):
"""Stop the replay buffer.
Parameters
----------
ctx: Context
The context containing the OBS client and other settings.
"""
resp = ctx.client.get_replay_buffer_status()
if not resp.output_active:
err_console.print('Replay buffer is not active.')
raise typer.Exit(1)
raise OBSWSCLIError('Replay buffer is not active.', ExitCode.ERROR)
ctx.obj.stop_replay_buffer()
out_console.print('Replay buffer stopped.')
ctx.client.stop_replay_buffer()
console.out.print('Replay buffer stopped.')
@app.command('toggle | tg')
def toggle(ctx: typer.Context):
"""Toggle the replay buffer."""
resp = ctx.obj.toggle_replay_buffer()
@app.command(name=['toggle', 'tg'])
def toggle(
*,
ctx: Annotated[Context, Parameter(parse=False)],
):
"""Toggle the replay buffer.
Parameters
----------
ctx: Context
The context containing the OBS client and other settings.
"""
resp = ctx.client.toggle_replay_buffer()
if resp.output_active:
out_console.print('Replay buffer is active.')
console.out.print('Replay buffer is active.')
else:
out_console.print('Replay buffer is not active.')
console.out.print('Replay buffer is not active.')
@app.command('status | ss')
def status(ctx: typer.Context):
"""Get the status of the replay buffer."""
resp = ctx.obj.get_replay_buffer_status()
@app.command(name=['status', 'ss'])
def status(
*,
ctx: Annotated[Context, Parameter(parse=False)],
):
"""Get the status of the replay buffer.
Parameters
----------
ctx: Context
The context containing the OBS client and other settings.
"""
resp = ctx.client.get_replay_buffer_status()
if resp.output_active:
out_console.print('Replay buffer is active.')
console.out.print('Replay buffer is active.')
else:
out_console.print('Replay buffer is not active.')
console.out.print('Replay buffer is not active.')
@app.command('save | sv')
def save(ctx: typer.Context):
"""Save the replay buffer."""
ctx.obj.save_replay_buffer()
out_console.print('Replay buffer saved.')
@app.command(name=['save', 'sv'])
def save(
*,
ctx: Annotated[Context, Parameter(parse=False)],
):
"""Save the replay buffer.
Parameters
----------
ctx: Context
The context containing the OBS client and other settings.
"""
ctx.client.save_replay_buffer()
console.out.print('Replay buffer saved.')

View File

@@ -2,86 +2,150 @@
from typing import Annotated
import typer
from rich.console import Console
from cyclopts import App, Parameter
from rich.table import Table
from rich.text import Text
from . import validate
from .alias import AliasGroup
from . import console, util, validate
from .context import Context
from .enum import ExitCode
from .error import OBSWSCLIError
app = typer.Typer(cls=AliasGroup)
out_console = Console()
err_console = Console(stderr=True)
app = App(name='scene', help='Commands for managing OBS scenes')
@app.callback()
def main():
"""Control OBS scenes."""
@app.command(name=['list', 'ls'])
def list_(
uuid: bool = False,
*,
ctx: Annotated[Context, Parameter(parse=False)],
):
"""List all scenes.
Parameters
----------
uuid : bool
Show UUIDs of scenes.
ctx : Context
The context containing the OBS client and configuration.
@app.command('list | ls')
def list(ctx: typer.Context):
"""List all scenes."""
resp = ctx.obj.get_scene_list()
"""
resp = ctx.client.get_scene_list()
scenes = (
(scene.get('sceneName'), scene.get('sceneUuid'))
for scene in reversed(resp.scenes)
)
table = Table(title='Scenes', padding=(0, 2))
for column in ('Scene Name', 'UUID'):
table.add_column(column, justify='left', style='cyan')
if not scenes:
console.out.print('No scenes found.')
return
active_scene = ctx.client.get_current_program_scene().scene_name
table = Table(title='Scenes', padding=(0, 2), border_style=ctx.style.border)
if uuid:
columns = [
(Text('Scene Name', justify='center'), 'left', ctx.style.column),
(Text('Active', justify='center'), 'center', None),
(Text('UUID', justify='center'), 'left', ctx.style.column),
]
else:
columns = [
(Text('Scene Name', justify='center'), 'left', ctx.style.column),
(Text('Active', justify='center'), 'center', None),
]
for heading, justify, style in columns:
table.add_column(heading, justify=justify, style=style)
for scene_name, scene_uuid in scenes:
table.add_row(
scene_name,
scene_uuid,
if uuid:
table.add_row(
scene_name,
util.check_mark(scene_name == active_scene, empty_if_false=True),
scene_uuid,
)
else:
table.add_row(
scene_name,
util.check_mark(scene_name == active_scene, empty_if_false=True),
)
console.out.print(table)
@app.command(name=['current', 'get'])
def current(
preview: bool = False,
*,
ctx: Annotated[Context, Parameter(parse=False)],
):
"""Get the current program scene or preview scene.
Parameters
----------
preview : bool
If True, get the preview scene instead of the program scene.
ctx : Context
The context containing the OBS client and configuration.
"""
if preview and not validate.studio_mode_enabled(ctx):
raise OBSWSCLIError(
'Studio mode is not enabled, cannot get preview scene.',
code=ExitCode.ERROR,
)
out_console.print(table)
@app.command('current | get')
def current(
ctx: typer.Context,
preview: Annotated[
bool, typer.Option(help='Get the preview scene instead of the program scene')
] = False,
):
"""Get the current program scene or preview scene."""
if preview and not validate.studio_mode_enabled(ctx):
err_console.print('Studio mode is not enabled, cannot get preview scene.')
raise typer.Exit(1)
if preview:
resp = ctx.obj.get_current_preview_scene()
out_console.print(resp.current_preview_scene_name)
resp = ctx.client.get_current_preview_scene()
console.out.print(
f'Current Preview Scene: {console.highlight(ctx, resp.current_preview_scene_name)}'
)
else:
resp = ctx.obj.get_current_program_scene()
out_console.print(resp.current_program_scene_name)
resp = ctx.client.get_current_program_scene()
console.out.print(
f'Current Program Scene: {console.highlight(ctx, resp.current_program_scene_name)}'
)
@app.command('switch | set')
@app.command(name=['switch', 'set'])
def switch(
ctx: typer.Context,
scene_name: str,
preview: Annotated[
bool,
typer.Option(help='Switch to the preview scene instead of the program scene'),
] = False,
/,
preview: bool = False,
*,
ctx: Annotated[Context, Parameter(parse=False)],
):
"""Switch to a scene."""
"""Switch to a scene.
Parameters
----------
scene_name : str
The name of the scene to switch to.
preview : bool
If True, switch to the preview scene instead of the program scene.
ctx : Context
The context containing the OBS client and configuration.
"""
if preview and not validate.studio_mode_enabled(ctx):
err_console.print('Studio mode is not enabled, cannot set the preview scene.')
raise typer.Exit(1)
raise OBSWSCLIError(
'Studio mode is not enabled, cannot switch to preview scene.',
code=ExitCode.ERROR,
)
if not validate.scene_in_scenes(ctx, scene_name):
err_console.print(f"Scene '{scene_name}' not found.")
raise typer.Exit(1)
raise OBSWSCLIError(
f'Scene [yellow]{scene_name}[/yellow] not found.',
code=ExitCode.ERROR,
)
if preview:
ctx.obj.set_current_preview_scene(scene_name)
out_console.print(f'Switched to preview scene: {scene_name}')
ctx.client.set_current_preview_scene(scene_name)
console.out.print(
f'Switched to preview scene: {console.highlight(ctx, scene_name)}'
)
else:
ctx.obj.set_current_program_scene(scene_name)
out_console.print(f'Switched to program scene: {scene_name}')
ctx.client.set_current_program_scene(scene_name)
console.out.print(
f'Switched to program scene: {console.highlight(ctx, scene_name)}'
)

View File

@@ -1,69 +1,133 @@
"""module containing commands for manipulating scene collections."""
import typer
from rich.console import Console
from typing import Annotated
from cyclopts import App, Parameter
from rich.table import Table
from . import validate
from .alias import AliasGroup
from . import console, validate
from .context import Context
from .enum import ExitCode
from .error import OBSWSCLIError
app = typer.Typer(cls=AliasGroup)
out_console = Console()
err_console = Console(stderr=True)
app = App(
name='scenecollection', help='Commands for controlling scene collections in OBS.'
)
@app.callback()
def main():
"""Control scene collections in OBS."""
@app.command(name=['list', 'ls'])
def list_(
*,
ctx: Annotated[Context, Parameter(parse=False)],
):
"""List all scene collections.
Parameters
----------
ctx : Context
The context containing the OBS client and configuration.
@app.command('list | ls')
def list(ctx: typer.Context):
"""List all scene collections."""
resp = ctx.obj.get_scene_collection_list()
"""
resp = ctx.client.get_scene_collection_list()
table = Table(title='Scene Collections', padding=(0, 2))
table.add_column('Scene Collection Name', justify='left', style='cyan')
table = Table(
title='Scene Collections',
padding=(0, 2),
border_style=ctx.style.border,
)
table.add_column('Scene Collection Name', justify='left', style=ctx.style.column)
if not resp.scene_collections:
console.out.print('No scene collections found.')
return
for scene_collection_name in resp.scene_collections:
table.add_row(scene_collection_name)
out_console.print(table)
console.out.print(table)
@app.command('current | get')
def current(ctx: typer.Context):
"""Get the current scene collection."""
resp = ctx.obj.get_scene_collection_list()
out_console.print(resp.current_scene_collection_name)
@app.command(name=['current', 'get'])
def current(
*,
ctx: Annotated[Context, Parameter(parse=False)],
):
"""Get the current scene collection.
Parameters
----------
ctx : Context
The context containing the OBS client and configuration.
"""
resp = ctx.client.get_scene_collection_list()
console.out.print(
f'Current scene collection: {console.highlight(ctx, resp.current_scene_collection_name)}'
)
@app.command('switch | set')
def switch(ctx: typer.Context, scene_collection_name: str):
"""Switch to a scene collection."""
@app.command(name=['switch', 'set'])
def switch(
scene_collection_name: str,
/,
*,
ctx: Annotated[Context, Parameter(parse=False)],
):
"""Switch to a scene collection.
Parameters
----------
scene_collection_name : str
The name of the scene collection to switch to.
ctx : Context
The context containing the OBS client and configuration.
"""
if not validate.scene_collection_in_scene_collections(ctx, scene_collection_name):
err_console.print(f"Scene collection '{scene_collection_name}' not found.")
raise typer.Exit(1)
raise OBSWSCLIError(
f'Scene collection [yellow]{scene_collection_name}[/yellow] not found.',
exit_code=ExitCode.ERROR,
)
current_scene_collection = (
ctx.obj.get_scene_collection_list().current_scene_collection_name
ctx.client.get_scene_collection_list().current_scene_collection_name
)
if scene_collection_name == current_scene_collection:
err_console.print(
f'Scene collection "{scene_collection_name}" is already active.'
raise OBSWSCLIError(
f'Scene collection [yellow]{scene_collection_name}[/yellow] is already active.',
exit_code=ExitCode.ERROR,
)
raise typer.Exit(1)
ctx.obj.set_current_scene_collection(scene_collection_name)
out_console.print(f"Switched to scene collection '{scene_collection_name}'")
ctx.client.set_current_scene_collection(scene_collection_name)
console.out.print(
f'Switched to scene collection {console.highlight(ctx, scene_collection_name)}.'
)
@app.command('create | new')
def create(ctx: typer.Context, scene_collection_name: str):
"""Create a new scene collection."""
@app.command(name=['create', 'new'])
def create(
scene_collection_name: str,
/,
*,
ctx: Annotated[Context, Parameter(parse=False)],
):
"""Create a new scene collection.
Parameters
----------
scene_collection_name : str
The name of the scene collection to create.
ctx : Context
The context containing the OBS client and configuration.
"""
if validate.scene_collection_in_scene_collections(ctx, scene_collection_name):
err_console.print(f"Scene collection '{scene_collection_name}' already exists.")
raise typer.Exit(1)
raise OBSWSCLIError(
f'Scene collection [yellow]{scene_collection_name}[/yellow] already exists.',
exit_code=ExitCode.ERROR,
)
ctx.obj.create_scene_collection(scene_collection_name)
out_console.print(f'Created scene collection {scene_collection_name}')
ctx.client.create_scene_collection(scene_collection_name)
console.out.print(
f'Created scene collection {console.highlight(ctx, scene_collection_name)}.'
)

View File

@@ -1,42 +1,50 @@
"""module containing commands for manipulating items in scenes."""
from collections.abc import Callable
from typing import Annotated, Optional
import obsws_python as obsws
import typer
from rich.console import Console
from cyclopts import App, Parameter
from rich.table import Table
from . import validate
from .alias import AliasGroup
from . import console, util, validate
from .context import Context
from .enum import ExitCode
from .error import OBSWSCLIError
app = typer.Typer(cls=AliasGroup)
out_console = Console()
err_console = Console(stderr=True)
app = App(name='sceneitem', help='Commands for controlling scene items in OBS.')
@app.callback()
def main():
"""Control items in OBS scenes."""
@app.command('list | ls')
def list(
ctx: typer.Context,
scene_name: str = typer.Argument(
None, help='Scene name (optional, defaults to current scene)'
),
@app.command(name=['list', 'ls'])
def list_(
scene_name: Optional[str] = None,
/,
uuid: bool = False,
*,
ctx: Annotated[Context, Parameter(parse=False)],
):
"""List all items in a scene."""
"""List all items in a scene.
Parameters
----------
scene_name : str, optional
The name of the scene to list items for. If not provided, the current program scene
will be used.
uuid : bool
Show UUIDs of scene items.
ctx : Context
The context containing the OBS client and configuration.
"""
if not scene_name:
scene_name = ctx.obj.get_current_program_scene().scene_name
scene_name = ctx.client.get_current_program_scene().scene_name
if not validate.scene_in_scenes(ctx, scene_name):
err_console.print(f"Scene '{scene_name}' not found.")
raise typer.Exit(1)
console.err.print(f'Scene [yellow]{scene_name}[/yellow] not found.')
raise OBSWSCLIError(
f'Scene [yellow]{scene_name}[/yellow] not found.',
exit_code=ExitCode.ERROR,
)
resp = ctx.obj.get_scene_item_list(scene_name)
resp = ctx.client.get_scene_item_list(scene_name)
items = sorted(
(
(
@@ -44,6 +52,7 @@ def list(
item.get('sourceName'),
item.get('isGroup'),
item.get('sceneItemEnabled'),
item.get('sourceUuid', 'N/A'), # Include source UUID
)
for item in resp.scene_items
),
@@ -51,213 +60,284 @@ def list(
)
if not items:
out_console.print(f"No items found in scene '{scene_name}'.")
raise typer.Exit()
table = Table(title=f'Items in Scene: {scene_name}', padding=(0, 2))
for column in ('Item ID', 'Item Name', 'In Group', 'Enabled'):
table.add_column(
column, justify='left' if column == 'Item Name' else 'center', style='cyan'
console.out.print(
f'No items found in scene {console.highlight(ctx, scene_name)}.'
)
return
for item_id, item_name, is_group, is_enabled in items:
table = Table(
title=f'Items in Scene: {scene_name}',
padding=(0, 2),
border_style=ctx.style.border,
)
if uuid:
columns = [
('Item ID', 'center', ctx.style.column),
('Item Name', 'left', ctx.style.column),
('In Group', 'left', ctx.style.column),
('Enabled', 'center', None),
('UUID', 'left', ctx.style.column),
]
else:
columns = [
('Item ID', 'center', ctx.style.column),
('Item Name', 'left', ctx.style.column),
('In Group', 'left', ctx.style.column),
('Enabled', 'center', None),
]
# Add columns to the table
for heading, justify, style in columns:
table.add_column(heading, justify=justify, style=style)
for item_id, item_name, is_group, is_enabled, source_uuid in items:
if is_group:
resp = ctx.obj.get_group_scene_item_list(item_name)
resp = ctx.client.get_group_scene_item_list(item_name)
group_items = sorted(
(
(
gi.get('sceneItemId'),
gi.get('sourceName'),
gi.get('sceneItemEnabled'),
gi.get('sourceUuid', 'N/A'), # Include source UUID
)
for gi in resp.scene_items
),
key=lambda x: x[0], # Sort by sceneItemId
)
for group_item_id, group_item_name, group_item_enabled in group_items:
table.add_row(
str(group_item_id),
group_item_name,
item_name,
':white_heavy_check_mark:'
if is_enabled and group_item_enabled
else ':x:',
)
for (
group_item_id,
group_item_name,
group_item_enabled,
group_item_source_uuid,
) in group_items:
if uuid:
table.add_row(
str(group_item_id),
group_item_name,
item_name,
util.check_mark(is_enabled and group_item_enabled),
group_item_source_uuid,
)
else:
table.add_row(
str(group_item_id),
group_item_name,
item_name,
util.check_mark(is_enabled and group_item_enabled),
)
else:
table.add_row(
str(item_id),
item_name,
'',
':white_heavy_check_mark:' if is_enabled else ':x:',
)
if uuid:
table.add_row(
str(item_id),
item_name,
'',
util.check_mark(is_enabled),
source_uuid,
)
else:
table.add_row(
str(item_id),
item_name,
'',
util.check_mark(is_enabled),
)
out_console.print(table)
console.out.print(table)
def _validate_scene_name_and_item_name(
func: Callable,
def _validate_sources(
ctx: Context,
scene_name: str,
item_name: str,
group: Optional[str] = None,
):
"""Validate the scene name and item name."""
if not validate.scene_in_scenes(ctx, scene_name):
raise OBSWSCLIError(
f'Scene [yellow]{scene_name}[/yellow] not found.',
exit_code=ExitCode.ERROR,
)
def wrapper(
ctx: typer.Context,
scene_name: str,
item_name: str,
group: Optional[str] = None,
):
if not validate.scene_in_scenes(ctx, scene_name):
err_console.print(f"Scene '{scene_name}' not found.")
raise typer.Exit(1)
if group:
if not validate.item_in_scene_item_list(ctx, scene_name, group):
err_console.print(
f"Parent group '{group}' not found in scene '{scene_name}'."
)
raise typer.Exit(1)
else:
if not validate.item_in_scene_item_list(ctx, scene_name, item_name):
err_console.print(
f"Item '{item_name}' not found in scene '{scene_name}'."
)
raise typer.Exit(1)
return func(ctx, scene_name, item_name, group)
return wrapper
if group:
if not validate.item_in_scene_item_list(ctx, scene_name, group):
raise OBSWSCLIError(
f'Group [yellow]{group}[/yellow] not found in scene [yellow]{scene_name}[/yellow].',
exit_code=ExitCode.ERROR,
)
else:
if not validate.item_in_scene_item_list(ctx, scene_name, item_name):
raise OBSWSCLIError(
f'Item [yellow]{item_name}[/yellow] not found in scene [yellow]{scene_name}[/yellow]. Is the item in a group? '
f'If so use the [yellow]--group[/yellow] option to specify the parent group.\n'
'Use [yellow]obsws-cli sceneitem ls[/yellow] for a list of items in the scene.',
exit_code=ExitCode.ERROR,
)
def _get_scene_name_and_item_id(
ctx: typer.Context, scene_name: str, item_name: str, group: Optional[str] = None
ctx: Context,
scene_name: str,
item_name: str,
group: Optional[str] = None,
):
"""Get the scene name and item ID for the given scene and item name."""
if group:
resp = ctx.obj.get_group_scene_item_list(group)
resp = ctx.client.get_group_scene_item_list(group)
for item in resp.scene_items:
if item.get('sourceName') == item_name:
scene_name = group
scene_item_id = item.get('sceneItemId')
break
else:
err_console.print(f"Item '{item_name}' not found in group '{group}'.")
raise typer.Exit(1)
raise OBSWSCLIError(
f'Item [yellow]{item_name}[/yellow] not found in group [yellow]{group}[/yellow].',
exit_code=ExitCode.ERROR,
)
else:
try:
resp = ctx.obj.get_scene_item_id(scene_name, item_name)
except obsws.error.OBSSDKRequestError as e:
if e.code == 600:
err_console.print(
f"Item '{item_name}' not found in scene '{scene_name}'. Is the item in a group? "
'If so use the --group option to specify the parent group. '
'See `obsws-cli sceneitem list` for a list of items in the scene.'
)
raise typer.Exit(1)
else:
raise
resp = ctx.client.get_scene_item_id(scene_name, item_name)
scene_item_id = resp.scene_item_id
return scene_name, scene_item_id
@_validate_scene_name_and_item_name
@app.command('show | sh')
@app.command(name=['show', 'sh'])
def show(
ctx: typer.Context,
scene_name: str,
item_name: str,
group: Annotated[Optional[str], typer.Option(help='Parent group name')] = None,
/,
group: Optional[str] = None,
*,
ctx: Annotated[Context, Parameter(parse=False)],
):
"""Show an item in a scene."""
"""Show an item in a scene.
Parameters
----------
scene_name : str
The name of the scene the item is in.
item_name : str
The name of the item to show in the scene.
group : str, optional
The name of the parent group the item is in, if applicable.
ctx : Context
The context containing the OBS client and configuration.
"""
_validate_sources(ctx, scene_name, item_name, group)
old_scene_name = scene_name
scene_name, scene_item_id = _get_scene_name_and_item_id(
ctx, scene_name, item_name, group
)
ctx.obj.set_scene_item_enabled(
ctx.client.set_scene_item_enabled(
scene_name=scene_name,
item_id=int(scene_item_id),
enabled=True,
)
if group:
out_console.print(
f"Item '{item_name}' in group '{group}' in scene '{scene_name}' has been shown."
console.out.print(
f'Item {console.highlight(ctx, item_name)} in group {console.highlight(ctx, group)} '
f'in scene {console.highlight(ctx, old_scene_name)} has been shown.'
)
else:
# If not in a parent group, just show the scene name
# This is to avoid confusion with the parent group name
# which is not the same as the scene name
# and is not needed in this case
out_console.print(f"Item '{item_name}' in scene '{scene_name}' has been shown.")
console.out.print(
f'Item {console.highlight(ctx, item_name)} in scene {console.highlight(ctx, scene_name)} has been shown.'
)
@_validate_scene_name_and_item_name
@app.command('hide | h')
@app.command(name=['hide', 'h'])
def hide(
ctx: typer.Context,
scene_name: str,
item_name: str,
group: Annotated[Optional[str], typer.Option(help='Parent group name')] = None,
/,
group: Optional[str] = None,
*,
ctx: Annotated[Context, Parameter(parse=False)],
):
"""Hide an item in a scene."""
"""Hide an item in a scene.
Parameters
----------
scene_name : str
The name of the scene the item is in.
item_name : str
The name of the item to hide in the scene.
group : str, optional
The name of the parent group the item is in, if applicable.
ctx : Context
The context containing the OBS client and configuration.
"""
_validate_sources(ctx, scene_name, item_name, group)
old_scene_name = scene_name
scene_name, scene_item_id = _get_scene_name_and_item_id(
ctx, scene_name, item_name, group
)
ctx.obj.set_scene_item_enabled(
ctx.client.set_scene_item_enabled(
scene_name=scene_name,
item_id=int(scene_item_id),
enabled=False,
)
if group:
out_console.print(
f"Item '{item_name}' in group '{group}' in scene '{scene_name}' has been hidden."
console.out.print(
f'Item {console.highlight(ctx, item_name)} in group {console.highlight(ctx, group)} in scene {console.highlight(ctx, old_scene_name)} has been hidden.'
)
else:
# If not in a parent group, just show the scene name
# This is to avoid confusion with the parent group name
# which is not the same as the scene name
# and is not needed in this case
out_console.print(
f"Item '{item_name}' in scene '{scene_name}' has been hidden."
console.out.print(
f'Item {console.highlight(ctx, item_name)} in scene {console.highlight(ctx, scene_name)} has been hidden.'
)
@_validate_scene_name_and_item_name
@app.command('toggle | tg')
@app.command(name=['toggle', 'tg'])
def toggle(
ctx: typer.Context,
scene_name: str,
item_name: str,
group: Annotated[Optional[str], typer.Option(help='Parent group name')] = None,
/,
group: Optional[str] = None,
*,
ctx: Annotated[Context, Parameter(parse=False)],
):
"""Toggle an item in a scene."""
if not validate.scene_in_scenes(ctx, scene_name):
err_console.print(f"Scene '{scene_name}' not found.")
raise typer.Exit(1)
"""Toggle an item in a scene.
if group:
if not validate.item_in_scene_item_list(ctx, scene_name, group):
err_console.print(
f"Parent group '{group}' not found in scene '{scene_name}'."
)
raise typer.Exit(1)
else:
if not validate.item_in_scene_item_list(ctx, scene_name, item_name):
err_console.print(f"Item '{item_name}' not found in scene '{scene_name}'.")
raise typer.Exit(1)
Parameters
----------
scene_name : str
The name of the scene the item is in.
item_name : str
The name of the item to toggle in the scene.
group : str, optional
The name of the parent group the item is in, if applicable.
ctx : Context
The context containing the OBS client and configuration.
"""
_validate_sources(ctx, scene_name, item_name, group)
old_scene_name = scene_name
scene_name, scene_item_id = _get_scene_name_and_item_id(
ctx, scene_name, item_name, group
)
enabled = ctx.obj.get_scene_item_enabled(
enabled = ctx.client.get_scene_item_enabled(
scene_name=scene_name,
item_id=int(scene_item_id),
)
new_state = not enabled.scene_item_enabled
ctx.obj.set_scene_item_enabled(
ctx.client.set_scene_item_enabled(
scene_name=scene_name,
item_id=int(scene_item_id),
enabled=new_state,
@@ -265,12 +345,14 @@ def toggle(
if group:
if new_state:
out_console.print(
f"Item '{item_name}' in group '{group}' in scene '{scene_name}' has been shown."
console.out.print(
f'Item {console.highlight(ctx, item_name)} in group {console.highlight(ctx, group)} '
f'in scene {console.highlight(ctx, old_scene_name)} has been shown.'
)
else:
out_console.print(
f"Item '{item_name}' in group '{group}' in scene '{scene_name}' has been hidden."
console.out.print(
f'Item {console.highlight(ctx, item_name)} in group {console.highlight(ctx, group)} '
f'in scene {console.highlight(ctx, old_scene_name)} has been hidden.'
)
else:
# If not in a parent group, just show the scene name
@@ -278,123 +360,135 @@ def toggle(
# which is not the same as the scene name
# and is not needed in this case
if new_state:
out_console.print(
f"Item '{item_name}' in scene '{scene_name}' has been shown."
console.out.print(
f'Item {console.highlight(ctx, item_name)} in scene {console.highlight(ctx, scene_name)} has been shown.'
)
else:
out_console.print(
f"Item '{item_name}' in scene '{scene_name}' has been hidden."
console.out.print(
f'Item {console.highlight(ctx, item_name)} in scene {console.highlight(ctx, scene_name)} has been hidden.'
)
@_validate_scene_name_and_item_name
@app.command('visible | v')
@app.command(name=['visible', 'v'])
def visible(
ctx: typer.Context,
scene_name: str,
item_name: str,
group: Annotated[Optional[str], typer.Option(help='Parent group name')] = None,
/,
group: Optional[str] = None,
*,
ctx: Annotated[Context, Parameter(parse=False)],
):
"""Check if an item in a scene is visible."""
if group:
if not validate.item_in_scene_item_list(ctx, scene_name, group):
err_console.print(
f"Parent group '{group}' not found in scene '{scene_name}'."
)
raise typer.Exit(1)
else:
if not validate.item_in_scene_item_list(ctx, scene_name, item_name):
err_console.print(f"Item '{item_name}' not found in scene '{scene_name}'.")
raise typer.Exit(1)
"""Check if an item in a scene is visible.
Parameters
----------
scene_name : str
The name of the scene the item is in.
item_name : str
The name of the item to check visibility in the scene.
group : str, optional
The name of the parent group the item is in, if applicable.
ctx : Context
The context containing the OBS client and configuration.
"""
_validate_sources(ctx, scene_name, item_name, group)
old_scene_name = scene_name
scene_name, scene_item_id = _get_scene_name_and_item_id(
ctx, scene_name, item_name, group
)
enabled = ctx.obj.get_scene_item_enabled(
enabled = ctx.client.get_scene_item_enabled(
scene_name=scene_name,
item_id=int(scene_item_id),
)
if group:
out_console.print(
f"Item '{item_name}' in group '{group}' in scene '{old_scene_name}' is currently {'visible' if enabled.scene_item_enabled else 'hidden'}."
console.out.print(
f'Item {console.highlight(ctx, item_name)} in group {console.highlight(ctx, group)} '
f'in scene {console.highlight(ctx, old_scene_name)} is currently {"visible" if enabled.scene_item_enabled else "hidden"}.'
)
else:
# If not in a parent group, just show the scene name
# This is to avoid confusion with the parent group name
# which is not the same as the scene name
# and is not needed in this case
out_console.print(
f"Item '{item_name}' in scene '{scene_name}' is currently {'visible' if enabled.scene_item_enabled else 'hidden'}."
console.out.print(
f'Item {console.highlight(ctx, item_name)} in scene {console.highlight(ctx, scene_name)} '
f'is currently {"visible" if enabled.scene_item_enabled else "hidden"}.'
)
@_validate_scene_name_and_item_name
@app.command('transform | t')
@app.command(name=['transform', 't'])
def transform(
ctx: typer.Context,
scene_name: str,
item_name: str,
group: Annotated[Optional[str], typer.Option(help='Parent group name')] = None,
alignment: Annotated[
Optional[int], typer.Option(help='Alignment of the item in the scene')
] = None,
bounds_alignment: Annotated[
Optional[int], typer.Option(help='Bounds alignment of the item in the scene')
] = None,
bounds_height: Annotated[
Optional[float], typer.Option(help='Height of the item in the scene')
] = None,
bounds_type: Annotated[
Optional[str], typer.Option(help='Type of bounds for the item in the scene')
] = None,
bounds_width: Annotated[
Optional[float], typer.Option(help='Width of the item in the scene')
] = None,
crop_to_bounds: Annotated[
Optional[bool], typer.Option(help='Crop the item to the bounds')
] = None,
crop_bottom: Annotated[
Optional[float], typer.Option(help='Bottom crop of the item in the scene')
] = None,
crop_left: Annotated[
Optional[float], typer.Option(help='Left crop of the item in the scene')
] = None,
crop_right: Annotated[
Optional[float], typer.Option(help='Right crop of the item in the scene')
] = None,
crop_top: Annotated[
Optional[float], typer.Option(help='Top crop of the item in the scene')
] = None,
position_x: Annotated[
Optional[float], typer.Option(help='X position of the item in the scene')
] = None,
position_y: Annotated[
Optional[float], typer.Option(help='Y position of the item in the scene')
] = None,
rotation: Annotated[
Optional[float], typer.Option(help='Rotation of the item in the scene')
] = None,
scale_x: Annotated[
Optional[float], typer.Option(help='X scale of the item in the scene')
] = None,
scale_y: Annotated[
Optional[float], typer.Option(help='Y scale of the item in the scene')
] = None,
/,
group: Optional[str] = None,
alignment: Optional[int] = None,
bounds_alignment: Optional[int] = None,
bounds_height: Optional[float] = None,
bounds_type: Optional[str] = None,
bounds_width: Optional[float] = None,
crop_to_bounds: Optional[bool] = None,
crop_bottom: Optional[float] = None,
crop_left: Optional[float] = None,
crop_right: Optional[float] = None,
crop_top: Optional[float] = None,
position_x: Optional[float] = None,
position_y: Optional[float] = None,
rotation: Optional[float] = None,
scale_x: Optional[float] = None,
scale_y: Optional[float] = None,
*,
ctx: Annotated[Context, Parameter(parse=False)],
):
"""Set the transform of an item in a scene."""
if group:
if not validate.item_in_scene_item_list(ctx, scene_name, group):
err_console.print(
f"Parent group '{group}' not found in scene '{scene_name}'."
)
raise typer.Exit(1)
else:
if not validate.item_in_scene_item_list(ctx, scene_name, item_name):
err_console.print(f"Item '{item_name}' not found in scene '{scene_name}'.")
raise typer.Exit(1)
"""Set the transform of an item in a scene.
Parameters
----------
scene_name : str
The name of the scene the item is in.
item_name : str
The name of the item to transform in the scene.
group : str, optional
The name of the parent group the item is in, if applicable.
alignment : int, optional
Alignment of the item in the scene.
bounds_alignment : int, optional
Bounds alignment of the item in the scene.
bounds_height : float, optional
Height of the item in the scene.
bounds_type : str, optional
Type of bounds for the item in the scene.
bounds_width : float, optional
Width of the item in the scene.
crop_to_bounds : bool, optional
Crop the item to the bounds.
crop_bottom : float, optional
Bottom crop of the item in the scene.
crop_left : float, optional
Left crop of the item in the scene.
crop_right : float, optional
Right crop of the item in the scene.
crop_top : float, optional
Top crop of the item in the scene.
position_x : float, optional
X position of the item in the scene.
position_y : float, optional
Y position of the item in the scene.
rotation : float, optional
Rotation of the item in the scene.
scale_x : float, optional
X scale of the item in the scene.
scale_y : float, optional
Y scale of the item in the scene.
ctx : Context
The context containing the OBS client and configuration.
"""
_validate_sources(ctx, scene_name, item_name, group)
old_scene_name = scene_name
scene_name, scene_item_id = _get_scene_name_and_item_id(
@@ -434,24 +528,27 @@ def transform(
transform['scaleY'] = scale_y
if not transform:
err_console.print('No transform options provided.')
raise typer.Exit(1)
raise OBSWSCLIError(
'No transform options provided. Use at least one of the transform options.',
exit_code=ExitCode.ERROR,
)
transform = ctx.obj.set_scene_item_transform(
transform = ctx.client.set_scene_item_transform(
scene_name=scene_name,
item_id=int(scene_item_id),
transform=transform,
)
if group:
out_console.print(
f"Item '{item_name}' in group '{group}' in scene '{old_scene_name}' has been transformed."
console.out.print(
f'Item {console.highlight(ctx, item_name)} in group {console.highlight(ctx, group)} '
f'in scene {console.highlight(ctx, old_scene_name)} has been transformed.'
)
else:
# If not in a parent group, just show the scene name
# This is to avoid confusion with the parent group name
# which is not the same as the scene name
# and is not needed in this case
out_console.print(
f"Item '{item_name}' in scene '{scene_name}' has been transformed."
console.out.print(
f'Item {console.highlight(ctx, item_name)} in scene {console.highlight(ctx, scene_name)} has been transformed.'
)

75
obsws_cli/screenshot.py Normal file
View File

@@ -0,0 +1,75 @@
"""module for taking screenshots using OBS WebSocket API."""
from pathlib import Path
from typing import Annotated
import obsws_python as obsws
from cyclopts import App, Parameter, validators
from . import console
from .context import Context
from .enum import ExitCode
from .error import OBSWSCLIError
app = App(name='screenshot', help='Commands for taking screenshots using OBS.')
@app.command(name=['save', 'sv'])
def save(
source_name: str,
# Since the CLI and OBS may be running on different platforms,
# we won't validate the path here.
output_path: Path,
/,
width: float = 1920,
height: float = 1080,
quality: Annotated[
float, Parameter(validator=validators.Number(gte=-1, lte=100))
] = -1.0,
*,
ctx: Annotated[Context, Parameter(parse=False)],
):
"""Take a screenshot and save it to a file.
Parameters
----------
source_name : str
Name of the source to take a screenshot of.
output_path : Path
Path to save the screenshot (must include file name and extension).
width : float
Width of the screenshot.
height : float
Height of the screenshot.
quality : float
Quality of the screenshot. A value of -1 uses the default quality.
ctx : Context
Context containing the OBS WebSocket client instance.
"""
try:
ctx.client.save_source_screenshot(
name=source_name,
img_format=output_path.suffix.lstrip('.').lower(),
file_path=str(output_path),
width=width,
height=height,
quality=quality,
)
except obsws.error.OBSSDKRequestError as e:
match e.code:
case 403:
raise OBSWSCLIError(
'The [yellow]image format[/yellow] (file extension) must be included in the file name, '
"for example: '/path/to/screenshot.png'.",
code=ExitCode.ERROR,
)
case 600:
raise OBSWSCLIError(
'No source was found by the name of [yellow]{source_name}[/yellow]',
code=ExitCode.ERROR,
)
case _:
raise
console.out.print(f'Screenshot saved to {console.highlight(ctx, output_path)}.')

View File

@@ -1,51 +0,0 @@
"""module for settings management for obsws-cli."""
from collections import UserDict
from pathlib import Path
from dotenv import dotenv_values
class Settings(UserDict):
"""Settings for the OBS WebSocket client."""
def __init__(self, *args, **kwargs):
"""Initialize the Settings object."""
kwargs.update(
{
**dotenv_values('.env'),
**dotenv_values(Path.home() / '.config' / 'obsws-cli' / 'obsws.env'),
}
)
super().__init__(*args, **kwargs)
def __getitem__(self, key):
"""Get a setting value by key."""
if not key.startswith('OBS_'):
key = f'OBS_{key.upper()}'
return self.data[key]
def __setitem__(self, key, value):
"""Set a setting value by key."""
self.data[key] = value
_settings = Settings(
OBS_HOST='localhost', OBS_PORT=4455, OBS_PASSWORD='', OBS_TIMEOUT=5
)
def get(key: str):
"""Get a setting value by key.
Args:
key (str): The key of the setting to retrieve.
Returns:
The value of the setting.
Raises:
KeyError: If the key does not exist in the settings.
"""
return _settings[key]

View File

@@ -1,63 +1,104 @@
"""module for controlling OBS stream functionality."""
import typer
from rich.console import Console
from typing import Annotated
from .alias import AliasGroup
from cyclopts import App, Parameter
app = typer.Typer(cls=AliasGroup)
out_console = Console()
err_console = Console(stderr=True)
from . import console
from .context import Context
from .enum import ExitCode
from .error import OBSWSCLIError
app = App(name='stream', help='Commands for controlling OBS stream functionality.')
@app.callback()
def main():
"""Control OBS stream functionality."""
def _get_streaming_status(ctx: typer.Context) -> tuple:
def _get_streaming_status(ctx: Context) -> tuple:
"""Get streaming status."""
resp = ctx.obj.get_stream_status()
resp = ctx.client.get_stream_status()
return resp.output_active, resp.output_duration
@app.command('start | s')
def start(ctx: typer.Context):
"""Start streaming."""
@app.command(name=['start', 's'])
def start(
*,
ctx: Annotated[Context, Parameter(parse=False)],
):
"""Start streaming.
Parameters
----------
ctx : Context
Context containing the OBS WebSocket client instance.
"""
active, _ = _get_streaming_status(ctx)
if active:
err_console.print('Streaming is already in progress, cannot start.')
raise typer.Exit(1)
raise OBSWSCLIError(
'Streaming is already in progress, cannot start.',
code=ExitCode.ERROR,
)
ctx.obj.start_stream()
out_console.print('Streaming started successfully.')
ctx.client.start_stream()
console.out.print('Streaming started successfully.')
@app.command('stop | st')
def stop(ctx: typer.Context):
"""Stop streaming."""
@app.command(name=['stop', 'st'])
def stop(
*,
ctx: Annotated[Context, Parameter(parse=False)],
):
"""Stop streaming.
Parameters
----------
ctx : Context
Context containing the OBS WebSocket client instance.
"""
active, _ = _get_streaming_status(ctx)
if not active:
err_console.print('Streaming is not in progress, cannot stop.')
raise typer.Exit(1)
raise OBSWSCLIError(
'Streaming is not in progress, cannot stop.',
code=ExitCode.ERROR,
)
ctx.obj.stop_stream()
out_console.print('Streaming stopped successfully.')
ctx.client.stop_stream()
console.out.print('Streaming stopped successfully.')
@app.command('toggle | tg')
def toggle(ctx: typer.Context):
"""Toggle streaming."""
resp = ctx.obj.toggle_stream()
@app.command(name=['toggle', 'tg'])
def toggle(
*,
ctx: Annotated[Context, Parameter(parse=False)],
):
"""Toggle streaming.
Parameters
----------
ctx : Context
Context containing the OBS WebSocket client instance.
"""
resp = ctx.client.toggle_stream()
if resp.output_active:
out_console.print('Streaming started successfully.')
console.out.print('Streaming started successfully.')
else:
out_console.print('Streaming stopped successfully.')
console.out.print('Streaming stopped successfully.')
@app.command('status | ss')
def status(ctx: typer.Context):
"""Get streaming status."""
@app.command(name=['status', 'ss'])
def status(
*,
ctx: Annotated[Context, Parameter(parse=False)],
):
"""Get streaming status.
Parameters
----------
ctx : Context
Context containing the OBS WebSocket client instance.
"""
active, duration = _get_streaming_status(ctx)
if active:
if duration > 0:
@@ -65,19 +106,19 @@ def status(ctx: typer.Context):
minutes = int(seconds // 60)
seconds = int(seconds % 60)
if minutes > 0:
out_console.print(
console.out.print(
f'Streaming is in progress for {minutes} minutes and {seconds} seconds.'
)
else:
if seconds > 0:
out_console.print(
console.out.print(
f'Streaming is in progress for {seconds} seconds.'
)
else:
out_console.print(
console.out.print(
'Streaming is in progress for less than a second.'
)
else:
out_console.print('Streaming is in progress.')
console.out.print('Streaming is in progress.')
else:
out_console.print('Streaming is not in progress.')
console.out.print('Streaming is not in progress.')

View File

@@ -1,51 +1,86 @@
"""module containing commands for manipulating studio mode in OBS."""
import typer
from rich.console import Console
from typing import Annotated
from .alias import AliasGroup
from cyclopts import App, Parameter
app = typer.Typer(cls=AliasGroup)
out_console = Console()
err_console = Console(stderr=True)
from . import console
from .context import Context
app = App(name='studiomode', help='Commands for controlling studio mode in OBS.')
@app.callback()
def main():
"""Control studio mode in OBS."""
@app.command(name=['enable', 'on'])
def enable(
*,
ctx: Annotated[Context, Parameter(parse=False)],
):
"""Enable studio mode.
Parameters
----------
ctx : Context
Context containing the OBS WebSocket client instance.
"""
ctx.client.set_studio_mode_enabled(True)
console.out.print('Studio mode has been enabled.')
@app.command('enable | on')
def enable(ctx: typer.Context):
"""Enable studio mode."""
ctx.obj.set_studio_mode_enabled(True)
out_console.print('Studio mode has been enabled.')
@app.command(name=['disable', 'off'])
def disable(
*,
ctx: Annotated[Context, Parameter(parse=False)],
):
"""Disable studio mode.
Parameters
----------
ctx : Context
Context containing the OBS WebSocket client instance.
"""
ctx.client.set_studio_mode_enabled(False)
console.out.print('Studio mode has been disabled.')
@app.command('disable | off')
def disable(ctx: typer.Context):
"""Disable studio mode."""
ctx.obj.set_studio_mode_enabled(False)
out_console.print('Studio mode has been disabled.')
@app.command(name=['toggle', 'tg'])
def toggle(
*,
ctx: Annotated[Context, Parameter(parse=False)],
):
"""Toggle studio mode.
Parameters
----------
ctx : Context
Context containing the OBS WebSocket client instance.
@app.command('toggle | tg')
def toggle(ctx: typer.Context):
"""Toggle studio mode."""
resp = ctx.obj.get_studio_mode_enabled()
"""
resp = ctx.client.get_studio_mode_enabled()
if resp.studio_mode_enabled:
ctx.obj.set_studio_mode_enabled(False)
out_console.print('Studio mode is now disabled.')
ctx.client.set_studio_mode_enabled(False)
console.out.print('Studio mode is now disabled.')
else:
ctx.obj.set_studio_mode_enabled(True)
out_console.print('Studio mode is now enabled.')
ctx.client.set_studio_mode_enabled(True)
console.out.print('Studio mode is now enabled.')
@app.command('status | ss')
def status(ctx: typer.Context):
"""Get the status of studio mode."""
resp = ctx.obj.get_studio_mode_enabled()
@app.command(name=['status', 'ss'])
def status(
*,
ctx: Annotated[Context, Parameter(parse=False)],
):
"""Get the status of studio mode.
Parameters
----------
ctx : Context
Context containing the OBS WebSocket client instance.
"""
resp = ctx.client.get_studio_mode_enabled()
if resp.studio_mode_enabled:
out_console.print('Studio mode is enabled.')
console.out.print('Studio mode is enabled.')
else:
out_console.print('Studio mode is disabled.')
console.out.print('Studio mode is disabled.')

183
obsws_cli/styles.py Normal file
View File

@@ -0,0 +1,183 @@
"""module containing styles for the OBS WebSocket CLI."""
import os
from dataclasses import dataclass
registry = {}
def register_style(cls):
"""Register a style class."""
key = cls.__name__.lower()
if key in registry:
raise ValueError(f'Style {key} is already registered.')
registry[key] = cls
return cls
@dataclass
class Style:
"""Base class for styles."""
name: str
border: str
column: str
highlight: str
no_border: bool = False
def __post_init__(self):
"""Post-initialization to set default values and normalize the name."""
self.name = self.name.lower()
if self.no_border:
self.border = None
@register_style
@dataclass
class Disabled(Style):
"""Disabled style."""
name: str = 'disabled'
border: str = 'none'
column: str = 'none'
highlight: str = 'none'
@register_style
@dataclass
class Red(Style):
"""Red style."""
name: str = 'red'
border: str = 'red3'
column: str = 'red1'
highlight: str = 'red1'
@register_style
@dataclass
class Magenta(Style):
"""Magenta style."""
name: str = 'magenta'
border: str = 'magenta3'
column: str = 'orchid1'
highlight: str = 'orchid1'
@register_style
@dataclass
class Purple(Style):
"""Purple style."""
name: str = 'purple'
border: str = 'medium_purple4'
column: str = 'medium_purple'
highlight: str = 'medium_purple'
@register_style
@dataclass
class Blue(Style):
"""Blue style."""
name: str = 'blue'
border: str = 'cornflower_blue'
column: str = 'sky_blue2'
highlight: str = 'sky_blue2'
@register_style
@dataclass
class Cyan(Style):
"""Cyan style."""
name: str = 'cyan'
border: str = 'dark_cyan'
column: str = 'cyan'
highlight: str = 'cyan'
@register_style
@dataclass
class Green(Style):
"""Green style."""
name: str = 'green'
border: str = 'green4'
column: str = 'spring_green3'
highlight: str = 'spring_green3'
@register_style
@dataclass
class Yellow(Style):
"""Yellow style."""
name: str = 'yellow'
border: str = 'yellow3'
column: str = 'wheat1'
highlight: str = 'wheat1'
@register_style
@dataclass
class Orange(Style):
"""Orange style."""
name: str = 'orange'
border: str = 'dark_orange'
column: str = 'orange1'
highlight: str = 'orange1'
@register_style
@dataclass
class White(Style):
"""White style."""
name: str = 'white'
border: str = 'grey82'
column: str = 'grey100'
highlight: str = 'grey100'
@register_style
@dataclass
class Grey(Style):
"""Grey style."""
name: str = 'grey'
border: str = 'grey50'
column: str = 'grey70'
highlight: str = 'grey70'
@register_style
@dataclass
class Navy(Style):
"""Navy Blue style."""
name: str = 'navyblue'
border: str = 'deep_sky_blue4'
column: str = 'light_sky_blue3'
highlight: str = 'light_sky_blue3'
@register_style
@dataclass
class Black(Style):
"""Black style."""
name: str = 'black'
border: str = 'grey19'
column: str = 'grey11'
highlight: str = 'grey11'
def request_style_obj(style_name: str, no_border: bool) -> Style:
"""Entry point for style objects. Returns a Style object based on the style name."""
if style_name == 'disabled':
os.environ['NO_COLOR'] = '1'
return registry[style_name.lower()](no_border=no_border)

94
obsws_cli/text.py Normal file
View File

@@ -0,0 +1,94 @@
"""module containing commands for manipulating text inputs."""
from typing import Annotated, Optional
from cyclopts import App, Parameter
from . import console, validate
from .context import Context
from .enum import ExitCode
from .error import OBSWSCLIError
app = App(name='text', help='Commands for controlling text inputs in OBS.')
@app.command(name=['current', 'get'])
def current(
input_name: str,
*,
ctx: Annotated[Context, Parameter(parse=False)],
):
"""Get the current text for a text input.
Parameters
----------
input_name : str
The name of the text input to retrieve the current text from.
ctx : Context
The context containing the OBS client and other settings.
"""
if not validate.input_in_inputs(ctx, input_name):
raise OBSWSCLIError(
f'Input [yellow]{input_name}[/yellow] not found.', code=ExitCode.ERROR
)
resp = ctx.client.get_input_settings(name=input_name)
if not resp.input_kind.startswith('text_'):
raise OBSWSCLIError(
f'Input [yellow]{input_name}[/yellow] is not a text input.',
code=ExitCode.ERROR,
)
current_text = resp.input_settings.get('text', '')
if not current_text:
current_text = '(empty)'
console.out.print(
f'Current text for input {console.highlight(ctx, input_name)}: {current_text}',
)
@app.command(name=['update', 'set'])
def update(
input_name: str,
new_text: Optional[str] = None,
/,
*,
ctx: Annotated[Context, Parameter(parse=False)],
):
"""Update the text of a text input.
Parameters
----------
input_name : str
The name of the text input to update.
new_text : Optional[str]
The new text to set for the input. If not provided, the text will be cleared
(set to an empty string).
ctx : Context
The context containing the OBS client and other settings.
"""
if not validate.input_in_inputs(ctx, input_name):
raise OBSWSCLIError(
f'Input [yellow]{input_name}[/yellow] not found.', code=ExitCode.ERROR
)
resp = ctx.client.get_input_settings(name=input_name)
if not resp.input_kind.startswith('text_'):
raise OBSWSCLIError(
f'Input [yellow]{input_name}[/yellow] is not a text input.',
code=ExitCode.ERROR,
)
ctx.client.set_input_settings(
name=input_name,
settings={'text': new_text},
overlay=True,
)
if not new_text:
new_text = '(empty)'
console.out.print(
f'Text for input {console.highlight(ctx, input_name)} updated to: {new_text}',
)

View File

@@ -1,6 +1,22 @@
"""module contains utility functions for the obsws_cli package."""
import os
def snakecase_to_titlecase(snake_str):
def snakecase_to_titlecase(snake_str: str) -> str:
"""Convert a snake_case string to a title case string."""
return snake_str.replace('_', ' ').title()
def check_mark(value: bool, empty_if_false: bool = False) -> str:
"""Return a check mark or cross mark based on the boolean value."""
if empty_if_false and not value:
return ''
# rich gracefully handles the absence of colour throughout the rest of the application,
# but here we must handle it manually.
# If NO_COLOR is set, we return plain text symbols.
# Otherwise, we return coloured symbols.
if os.getenv('NO_COLOR', '') != '':
return '' if value else ''
return '' if value else ''

View File

@@ -1,48 +1,49 @@
"""module containing validation functions."""
import typer
# type alias for an option that is skipped when the command is run
skipped_option = typer.Option(parser=lambda _: _, hidden=True, expose_value=False)
from .context import Context
def input_in_inputs(ctx: typer.Context, input_name: str) -> bool:
def input_in_inputs(ctx: Context, input_name: str) -> bool:
"""Check if an input is in the input list."""
inputs = ctx.obj.get_input_list().inputs
inputs = ctx.client.get_input_list().inputs
return any(input_.get('inputName') == input_name for input_ in inputs)
def scene_in_scenes(ctx: typer.Context, scene_name: str) -> bool:
def scene_in_scenes(ctx: Context, scene_name: str) -> bool:
"""Check if a scene exists in the list of scenes."""
resp = ctx.obj.get_scene_list()
resp = ctx.client.get_scene_list()
return any(scene.get('sceneName') == scene_name for scene in resp.scenes)
def studio_mode_enabled(ctx: typer.Context) -> bool:
def studio_mode_enabled(ctx: Context) -> bool:
"""Check if studio mode is enabled."""
resp = ctx.obj.get_studio_mode_enabled()
resp = ctx.client.get_studio_mode_enabled()
return resp.studio_mode_enabled
def scene_collection_in_scene_collections(
ctx: typer.Context, scene_collection_name: str
ctx: Context, scene_collection_name: str
) -> bool:
"""Check if a scene collection exists."""
resp = ctx.obj.get_scene_collection_list()
resp = ctx.client.get_scene_collection_list()
return any(
collection == scene_collection_name for collection in resp.scene_collections
)
def item_in_scene_item_list(
ctx: typer.Context, scene_name: str, item_name: str
) -> bool:
def item_in_scene_item_list(ctx: Context, scene_name: str, item_name: str) -> bool:
"""Check if an item exists in a scene."""
resp = ctx.obj.get_scene_item_list(scene_name)
resp = ctx.client.get_scene_item_list(scene_name)
return any(item.get('sourceName') == item_name for item in resp.scene_items)
def profile_exists(ctx: typer.Context, profile_name: str) -> bool:
def profile_exists(ctx: Context, profile_name: str) -> bool:
"""Check if a profile exists."""
resp = ctx.obj.get_profile_list()
resp = ctx.client.get_profile_list()
return any(profile == profile_name for profile in resp.profiles)
def monitor_exists(ctx: Context, monitor_index: int) -> bool:
"""Check if a monitor exists."""
resp = ctx.client.get_monitor_list()
return any(monitor['monitorIndex'] == monitor_index for monitor in resp.monitors)

View File

@@ -1,49 +1,84 @@
"""module containing commands for manipulating virtual camera in OBS."""
import typer
from rich.console import Console
from typing import Annotated
from .alias import AliasGroup
from cyclopts import App, Parameter
app = typer.Typer(cls=AliasGroup)
out_console = Console()
err_console = Console(stderr=True)
from . import console
from .context import Context
app = App(name='virtualcam', help='Commands for controlling the virtual camera in OBS.')
@app.callback()
def main():
"""Control virtual camera in OBS."""
@app.command(name=['start', 's'])
def start(
*,
ctx: Annotated[Context, Parameter(parse=False)],
):
"""Start the virtual camera.
Parameters
----------
ctx : Context
The context containing the OBS client and other settings.
"""
ctx.client.start_virtual_cam()
console.out.print('Virtual camera started.')
@app.command('start | s')
def start(ctx: typer.Context):
"""Start the virtual camera."""
ctx.obj.start_virtual_cam()
out_console.print('Virtual camera started.')
@app.command(name=['stop', 'p'])
def stop(
*,
ctx: Annotated[Context, Parameter(parse=False)],
):
"""Stop the virtual camera.
Parameters
----------
ctx : Context
The context containing the OBS client and other settings.
"""
ctx.client.stop_virtual_cam()
console.out.print('Virtual camera stopped.')
@app.command('stop | p')
def stop(ctx: typer.Context):
"""Stop the virtual camera."""
ctx.obj.stop_virtual_cam()
out_console.print('Virtual camera stopped.')
@app.command(name=['toggle', 'tg'])
def toggle(
*,
ctx: Annotated[Context, Parameter(parse=False)],
):
"""Toggle the virtual camera.
Parameters
----------
ctx : Context
The context containing the OBS client and other settings.
@app.command('toggle | tg')
def toggle(ctx: typer.Context):
"""Toggle the virtual camera."""
resp = ctx.obj.toggle_virtual_cam()
"""
resp = ctx.client.toggle_virtual_cam()
if resp.output_active:
out_console.print('Virtual camera is enabled.')
console.out.print('Virtual camera is enabled.')
else:
out_console.print('Virtual camera is disabled.')
console.out.print('Virtual camera is disabled.')
@app.command('status | ss')
def status(ctx: typer.Context):
"""Get the status of the virtual camera."""
resp = ctx.obj.get_virtual_cam_status()
@app.command(name=['status', 'ss'])
def status(
*,
ctx: Annotated[Context, Parameter(parse=False)],
):
"""Get the status of the virtual camera.
Parameters
----------
ctx : Context
The context containing the OBS client and other settings.
"""
resp = ctx.client.get_virtual_cam_status()
if resp.output_active:
out_console.print('Virtual camera is enabled.')
console.out.print('Virtual camera is enabled.')
else:
out_console.print('Virtual camera is disabled.')
console.out.print('Virtual camera is disabled.')

View File

@@ -21,7 +21,7 @@ classifiers = [
"Programming Language :: Python :: Implementation :: CPython",
"Programming Language :: Python :: Implementation :: PyPy",
]
dependencies = ["typer>=0.16.0", "obsws-python>=1.7.2", "python-dotenv>=1.1.0"]
dependencies = ["cyclopts>=3.22.2", "obsws-python>=1.8.0"]
[project.urls]
@@ -30,7 +30,7 @@ Issues = "https://github.com/onyx-and-iris/obsws-cli/issues"
Source = "https://github.com/onyx-and-iris/obsws-cli"
[project.scripts]
obsws-cli = "obsws_cli:app"
obsws-cli = "obsws_cli:run"
[tool.hatch.version]
path = "obsws_cli/__about__.py"
@@ -42,6 +42,9 @@ dependencies = ["click-man>=0.5.1"]
cli = "obsws-cli {args:}"
man = "python man/generate.py --output=./man"
[tool.hatch.envs.lazyimports.scripts]
cli = "obsws-cli {args:}"
[tool.hatch.envs.hatch-test]
randomize = true

View File

@@ -71,6 +71,13 @@ def pytest_sessionstart(session):
},
sceneItemEnabled=True,
)
session.obsws.create_input(
sceneName='pytest_scene',
inputName='pytest_text_input',
inputKind='text_gdiplus_v3',
inputSettings={'text': 'Hello, OBS!'},
sceneItemEnabled=True,
)
resp = session.obsws.get_scene_item_list('pytest_scene')
for item in resp.scene_items:
if item['sourceName'] == 'pytest_input_2':

View File

@@ -27,4 +27,4 @@ def test_filter_list_invalid_source():
"""Test the filter list command with an invalid source."""
result = runner.invoke(app, ['filter', 'list', 'invalid_source'])
assert result.exit_code != 0
assert "No source was found by the name of 'invalid_source'" in result.stderr
assert 'No source was found by the name of invalid_source' in result.stderr

View File

@@ -18,29 +18,29 @@ def test_group_show():
"""Test the group show command."""
result = runner.invoke(app, ['group', 'show', 'Scene', 'test_group'])
assert result.exit_code == 0
assert "Group 'test_group' is now visible." in result.stdout
assert 'Group test_group is now visible.' in result.stdout
def test_group_toggle():
"""Test the group toggle command."""
result = runner.invoke(app, ['group', 'status', 'Scene', 'test_group'])
assert result.exit_code == 0
enabled = "Group 'test_group' is now visible." in result.stdout
enabled = 'Group test_group is now visible.' in result.stdout
result = runner.invoke(app, ['group', 'toggle', 'Scene', 'test_group'])
assert result.exit_code == 0
if enabled:
assert "Group 'test_group' is now hidden." in result.stdout
assert 'Group test_group is now hidden.' in result.stdout
else:
assert "Group 'test_group' is now visible." in result.stdout
assert 'Group test_group is now visible.' in result.stdout
def test_group_status():
"""Test the group status command."""
result = runner.invoke(app, ['group', 'show', 'Scene', 'test_group'])
assert result.exit_code == 0
assert "Group 'test_group' is now visible." in result.stdout
assert 'Group test_group is now visible.' in result.stdout
result = runner.invoke(app, ['group', 'status', 'Scene', 'test_group'])
assert result.exit_code == 0
assert "Group 'test_group' is now visible." in result.stdout
assert 'Group test_group is now visible.' in result.stdout

View File

@@ -36,3 +36,10 @@ def test_scene_switch():
result = runner.invoke(app, ['scene', 'switch', 'pytest_scene'])
assert result.exit_code == 0
assert 'Switched to program scene: pytest_scene' in result.stdout
def test_scene_switch_invalid():
"""Test the scene switch command with an invalid scene."""
result = runner.invoke(app, ['scene', 'switch', 'non_existent_scene'])
assert result.exit_code != 0
assert 'Scene non_existent_scene not found' in result.stderr

View File

@@ -29,6 +29,6 @@ def test_sceneitem_transform():
)
assert result.exit_code == 0
assert (
"Item 'pytest_input_2' in scene 'pytest_scene' has been transformed"
'Item pytest_input_2 in scene pytest_scene has been transformed'
in result.stdout
)

18
tests/test_text.py Normal file
View File

@@ -0,0 +1,18 @@
"""Unit tests for the text command in the OBS WebSocket CLI."""
from typer.testing import CliRunner
from obsws_cli.app import app
runner = CliRunner(mix_stderr=False)
def test_text_update():
"""Test the text update command."""
result = runner.invoke(app, ['text', 'current', 'pytest_text_input'])
assert result.exit_code == 0
assert 'Current text for input pytest_text_input: Hello, OBS!' in result.stdout
result = runner.invoke(app, ['text', 'update', 'pytest_text_input', 'New Text'])
assert result.exit_code == 0
assert 'Text for input pytest_text_input updated to: New Text' in result.stdout