28 Commits

Author SHA1 Message Date
c48f7a49ac add Style section to README 2025-06-22 02:54:42 +01:00
b14d9b7610 ensure studio mode is disabled on test cleanup 2025-06-22 02:54:08 +01:00
864751ecc9 remove terminaltables3 dep, add rich 2025-06-22 02:53:46 +01:00
c02ffac403 replace terminaltables with rich tables.
allow rich to handle all console output.

util.check_mark is now used to pass back colourless check/cross marks if NO_COLOR is set or --style/SLOBS_STYLE was not set.
2025-06-22 02:52:27 +01:00
6bcdd8391c add --style and --no-border flags to root command 2025-06-22 02:39:28 +01:00
b0d311dad9 define styles 2025-06-22 02:38:56 +01:00
46159a0ca4 add missing for-else comment 2025-06-15 08:40:36 +01:00
23d3118e6a keep names consistent 2025-06-13 14:30:50 +01:00
c369f4e3d5 add 0.10.0 to CHANGELOG 2025-06-13 14:19:22 +01:00
129c3f57f2 minor bump 2025-06-13 14:14:16 +01:00
42519ba294 slow down the scene tests in an attempt to avoid rate limiting issues 2025-06-13 14:11:16 +01:00
09a44b2dea add SlobsCliProtocolError for wrapping ProtocolError
handle ProtocolError(s) and reraise as SlobsCliProtocolError. This has the following benefits:
A user friendly error message
A non-zero exit code
2025-06-13 14:10:54 +01:00
f4421b3351 call next(iter()) on excgroup.exceptions to convery intent a little better
(we dont intend to iterate through them, we just want to raise the first one)
2025-06-13 14:04:05 +01:00
f282f86e72 make test a composite command so we can avoid failing fast 2025-06-13 05:46:30 +01:00
81d0072148 revert default to localhost.
the slow resolution was due to Docker desktop...

patch bump
2025-06-13 04:34:15 +01:00
f3c94d1dee add comments to for-else blocks 2025-06-13 00:52:27 +01:00
3c09f9fd5b upd CHANGELOG 2025-06-12 23:47:35 +01:00
51d49c9c93 add status to audio section 2025-06-12 23:47:23 +01:00
bc43c8483a add audio unit tests
add audio status command

patch bump
2025-06-12 23:47:07 +01:00
57e31a7e49 arguments appear to be required by default. 2025-06-12 23:25:07 +01:00
db6a9b5e84 add studiomode unit tests 2025-06-12 23:10:01 +01:00
03f1dac8ea rename leftmost column heading for audio list
patch bump
2025-06-12 22:17:48 +01:00
535b22bf8e add 0.9.0 to CHANGELOG 2025-06-12 22:13:58 +01:00
6e95e4d670 add scenecollection command group
minor bump
2025-06-12 22:13:46 +01:00
519db1b46e add Scene Collection to README 2025-06-12 22:04:15 +01:00
582587bed5 add ruff config
run files through formatter

add dosctrings to satisfy the linter
2025-06-12 20:34:14 +01:00
fecd13d345 patch bump 2025-06-12 18:50:43 +01:00
a1a22d0d00 scene list and audio list now print tables
patch bump
2025-06-12 18:49:54 +01:00
29 changed files with 1273 additions and 284 deletions

View File

@@ -5,7 +5,22 @@ All notable changes to this project will be documented in this file.
The format is based on [Keep a Changelog](https://keepachangelog.com/en/1.0.0/),
and this project adheres to [Semantic Versioning](https://semver.org/spec/v2.0.0.html).
# [0.8.3] - 2025-06-12
# [0.10.0] - 2025-06-13
### Changed
- scene commands are prone to raise ProtocolErrors if called too quickly in succession. To make these errors a little more user friendly the following changes have been made:
- They print error messages to stderr
- They return exit code 2
# [0.9.0] - 2025-06-12
### Added
- scenecollection command group, see [Scene Collection](https://github.com/onyx-and-iris/slobs-cli/tree/main?tab=readme-ov-file#scene-collection)
- add audio status commmand.
# [0.8.4] - 2025-06-12
### Added
@@ -16,6 +31,7 @@ and this project adheres to [Semantic Versioning](https://semver.org/spec/v2.0.0
- scene list now shows which scene is the current active scene.
- audio list now shows mute states.
- scene list and audio list commands now print as tables
- --id option added to scene and audio commands to show the source ID in the output
- by default this is no longer displayed

View File

@@ -51,7 +51,7 @@ The CLI should now be discoverable as `slobs-cli`
Pass `--domain`, `--port` and `--token` as flags on the root command, for example:
```console
slobs-cli --domain 127.0.0.1 --port 59650 --token <API token> --help
slobs-cli --domain localhost --port 59650 --token <API token> --help
```
#### Environment Variables
@@ -59,7 +59,7 @@ slobs-cli --domain 127.0.0.1 --port 59650 --token <API token> --help
Load the following values from your environment:
```env
SLOBS_DOMAIN=127.0.0.1
SLOBS_DOMAIN=localhost
SLOBS_PORT=59650
SLOBS_TOKEN=<API Token>
```
@@ -189,6 +189,12 @@ slobs-cli audio unmute "Mic/Aux"
slobs-cli audio toggle "Mic/Aux"
```
- status: Get the mute status of an audio source by name.
```console
slobs-cli audio status "Mic/Aux"
```
#### Replay Buffer
- start: Start the replay buffer.
@@ -247,6 +253,74 @@ slobs-cli studiomode status
slobs-cli studiomode force-transition
```
#### Scene Collection
- list: List all scene collections.
- flags:
*optional*
- --id: Include scenecollection IDs in the output.
```console
slobs-cli scenecollection list
```
- create: Create a new scene collection.
- args: <scenecollection_name>
```console
slobs-cli scenecollection create "NewCollection"
```
- delete: Delete a scene collection by name.
- args: <scenecollection_name>
```console
slobs-cli scenecollection delete "ExistingCollection"
```
- load: Load a scene collection by name.
- args: <scenecollection_name>
```console
slobs-cli scenecollection load "ExistingCollection"
```
- rename: Rename a scene collection.
- args: <scenecollection_name> <new_name>
```console
slobs-cli scenecollection rename "ExistingCollection" "NewName"
```
## Style
By default styling is disabled but you may enable it with:
- --style/-s: Style used in output.
- SLOBS_STYLE
- --no-border/-b: Disable table border styling in output.
- SLOBS_STYLE_NO_BORDER
Available styles:
- red
- magenta
- purple
- blue
- cyan
- green
- yellow
- orange
- white
- grey
- navy
- black
```console
slobs-cli --style=cyan --no-border scene list
```
## Special Thanks
- [Julian-0](https://github.com/Julian-O) For writing the [PySLOBS wrapper](https://github.com/Julian-O/PySLOBS) on which this CLI depends.

96
pdm.lock generated
View File

@@ -5,7 +5,7 @@
groups = ["default", "dev"]
strategy = ["inherit_metadata"]
lock_version = "4.5.0"
content_hash = "sha256:87714b892affeadd7dba57d9430f0af3dc46f50cc9d095942367e4fca103f61e"
content_hash = "sha256:9be0832aae27a9da3f885900d367836aa05a12e9c0459d751c319d1bd329c33c"
[[metadata.targets]]
requires_python = ">=3.11"
@@ -45,13 +45,13 @@ files = [
[[package]]
name = "cachetools"
version = "6.0.0"
version = "6.1.0"
requires_python = ">=3.9"
summary = "Extensible memoizing collections and decorators"
groups = ["dev"]
files = [
{file = "cachetools-6.0.0-py3-none-any.whl", hash = "sha256:82e73ba88f7b30228b5507dce1a1f878498fc669d972aef2dde4f3a3c24f103e"},
{file = "cachetools-6.0.0.tar.gz", hash = "sha256:f225782b84438f828328fc2ad74346522f27e5b1440f4e9fd18b20ebfd1aa2cf"},
{file = "cachetools-6.1.0-py3-none-any.whl", hash = "sha256:1c7bb3cf9193deaf3508b7c5f2a79986c13ea38965c5adcff1f84519cf39163e"},
{file = "cachetools-6.1.0.tar.gz", hash = "sha256:b4c4f404392848db3ce7aac34950d17be4d864da4b8b66911008e430bc544587"},
]
[[package]]
@@ -119,6 +119,31 @@ files = [
{file = "iniconfig-2.1.0.tar.gz", hash = "sha256:3abbd2e30b36733fee78f9c7f7308f2d0050e88f0087fd25c2645f63c773e1c7"},
]
[[package]]
name = "markdown-it-py"
version = "3.0.0"
requires_python = ">=3.8"
summary = "Python port of markdown-it. Markdown parsing, done right!"
groups = ["default"]
dependencies = [
"mdurl~=0.1",
]
files = [
{file = "markdown-it-py-3.0.0.tar.gz", hash = "sha256:e3f60a94fa066dc52ec76661e37c851cb232d92f9886b15cb560aaada2df8feb"},
{file = "markdown_it_py-3.0.0-py3-none-any.whl", hash = "sha256:355216845c60bd96232cd8d8c40e8f9765cc86f46880e43a8fd22dc1a1a8cab1"},
]
[[package]]
name = "mdurl"
version = "0.1.2"
requires_python = ">=3.7"
summary = "Markdown URL utilities"
groups = ["default"]
files = [
{file = "mdurl-0.1.2-py3-none-any.whl", hash = "sha256:84008a41e51615a49fc9966191ff91509e3c40b939176e643fd50a5c2196b8f8"},
{file = "mdurl-0.1.2.tar.gz", hash = "sha256:bb413d29f5eea38f31dd4754dd7377d4465116fb207585f97bf925588687c1ba"},
]
[[package]]
name = "packaging"
version = "25.0"
@@ -165,13 +190,13 @@ files = [
[[package]]
name = "pygments"
version = "2.19.1"
version = "2.19.2"
requires_python = ">=3.8"
summary = "Pygments is a syntax highlighting package written in Python."
groups = ["dev"]
groups = ["default", "dev"]
files = [
{file = "pygments-2.19.1-py3-none-any.whl", hash = "sha256:9ea1544ad55cecf4b8242fab6dd35a93bbce657034b0611ee383099054ab6d8c"},
{file = "pygments-2.19.1.tar.gz", hash = "sha256:61c16d2a8576dc0649d9f39e089b5f02bcd27fba10d8fb4dcc28173f7a45151f"},
{file = "pygments-2.19.2-py3-none-any.whl", hash = "sha256:86540386c03d588bb81d44bc3928634ff26449851e99741617ecb9037ee5ec0b"},
{file = "pygments-2.19.2.tar.gz", hash = "sha256:636cb2477cec7f8952536970bc533bc43743542f70392ae026374600add5b887"},
]
[[package]]
@@ -205,7 +230,7 @@ files = [
[[package]]
name = "pytest"
version = "8.4.0"
version = "8.4.1"
requires_python = ">=3.9"
summary = "pytest: simple powerful testing with Python"
groups = ["dev"]
@@ -219,8 +244,8 @@ dependencies = [
"tomli>=1; python_version < \"3.11\"",
]
files = [
{file = "pytest-8.4.0-py3-none-any.whl", hash = "sha256:f40f825768ad76c0977cbacdf1fd37c6f7a468e460ea6a0636078f8972d4517e"},
{file = "pytest-8.4.0.tar.gz", hash = "sha256:14d920b48472ea0dbf68e45b96cd1ffda4705f33307dcc86c676c1b5104838a6"},
{file = "pytest-8.4.1-py3-none-any.whl", hash = "sha256:539c70ba6fcead8e78eebbf1115e8b589e7565830d7d006a8723f19ac8a0afb7"},
{file = "pytest-8.4.1.tar.gz", hash = "sha256:7c67fd69174877359ed9371ec3af8a3d2b04741818c51e5e99cc1742251fa93c"},
]
[[package]]
@@ -238,6 +263,49 @@ files = [
{file = "pytest_randomly-3.16.0.tar.gz", hash = "sha256:11bf4d23a26484de7860d82f726c0629837cf4064b79157bd18ec9d41d7feb26"},
]
[[package]]
name = "rich"
version = "14.0.0"
requires_python = ">=3.8.0"
summary = "Render rich text, tables, progress bars, syntax highlighting, markdown and more to the terminal"
groups = ["default"]
dependencies = [
"markdown-it-py>=2.2.0",
"pygments<3.0.0,>=2.13.0",
"typing-extensions<5.0,>=4.0.0; python_version < \"3.11\"",
]
files = [
{file = "rich-14.0.0-py3-none-any.whl", hash = "sha256:1c9491e1951aac09caffd42f448ee3d04e58923ffe14993f6e83068dc395d7e0"},
{file = "rich-14.0.0.tar.gz", hash = "sha256:82f1bc23a6a21ebca4ae0c45af9bdbc492ed20231dcb63f297d6d1021a9d5725"},
]
[[package]]
name = "ruff"
version = "0.12.0"
requires_python = ">=3.7"
summary = "An extremely fast Python linter and code formatter, written in Rust."
groups = ["dev"]
files = [
{file = "ruff-0.12.0-py3-none-linux_armv6l.whl", hash = "sha256:5652a9ecdb308a1754d96a68827755f28d5dfb416b06f60fd9e13f26191a8848"},
{file = "ruff-0.12.0-py3-none-macosx_10_12_x86_64.whl", hash = "sha256:05ed0c914fabc602fc1f3b42c53aa219e5736cb030cdd85640c32dbc73da74a6"},
{file = "ruff-0.12.0-py3-none-macosx_11_0_arm64.whl", hash = "sha256:07a7aa9b69ac3fcfda3c507916d5d1bca10821fe3797d46bad10f2c6de1edda0"},
{file = "ruff-0.12.0-py3-none-manylinux_2_17_aarch64.manylinux2014_aarch64.whl", hash = "sha256:e7731c3eec50af71597243bace7ec6104616ca56dda2b99c89935fe926bdcd48"},
{file = "ruff-0.12.0-py3-none-manylinux_2_17_armv7l.manylinux2014_armv7l.whl", hash = "sha256:952d0630eae628250ab1c70a7fffb641b03e6b4a2d3f3ec6c1d19b4ab6c6c807"},
{file = "ruff-0.12.0-py3-none-manylinux_2_17_i686.manylinux2014_i686.whl", hash = "sha256:c021f04ea06966b02614d442e94071781c424ab8e02ec7af2f037b4c1e01cc82"},
{file = "ruff-0.12.0-py3-none-manylinux_2_17_ppc64.manylinux2014_ppc64.whl", hash = "sha256:7d235618283718ee2fe14db07f954f9b2423700919dc688eacf3f8797a11315c"},
{file = "ruff-0.12.0-py3-none-manylinux_2_17_ppc64le.manylinux2014_ppc64le.whl", hash = "sha256:0c0758038f81beec8cc52ca22de9685b8ae7f7cc18c013ec2050012862cc9165"},
{file = "ruff-0.12.0-py3-none-manylinux_2_17_s390x.manylinux2014_s390x.whl", hash = "sha256:139b3d28027987b78fc8d6cfb61165447bdf3740e650b7c480744873688808c2"},
{file = "ruff-0.12.0-py3-none-manylinux_2_17_x86_64.manylinux2014_x86_64.whl", hash = "sha256:68853e8517b17bba004152aebd9dd77d5213e503a5f2789395b25f26acac0da4"},
{file = "ruff-0.12.0-py3-none-musllinux_1_2_aarch64.whl", hash = "sha256:3a9512af224b9ac4757f7010843771da6b2b0935a9e5e76bb407caa901a1a514"},
{file = "ruff-0.12.0-py3-none-musllinux_1_2_armv7l.whl", hash = "sha256:b08df3d96db798e5beb488d4df03011874aff919a97dcc2dd8539bb2be5d6a88"},
{file = "ruff-0.12.0-py3-none-musllinux_1_2_i686.whl", hash = "sha256:6a315992297a7435a66259073681bb0d8647a826b7a6de45c6934b2ca3a9ed51"},
{file = "ruff-0.12.0-py3-none-musllinux_1_2_x86_64.whl", hash = "sha256:1e55e44e770e061f55a7dbc6e9aed47feea07731d809a3710feda2262d2d4d8a"},
{file = "ruff-0.12.0-py3-none-win32.whl", hash = "sha256:7162a4c816f8d1555eb195c46ae0bd819834d2a3f18f98cc63819a7b46f474fb"},
{file = "ruff-0.12.0-py3-none-win_amd64.whl", hash = "sha256:d00b7a157b8fb6d3827b49d3324da34a1e3f93492c1f97b08e222ad7e9b291e0"},
{file = "ruff-0.12.0-py3-none-win_arm64.whl", hash = "sha256:8cd24580405ad8c1cc64d61725bca091d6b6da7eb3d36f72cc605467069d7e8b"},
{file = "ruff-0.12.0.tar.gz", hash = "sha256:4d047db3662418d4a848a3fdbfaf17488b34b62f527ed6f10cb8afd78135bc5c"},
]
[[package]]
name = "sniffio"
version = "1.3.1"
@@ -251,7 +319,7 @@ files = [
[[package]]
name = "tox"
version = "4.26.0"
version = "4.27.0"
requires_python = ">=3.9"
summary = "tox is a generic virtualenv management and test command line tool"
groups = ["dev"]
@@ -269,8 +337,8 @@ dependencies = [
"virtualenv>=20.31",
]
files = [
{file = "tox-4.26.0-py3-none-any.whl", hash = "sha256:75f17aaf09face9b97bd41645028d9f722301e912be8b4c65a3f938024560224"},
{file = "tox-4.26.0.tar.gz", hash = "sha256:a83b3b67b0159fa58e44e646505079e35a43317a62d2ae94725e0586266faeca"},
{file = "tox-4.27.0-py3-none-any.whl", hash = "sha256:2b8a7fb986b82aa2c830c0615082a490d134e0626dbc9189986da46a313c4f20"},
{file = "tox-4.27.0.tar.gz", hash = "sha256:b97d5ecc0c0d5755bcc5348387fef793e1bfa68eb33746412f4c60881d7f5f57"},
]
[[package]]

View File

@@ -2,7 +2,7 @@
name = "slobs-cli"
description = "A command line application for Streamlabs Desktop"
authors = [{ name = "onyx-and-iris", email = "code@onyxandiris.online" }]
dependencies = ["pyslobs>=2.0.5", "asyncclick>=8.1.8"]
dependencies = ["pyslobs>=2.0.5", "asyncclick>=8.1.8", "rich>=14.0.0"]
requires-python = ">=3.11"
readme = "README.md"
license = { text = "MIT" }
@@ -24,18 +24,23 @@ source = "file"
path = "src/slobs_cli/__about__.py"
[tool.pdm.scripts]
cli.cmd = "slobs-cli {args}"
cli.env_file = ".env"
_.env_file = ".env"
pre_test.cmd = "python tests/setup.py"
test.cmd = "pytest {args}"
test.env_file = ".env"
post_test.cmd = "python tests/teardown.py"
cli.cmd = "slobs-cli {args}"
_setup.cmd = "python tests/setup.py"
_teardown.cmd = "python tests/teardown.py"
test.composite = ["_setup", "pytest {args}", "_teardown"]
test.keep_going = true
fmt.cmd = "ruff format {args}"
post_fmt.cmd = "ruff check {args}"
[dependency-groups]
dev = [
"tox-pdm>=0.7.2",
"pytest>=8.4.0",
"virtualenv-pyenv>=0.5.0",
"pytest-randomly>=3.16.0",
"virtualenv-pyenv>=0.5.0",
"ruff>=0.11.13",
]

79
ruff.toml Normal file
View File

@@ -0,0 +1,79 @@
# Exclude a variety of commonly ignored directories.
exclude = [
".bzr",
".direnv",
".eggs",
".git",
".git-rewrite",
".hatch",
".hg",
".ipynb_checkpoints",
".mypy_cache",
".nox",
".pants.d",
".pyenv",
".pytest_cache",
".pytype",
".ruff_cache",
".svn",
".tox",
".venv",
".vscode",
"__pypackages__",
"_build",
"buck-out",
"build",
"dist",
"node_modules",
"site-packages",
"venv",
]
# Same as Black.
line-length = 88
indent-width = 4
# Assume Python 3.11
target-version = "py311"
[lint]
# Enable Pyflakes (`F`) and a subset of the pycodestyle (`E`) codes by default.
# Unlike Flake8, Ruff doesn't enable pycodestyle warnings (`W`) or
# McCabe complexity (`C901`) by default.
# Enable pydocstyle (`D`) codes by default.
select = ["E4", "E7", "E9", "F", "D"]
ignore = ["D203", "D213"]
# Allow fix for all enabled rules (when `--fix`) is provided.
fixable = ["ALL"]
unfixable = []
# Allow unused variables when underscore-prefixed.
dummy-variable-rgx = "^(_+|(_+[a-zA-Z0-9_]*[a-zA-Z0-9]+?))$"
[format]
# Unlike Black, use single quotes for strings.
quote-style = "single"
# Like Black, indent with spaces, rather than tabs.
indent-style = "space"
# Like Black, respect magic trailing commas.
skip-magic-trailing-comma = false
# Like Black, automatically detect the appropriate line ending.
line-ending = "auto"
# Enable auto-formatting of code examples in docstrings. Markdown,
# reStructuredText code/literal blocks and doctests are all supported.
#
# This is currently disabled by default, but it is planned for this
# to be opt-out in the future.
docstring-code-format = false
# Set the line length limit used when formatting code snippets in
# docstrings.
#
# This only has an effect when the `docstring-code-format` setting is
# enabled.
docstring-code-line-length = "dynamic"

View File

@@ -1 +1,3 @@
__version__ = "0.8.3"
"""module for package metadata."""
__version__ = '0.10.0'

View File

@@ -1,9 +1,21 @@
"""Package slobs_cli provides a command-line interface for interacting with SLOBS (Streamlabs OBS)."""
from .audio import audio
from .cli import cli
from .record import record
from .replaybuffer import replaybuffer
from .scene import scene
from .scenecollection import scenecollection
from .stream import stream
from .studiomode import studiomode
__all__ = ["cli", "scene", "stream", "record", "audio", "replaybuffer", "studiomode"]
__all__ = [
'cli',
'scene',
'stream',
'record',
'audio',
'replaybuffer',
'studiomode',
'scenecollection',
]

View File

@@ -1,40 +1,67 @@
"""module for managing audio sources in Slobs CLI."""
import asyncclick as click
from anyio import create_task_group
from pyslobs import AudioService
from rich.table import Table
from rich.text import Text
from . import console, util
from .cli import cli
from .errors import SlobsCliError
@cli.group()
def audio():
"""Audio management commands."""
"""Manage audio sources in Slobs CLI."""
@audio.command()
@click.option("--id", is_flag=True, help="Include audio source IDs in the output.")
@click.option('--id', is_flag=True, help='Include audio source IDs in the output.')
@click.pass_context
async def list(ctx: click.Context, id: bool = False):
"""List all audio sources."""
conn = ctx.obj["connection"]
conn = ctx.obj['connection']
as_ = AudioService(conn)
async def _run():
sources = await as_.get_sources()
if not sources:
click.echo("No audio sources found.")
console.out.print('No audio sources found.')
conn.close()
return
click.echo("Available audio sources:")
style = ctx.obj['style']
table = Table(
show_header=True, header_style=style.header, border_style=style.border
)
if id:
columns = [
('Audio Source Name', 'left'),
('Muted', 'center'),
('ID', 'left'),
]
else:
columns = [
('Audio Source Name', 'left'),
('Muted', 'center'),
]
for col_name, col_justify in columns:
table.add_column(Text(col_name, justify='center'), justify=col_justify)
for source in sources:
model = await source.get_model()
click.echo(
f"- {click.style(model.name, fg='blue')} "
f"{f'ID: {model.source_id}, ' if id else ''}"
f"Muted: {click.style('', fg='green') if model.muted else click.style('', fg='red')}"
)
to_append = [Text(model.name, style=style.cell)]
to_append.append(util.check_mark(ctx, model.muted))
if id:
to_append.append(Text(model.source_id, style=style.cell))
table.add_row(*to_append)
console.out.print(table)
conn.close()
async with create_task_group() as tg:
@@ -43,12 +70,11 @@ async def list(ctx: click.Context, id: bool = False):
@audio.command()
@click.argument("source_name")
@click.argument('source_name')
@click.pass_context
async def mute(ctx: click.Context, source_name: str):
"""Mute an audio source by name."""
conn = ctx.obj["connection"]
conn = ctx.obj['connection']
as_ = AudioService(conn)
async def _run():
@@ -59,10 +85,10 @@ async def mute(ctx: click.Context, source_name: str):
break
else: # If no source by the given name was found
conn.close()
raise SlobsCliError(f"Source '{source_name}' not found.")
raise SlobsCliError(f'Audio source "{source_name}" not found.')
await source.set_muted(True)
click.echo(f"Muted audio source: {source_name}")
console.out.print(f'{console.highlight(ctx, source_name)} muted successfully.')
conn.close()
try:
@@ -70,17 +96,16 @@ async def mute(ctx: click.Context, source_name: str):
tg.start_soon(conn.background_processing)
tg.start_soon(_run)
except* SlobsCliError as excgroup:
for e in excgroup.exceptions:
raise e
raisable = next(iter(excgroup.exceptions))
raise raisable
@audio.command()
@click.argument("source_name")
@click.argument('source_name')
@click.pass_context
async def unmute(ctx: click.Context, source_name: str):
"""Unmute an audio source by name."""
conn = ctx.obj["connection"]
conn = ctx.obj['connection']
as_ = AudioService(conn)
async def _run():
@@ -91,10 +116,12 @@ async def unmute(ctx: click.Context, source_name: str):
break
else: # If no source by the given name was found
conn.close()
raise SlobsCliError(f"Source '{source_name}' not found.")
raise SlobsCliError(f'Audio source "{source_name}" not found.')
await source.set_muted(False)
click.echo(f"Unmuted audio source: {source_name}")
console.out.print(
f'{console.highlight(ctx, source_name)} unmuted successfully.'
)
conn.close()
try:
@@ -102,17 +129,16 @@ async def unmute(ctx: click.Context, source_name: str):
tg.start_soon(conn.background_processing)
tg.start_soon(_run)
except* SlobsCliError as excgroup:
for e in excgroup.exceptions:
raise e
raisable = next(iter(excgroup.exceptions))
raise raisable
@audio.command()
@click.argument("source_name")
@click.argument('source_name')
@click.pass_context
async def toggle(ctx: click.Context, source_name: str):
"""Toggle mute state of an audio source by name."""
conn = ctx.obj["connection"]
conn = ctx.obj['connection']
as_ = AudioService(conn)
async def _run():
@@ -122,20 +148,55 @@ async def toggle(ctx: click.Context, source_name: str):
if model.name.lower() == source_name.lower():
if model.muted:
await source.set_muted(False)
click.echo(f"Unmuted audio source: {source_name}")
console.out.print(
f'{console.highlight(ctx, source_name)} unmuted successfully.'
)
else:
await source.set_muted(True)
click.echo(f"Muted audio source: {source_name}")
console.out.print(
f'{console.highlight(ctx, source_name)} muted successfully.'
)
conn.close()
break
else: # If no source by the given name was found
conn.close()
raise SlobsCliError(f"Source '{source_name}' not found.")
raise SlobsCliError(f'Audio source "{source_name}" not found.')
try:
async with create_task_group() as tg:
tg.start_soon(conn.background_processing)
tg.start_soon(_run)
except* SlobsCliError as excgroup:
for e in excgroup.exceptions:
raise e
raisable = next(iter(excgroup.exceptions))
raise raisable
@audio.command()
@click.argument('source_name')
@click.pass_context
async def status(ctx: click.Context, source_name: str):
"""Get the mute status of an audio source by name."""
conn = ctx.obj['connection']
as_ = AudioService(conn)
async def _run():
sources = await as_.get_sources()
for source in sources:
model = await source.get_model()
if model.name.lower() == source_name.lower():
console.out.print(
f'{console.highlight(ctx, source_name)} is {"muted" if model.muted else "unmuted"}.'
)
conn.close()
return
else: # If no source by the given name was found
conn.close()
raise SlobsCliError(f'Audio source "{source_name}" not found.')
try:
async with create_task_group() as tg:
tg.start_soon(conn.background_processing)
tg.start_soon(_run)
except* SlobsCliError as excgroup:
raisable = next(iter(excgroup.exceptions))
raise raisable

View File

@@ -1,42 +1,66 @@
"""module defining the entry point for the Streamlabs Desktop CLI application."""
import anyio
import asyncclick as click
from pyslobs import ConnectionConfig, SlobsConnection
from . import styles
from .__about__ import __version__ as version
@click.group()
@click.option(
"-d",
"--domain",
default="127.0.0.1",
'-d',
'--domain',
default='localhost',
envvar='SLOBS_DOMAIN',
show_default=True,
show_envvar=True,
help="The domain of the SLOBS server.",
envvar="SLOBS_DOMAIN",
help='The domain of the SLOBS server.',
)
@click.option(
"-p",
"--port",
'-p',
'--port',
default=59650,
envvar='SLOBS_PORT',
show_default=True,
show_envvar=True,
help="The port of the SLOBS server.",
envvar="SLOBS_PORT",
help='The port of the SLOBS server.',
)
@click.option(
"-t",
"--token",
help="The token for the SLOBS server.",
envvar="SLOBS_TOKEN",
'-t',
'--token',
envvar='SLOBS_TOKEN',
show_envvar=True,
required=True,
help='The token for the SLOBS server.',
)
@click.option(
'-s',
'--style',
default='disabled',
envvar='SLOBS_STYLE',
show_default=True,
show_envvar=True,
help='The style to use for output.',
)
@click.option(
'-b',
'--no-border',
is_flag=True,
default=False,
envvar='SLOBS_STYLE_NO_BORDER',
show_default=True,
show_envvar=True,
help='Disable borders in the output.',
)
@click.version_option(
version, "-v", "--version", message="%(prog)s version: %(version)s"
version, '-v', '--version', message='%(prog)s version: %(version)s'
)
@click.pass_context
async def cli(ctx: click.Context, domain: str, port: int, token: str):
async def cli(
ctx: click.Context, domain: str, port: int, token: str, style: str, no_border: bool
):
"""Command line interface for Streamlabs Desktop."""
ctx.ensure_object(dict)
config = ConnectionConfig(
@@ -44,7 +68,8 @@ async def cli(ctx: click.Context, domain: str, port: int, token: str):
port=port,
token=token,
)
ctx.obj["connection"] = SlobsConnection(config)
ctx.obj['connection'] = SlobsConnection(config)
ctx.obj['style'] = styles.request_style_obj(style, no_border)
def run():

21
src/slobs_cli/console.py Normal file
View File

@@ -0,0 +1,21 @@
"""module for console output handling."""
import asyncclick as click
from rich.console import Console
out = Console()
err = Console(stderr=True, style='bold red')
def highlight(ctx: click.Context, text: str) -> str:
"""Highlight text for console output."""
if ctx.obj['style'].name == 'no_colour':
return text
return f'[{ctx.obj["style"].highlight}]{text}[/{ctx.obj["style"].highlight}]'
def warning(ctx: click.Context, text: str) -> str:
"""Format warning text for console output."""
if ctx.obj['style'].name == 'no_colour':
return text
return f'[magenta]{text}[/magenta]'

View File

@@ -1,13 +1,46 @@
"""module for custom exceptions in Slobs CLI."""
import json
import asyncclick as click
from . import console
class SlobsCliError(click.ClickException):
"""Base class for all Slobs CLI errors."""
def __init__(self, message: str):
"""Initialize the SlobsCliError with a message."""
super().__init__(message)
self.exit_code = 1
def show(self):
"""Display the error message in red."""
click.secho(f"Error: {self.message}", fg="red", err=True)
"""Display the error message in red and write to stderr."""
console.err.print(f'Error: {self.message}')
class SlobsCliProtocolError(SlobsCliError):
"""Converts pyslobs ProtocolError to a SlobsCliProtocolError."""
def __init__(self, message: str):
"""Initialize the SlobsCliProtocolError with a message."""
protocol_message_to_dict = json.loads(
str(message).replace('"', '\\"').replace("'", '"')
)
super().__init__(
protocol_message_to_dict.get('message', 'Unable to parse error message')
)
self.exit_code = 2
self.protocol_code = protocol_message_to_dict.get('code', 'Unknown error code')
def show(self):
"""Display the protocol error message in red."""
match self.protocol_code:
case -32600:
console.err.print(
'Oops! Looks like we hit a rate limit for this command. Please try again later.'
)
case _:
# Fall back to the base error display for unknown protocol codes
super().show()

View File

@@ -1,34 +1,36 @@
"""module for managing recording commands in Slobs CLI."""
import asyncclick as click
from anyio import create_task_group
from pyslobs import StreamingService
from . import console
from .cli import cli
from .errors import SlobsCliError
@cli.group()
def record():
"""Recording management commands."""
"""Manage recording in Slobs CLI."""
@record.command()
@click.pass_context
async def start(ctx: click.Context):
"""Start recording."""
conn = ctx.obj["connection"]
conn = ctx.obj['connection']
ss = StreamingService(conn)
async def _run():
model = await ss.get_model()
active = model.recording_status != "offline"
active = model.recording_status != 'offline'
if active:
conn.close()
raise SlobsCliError("Recording is already active.")
raise SlobsCliError('Recording is already active.')
await ss.toggle_recording()
click.echo("Recording started.")
console.out.print('Recording started.')
conn.close()
@@ -37,28 +39,27 @@ async def start(ctx: click.Context):
tg.start_soon(conn.background_processing)
tg.start_soon(_run)
except* SlobsCliError as excgroup:
for e in excgroup.exceptions:
raise e
raisable = next(iter(excgroup.exceptions))
raise raisable
@record.command()
@click.pass_context
async def stop(ctx: click.Context):
"""Stop recording."""
conn = ctx.obj["connection"]
conn = ctx.obj['connection']
ss = StreamingService(conn)
async def _run():
model = await ss.get_model()
active = model.recording_status != "offline"
active = model.recording_status != 'offline'
if not active:
conn.close()
raise SlobsCliError("Recording is already inactive.")
raise SlobsCliError('Recording is already inactive.')
await ss.toggle_recording()
click.echo("Recording stopped.")
console.out.print('Recording stopped.')
conn.close()
@@ -67,26 +68,25 @@ async def stop(ctx: click.Context):
tg.start_soon(conn.background_processing)
tg.start_soon(_run)
except* SlobsCliError as excgroup:
for e in excgroup.exceptions:
raise e
raisable = next(iter(excgroup.exceptions))
raise raisable
@record.command()
@click.pass_context
async def status(ctx: click.Context):
"""Get recording status."""
conn = ctx.obj["connection"]
conn = ctx.obj['connection']
ss = StreamingService(conn)
async def _run():
model = await ss.get_model()
active = model.recording_status != "offline"
active = model.recording_status != 'offline'
if active:
click.echo("Recording is currently active.")
console.out.print('Recording is currently active.')
else:
click.echo("Recording is currently inactive.")
console.out.print('Recording is currently inactive.')
conn.close()
@@ -99,20 +99,18 @@ async def status(ctx: click.Context):
@click.pass_context
async def toggle(ctx: click.Context):
"""Toggle recording status."""
conn = ctx.obj["connection"]
conn = ctx.obj['connection']
ss = StreamingService(conn)
async def _run():
model = await ss.get_model()
active = model.recording_status != "offline"
active = model.recording_status != 'offline'
await ss.toggle_recording()
if active:
await ss.toggle_recording()
click.echo("Recording stopped.")
console.out.print('Recording stopped.')
else:
await ss.toggle_recording()
click.echo("Recording started.")
console.out.print('Recording started.')
conn.close()

View File

@@ -1,34 +1,36 @@
"""module for managing the replay buffer in Slobs CLI."""
import asyncclick as click
from anyio import create_task_group
from pyslobs import StreamingService
from . import console
from .cli import cli
from .errors import SlobsCliError
@cli.group()
def replaybuffer():
"""Replay buffer management commands."""
"""Manage the replay buffer in Slobs CLI."""
@replaybuffer.command()
@click.pass_context
async def start(ctx: click.Context):
"""Start the replay buffer."""
conn = ctx.obj["connection"]
conn = ctx.obj['connection']
ss = StreamingService(conn)
async def _run():
model = await ss.get_model()
active = model.replay_buffer_status != "offline"
active = model.replay_buffer_status != 'offline'
if active:
conn.close()
raise SlobsCliError("Replay buffer is already active.")
raise SlobsCliError('Replay buffer is already active.')
await ss.start_replay_buffer()
click.echo("Replay buffer started.")
console.out.print('Replay buffer started.')
conn.close()
try:
@@ -36,28 +38,27 @@ async def start(ctx: click.Context):
tg.start_soon(conn.background_processing)
tg.start_soon(_run)
except* SlobsCliError as excgroup:
for e in excgroup.exceptions:
raise e
raisable = next(iter(excgroup.exceptions))
raise raisable
@replaybuffer.command()
@click.pass_context
async def stop(ctx: click.Context):
"""Stop the replay buffer."""
conn = ctx.obj["connection"]
conn = ctx.obj['connection']
ss = StreamingService(conn)
async def _run():
model = await ss.get_model()
active = model.replay_buffer_status != "offline"
active = model.replay_buffer_status != 'offline'
if not active:
conn.close()
raise SlobsCliError("Replay buffer is already inactive.")
raise SlobsCliError('Replay buffer is already inactive.')
await ss.stop_replay_buffer()
click.echo("Replay buffer stopped.")
console.out.print('Replay buffer stopped.')
conn.close()
try:
@@ -65,25 +66,24 @@ async def stop(ctx: click.Context):
tg.start_soon(conn.background_processing)
tg.start_soon(_run)
except* SlobsCliError as excgroup:
for e in excgroup.exceptions:
raise e
raisable = next(iter(excgroup.exceptions))
raise raisable
@replaybuffer.command()
@click.pass_context
async def status(ctx: click.Context):
"""Get the current status of the replay buffer."""
conn = ctx.obj["connection"]
conn = ctx.obj['connection']
ss = StreamingService(conn)
async def _run():
model = await ss.get_model()
active = model.replay_buffer_status != "offline"
active = model.replay_buffer_status != 'offline'
if active:
click.echo("Replay buffer is currently active.")
console.out.print('Replay buffer is currently active.')
else:
click.echo("Replay buffer is currently inactive.")
console.out.print('Replay buffer is currently inactive.')
conn.close()
async with create_task_group() as tg:
@@ -95,13 +95,12 @@ async def status(ctx: click.Context):
@click.pass_context
async def save(ctx: click.Context):
"""Save the current replay buffer."""
conn = ctx.obj["connection"]
conn = ctx.obj['connection']
ss = StreamingService(conn)
async def _run():
await ss.save_replay()
click.echo("Replay buffer saved.")
console.out.print('Replay buffer saved.')
conn.close()
async with create_task_group() as tg:

View File

@@ -1,91 +1,124 @@
"""module for managing scenes in Slobs CLI."""
import asyncclick as click
from anyio import create_task_group
from pyslobs import ScenesService, TransitionsService
from pyslobs import ProtocolError, ScenesService, TransitionsService
from rich.table import Table
from rich.text import Text
from . import console, util
from .cli import cli
from .errors import SlobsCliError
from .errors import SlobsCliError, SlobsCliProtocolError
@cli.group()
def scene():
"""Scene management commands."""
"""Manage scenes in Slobs CLI."""
@scene.command()
@click.option("--id", is_flag=True, help="Include scene IDs in the output.")
@click.option('--id', is_flag=True, help='Include scene IDs in the output.')
@click.pass_context
async def list(ctx: click.Context, id: bool = False):
"""List all available scenes."""
conn = ctx.obj["connection"]
conn = ctx.obj['connection']
ss = ScenesService(conn)
async def _run():
scenes = await ss.get_scenes()
if not scenes:
click.echo("No scenes found.")
console.out.print('No scenes found.')
conn.close()
return
active_scene = await ss.active_scene()
click.echo("Available scenes:")
style = ctx.obj['style']
table = Table(
show_header=True,
header_style=style.header,
border_style=style.border,
)
if id:
columns = [
('Scene Name', 'left'),
('Active', 'center'),
('ID', 'left'),
]
else:
columns = [
('Scene Name', 'left'),
('Active', 'center'),
]
for col_name, col_justify in columns:
table.add_column(Text(col_name, justify='center'), justify=col_justify)
for scene in scenes:
if scene.id == active_scene.id:
click.echo(
f"- {click.style(scene.name, fg='green')} "
f"{f'(ID: {scene.id})' if id else ''} [Active]"
)
else:
click.echo(
f"- {click.style(scene.name, fg='blue')} "
f"{f'(ID: {scene.id})' if id else ''}"
)
to_append = [Text(scene.name, style=style.cell)]
to_append.append(
util.check_mark(ctx, scene.id == active_scene.id, empty_if_false=True)
)
if id:
to_append.append(Text(scene.id, style=style.cell))
table.add_row(*to_append)
console.out.print(table)
conn.close()
async with create_task_group() as tg:
tg.start_soon(conn.background_processing)
tg.start_soon(_run)
try:
async with create_task_group() as tg:
tg.start_soon(conn.background_processing)
tg.start_soon(_run)
except* ProtocolError as excgroup:
p_error = next(iter(excgroup.exceptions))
raisable = SlobsCliProtocolError(str(p_error))
raise raisable
@scene.command()
@click.option("--id", is_flag=True, help="Include scene IDs in the output.")
@click.option('--id', is_flag=True, help='Include scene IDs in the output.')
@click.pass_context
async def current(ctx: click.Context, id: bool = False):
"""Show the currently active scene."""
conn = ctx.obj["connection"]
conn = ctx.obj['connection']
ss = ScenesService(conn)
async def _run():
active_scene = await ss.active_scene()
click.echo(
f"Current active scene: {click.style(active_scene.name, fg='green')} "
f"{f'(ID: {active_scene.id})' if id else ''}"
console.out.print(
f'Current active scene: {console.highlight(ctx, active_scene.name)} '
f'{f"(ID: {console.highlight(ctx, active_scene.id)})" if id else ""}'
)
conn.close()
async with create_task_group() as tg:
tg.start_soon(conn.background_processing)
tg.start_soon(_run)
try:
async with create_task_group() as tg:
tg.start_soon(conn.background_processing)
tg.start_soon(_run)
except* ProtocolError as excgroup:
p_error = next(iter(excgroup.exceptions))
raisable = SlobsCliProtocolError(str(p_error))
raise raisable
@scene.command()
@click.option("--id", is_flag=True, help="Include scene IDs in the output.")
@click.argument("scene_name", type=str)
@click.option('--id', is_flag=True, help='Include scene IDs in the output.')
@click.argument('scene_name')
@click.option(
"--preview",
'--preview',
is_flag=True,
help="Switch the preview scene only.",
help='Switch the preview scene only.',
)
@click.pass_context
async def switch(
ctx: click.Context, scene_name: str, preview: bool = False, id: bool = False
):
"""Switch to a scene by its name."""
conn = ctx.obj["connection"]
conn = ctx.obj['connection']
ss = ScenesService(conn)
ts = TransitionsService(conn)
@@ -98,42 +131,49 @@ async def switch(
if model.studio_mode:
await ss.make_scene_active(scene.id)
if preview:
click.echo(
f"Switched to preview scene: {click.style(scene.name, fg='blue')} "
f"{f'(ID: {scene.id}).' if id else ''}"
console.out.print(
f'Switched to preview scene: {console.highlight(ctx, scene.name)} '
f'{f"(ID: {console.highlight(ctx, scene.id)})" if id else ""}'
)
else:
click.echo(
f"Switched to scene: {click.style(scene.name, fg='blue')} "
f"{f'(ID: {scene.id}).' if id else ''}"
console.out.print(
f'Switched to scene: {console.highlight(ctx, scene.name)} '
f'{f"(ID: {console.highlight(ctx, scene.id)})" if id else ""}'
)
await ts.execute_studio_mode_transition()
click.echo(
"Executed studio mode transition to make the scene active."
console.err.print(
console.warning(
ctx,
'Warning: You are in studio mode. The scene switch is not active yet.\n'
'use `slobs-cli studiomode force-transition` to activate the scene switch.',
)
)
else:
if preview:
conn.close()
raise SlobsCliError(
"Cannot switch the preview scene in non-studio mode."
'Cannot switch the preview scene in non-studio mode.'
)
await ss.make_scene_active(scene.id)
click.echo(
f"Switched to scene: {click.style(scene.name, fg='blue')} "
f"{f'(ID: {scene.id}).' if id else ''}"
console.out.print(
f'Switched to scene: {console.highlight(ctx, scene.name)} '
f'{f"(ID: {console.highlight(ctx, scene.id)})" if id else ""}'
)
conn.close()
break
else: # If no scene by the given name was found
conn.close()
raise SlobsCliError(f"Scene '{scene_name}' not found.")
raise SlobsCliError(f'Scene "{scene_name}" not found.')
try:
async with create_task_group() as tg:
tg.start_soon(conn.background_processing)
tg.start_soon(_run)
except* SlobsCliError as excgroup:
for e in excgroup.exceptions:
raise e
raisable = next(iter(excgroup.exceptions))
raise raisable
except* ProtocolError as excgroup:
p_error = next(iter(excgroup.exceptions))
raisable = SlobsCliProtocolError(str(p_error))
raise raisable

View File

@@ -0,0 +1,192 @@
"""module for scene collection management in SLOBS CLI."""
import asyncclick as click
from anyio import create_task_group
from pyslobs import ISceneCollectionCreateOptions, SceneCollectionsService
from rich.table import Table
from rich.text import Text
from . import console, util
from .cli import cli
from .errors import SlobsCliError
@cli.group()
def scenecollection():
"""Manage scene collections in Slobs CLI."""
@scenecollection.command()
@click.option('--id', is_flag=True, help='Include scene collection IDs in the output.')
@click.pass_context
async def list(ctx: click.Context, id: bool):
"""List all scene collections."""
conn = ctx.obj['connection']
scs = SceneCollectionsService(conn)
async def _run():
collections = await scs.collections()
if not collections:
console.out.print('No scene collections found.')
conn.close()
return
active_collection = await scs.active_collection()
style = ctx.obj['style']
table = Table(
show_header=True,
header_style=style.header,
border_style=style.border,
)
if id:
columns = [
('Scene Collection Name', 'left'),
('Active', 'center'),
('ID', 'left'),
]
else:
columns = [
('Scene Collection Name', 'left'),
('Active', 'center'),
]
for col_name, col_justify in columns:
table.add_column(Text(col_name, justify='center'), justify=col_justify)
for collection in collections:
to_append = [Text(collection.name, style=style.cell)]
to_append.append(
util.check_mark(
ctx, collection.id == active_collection.id, empty_if_false=True
)
)
if id:
to_append.append(Text(collection.id, style=style.cell))
table.add_row(*to_append)
console.out.print(table)
conn.close()
async with create_task_group() as tg:
tg.start_soon(conn.background_processing)
tg.start_soon(_run)
@scenecollection.command()
@click.argument('scenecollection_name')
@click.pass_context
async def load(ctx: click.Context, scenecollection_name: str):
"""Load a scene collection by name."""
conn = ctx.obj['connection']
scs = SceneCollectionsService(conn)
async def _run():
collections = await scs.collections()
for collection in collections:
if collection.name == scenecollection_name:
break
else: # If no collection by the given name was found
conn.close()
raise SlobsCliError(f'Scene collection "{scenecollection_name}" not found.')
await scs.load(collection.id)
console.out.print(
f'Scene collection {console.highlight(scenecollection_name)} loaded successfully.'
)
conn.close()
try:
async with create_task_group() as tg:
tg.start_soon(conn.background_processing)
tg.start_soon(_run)
except* SlobsCliError as excgroup:
raisable = next(iter(excgroup.exceptions))
raise raisable
@scenecollection.command()
@click.argument('scenecollection_name')
@click.pass_context
async def create(ctx: click.Context, scenecollection_name: str):
"""Create a new scene collection."""
conn = ctx.obj['connection']
scs = SceneCollectionsService(conn)
async def _run():
await scs.create(ISceneCollectionCreateOptions(scenecollection_name))
console.out.print(
f'Scene collection {console.highlight(scenecollection_name)} created successfully.'
)
conn.close()
async with create_task_group() as tg:
tg.start_soon(conn.background_processing)
tg.start_soon(_run)
@scenecollection.command()
@click.argument('scenecollection_name')
@click.pass_context
async def delete(ctx: click.Context, scenecollection_name: str):
"""Delete a scene collection by name."""
conn = ctx.obj['connection']
scs = SceneCollectionsService(conn)
async def _run():
collections = await scs.collections()
for collection in collections:
if collection.name == scenecollection_name:
break
else: # If no collection by the given name was found
conn.close()
raise SlobsCliError(f'Scene collection "{scenecollection_name}" not found.')
await scs.delete(collection.id)
console.out.print(
f'Scene collection {console.highlight(scenecollection_name)} deleted successfully.'
)
conn.close()
try:
async with create_task_group() as tg:
tg.start_soon(conn.background_processing)
tg.start_soon(_run)
except* SlobsCliError as excgroup:
raisable = next(iter(excgroup.exceptions))
raise raisable
@scenecollection.command()
@click.argument('scenecollection_name')
@click.argument('new_name')
@click.pass_context
async def rename(ctx: click.Context, scenecollection_name: str, new_name: str):
"""Rename a scene collection."""
conn = ctx.obj['connection']
scs = SceneCollectionsService(conn)
async def _run():
collections = await scs.collections()
for collection in collections:
if collection.name == scenecollection_name:
break
else: # If no collection by the given name was found
conn.close()
raise SlobsCliError(f'Scene collection "{scenecollection_name}" not found.')
await scs.rename(new_name, collection.id)
console.out.print(
f'Scene collection {console.highlight(scenecollection_name)} renamed to {console.highlight(new_name)}.'
)
conn.close()
try:
async with create_task_group() as tg:
tg.start_soon(conn.background_processing)
tg.start_soon(_run)
except* SlobsCliError as excgroup:
raisable = next(iter(excgroup.exceptions))
raise raisable

View File

@@ -1,34 +1,36 @@
"""module for managing the replay buffer in Slobs CLI."""
import asyncclick as click
from anyio import create_task_group
from pyslobs import StreamingService
from . import console
from .cli import cli
from .errors import SlobsCliError
@cli.group()
def stream():
"""Stream management commands."""
"""Manage streaming in Slobs CLI."""
@stream.command()
@click.pass_context
async def start(ctx: click.Context):
"""Start the stream."""
conn = ctx.obj["connection"]
conn = ctx.obj['connection']
ss = StreamingService(conn)
async def _run():
model = await ss.get_model()
active = model.streaming_status != "offline"
active = model.streaming_status != 'offline'
if active:
conn.close()
raise SlobsCliError("Stream is already active.")
raise SlobsCliError('Stream is already active.')
await ss.toggle_streaming()
click.echo("Stream started.")
console.out.print('Stream started.')
conn.close()
try:
@@ -36,28 +38,27 @@ async def start(ctx: click.Context):
tg.start_soon(conn.background_processing)
tg.start_soon(_run)
except* SlobsCliError as excgroup:
for e in excgroup.exceptions:
raise e
raisable = next(iter(excgroup.exceptions))
raise raisable
@stream.command()
@click.pass_context
async def stop(ctx: click.Context):
"""Stop the stream."""
conn = ctx.obj["connection"]
conn = ctx.obj['connection']
ss = StreamingService(conn)
async def _run():
model = await ss.get_model()
active = model.streaming_status != "offline"
active = model.streaming_status != 'offline'
if not active:
conn.close()
raise SlobsCliError("Stream is already inactive.")
raise SlobsCliError('Stream is already inactive.')
await ss.toggle_streaming()
click.echo("Stream stopped.")
console.out.print('Stream stopped.')
conn.close()
try:
@@ -65,26 +66,25 @@ async def stop(ctx: click.Context):
tg.start_soon(conn.background_processing)
tg.start_soon(_run)
except* SlobsCliError as excgroup:
for e in excgroup.exceptions:
raise e
raisable = next(iter(excgroup.exceptions))
raise raisable
@stream.command()
@click.pass_context
async def status(ctx: click.Context):
"""Get the current stream status."""
conn = ctx.obj["connection"]
conn = ctx.obj['connection']
ss = StreamingService(conn)
async def _run():
model = await ss.get_model()
active = model.streaming_status != "offline"
active = model.streaming_status != 'offline'
if active:
click.echo("Stream is currently active.")
console.out.print('Stream is currently active.')
else:
click.echo("Stream is currently inactive.")
console.out.print('Stream is currently inactive.')
conn.close()
async with create_task_group() as tg:
@@ -96,19 +96,18 @@ async def status(ctx: click.Context):
@click.pass_context
async def toggle(ctx: click.Context):
"""Toggle the stream status."""
conn = ctx.obj["connection"]
conn = ctx.obj['connection']
ss = StreamingService(conn)
async def _run():
model = await ss.get_model()
active = model.streaming_status != "offline"
active = model.streaming_status != 'offline'
await ss.toggle_streaming()
if active:
click.echo("Stream stopped.")
console.out.print('Stream stopped.')
else:
click.echo("Stream started.")
console.out.print('Stream started.')
conn.close()

View File

@@ -1,32 +1,34 @@
"""module for managing studio mode in Slobs CLI."""
import asyncclick as click
from anyio import create_task_group
from pyslobs import TransitionsService
from . import console
from .cli import cli
from .errors import SlobsCliError
@cli.group()
def studiomode():
"""Studio mode management commands."""
"""Manage studio mode in Slobs CLI."""
@studiomode.command()
@click.pass_context
async def enable(ctx: click.Context):
"""Enable studio mode."""
conn = ctx.obj["connection"]
conn = ctx.obj['connection']
ts = TransitionsService(conn)
async def _run():
model = await ts.get_model()
if model.studio_mode:
conn.close()
raise SlobsCliError("Studio mode is already enabled.")
raise SlobsCliError('Studio mode is already enabled.')
await ts.enable_studio_mode()
click.echo("Studio mode enabled successfully.")
console.out.print('Studio mode enabled successfully.')
conn.close()
try:
@@ -34,26 +36,25 @@ async def enable(ctx: click.Context):
tg.start_soon(conn.background_processing)
tg.start_soon(_run)
except* SlobsCliError as excgroup:
for e in excgroup.exceptions:
raise e
raisable = next(iter(excgroup.exceptions))
raise raisable
@studiomode.command()
@click.pass_context
async def disable(ctx: click.Context):
"""Disable studio mode."""
conn = ctx.obj["connection"]
conn = ctx.obj['connection']
ts = TransitionsService(conn)
async def _run():
model = await ts.get_model()
if not model.studio_mode:
conn.close()
raise SlobsCliError("Studio mode is already disabled.")
raise SlobsCliError('Studio mode is already disabled.')
await ts.disable_studio_mode()
click.echo("Studio mode disabled successfully.")
console.out.print('Studio mode disabled successfully.')
conn.close()
try:
@@ -61,24 +62,23 @@ async def disable(ctx: click.Context):
tg.start_soon(conn.background_processing)
tg.start_soon(_run)
except* SlobsCliError as excgroup:
for e in excgroup.exceptions:
raise e
raisable = next(iter(excgroup.exceptions))
raise raisable
@studiomode.command()
@click.pass_context
async def status(ctx: click.Context):
"""Check the status of studio mode."""
conn = ctx.obj["connection"]
conn = ctx.obj['connection']
ts = TransitionsService(conn)
async def _run():
model = await ts.get_model()
if model.studio_mode:
click.echo("Studio mode is currently enabled.")
console.out.print('Studio mode is currently enabled.')
else:
click.echo("Studio mode is currently disabled.")
console.out.print('Studio mode is currently disabled.')
conn.close()
async with create_task_group() as tg:
@@ -90,18 +90,17 @@ async def status(ctx: click.Context):
@click.pass_context
async def toggle(ctx: click.Context):
"""Toggle studio mode."""
conn = ctx.obj["connection"]
conn = ctx.obj['connection']
ts = TransitionsService(conn)
async def _run():
model = await ts.get_model()
if model.studio_mode:
await ts.disable_studio_mode()
click.echo("Studio mode disabled successfully.")
console.out.print('Studio mode disabled successfully.')
else:
await ts.enable_studio_mode()
click.echo("Studio mode enabled successfully.")
console.out.print('Studio mode enabled successfully.')
conn.close()
async with create_task_group() as tg:
@@ -113,18 +112,17 @@ async def toggle(ctx: click.Context):
@click.pass_context
async def force_transition(ctx: click.Context):
"""Force a transition in studio mode."""
conn = ctx.obj["connection"]
conn = ctx.obj['connection']
ts = TransitionsService(conn)
async def _run():
model = await ts.get_model()
if not model.studio_mode:
conn.close()
raise SlobsCliError("Studio mode is not enabled.")
raise SlobsCliError('Studio mode is not enabled.')
await ts.execute_studio_mode_transition()
click.echo("Forced studio mode transition.")
console.out.print('Forced studio mode transition.')
conn.close()
try:
@@ -132,5 +130,5 @@ async def force_transition(ctx: click.Context):
tg.start_soon(conn.background_processing)
tg.start_soon(_run)
except* SlobsCliError as excgroup:
for e in excgroup.exceptions:
raise e
raisable = next(iter(excgroup.exceptions))
raise raisable

186
src/slobs_cli/styles.py Normal file
View File

@@ -0,0 +1,186 @@
"""module containing style management for Slobs CLI."""
import os
from dataclasses import dataclass
_registry = {}
def register_style(cls):
"""Register a style class."""
key = cls.__name__.lower()
if key in _registry:
raise ValueError(f'Style {key} is already registered.')
_registry[key] = cls
return cls
@dataclass
class Style:
"""Base class for styles."""
name: str = 'no_colour'
border: str | None = None
header: str | None = None
cell: str | None = None
highlight: str | None = None
no_border: bool = False
def __post_init__(self):
"""Post-initialization to set default values and normalize the name."""
self.name = self.name.lower()
if self.no_border:
self.border = None
@register_style
@dataclass
class Red(Style):
"""Red style."""
name: str = 'red'
header: str = ''
border: str = 'dark_red'
cell: str = 'red'
highlight: str = 'red3'
@register_style
@dataclass
class Magenta(Style):
"""Magenta style."""
name: str = 'magenta'
header: str = ''
border: str = 'dark_magenta'
cell: str = 'magenta'
highlight: str = 'magenta3'
@register_style
@dataclass
class Purple(Style):
"""Purple style."""
name: str = 'purple'
header: str = ''
border: str = 'purple'
cell: str = 'medium_orchid'
highlight: str = 'medium_orchid'
@register_style
@dataclass
class Blue(Style):
"""Blue style."""
name: str = 'blue'
header: str = ''
border: str = 'dark_blue'
cell: str = 'blue'
highlight: str = 'blue3'
@register_style
@dataclass
class Cyan(Style):
"""Cyan style."""
name: str = 'cyan'
header: str = ''
border: str = 'dark_cyan'
cell: str = 'cyan'
highlight: str = 'cyan3'
@register_style
@dataclass
class Green(Style):
"""Green style."""
name: str = 'green'
header: str = ''
border: str = 'dark_green'
cell: str = 'green'
highlight: str = 'green3'
@register_style
@dataclass
class Yellow(Style):
"""Yellow style."""
name: str = 'yellow'
header: str = ''
border: str = 'yellow3'
cell: str = 'wheat1'
highlight: str = 'yellow3'
@register_style
@dataclass
class Orange(Style):
"""Orange style."""
name: str = 'orange'
header: str = ''
border: str = 'dark_orange'
cell: str = 'orange'
highlight: str = 'orange3'
@register_style
@dataclass
class White(Style):
"""White style."""
name: str = 'white'
header: str = ''
border: str = 'white'
cell: str = 'white'
highlight: str = 'white'
@register_style
@dataclass
class Grey(Style):
"""Grey style."""
name: str = 'grey'
header: str = ''
border: str = 'grey50'
cell: str = 'grey70'
highlight: str = 'grey90'
@register_style
@dataclass
class Navy(Style):
"""Navy style."""
name: str = 'navy'
header: str = ''
border: str = 'deep_sky_blue4'
cell: str = 'light_sky_blue3'
highlight: str = 'light_sky_blue3'
@register_style
@dataclass
class Black(Style):
"""Black style."""
name: str = 'black'
header: str = ''
border: str = 'black'
cell: str = 'grey30'
highlight: str = 'grey30'
def request_style_obj(style_name: str, no_border: bool) -> Style:
"""Request a style object by name."""
key = style_name.lower()
if key not in _registry:
os.environ['NO_COLOR'] = '1' # Disable colour output
return Style(no_border=no_border)
return _registry[key](no_border=no_border)

15
src/slobs_cli/util.py Normal file
View File

@@ -0,0 +1,15 @@
"""module containing utility functions for Slobs CLI."""
import os
import asyncclick as click
def check_mark(ctx: click.Context, value: bool, empty_if_false: bool = False) -> str:
"""Return a check mark or cross mark based on the boolean value."""
if empty_if_false and not value:
return ''
if os.getenv('NO_COLOR', '') != '' or ctx.obj['style'].name == 'no_colour':
return '' if value else ''
return '' if value else ''

View File

@@ -0,0 +1 @@
"""Test suite for the slobs_cli package."""

View File

@@ -1,6 +1,9 @@
"""pytest configuration for async tests using anyio."""
import pytest
@pytest.fixture
def anyio_backend():
return "asyncio"
"""Return the backend to use for async tests."""
return 'asyncio'

View File

@@ -1,3 +1,10 @@
"""Create test scenes in Streamlabs.
Usage:
Run this script as a standalone program to setup the test environment.
Requires 'SLOBS_DOMAIN' and 'SLOBS_TOKEN' environment variables to be set.
"""
import os
import anyio
@@ -6,20 +13,22 @@ from pyslobs import ConnectionConfig, ScenesService, SlobsConnection
async def setup(conn: SlobsConnection):
"""Set up test scenes in Streamlabs OBS."""
ss = ScenesService(conn)
await ss.create_scene("slobs-test-scene-1")
await ss.create_scene("slobs-test-scene-2")
await ss.create_scene("slobs-test-scene-3")
await ss.create_scene('slobs-test-scene-1')
await ss.create_scene('slobs-test-scene-2')
await ss.create_scene('slobs-test-scene-3')
conn.close()
async def main():
"""Establish connection and set up scenes."""
conn = SlobsConnection(
ConnectionConfig(
domain=os.environ["SLOBS_DOMAIN"],
domain=os.environ['SLOBS_DOMAIN'],
port=59650,
token=os.environ["SLOBS_TOKEN"],
token=os.environ['SLOBS_TOKEN'],
)
)
@@ -28,5 +37,5 @@ async def main():
tg.start_soon(setup, conn)
if __name__ == "__main__":
if __name__ == '__main__':
anyio.run(main)

View File

@@ -1,35 +1,55 @@
"""Remove test scenes in Streamlabs, disable streaming, recording, and replay buffer.
Usage:
Run this script as a standalone program to tear down the test environment.
Requires 'SLOBS_DOMAIN' and 'SLOBS_TOKEN' environment variables to be set.
"""
import os
import anyio
from anyio import create_task_group
from pyslobs import ConnectionConfig, ScenesService, SlobsConnection, StreamingService
from pyslobs import (
ConnectionConfig,
ScenesService,
SlobsConnection,
StreamingService,
TransitionsService,
)
async def cleanup(conn: SlobsConnection):
"""Clean up test scenes and ensure streaming, recording, and replay buffer are stopped."""
ss = ScenesService(conn)
scenes = await ss.get_scenes()
for scene in scenes:
if scene.name.startswith("slobs-test-scene-"):
if scene.name.startswith('slobs-test-scene-'):
await ss.remove_scene(scene.id)
ss = StreamingService(conn)
model = await ss.get_model()
if model.streaming_status != "offline":
if model.streaming_status != 'offline':
await ss.toggle_streaming()
if model.replay_buffer_status != "offline":
if model.replay_buffer_status != 'offline':
await ss.stop_replay_buffer()
if model.recording_status != "offline":
if model.recording_status != 'offline':
await ss.toggle_recording()
ts = TransitionsService(conn)
model = await ts.get_model()
if model.studio_mode:
await ts.disable_studio_mode()
conn.close()
async def main():
"""Establish connection and clean up test scenes."""
conn = SlobsConnection(
ConnectionConfig(
domain=os.environ["SLOBS_DOMAIN"],
domain=os.environ['SLOBS_DOMAIN'],
port=59650,
token=os.environ["SLOBS_TOKEN"],
token=os.environ['SLOBS_TOKEN'],
)
)
@@ -38,5 +58,5 @@ async def main():
tg.start_soon(cleanup, conn)
if __name__ == "__main__":
if __name__ == '__main__':
anyio.run(main)

43
tests/test_audio.py Normal file
View File

@@ -0,0 +1,43 @@
"""Test cases for audio commands in slobs_cli."""
import pytest
from asyncclick.testing import CliRunner
from slobs_cli import cli
@pytest.mark.anyio
async def test_audio_list():
"""Test the list audio sources command."""
runner = CliRunner()
result = await runner.invoke(cli, ['audio', 'list'])
assert result.exit_code == 0
assert 'Desktop Audio' in result.output
assert 'Mic/Aux' in result.output
@pytest.mark.anyio
async def test_audio_mute():
"""Test the mute audio source command."""
runner = CliRunner()
result = await runner.invoke(cli, ['audio', 'mute', 'Mic/Aux'])
assert result.exit_code == 0
assert 'Mic/Aux muted successfully' in result.output
@pytest.mark.anyio
async def test_audio_unmute():
"""Test the unmute audio source command."""
runner = CliRunner()
result = await runner.invoke(cli, ['audio', 'unmute', 'Mic/Aux'])
assert result.exit_code == 0
assert 'Mic/Aux unmuted successfully' in result.output
@pytest.mark.anyio
async def test_audio_invalid_source():
"""Test handling of invalid audio source."""
runner = CliRunner()
result = await runner.invoke(cli, ['audio', 'mute', 'InvalidSource'])
assert result.exit_code != 0
assert 'Audio source "InvalidSource" not found' in result.output

View File

@@ -1,3 +1,5 @@
"""Test cases for the recording commands of the slobs_cli CLI application."""
import anyio
import pytest
from asyncclick.testing import CliRunner
@@ -7,33 +9,35 @@ from slobs_cli import cli
@pytest.mark.anyio
async def test_record_start():
"""Test the start recording command."""
runner = CliRunner()
result = await runner.invoke(cli, ["record", "status"])
result = await runner.invoke(cli, ['record', 'status'])
assert result.exit_code == 0
active = "Recording is currently active." in result.output
active = 'Recording is currently active.' in result.output
result = await runner.invoke(cli, ["record", "start"])
result = await runner.invoke(cli, ['record', 'start'])
if not active:
assert result.exit_code == 0
assert "Recording started" in result.output
assert 'Recording started' in result.output
await anyio.sleep(0.2) # Allow some time for the recording to start
else:
assert result.exit_code != 0
assert "Recording is already active." in result.output
assert 'Recording is already active.' in result.output
@pytest.mark.anyio
async def test_record_stop():
"""Test the stop recording command."""
runner = CliRunner()
result = await runner.invoke(cli, ["record", "status"])
result = await runner.invoke(cli, ['record', 'status'])
assert result.exit_code == 0
active = "Recording is currently active." in result.output
active = 'Recording is currently active.' in result.output
result = await runner.invoke(cli, ["record", "stop"])
result = await runner.invoke(cli, ['record', 'stop'])
if active:
assert result.exit_code == 0
assert "Recording stopped" in result.output
assert 'Recording stopped' in result.output
await anyio.sleep(0.2) # Allow some time for the recording to stop
else:
assert result.exit_code != 0
assert "Recording is already inactive." in result.output
assert 'Recording is already inactive.' in result.output

View File

@@ -1,3 +1,5 @@
"""Test cases for the replay buffer commands in slobs_cli."""
import anyio
import pytest
from asyncclick.testing import CliRunner
@@ -7,33 +9,35 @@ from slobs_cli import cli
@pytest.mark.anyio
async def test_replaybuffer_start():
"""Test the start replay buffer command."""
runner = CliRunner()
result = await runner.invoke(cli, ["replaybuffer", "status"])
result = await runner.invoke(cli, ['replaybuffer', 'status'])
assert result.exit_code == 0
active = "Replay buffer is currently active." in result.output
active = 'Replay buffer is currently active.' in result.output
result = await runner.invoke(cli, ["replaybuffer", "start"])
result = await runner.invoke(cli, ['replaybuffer', 'start'])
if not active:
assert result.exit_code == 0
assert "Replay buffer started" in result.output
assert 'Replay buffer started' in result.output
await anyio.sleep(0.2) # Allow some time for the replay buffer to start
else:
assert result.exit_code != 0
assert "Replay buffer is already active." in result.output
assert 'Replay buffer is already active.' in result.output
@pytest.mark.anyio
async def test_replaybuffer_stop():
"""Test the stop replay buffer command."""
runner = CliRunner()
result = await runner.invoke(cli, ["replaybuffer", "status"])
result = await runner.invoke(cli, ['replaybuffer', 'status'])
assert result.exit_code == 0
active = "Replay buffer is currently active." in result.output
active = 'Replay buffer is currently active.' in result.output
result = await runner.invoke(cli, ["replaybuffer", "stop"])
result = await runner.invoke(cli, ['replaybuffer', 'stop'])
if active:
assert result.exit_code == 0
assert "Replay buffer stopped" in result.output
assert 'Replay buffer stopped' in result.output
await anyio.sleep(0.2) # Allow some time for the replay buffer to stop
else:
assert result.exit_code != 0
assert "Replay buffer is already inactive." in result.output
assert 'Replay buffer is already inactive.' in result.output

View File

@@ -1,3 +1,6 @@
"""Test cases for scene commands in slobs_cli."""
import anyio
import pytest
from asyncclick.testing import CliRunner
@@ -6,20 +9,35 @@ from slobs_cli import cli
@pytest.mark.anyio
async def test_scene_list():
"""Test the list scenes command."""
runner = CliRunner()
result = await runner.invoke(cli, ["scene", "list"])
result = await runner.invoke(cli, ['scene', 'list'])
assert result.exit_code == 0
assert "slobs-test-scene-1" in result.output
assert "slobs-test-scene-2" in result.output
assert "slobs-test-scene-3" in result.output
assert 'slobs-test-scene-1' in result.output
assert 'slobs-test-scene-2' in result.output
assert 'slobs-test-scene-3' in result.output
await anyio.sleep(0.2) # Avoid rate limiting issues
@pytest.mark.anyio
async def test_scene_current():
"""Test the current scene command."""
runner = CliRunner()
result = await runner.invoke(cli, ["scene", "switch", "slobs-test-scene-2"])
result = await runner.invoke(cli, ['scene', 'switch', 'slobs-test-scene-2'])
assert result.exit_code == 0
await anyio.sleep(0.2) # Avoid rate limiting issues
result = await runner.invoke(cli, ["scene", "current"])
result = await runner.invoke(cli, ['scene', 'current'])
assert result.exit_code == 0
assert "Current active scene: slobs-test-scene-2" in result.output
assert 'Current active scene: slobs-test-scene-2' in result.output
await anyio.sleep(0.2) # Avoid rate limiting issues
@pytest.mark.anyio
async def test_scene_invalid_switch():
"""Test switching to an invalid scene."""
runner = CliRunner()
result = await runner.invoke(cli, ['scene', 'switch', 'invalid-scene'])
assert result.exit_code != 0
assert 'Scene "invalid-scene" not found' in result.output
await anyio.sleep(0.2) # Avoid rate limiting issues

View File

@@ -1,3 +1,5 @@
"""Tests for the stream commands in slobs_cli."""
import anyio
import pytest
from asyncclick.testing import CliRunner
@@ -7,33 +9,35 @@ from slobs_cli import cli
@pytest.mark.anyio
async def test_stream_start():
"""Test the start stream command."""
runner = CliRunner()
result = await runner.invoke(cli, ["stream", "status"])
result = await runner.invoke(cli, ['stream', 'status'])
assert result.exit_code == 0
active = "Stream is currently active." in result.output
active = 'Stream is currently active.' in result.output
result = await runner.invoke(cli, ["stream", "start"])
result = await runner.invoke(cli, ['stream', 'start'])
if not active:
assert result.exit_code == 0
assert "Stream started" in result.output
assert 'Stream started' in result.output
await anyio.sleep(0.2) # Allow some time for the stream to start
else:
assert result.exit_code != 0
assert "Stream is already active." in result.output
assert 'Stream is already active.' in result.output
@pytest.mark.anyio
async def test_stream_stop():
"""Test the stop stream command."""
runner = CliRunner()
result = await runner.invoke(cli, ["stream", "status"])
result = await runner.invoke(cli, ['stream', 'status'])
assert result.exit_code == 0
active = "Stream is currently active." in result.output
active = 'Stream is currently active.' in result.output
result = await runner.invoke(cli, ["stream", "stop"])
result = await runner.invoke(cli, ['stream', 'stop'])
if active:
assert result.exit_code == 0
assert "Stream stopped" in result.output
assert 'Stream stopped' in result.output
await anyio.sleep(0.2) # Allow some time for the stream to stop
else:
assert result.exit_code != 0
assert "Stream is already inactive." in result.output
assert 'Stream is already inactive.' in result.output

60
tests/test_studiomode.py Normal file
View File

@@ -0,0 +1,60 @@
"""Test cases for the studio mode commands of the slobs_cli CLI application."""
import pytest
from asyncclick.testing import CliRunner
from slobs_cli import cli
@pytest.mark.anyio
async def test_studiomode_enable():
"""Test the enable studio mode command."""
runner = CliRunner()
result = await runner.invoke(cli, ['studiomode', 'status'])
assert result.exit_code == 0
active = 'Studio mode is currently enabled.' in result.output
result = await runner.invoke(cli, ['studiomode', 'enable'])
if active:
assert result.exit_code != 0
assert 'Studio mode is already enabled.' in result.output
else:
assert result.exit_code == 0
assert 'Studio mode enabled successfully.' in result.output
@pytest.mark.anyio
async def test_studiomode_disable():
"""Test the disable studio mode command."""
runner = CliRunner()
result = await runner.invoke(cli, ['studiomode', 'status'])
assert result.exit_code == 0
active = 'Studio mode is currently enabled.' in result.output
result = await runner.invoke(cli, ['studiomode', 'disable'])
if not active:
assert result.exit_code != 0
assert 'Studio mode is already disabled.' in result.output
else:
assert result.exit_code == 0
assert 'Studio mode disabled successfully.' in result.output
@pytest.mark.anyio
async def test_studiomode_toggle():
"""Test the toggle studio mode command."""
runner = CliRunner()
result = await runner.invoke(cli, ['studiomode', 'status'])
assert result.exit_code == 0
active = 'Studio mode is currently enabled.' in result.output
result = await runner.invoke(cli, ['studiomode', 'toggle'])
if active:
assert result.exit_code == 0
assert 'Studio mode disabled successfully.' in result.output
else:
assert result.exit_code == 0
assert 'Studio mode enabled successfully.' in result.output