52 Commits

Author SHA1 Message Date
4cd420dc44 fix workflow dir name 2025-06-27 12:13:05 +01:00
06c9fa236d add release workflow 2025-06-27 12:09:05 +01:00
6490b5aa54 move style section and add imgs 2025-06-27 12:05:53 +01:00
f7345155f1 disable --help wrapping, realign the text
add --help shortname -h

patch bump
2025-06-27 12:05:25 +01:00
1c2d1abb2a add --style validation
add Disabled class to style registry

patch bump
2025-06-22 12:36:29 +01:00
fe3a975ba3 leave it up to rich... if no style is set we still get colour on stderr, but this can be overriden with NO_COLOR manually
patch bump
2025-06-22 11:33:52 +01:00
d8622ab037 set default values for no_colour style to 'none'.
This fixes rich markup error

ensure errors are written without colour if NO_COLOR is set
2025-06-22 10:49:05 +01:00
10cb9777fa add 0.11.0 to CHANGELOG 2025-06-22 02:57:31 +01:00
c48f7a49ac add Style section to README 2025-06-22 02:54:42 +01:00
b14d9b7610 ensure studio mode is disabled on test cleanup 2025-06-22 02:54:08 +01:00
864751ecc9 remove terminaltables3 dep, add rich 2025-06-22 02:53:46 +01:00
c02ffac403 replace terminaltables with rich tables.
allow rich to handle all console output.

util.check_mark is now used to pass back colourless check/cross marks if NO_COLOR is set or --style/SLOBS_STYLE was not set.
2025-06-22 02:52:27 +01:00
6bcdd8391c add --style and --no-border flags to root command 2025-06-22 02:39:28 +01:00
b0d311dad9 define styles 2025-06-22 02:38:56 +01:00
46159a0ca4 add missing for-else comment 2025-06-15 08:40:36 +01:00
23d3118e6a keep names consistent 2025-06-13 14:30:50 +01:00
c369f4e3d5 add 0.10.0 to CHANGELOG 2025-06-13 14:19:22 +01:00
129c3f57f2 minor bump 2025-06-13 14:14:16 +01:00
42519ba294 slow down the scene tests in an attempt to avoid rate limiting issues 2025-06-13 14:11:16 +01:00
09a44b2dea add SlobsCliProtocolError for wrapping ProtocolError
handle ProtocolError(s) and reraise as SlobsCliProtocolError. This has the following benefits:
A user friendly error message
A non-zero exit code
2025-06-13 14:10:54 +01:00
f4421b3351 call next(iter()) on excgroup.exceptions to convery intent a little better
(we dont intend to iterate through them, we just want to raise the first one)
2025-06-13 14:04:05 +01:00
f282f86e72 make test a composite command so we can avoid failing fast 2025-06-13 05:46:30 +01:00
81d0072148 revert default to localhost.
the slow resolution was due to Docker desktop...

patch bump
2025-06-13 04:34:15 +01:00
f3c94d1dee add comments to for-else blocks 2025-06-13 00:52:27 +01:00
3c09f9fd5b upd CHANGELOG 2025-06-12 23:47:35 +01:00
51d49c9c93 add status to audio section 2025-06-12 23:47:23 +01:00
bc43c8483a add audio unit tests
add audio status command

patch bump
2025-06-12 23:47:07 +01:00
57e31a7e49 arguments appear to be required by default. 2025-06-12 23:25:07 +01:00
db6a9b5e84 add studiomode unit tests 2025-06-12 23:10:01 +01:00
03f1dac8ea rename leftmost column heading for audio list
patch bump
2025-06-12 22:17:48 +01:00
535b22bf8e add 0.9.0 to CHANGELOG 2025-06-12 22:13:58 +01:00
6e95e4d670 add scenecollection command group
minor bump
2025-06-12 22:13:46 +01:00
519db1b46e add Scene Collection to README 2025-06-12 22:04:15 +01:00
582587bed5 add ruff config
run files through formatter

add dosctrings to satisfy the linter
2025-06-12 20:34:14 +01:00
fecd13d345 patch bump 2025-06-12 18:50:43 +01:00
a1a22d0d00 scene list and audio list now print tables
patch bump
2025-06-12 18:49:54 +01:00
2a18b94b11 add 0.8.3 to CHANGELOG
pumb pyslobs dep

patch bump
2025-06-12 13:47:18 +01:00
409aa2333f upd scene/audio commands 2025-06-12 13:46:32 +01:00
80da58bd6f add --id option, this allows a user to optionally show the source ids 2025-06-12 13:45:38 +01:00
0c5bbc114f fix regression
patch bump
2025-06-12 05:42:30 +01:00
564f4116d1 remove exception handling, we no longer raise exception on empty sources list.
split long line

patch bump
2025-06-12 05:34:36 +01:00
48201e4bbb add 0.8.0 to CHANGELOG 2025-06-12 05:29:48 +01:00
00273ff461 upd tests to check exit_code and err output instead of exceptions 2025-06-12 05:29:22 +01:00
d33c209d7c add custom error class SlobsCliError
add exception group handling for all commands that may raise exceptions

add comments to for-else blocks
2025-06-12 05:28:54 +01:00
a8bed0f4d9 swap localhost for 127.0.0.1 in examples 2025-06-11 19:02:29 +01:00
51923dc8a8 ensure tox tests run setup/teardown scripts
bump min python version to 3.11 (because of ExceptionGroup in tests)

patch bump
2025-06-11 16:37:02 +01:00
37364e7243 rename model variables
patch bump
2025-06-11 16:34:36 +01:00
377a9df824 add pre_test script
ensure teardown removes the test scenes
2025-06-11 13:34:31 +01:00
e7a561c7b4 add --version/-v flag 2025-06-11 02:09:32 +01:00
cb0a87df68 fix readme commands link 2025-06-11 01:42:05 +01:00
ed08d7eff2 Create LICENSE 2025-06-11 01:32:24 +01:00
4ab2706475 add special thanks 2025-06-11 00:33:51 +01:00
35 changed files with 1622 additions and 426 deletions

26
.github/workflows/release.yml vendored Normal file
View File

@@ -0,0 +1,26 @@
name: Release
on:
release:
types: [published]
push:
tags:
- 'v*.*.*'
jobs:
pypi-publish:
name: upload release to PyPI
runs-on: ubuntu-latest
environment: pypi
permissions:
# This permission is needed for private repositories.
contents: read
# IMPORTANT: this permission is mandatory for trusted publishing
id-token: write
steps:
- uses: actions/checkout@v4
- uses: pdm-project/setup-pdm@v4
- name: Publish package distributions to PyPI
run: pdm publish

View File

@@ -5,6 +5,57 @@ All notable changes to this project will be documented in this file.
The format is based on [Keep a Changelog](https://keepachangelog.com/en/1.0.0/),
and this project adheres to [Semantic Versioning](https://semver.org/spec/v2.0.0.html).
# [0.11.0] - 2025-06-22
### Added
- Various colouring styles, see [Style](https://github.com/onyx-and-iris/slobs-cli/tree/main?tab=readme-ov-file#style)
- colouring is applied to list tables as well as highlighted information in stdout/stderr output.
- table border styling may be optionally disabled with the --no-border flag.
# [0.10.0] - 2025-06-13
### Changed
- scene commands are prone to raise ProtocolErrors if called too quickly in succession. To make these errors a little more user friendly the following changes have been made:
- They print error messages to stderr
- They return exit code 2
# [0.9.0] - 2025-06-12
### Added
- scenecollection command group, see [Scene Collection](https://github.com/onyx-and-iris/slobs-cli/tree/main?tab=readme-ov-file#scene-collection)
- add audio status commmand.
# [0.8.4] - 2025-06-12
### Added
- custom error class SlobsCliError.
- scene switch command: if the --preview flag is not passed and we're in studio mode then write a message to the user to indicate the transition is being forced.
### Changed
- scene list now shows which scene is the current active scene.
- audio list now shows mute states.
- scene list and audio list commands now print as tables
- --id option added to scene and audio commands to show the source ID in the output
- by default this is no longer displayed
- erroneous paths now print error messages and return 1 instead of raising exceptions.
- unit tests updated to reflect the changes.
# [0.7.6] - 2025-06-11
### Added
- --version/-v command.
### Changed
- --token/-t is now required.
# [0.7.3] - 2025-06-10
### Added

21
LICENSE Normal file
View File

@@ -0,0 +1,21 @@
MIT License
Copyright (c) 2025 Onyx and Iris
Permission is hereby granted, free of charge, to any person obtaining a copy
of this software and associated documentation files (the "Software"), to deal
in the Software without restriction, including without limitation the rights
to use, copy, modify, merge, publish, distribute, sublicense, and/or sell
copies of the Software, and to permit persons to whom the Software is
furnished to do so, subject to the following conditions:
The above copyright notice and this permission notice shall be included in all
copies or substantial portions of the Software.
THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR
IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY,
FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE
AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER
LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM,
OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE
SOFTWARE.

View File

@@ -14,12 +14,13 @@ For an outline of past/future changes refer to: [CHANGELOG](CHANGELOG.md)
- [Installation](#installation)
- [Configuration](#configuration)
- [Commands](#root-typer)
- [Style](#style)
- [Commands](#commands)
- [License](#license)
## Requirements
- Python 3.10 or greater
- Python 3.11 or greater
- [Streamlabs Desktop][sl-desktop]
- A websocket token: Settings > Remote Control > API Token
@@ -46,6 +47,7 @@ The CLI should now be discoverable as `slobs-cli`
- --domain/-d: Streamlabs client domain
- --port/-p Streamlabs client port
- --token/-t: API Token
- --version/-v: Print the slobs-cli version
Pass `--domain`, `--port` and `--token` as flags on the root command, for example:
@@ -67,17 +69,49 @@ Flags can be used to override environment variables.
[sl-desktop]: https://streamlabs.com/streamlabs-live-streaming-software?srsltid=AfmBOopnswGBgEyvVSi2DIc_vsGovKn2HQZyLw1Cg6LEo51OJhONXnAX
## Style
Styling is opt-in, by default you will get a colourless output:
![colourless](./img/colourless.png)
You may enable styling with the --style/-s flag:
```console
slobs-cli --style="yellow" audio list
```
Available styles: _red, magenta, purple, blue, cyan, green, yellow, orange, white, grey, navy, black_
![coloured](./img/coloured-border.png)
Optionally you may disable the border colouring with the --no-border flag:
![coloured-no-border](./img/coloured-no-border.png)
```console
slobs-cli --style="yellow' --no-border audio list
```
## Commands
#### Scene
- list: List all available scenes.
- flags:
*optional*
- --id: Include scene IDs in the output.
```console
slobs-cli scene list
```
- current: Show the currently active scene.
- flags:
*optional*
- --id: Include scene IDs in the output.
```console
slobs-cli scene current
@@ -87,7 +121,8 @@ slobs-cli scene current
- flags:
*optional*
- --preview: Switch the preview scene only.
- --id: Include scene IDs in the output.
- --preview: Switch the preview scene.
- args: <scene_name>
```console
@@ -149,6 +184,10 @@ slobs-cli record toggle
#### Audio
- list: List all audio sources.
- flags:
*optional*
- --id: Include audio source IDs in the output.
```console
slobs-cli audio list
@@ -175,6 +214,12 @@ slobs-cli audio unmute "Mic/Aux"
slobs-cli audio toggle "Mic/Aux"
```
- status: Get the mute status of an audio source by name.
```console
slobs-cli audio status "Mic/Aux"
```
#### Replay Buffer
- start: Start the replay buffer.
@@ -232,3 +277,51 @@ slobs-cli studiomode status
```console
slobs-cli studiomode force-transition
```
#### Scene Collection
- list: List all scene collections.
- flags:
*optional*
- --id: Include scenecollection IDs in the output.
```console
slobs-cli scenecollection list
```
- create: Create a new scene collection.
- args: <scenecollection_name>
```console
slobs-cli scenecollection create "NewCollection"
```
- delete: Delete a scene collection by name.
- args: <scenecollection_name>
```console
slobs-cli scenecollection delete "ExistingCollection"
```
- load: Load a scene collection by name.
- args: <scenecollection_name>
```console
slobs-cli scenecollection load "ExistingCollection"
```
- rename: Rename a scene collection.
- args: <scenecollection_name> <new_name>
```console
slobs-cli scenecollection rename "ExistingCollection" "NewName"
```
## Special Thanks
- [Julian-0](https://github.com/Julian-O) For writing the [PySLOBS wrapper](https://github.com/Julian-O/PySLOBS) on which this CLI depends.
## License
`slobs-cli` is distributed under the terms of the [GPL-3.0-or-later](https://spdx.org/licenses/GPL-3.0-or-later.html) license.

BIN
img/coloured-border.png Executable file

Binary file not shown.

After

Width:  |  Height:  |  Size: 4.4 KiB

BIN
img/coloured-no-border.png Executable file

Binary file not shown.

After

Width:  |  Height:  |  Size: 4.4 KiB

BIN
img/colourless.png Executable file

Binary file not shown.

After

Width:  |  Height:  |  Size: 3.7 KiB

178
pdm.lock generated
View File

@@ -5,10 +5,10 @@
groups = ["default", "dev"]
strategy = ["inherit_metadata"]
lock_version = "4.5.0"
content_hash = "sha256:b032bb4d22d3d3b10233c543cd182ac8e7ec052aa9dc03a3034a967066e85db2"
content_hash = "sha256:9be0832aae27a9da3f885900d367836aa05a12e9c0459d751c319d1bd329c33c"
[[metadata.targets]]
requires_python = ">=3.10"
requires_python = ">=3.11"
[[package]]
name = "anyio"
@@ -45,13 +45,13 @@ files = [
[[package]]
name = "cachetools"
version = "6.0.0"
version = "6.1.0"
requires_python = ">=3.9"
summary = "Extensible memoizing collections and decorators"
groups = ["dev"]
files = [
{file = "cachetools-6.0.0-py3-none-any.whl", hash = "sha256:82e73ba88f7b30228b5507dce1a1f878498fc669d972aef2dde4f3a3c24f103e"},
{file = "cachetools-6.0.0.tar.gz", hash = "sha256:f225782b84438f828328fc2ad74346522f27e5b1440f4e9fd18b20ebfd1aa2cf"},
{file = "cachetools-6.1.0-py3-none-any.whl", hash = "sha256:1c7bb3cf9193deaf3508b7c5f2a79986c13ea38965c5adcff1f84519cf39163e"},
{file = "cachetools-6.1.0.tar.gz", hash = "sha256:b4c4f404392848db3ce7aac34950d17be4d864da4b8b66911008e430bc544587"},
]
[[package]]
@@ -86,21 +86,6 @@ files = [
{file = "distlib-0.3.9.tar.gz", hash = "sha256:a60f20dea646b8a33f3e7772f74dc0b2d0772d2837ee1342a00645c81edf9403"},
]
[[package]]
name = "exceptiongroup"
version = "1.3.0"
requires_python = ">=3.7"
summary = "Backport of PEP 654 (exception groups)"
groups = ["default", "dev"]
marker = "python_version < \"3.11\""
dependencies = [
"typing-extensions>=4.6.0; python_version < \"3.13\"",
]
files = [
{file = "exceptiongroup-1.3.0-py3-none-any.whl", hash = "sha256:4d111e6e0c13d0644cad6ddaa7ed0261a0b36971f6d23e7ec9b4b9097da78a10"},
{file = "exceptiongroup-1.3.0.tar.gz", hash = "sha256:b241f5885f560bc56a59ee63ca4c6a8bfa46ae4ad651af316d4e81817bb9fd88"},
]
[[package]]
name = "filelock"
version = "3.18.0"
@@ -134,6 +119,31 @@ files = [
{file = "iniconfig-2.1.0.tar.gz", hash = "sha256:3abbd2e30b36733fee78f9c7f7308f2d0050e88f0087fd25c2645f63c773e1c7"},
]
[[package]]
name = "markdown-it-py"
version = "3.0.0"
requires_python = ">=3.8"
summary = "Python port of markdown-it. Markdown parsing, done right!"
groups = ["default"]
dependencies = [
"mdurl~=0.1",
]
files = [
{file = "markdown-it-py-3.0.0.tar.gz", hash = "sha256:e3f60a94fa066dc52ec76661e37c851cb232d92f9886b15cb560aaada2df8feb"},
{file = "markdown_it_py-3.0.0-py3-none-any.whl", hash = "sha256:355216845c60bd96232cd8d8c40e8f9765cc86f46880e43a8fd22dc1a1a8cab1"},
]
[[package]]
name = "mdurl"
version = "0.1.2"
requires_python = ">=3.7"
summary = "Markdown URL utilities"
groups = ["default"]
files = [
{file = "mdurl-0.1.2-py3-none-any.whl", hash = "sha256:84008a41e51615a49fc9966191ff91509e3c40b939176e643fd50a5c2196b8f8"},
{file = "mdurl-0.1.2.tar.gz", hash = "sha256:bb413d29f5eea38f31dd4754dd7377d4465116fb207585f97bf925588687c1ba"},
]
[[package]]
name = "packaging"
version = "25.0"
@@ -180,13 +190,13 @@ files = [
[[package]]
name = "pygments"
version = "2.19.1"
version = "2.19.2"
requires_python = ">=3.8"
summary = "Pygments is a syntax highlighting package written in Python."
groups = ["dev"]
groups = ["default", "dev"]
files = [
{file = "pygments-2.19.1-py3-none-any.whl", hash = "sha256:9ea1544ad55cecf4b8242fab6dd35a93bbce657034b0611ee383099054ab6d8c"},
{file = "pygments-2.19.1.tar.gz", hash = "sha256:61c16d2a8576dc0649d9f39e089b5f02bcd27fba10d8fb4dcc28173f7a45151f"},
{file = "pygments-2.19.2-py3-none-any.whl", hash = "sha256:86540386c03d588bb81d44bc3928634ff26449851e99741617ecb9037ee5ec0b"},
{file = "pygments-2.19.2.tar.gz", hash = "sha256:636cb2477cec7f8952536970bc533bc43743542f70392ae026374600add5b887"},
]
[[package]]
@@ -206,7 +216,7 @@ files = [
[[package]]
name = "pyslobs"
version = "2.0.4"
version = "2.0.5"
requires_python = ">=3.9"
summary = "Python wrapper to StreamLabs Desktop API"
groups = ["default"]
@@ -214,13 +224,13 @@ dependencies = [
"websocket-client",
]
files = [
{file = "PySLOBS-2.0.4-py3-none-any.whl", hash = "sha256:c9cc8cd3f5f22800f23589ae0607d78d8c4257c7c6a9770d651c63a76f512784"},
{file = "pyslobs-2.0.4.tar.gz", hash = "sha256:a1e855b62cf4dd1208131fd58d925022dde8f057e33a0cb4933fd55efee876c7"},
{file = "pyslobs-2.0.5-py3-none-any.whl", hash = "sha256:b33d774dda484ffe48e63592a3cdf66c79f77d04947a6e8c8b6da58db13ab156"},
{file = "pyslobs-2.0.5.tar.gz", hash = "sha256:53cb083cbe71e37f3604ad0e5ca9712b3f26a29824ce0d437fd66e3d7937fee6"},
]
[[package]]
name = "pytest"
version = "8.4.0"
version = "8.4.1"
requires_python = ">=3.9"
summary = "pytest: simple powerful testing with Python"
groups = ["dev"]
@@ -234,8 +244,66 @@ dependencies = [
"tomli>=1; python_version < \"3.11\"",
]
files = [
{file = "pytest-8.4.0-py3-none-any.whl", hash = "sha256:f40f825768ad76c0977cbacdf1fd37c6f7a468e460ea6a0636078f8972d4517e"},
{file = "pytest-8.4.0.tar.gz", hash = "sha256:14d920b48472ea0dbf68e45b96cd1ffda4705f33307dcc86c676c1b5104838a6"},
{file = "pytest-8.4.1-py3-none-any.whl", hash = "sha256:539c70ba6fcead8e78eebbf1115e8b589e7565830d7d006a8723f19ac8a0afb7"},
{file = "pytest-8.4.1.tar.gz", hash = "sha256:7c67fd69174877359ed9371ec3af8a3d2b04741818c51e5e99cc1742251fa93c"},
]
[[package]]
name = "pytest-randomly"
version = "3.16.0"
requires_python = ">=3.9"
summary = "Pytest plugin to randomly order tests and control random.seed."
groups = ["dev"]
dependencies = [
"importlib-metadata>=3.6; python_version < \"3.10\"",
"pytest",
]
files = [
{file = "pytest_randomly-3.16.0-py3-none-any.whl", hash = "sha256:8633d332635a1a0983d3bba19342196807f6afb17c3eef78e02c2f85dade45d6"},
{file = "pytest_randomly-3.16.0.tar.gz", hash = "sha256:11bf4d23a26484de7860d82f726c0629837cf4064b79157bd18ec9d41d7feb26"},
]
[[package]]
name = "rich"
version = "14.0.0"
requires_python = ">=3.8.0"
summary = "Render rich text, tables, progress bars, syntax highlighting, markdown and more to the terminal"
groups = ["default"]
dependencies = [
"markdown-it-py>=2.2.0",
"pygments<3.0.0,>=2.13.0",
"typing-extensions<5.0,>=4.0.0; python_version < \"3.11\"",
]
files = [
{file = "rich-14.0.0-py3-none-any.whl", hash = "sha256:1c9491e1951aac09caffd42f448ee3d04e58923ffe14993f6e83068dc395d7e0"},
{file = "rich-14.0.0.tar.gz", hash = "sha256:82f1bc23a6a21ebca4ae0c45af9bdbc492ed20231dcb63f297d6d1021a9d5725"},
]
[[package]]
name = "ruff"
version = "0.12.1"
requires_python = ">=3.7"
summary = "An extremely fast Python linter and code formatter, written in Rust."
groups = ["dev"]
files = [
{file = "ruff-0.12.1-py3-none-linux_armv6l.whl", hash = "sha256:6013a46d865111e2edb71ad692fbb8262e6c172587a57c0669332a449384a36b"},
{file = "ruff-0.12.1-py3-none-macosx_10_12_x86_64.whl", hash = "sha256:b3f75a19e03a4b0757d1412edb7f27cffb0c700365e9d6b60bc1b68d35bc89e0"},
{file = "ruff-0.12.1-py3-none-macosx_11_0_arm64.whl", hash = "sha256:9a256522893cb7e92bb1e1153283927f842dea2e48619c803243dccc8437b8be"},
{file = "ruff-0.12.1-py3-none-manylinux_2_17_aarch64.manylinux2014_aarch64.whl", hash = "sha256:069052605fe74c765a5b4272eb89880e0ff7a31e6c0dbf8767203c1fbd31c7ff"},
{file = "ruff-0.12.1-py3-none-manylinux_2_17_armv7l.manylinux2014_armv7l.whl", hash = "sha256:a684f125a4fec2d5a6501a466be3841113ba6847827be4573fddf8308b83477d"},
{file = "ruff-0.12.1-py3-none-manylinux_2_17_i686.manylinux2014_i686.whl", hash = "sha256:bdecdef753bf1e95797593007569d8e1697a54fca843d78f6862f7dc279e23bd"},
{file = "ruff-0.12.1-py3-none-manylinux_2_17_ppc64.manylinux2014_ppc64.whl", hash = "sha256:70d52a058c0e7b88b602f575d23596e89bd7d8196437a4148381a3f73fcd5010"},
{file = "ruff-0.12.1-py3-none-manylinux_2_17_ppc64le.manylinux2014_ppc64le.whl", hash = "sha256:84d0a69d1e8d716dfeab22d8d5e7c786b73f2106429a933cee51d7b09f861d4e"},
{file = "ruff-0.12.1-py3-none-manylinux_2_17_s390x.manylinux2014_s390x.whl", hash = "sha256:6cc32e863adcf9e71690248607ccdf25252eeeab5193768e6873b901fd441fed"},
{file = "ruff-0.12.1-py3-none-manylinux_2_17_x86_64.manylinux2014_x86_64.whl", hash = "sha256:7fd49a4619f90d5afc65cf42e07b6ae98bb454fd5029d03b306bd9e2273d44cc"},
{file = "ruff-0.12.1-py3-none-musllinux_1_2_aarch64.whl", hash = "sha256:ed5af6aaaea20710e77698e2055b9ff9b3494891e1b24d26c07055459bb717e9"},
{file = "ruff-0.12.1-py3-none-musllinux_1_2_armv7l.whl", hash = "sha256:801d626de15e6bf988fbe7ce59b303a914ff9c616d5866f8c79eb5012720ae13"},
{file = "ruff-0.12.1-py3-none-musllinux_1_2_i686.whl", hash = "sha256:2be9d32a147f98a1972c1e4df9a6956d612ca5f5578536814372113d09a27a6c"},
{file = "ruff-0.12.1-py3-none-musllinux_1_2_x86_64.whl", hash = "sha256:49b7ce354eed2a322fbaea80168c902de9504e6e174fd501e9447cad0232f9e6"},
{file = "ruff-0.12.1-py3-none-win32.whl", hash = "sha256:d973fa626d4c8267848755bd0414211a456e99e125dcab147f24daa9e991a245"},
{file = "ruff-0.12.1-py3-none-win_amd64.whl", hash = "sha256:9e1123b1c033f77bd2590e4c1fe7e8ea72ef990a85d2484351d408224d603013"},
{file = "ruff-0.12.1-py3-none-win_arm64.whl", hash = "sha256:78ad09a022c64c13cc6077707f036bab0fac8cd7088772dcd1e5be21c5002efc"},
{file = "ruff-0.12.1.tar.gz", hash = "sha256:806bbc17f1104fd57451a98a58df35388ee3ab422e029e8f5cf30aa4af2c138c"},
]
[[package]]
@@ -249,51 +317,9 @@ files = [
{file = "sniffio-1.3.1.tar.gz", hash = "sha256:f4324edc670a0f49750a81b895f35c3adb843cca46f0530f79fc1babb23789dc"},
]
[[package]]
name = "tomli"
version = "2.2.1"
requires_python = ">=3.8"
summary = "A lil' TOML parser"
groups = ["dev"]
marker = "python_version < \"3.11\""
files = [
{file = "tomli-2.2.1-cp311-cp311-macosx_10_9_x86_64.whl", hash = "sha256:678e4fa69e4575eb77d103de3df8a895e1591b48e740211bd1067378c69e8249"},
{file = "tomli-2.2.1-cp311-cp311-macosx_11_0_arm64.whl", hash = "sha256:023aa114dd824ade0100497eb2318602af309e5a55595f76b626d6d9f3b7b0a6"},
{file = "tomli-2.2.1-cp311-cp311-manylinux_2_17_aarch64.manylinux2014_aarch64.whl", hash = "sha256:ece47d672db52ac607a3d9599a9d48dcb2f2f735c6c2d1f34130085bb12b112a"},
{file = "tomli-2.2.1-cp311-cp311-manylinux_2_17_x86_64.manylinux2014_x86_64.whl", hash = "sha256:6972ca9c9cc9f0acaa56a8ca1ff51e7af152a9f87fb64623e31d5c83700080ee"},
{file = "tomli-2.2.1-cp311-cp311-manylinux_2_5_i686.manylinux1_i686.manylinux_2_17_i686.manylinux2014_i686.whl", hash = "sha256:c954d2250168d28797dd4e3ac5cf812a406cd5a92674ee4c8f123c889786aa8e"},
{file = "tomli-2.2.1-cp311-cp311-musllinux_1_2_aarch64.whl", hash = "sha256:8dd28b3e155b80f4d54beb40a441d366adcfe740969820caf156c019fb5c7ec4"},
{file = "tomli-2.2.1-cp311-cp311-musllinux_1_2_i686.whl", hash = "sha256:e59e304978767a54663af13c07b3d1af22ddee3bb2fb0618ca1593e4f593a106"},
{file = "tomli-2.2.1-cp311-cp311-musllinux_1_2_x86_64.whl", hash = "sha256:33580bccab0338d00994d7f16f4c4ec25b776af3ffaac1ed74e0b3fc95e885a8"},
{file = "tomli-2.2.1-cp311-cp311-win32.whl", hash = "sha256:465af0e0875402f1d226519c9904f37254b3045fc5084697cefb9bdde1ff99ff"},
{file = "tomli-2.2.1-cp311-cp311-win_amd64.whl", hash = "sha256:2d0f2fdd22b02c6d81637a3c95f8cd77f995846af7414c5c4b8d0545afa1bc4b"},
{file = "tomli-2.2.1-cp312-cp312-macosx_10_13_x86_64.whl", hash = "sha256:4a8f6e44de52d5e6c657c9fe83b562f5f4256d8ebbfe4ff922c495620a7f6cea"},
{file = "tomli-2.2.1-cp312-cp312-macosx_11_0_arm64.whl", hash = "sha256:8d57ca8095a641b8237d5b079147646153d22552f1c637fd3ba7f4b0b29167a8"},
{file = "tomli-2.2.1-cp312-cp312-manylinux_2_17_aarch64.manylinux2014_aarch64.whl", hash = "sha256:4e340144ad7ae1533cb897d406382b4b6fede8890a03738ff1683af800d54192"},
{file = "tomli-2.2.1-cp312-cp312-manylinux_2_17_x86_64.manylinux2014_x86_64.whl", hash = "sha256:db2b95f9de79181805df90bedc5a5ab4c165e6ec3fe99f970d0e302f384ad222"},
{file = "tomli-2.2.1-cp312-cp312-manylinux_2_5_i686.manylinux1_i686.manylinux_2_17_i686.manylinux2014_i686.whl", hash = "sha256:40741994320b232529c802f8bc86da4e1aa9f413db394617b9a256ae0f9a7f77"},
{file = "tomli-2.2.1-cp312-cp312-musllinux_1_2_aarch64.whl", hash = "sha256:400e720fe168c0f8521520190686ef8ef033fb19fc493da09779e592861b78c6"},
{file = "tomli-2.2.1-cp312-cp312-musllinux_1_2_i686.whl", hash = "sha256:02abe224de6ae62c19f090f68da4e27b10af2b93213d36cf44e6e1c5abd19fdd"},
{file = "tomli-2.2.1-cp312-cp312-musllinux_1_2_x86_64.whl", hash = "sha256:b82ebccc8c8a36f2094e969560a1b836758481f3dc360ce9a3277c65f374285e"},
{file = "tomli-2.2.1-cp312-cp312-win32.whl", hash = "sha256:889f80ef92701b9dbb224e49ec87c645ce5df3fa2cc548664eb8a25e03127a98"},
{file = "tomli-2.2.1-cp312-cp312-win_amd64.whl", hash = "sha256:7fc04e92e1d624a4a63c76474610238576942d6b8950a2d7f908a340494e67e4"},
{file = "tomli-2.2.1-cp313-cp313-macosx_10_13_x86_64.whl", hash = "sha256:f4039b9cbc3048b2416cc57ab3bda989a6fcf9b36cf8937f01a6e731b64f80d7"},
{file = "tomli-2.2.1-cp313-cp313-macosx_11_0_arm64.whl", hash = "sha256:286f0ca2ffeeb5b9bd4fcc8d6c330534323ec51b2f52da063b11c502da16f30c"},
{file = "tomli-2.2.1-cp313-cp313-manylinux_2_17_aarch64.manylinux2014_aarch64.whl", hash = "sha256:a92ef1a44547e894e2a17d24e7557a5e85a9e1d0048b0b5e7541f76c5032cb13"},
{file = "tomli-2.2.1-cp313-cp313-manylinux_2_17_x86_64.manylinux2014_x86_64.whl", hash = "sha256:9316dc65bed1684c9a98ee68759ceaed29d229e985297003e494aa825ebb0281"},
{file = "tomli-2.2.1-cp313-cp313-manylinux_2_5_i686.manylinux1_i686.manylinux_2_17_i686.manylinux2014_i686.whl", hash = "sha256:e85e99945e688e32d5a35c1ff38ed0b3f41f43fad8df0bdf79f72b2ba7bc5272"},
{file = "tomli-2.2.1-cp313-cp313-musllinux_1_2_aarch64.whl", hash = "sha256:ac065718db92ca818f8d6141b5f66369833d4a80a9d74435a268c52bdfa73140"},
{file = "tomli-2.2.1-cp313-cp313-musllinux_1_2_i686.whl", hash = "sha256:d920f33822747519673ee656a4b6ac33e382eca9d331c87770faa3eef562aeb2"},
{file = "tomli-2.2.1-cp313-cp313-musllinux_1_2_x86_64.whl", hash = "sha256:a198f10c4d1b1375d7687bc25294306e551bf1abfa4eace6650070a5c1ae2744"},
{file = "tomli-2.2.1-cp313-cp313-win32.whl", hash = "sha256:d3f5614314d758649ab2ab3a62d4f2004c825922f9e370b29416484086b264ec"},
{file = "tomli-2.2.1-cp313-cp313-win_amd64.whl", hash = "sha256:a38aa0308e754b0e3c67e344754dff64999ff9b513e691d0e786265c93583c69"},
{file = "tomli-2.2.1-py3-none-any.whl", hash = "sha256:cb55c73c5f4408779d0cf3eef9f762b9c9f147a77de7b258bef0a5628adc85cc"},
{file = "tomli-2.2.1.tar.gz", hash = "sha256:cd45e1dc79c835ce60f7404ec8119f2eb06d38b1deba146f07ced3bbc44505ff"},
]
[[package]]
name = "tox"
version = "4.26.0"
version = "4.27.0"
requires_python = ">=3.9"
summary = "tox is a generic virtualenv management and test command line tool"
groups = ["dev"]
@@ -311,8 +337,8 @@ dependencies = [
"virtualenv>=20.31",
]
files = [
{file = "tox-4.26.0-py3-none-any.whl", hash = "sha256:75f17aaf09face9b97bd41645028d9f722301e912be8b4c65a3f938024560224"},
{file = "tox-4.26.0.tar.gz", hash = "sha256:a83b3b67b0159fa58e44e646505079e35a43317a62d2ae94725e0586266faeca"},
{file = "tox-4.27.0-py3-none-any.whl", hash = "sha256:2b8a7fb986b82aa2c830c0615082a490d134e0626dbc9189986da46a313c4f20"},
{file = "tox-4.27.0.tar.gz", hash = "sha256:b97d5ecc0c0d5755bcc5348387fef793e1bfa68eb33746412f4c60881d7f5f57"},
]
[[package]]
@@ -335,7 +361,7 @@ name = "typing-extensions"
version = "4.14.0"
requires_python = ">=3.9"
summary = "Backported and Experimental Type Hints for Python 3.9+"
groups = ["default", "dev"]
groups = ["default"]
marker = "python_version < \"3.13\""
files = [
{file = "typing_extensions-4.14.0-py3-none-any.whl", hash = "sha256:a1514509136dd0b477638fc68d6a91497af5076466ad0fa6c338e44e359944af"},

View File

@@ -1,12 +1,12 @@
[project]
name = "slobs-cli"
version = "0.7.5"
description = "A command line application for Streamlabs Desktop"
authors = [{ name = "onyx-and-iris", email = "code@onyxandiris.online" }]
dependencies = ["pyslobs>=2.0.4", "asyncclick>=8.1.8"]
requires-python = ">=3.10"
dependencies = ["pyslobs>=2.0.5", "asyncclick>=8.1.8", "rich>=14.0.0"]
requires-python = ">=3.11"
readme = "README.md"
license = { text = "MIT" }
dynamic = ["version"]
[project.scripts]
slobs-cli = "slobs_cli.cli:run"
@@ -19,17 +19,28 @@ build-backend = "pdm.backend"
[tool.pdm]
distribution = true
[tool.pdm.scripts]
cli.cmd = "slobs-cli {args}"
cli.env_file = ".env"
[tool.pdm.version]
source = "file"
path = "src/slobs_cli/__about__.py"
test.cmd = "pytest {args}"
test.env_file = ".env"
post_test.cmd = "python tests/teardown.py"
[tool.pdm.scripts]
_.env_file = ".env"
cli.cmd = "slobs-cli {args}"
_setup.cmd = "python tests/setup.py"
_teardown.cmd = "python tests/teardown.py"
test.composite = ["_setup", "pytest {args}", "_teardown"]
test.keep_going = true
fmt.cmd = "ruff format {args}"
post_fmt.cmd = "ruff check {args}"
[dependency-groups]
dev = [
"tox-pdm>=0.7.2",
"pytest>=8.4.0",
"pytest-randomly>=3.16.0",
"virtualenv-pyenv>=0.5.0",
"ruff>=0.11.13",
]

79
ruff.toml Normal file
View File

@@ -0,0 +1,79 @@
# Exclude a variety of commonly ignored directories.
exclude = [
".bzr",
".direnv",
".eggs",
".git",
".git-rewrite",
".hatch",
".hg",
".ipynb_checkpoints",
".mypy_cache",
".nox",
".pants.d",
".pyenv",
".pytest_cache",
".pytype",
".ruff_cache",
".svn",
".tox",
".venv",
".vscode",
"__pypackages__",
"_build",
"buck-out",
"build",
"dist",
"node_modules",
"site-packages",
"venv",
]
# Same as Black.
line-length = 88
indent-width = 4
# Assume Python 3.11
target-version = "py311"
[lint]
# Enable Pyflakes (`F`) and a subset of the pycodestyle (`E`) codes by default.
# Unlike Flake8, Ruff doesn't enable pycodestyle warnings (`W`) or
# McCabe complexity (`C901`) by default.
# Enable pydocstyle (`D`) codes by default.
select = ["E4", "E7", "E9", "F", "D"]
ignore = ["D203", "D213"]
# Allow fix for all enabled rules (when `--fix`) is provided.
fixable = ["ALL"]
unfixable = []
# Allow unused variables when underscore-prefixed.
dummy-variable-rgx = "^(_+|(_+[a-zA-Z0-9_]*[a-zA-Z0-9]+?))$"
[format]
# Unlike Black, use single quotes for strings.
quote-style = "single"
# Like Black, indent with spaces, rather than tabs.
indent-style = "space"
# Like Black, respect magic trailing commas.
skip-magic-trailing-comma = false
# Like Black, automatically detect the appropriate line ending.
line-ending = "auto"
# Enable auto-formatting of code examples in docstrings. Markdown,
# reStructuredText code/literal blocks and doctests are all supported.
#
# This is currently disabled by default, but it is planned for this
# to be opt-out in the future.
docstring-code-format = false
# Set the line length limit used when formatting code snippets in
# docstrings.
#
# This only has an effect when the `docstring-code-format` setting is
# enabled.
docstring-code-line-length = "dynamic"

View File

@@ -0,0 +1,3 @@
"""module for package metadata."""
__version__ = '0.11.4'

View File

@@ -1,9 +1,21 @@
"""Package slobs_cli provides a command-line interface for interacting with SLOBS (Streamlabs OBS)."""
from .audio import audio
from .cli import cli
from .record import record
from .replaybuffer import replaybuffer
from .scene import scene
from .scenecollection import scenecollection
from .stream import stream
from .studiomode import studiomode
__all__ = ["cli", "scene", "stream", "record", "audio", "replaybuffer", "studiomode"]
__all__ = [
'cli',
'scene',
'stream',
'record',
'audio',
'replaybuffer',
'studiomode',
'scenecollection',
]

View File

@@ -1,34 +1,67 @@
"""module for managing audio sources in Slobs CLI."""
import asyncclick as click
from anyio import create_task_group
from pyslobs import AudioService
from rich.table import Table
from rich.text import Text
from . import console, util
from .cli import cli
from .errors import SlobsCliError
@cli.group()
def audio():
"""Audio management commands."""
"""Manage audio sources in Slobs CLI."""
@audio.command()
@click.option('--id', is_flag=True, help='Include audio source IDs in the output.')
@click.pass_context
async def list(ctx: click.Context):
async def list(ctx: click.Context, id: bool = False):
"""List all audio sources."""
conn = ctx.obj["connection"]
conn = ctx.obj['connection']
as_ = AudioService(conn)
async def _run():
sources = await as_.get_sources()
if not sources:
console.out.print('No audio sources found.')
conn.close()
click.Abort(click.style("No audio sources found.", fg="red"))
return
style = ctx.obj['style']
table = Table(
show_header=True, header_style=style.header, border_style=style.border
)
if id:
columns = [
('Audio Source Name', 'left'),
('Muted', 'center'),
('ID', 'left'),
]
else:
columns = [
('Audio Source Name', 'left'),
('Muted', 'center'),
]
for heading, justify in columns:
table.add_column(Text(heading, justify='center'), justify=justify)
for source in sources:
model = await source.get_model()
click.echo(
f"Source ID: {source.source_id}, Name: {model.name}, Muted: {model.muted}"
)
to_append = [Text(model.name, style=style.cell)]
to_append.append(util.check_mark(ctx, model.muted))
if id:
to_append.append(Text(model.source_id, style=style.cell))
table.add_row(*to_append)
console.out.print(table)
conn.close()
async with create_task_group() as tg:
@@ -37,12 +70,11 @@ async def list(ctx: click.Context):
@audio.command()
@click.argument("source_name")
@click.argument('source_name')
@click.pass_context
async def mute(ctx: click.Context, source_name: str):
"""Mute an audio source by name."""
conn = ctx.obj["connection"]
conn = ctx.obj['connection']
as_ = AudioService(conn)
async def _run():
@@ -51,28 +83,29 @@ async def mute(ctx: click.Context, source_name: str):
model = await source.get_model()
if model.name.lower() == source_name.lower():
break
else:
else: # If no source by the given name was found
conn.close()
raise click.Abort(
click.style(f"Source '{source_name}' not found.", fg="red")
)
raise SlobsCliError(f'Audio source "{source_name}" not found.')
await source.set_muted(True)
click.echo(f"Muted audio source: {source_name}")
console.out.print(f'{console.highlight(ctx, source_name)} muted successfully.')
conn.close()
async with create_task_group() as tg:
tg.start_soon(conn.background_processing)
tg.start_soon(_run)
try:
async with create_task_group() as tg:
tg.start_soon(conn.background_processing)
tg.start_soon(_run)
except* SlobsCliError as excgroup:
raisable = next(iter(excgroup.exceptions))
raise raisable
@audio.command()
@click.argument("source_name")
@click.argument('source_name')
@click.pass_context
async def unmute(ctx: click.Context, source_name: str):
"""Unmute an audio source by name."""
conn = ctx.obj["connection"]
conn = ctx.obj['connection']
as_ = AudioService(conn)
async def _run():
@@ -81,28 +114,31 @@ async def unmute(ctx: click.Context, source_name: str):
model = await source.get_model()
if model.name.lower() == source_name.lower():
break
else:
else: # If no source by the given name was found
conn.close()
raise click.Abort(
click.style(f"Source '{source_name}' not found.", fg="red")
)
raise SlobsCliError(f'Audio source "{source_name}" not found.')
await source.set_muted(False)
click.echo(f"Unmuted audio source: {source_name}")
console.out.print(
f'{console.highlight(ctx, source_name)} unmuted successfully.'
)
conn.close()
async with create_task_group() as tg:
tg.start_soon(conn.background_processing)
tg.start_soon(_run)
try:
async with create_task_group() as tg:
tg.start_soon(conn.background_processing)
tg.start_soon(_run)
except* SlobsCliError as excgroup:
raisable = next(iter(excgroup.exceptions))
raise raisable
@audio.command()
@click.argument("source_name")
@click.argument('source_name')
@click.pass_context
async def toggle(ctx: click.Context, source_name: str):
"""Toggle mute state of an audio source by name."""
conn = ctx.obj["connection"]
conn = ctx.obj['connection']
as_ = AudioService(conn)
async def _run():
@@ -112,18 +148,55 @@ async def toggle(ctx: click.Context, source_name: str):
if model.name.lower() == source_name.lower():
if model.muted:
await source.set_muted(False)
click.echo(f"Unmuted audio source: {source_name}")
console.out.print(
f'{console.highlight(ctx, source_name)} unmuted successfully.'
)
else:
await source.set_muted(True)
click.echo(f"Muted audio source: {source_name}")
console.out.print(
f'{console.highlight(ctx, source_name)} muted successfully.'
)
conn.close()
break
else:
else: # If no source by the given name was found
conn.close()
raise click.Abort(
click.style(f"Source '{source_name}' not found.", fg="red")
)
raise SlobsCliError(f'Audio source "{source_name}" not found.')
async with create_task_group() as tg:
tg.start_soon(conn.background_processing)
tg.start_soon(_run)
try:
async with create_task_group() as tg:
tg.start_soon(conn.background_processing)
tg.start_soon(_run)
except* SlobsCliError as excgroup:
raisable = next(iter(excgroup.exceptions))
raise raisable
@audio.command()
@click.argument('source_name')
@click.pass_context
async def status(ctx: click.Context, source_name: str):
"""Get the mute status of an audio source by name."""
conn = ctx.obj['connection']
as_ = AudioService(conn)
async def _run():
sources = await as_.get_sources()
for source in sources:
model = await source.get_model()
if model.name.lower() == source_name.lower():
console.out.print(
f'{console.highlight(ctx, source_name)} is {"muted" if model.muted else "unmuted"}.'
)
conn.close()
return
else: # If no source by the given name was found
conn.close()
raise SlobsCliError(f'Audio source "{source_name}" not found.')
try:
async with create_task_group() as tg:
tg.start_soon(conn.background_processing)
tg.start_soon(_run)
except* SlobsCliError as excgroup:
raisable = next(iter(excgroup.exceptions))
raise raisable

View File

@@ -1,37 +1,81 @@
"""module defining the entry point for the Streamlabs Desktop CLI application."""
import anyio
import asyncclick as click
from pyslobs import ConnectionConfig, SlobsConnection
from . import styles
from .__about__ import __version__ as version
@click.group()
@click.option(
"-d",
"--domain",
default="127.0.0.1",
show_default=True,
show_envvar=True,
help="The domain of the SLOBS server.",
envvar="SLOBS_DOMAIN",
def validate_style(ctx: click.Context, param: click.Parameter, value: str) -> str:
"""Validate the style option."""
if value not in styles.registry:
raise click.BadParameter(
f"Invalid style '{value}'. Available styles: {', '.join(styles.registry.keys())}"
)
return value
CONTEXT_SETTINGS = dict(help_option_names=['-h', '--help'])
@click.group(
context_settings=CONTEXT_SETTINGS,
)
@click.option(
"-p",
"--port",
'-d',
'--domain',
default='localhost',
envvar='SLOBS_DOMAIN',
show_default=True,
show_envvar=True,
help='\b\nStreamlabs Desktop WebSocket domain or IP address.\t',
)
@click.option(
'-p',
'--port',
default=59650,
envvar='SLOBS_PORT',
show_default=True,
show_envvar=True,
help="The port of the SLOBS server.",
envvar="SLOBS_PORT",
help='\b\nStreamlabs Desktop WebSocket port.\t\t\t',
)
@click.option(
"-t",
"--token",
help="The token for the SLOBS server.",
envvar="SLOBS_TOKEN",
'-t',
'--token',
envvar='SLOBS_TOKEN',
show_envvar=True,
required=True,
help='\b\nStreamlabs Desktop WebSocket authentication token.\t',
)
@click.option(
'-s',
'--style',
default='disabled',
envvar='SLOBS_STYLE',
show_default=True,
show_envvar=True,
help='\b\nThe style to use for output.\t\t\t\t',
callback=validate_style,
)
@click.option(
'-b',
'--no-border',
is_flag=True,
default=False,
envvar='SLOBS_STYLE_NO_BORDER',
show_default=True,
show_envvar=True,
help='\b\nDisable borders in the output.\t\t\t\t',
)
@click.version_option(
version, '-v', '--version', message='%(prog)s version: %(version)s'
)
@click.pass_context
async def cli(ctx: click.Context, domain: str, port: int, token: str | None):
async def cli(
ctx: click.Context, domain: str, port: int, token: str, style: str, no_border: bool
):
"""Command line interface for Streamlabs Desktop."""
ctx.ensure_object(dict)
config = ConnectionConfig(
@@ -39,7 +83,8 @@ async def cli(ctx: click.Context, domain: str, port: int, token: str | None):
port=port,
token=token,
)
ctx.obj["connection"] = SlobsConnection(config)
ctx.obj['connection'] = SlobsConnection(config)
ctx.obj['style'] = styles.request_style_obj(style, no_border)
def run():

17
src/slobs_cli/console.py Normal file
View File

@@ -0,0 +1,17 @@
"""module for console output handling."""
import asyncclick as click
from rich.console import Console
out = Console()
err = Console(stderr=True, style='bold red')
def highlight(ctx: click.Context, text: str) -> str:
"""Highlight text for console output."""
return f'[{ctx.obj["style"].highlight}]{text}[/{ctx.obj["style"].highlight}]'
def warning(ctx: click.Context, text: str) -> str:
"""Format warning text for console output."""
return f'[{ctx.obj["style"].warning}]{text}[/{ctx.obj["style"].warning}]'

46
src/slobs_cli/errors.py Normal file
View File

@@ -0,0 +1,46 @@
"""module for custom exceptions in Slobs CLI."""
import json
import asyncclick as click
from . import console
class SlobsCliError(click.ClickException):
"""Base class for all Slobs CLI errors."""
def __init__(self, message: str):
"""Initialize the SlobsCliError with a message."""
super().__init__(message)
self.exit_code = 1
def show(self):
"""Display the error message in red and write to stderr."""
console.err.print(f'Error: {self.message}')
class SlobsCliProtocolError(SlobsCliError):
"""Converts pyslobs ProtocolError to a SlobsCliProtocolError."""
def __init__(self, message: str):
"""Initialize the SlobsCliProtocolError with a message."""
protocol_message_to_dict = json.loads(
str(message).replace('"', '\\"').replace("'", '"')
)
super().__init__(
protocol_message_to_dict.get('message', 'Unable to parse error message')
)
self.exit_code = 2
self.protocol_code = protocol_message_to_dict.get('code', 'Unknown error code')
def show(self):
"""Display the protocol error message in red."""
match self.protocol_code:
case -32600:
console.err.print(
'Oops! Looks like we hit a rate limit for this command. Please try again later.'
)
case _:
# Fall back to the base error display for unknown protocol codes
super().show()

View File

@@ -1,83 +1,92 @@
"""module for managing recording commands in Slobs CLI."""
import asyncclick as click
from anyio import create_task_group
from pyslobs import StreamingService
from . import console
from .cli import cli
from .errors import SlobsCliError
@cli.group()
def record():
"""Recording management commands."""
"""Manage recording in Slobs CLI."""
@record.command()
@click.pass_context
async def start(ctx: click.Context):
"""Start recording."""
conn = ctx.obj["connection"]
conn = ctx.obj['connection']
ss = StreamingService(conn)
async def _run():
current_state = await ss.get_model()
active = current_state.recording_status != "offline"
model = await ss.get_model()
active = model.recording_status != 'offline'
if active:
conn.close()
raise click.Abort(click.style("Recording is already active.", fg="red"))
raise SlobsCliError('Recording is already active.')
await ss.toggle_recording()
click.echo("Recording started.")
console.out.print('Recording started.')
conn.close()
async with create_task_group() as tg:
tg.start_soon(conn.background_processing)
tg.start_soon(_run)
try:
async with create_task_group() as tg:
tg.start_soon(conn.background_processing)
tg.start_soon(_run)
except* SlobsCliError as excgroup:
raisable = next(iter(excgroup.exceptions))
raise raisable
@record.command()
@click.pass_context
async def stop(ctx: click.Context):
"""Stop recording."""
conn = ctx.obj["connection"]
conn = ctx.obj['connection']
ss = StreamingService(conn)
async def _run():
current_state = await ss.get_model()
active = current_state.recording_status != "offline"
model = await ss.get_model()
active = model.recording_status != 'offline'
if not active:
conn.close()
raise click.Abort(click.style("Recording is already inactive.", fg="red"))
raise SlobsCliError('Recording is already inactive.')
await ss.toggle_recording()
click.echo("Recording stopped.")
console.out.print('Recording stopped.')
conn.close()
async with create_task_group() as tg:
tg.start_soon(conn.background_processing)
tg.start_soon(_run)
try:
async with create_task_group() as tg:
tg.start_soon(conn.background_processing)
tg.start_soon(_run)
except* SlobsCliError as excgroup:
raisable = next(iter(excgroup.exceptions))
raise raisable
@record.command()
@click.pass_context
async def status(ctx: click.Context):
"""Get recording status."""
conn = ctx.obj["connection"]
conn = ctx.obj['connection']
ss = StreamingService(conn)
async def _run():
current_state = await ss.get_model()
active = current_state.recording_status != "offline"
model = await ss.get_model()
active = model.recording_status != 'offline'
if active:
click.echo("Recording is currently active.")
console.out.print('Recording is currently active.')
else:
click.echo("Recording is currently inactive.")
console.out.print('Recording is currently inactive.')
conn.close()
@@ -90,20 +99,18 @@ async def status(ctx: click.Context):
@click.pass_context
async def toggle(ctx: click.Context):
"""Toggle recording status."""
conn = ctx.obj["connection"]
conn = ctx.obj['connection']
ss = StreamingService(conn)
async def _run():
current_state = await ss.get_model()
active = current_state.recording_status != "offline"
model = await ss.get_model()
active = model.recording_status != 'offline'
await ss.toggle_recording()
if active:
await ss.toggle_recording()
click.echo("Recording stopped.")
console.out.print('Recording stopped.')
else:
await ss.toggle_recording()
click.echo("Recording started.")
console.out.print('Recording started.')
conn.close()

View File

@@ -1,82 +1,89 @@
"""module for managing the replay buffer in Slobs CLI."""
import asyncclick as click
from anyio import create_task_group
from pyslobs import StreamingService
from . import console
from .cli import cli
from .errors import SlobsCliError
@cli.group()
def replaybuffer():
"""Replay buffer management commands."""
"""Manage the replay buffer in Slobs CLI."""
@replaybuffer.command()
@click.pass_context
async def start(ctx: click.Context):
"""Start the replay buffer."""
conn = ctx.obj["connection"]
conn = ctx.obj['connection']
ss = StreamingService(conn)
async def _run():
current_state = await ss.get_model()
active = current_state.replay_buffer_status != "offline"
model = await ss.get_model()
active = model.replay_buffer_status != 'offline'
if active:
conn.close()
raise click.Abort(click.style("Replay buffer is already active.", fg="red"))
raise SlobsCliError('Replay buffer is already active.')
await ss.start_replay_buffer()
click.echo("Replay buffer started.")
console.out.print('Replay buffer started.')
conn.close()
async with create_task_group() as tg:
tg.start_soon(conn.background_processing)
tg.start_soon(_run)
try:
async with create_task_group() as tg:
tg.start_soon(conn.background_processing)
tg.start_soon(_run)
except* SlobsCliError as excgroup:
raisable = next(iter(excgroup.exceptions))
raise raisable
@replaybuffer.command()
@click.pass_context
async def stop(ctx: click.Context):
"""Stop the replay buffer."""
conn = ctx.obj["connection"]
conn = ctx.obj['connection']
ss = StreamingService(conn)
async def _run():
current_state = await ss.get_model()
active = current_state.replay_buffer_status != "offline"
model = await ss.get_model()
active = model.replay_buffer_status != 'offline'
if not active:
conn.close()
raise click.Abort(
click.style("Replay buffer is already inactive.", fg="red")
)
raise SlobsCliError('Replay buffer is already inactive.')
await ss.stop_replay_buffer()
click.echo("Replay buffer stopped.")
console.out.print('Replay buffer stopped.')
conn.close()
async with create_task_group() as tg:
tg.start_soon(conn.background_processing)
tg.start_soon(_run)
try:
async with create_task_group() as tg:
tg.start_soon(conn.background_processing)
tg.start_soon(_run)
except* SlobsCliError as excgroup:
raisable = next(iter(excgroup.exceptions))
raise raisable
@replaybuffer.command()
@click.pass_context
async def status(ctx: click.Context):
"""Get the current status of the replay buffer."""
conn = ctx.obj["connection"]
conn = ctx.obj['connection']
ss = StreamingService(conn)
async def _run():
current_state = await ss.get_model()
active = current_state.replay_buffer_status != "offline"
model = await ss.get_model()
active = model.replay_buffer_status != 'offline'
if active:
click.echo("Replay buffer is currently active.")
console.out.print('Replay buffer is currently active.')
else:
click.echo("Replay buffer is currently inactive.")
console.out.print('Replay buffer is currently inactive.')
conn.close()
async with create_task_group() as tg:
@@ -88,13 +95,12 @@ async def status(ctx: click.Context):
@click.pass_context
async def save(ctx: click.Context):
"""Save the current replay buffer."""
conn = ctx.obj["connection"]
conn = ctx.obj['connection']
ss = StreamingService(conn)
async def _run():
await ss.save_replay()
click.echo("Replay buffer saved.")
console.out.print('Replay buffer saved.')
conn.close()
async with create_task_group() as tg:

View File

@@ -1,75 +1,124 @@
"""module for managing scenes in Slobs CLI."""
import asyncclick as click
from anyio import create_task_group
from pyslobs import ScenesService, TransitionsService
from pyslobs import ProtocolError, ScenesService, TransitionsService
from rich.table import Table
from rich.text import Text
from . import console, util
from .cli import cli
from .errors import SlobsCliError, SlobsCliProtocolError
@cli.group()
def scene():
"""Scene management commands."""
"""Manage scenes in Slobs CLI."""
@scene.command()
@click.option('--id', is_flag=True, help='Include scene IDs in the output.')
@click.pass_context
async def list(ctx: click.Context):
async def list(ctx: click.Context, id: bool = False):
"""List all available scenes."""
conn = ctx.obj["connection"]
conn = ctx.obj['connection']
ss = ScenesService(conn)
async def _run():
scenes = await ss.get_scenes()
if not scenes:
click.echo("No scenes found.")
console.out.print('No scenes found.')
conn.close()
return
click.echo("Available scenes:")
active_scene = await ss.active_scene()
style = ctx.obj['style']
table = Table(
show_header=True,
header_style=style.header,
border_style=style.border,
)
if id:
columns = [
('Scene Name', 'left'),
('Active', 'center'),
('ID', 'left'),
]
else:
columns = [
('Scene Name', 'left'),
('Active', 'center'),
]
for heading, justify in columns:
table.add_column(Text(heading, justify='center'), justify=justify)
for scene in scenes:
click.echo(f"- {click.style(scene.name, fg='blue')} (ID: {scene.id})")
to_append = [Text(scene.name, style=style.cell)]
to_append.append(
util.check_mark(ctx, scene.id == active_scene.id, empty_if_false=True)
)
if id:
to_append.append(Text(scene.id, style=style.cell))
table.add_row(*to_append)
console.out.print(table)
conn.close()
async with create_task_group() as tg:
tg.start_soon(conn.background_processing)
tg.start_soon(_run)
try:
async with create_task_group() as tg:
tg.start_soon(conn.background_processing)
tg.start_soon(_run)
except* ProtocolError as excgroup:
p_error = next(iter(excgroup.exceptions))
raisable = SlobsCliProtocolError(str(p_error))
raise raisable
@scene.command()
@click.option('--id', is_flag=True, help='Include scene IDs in the output.')
@click.pass_context
async def current(ctx: click.Context):
async def current(ctx: click.Context, id: bool = False):
"""Show the currently active scene."""
conn = ctx.obj["connection"]
conn = ctx.obj['connection']
ss = ScenesService(conn)
async def _run():
active_scene = await ss.active_scene()
if active_scene:
click.echo(
f"Current active scene: {click.style(active_scene.name, fg='green')} (ID: {active_scene.id})"
)
else:
click.echo("No active scene found.")
console.out.print(
f'Current active scene: {console.highlight(ctx, active_scene.name)} '
f'{f"(ID: {console.highlight(ctx, active_scene.id)})" if id else ""}'
)
conn.close()
async with create_task_group() as tg:
tg.start_soon(conn.background_processing)
tg.start_soon(_run)
try:
async with create_task_group() as tg:
tg.start_soon(conn.background_processing)
tg.start_soon(_run)
except* ProtocolError as excgroup:
p_error = next(iter(excgroup.exceptions))
raisable = SlobsCliProtocolError(str(p_error))
raise raisable
@scene.command()
@click.argument("scene_name", type=str)
@click.option('--id', is_flag=True, help='Include scene IDs in the output.')
@click.argument('scene_name')
@click.option(
"--preview",
'--preview',
is_flag=True,
help="Switch the preview scene only.",
help='Switch the preview scene only.',
)
@click.pass_context
async def switch(ctx: click.Context, scene_name: str, preview: bool = False):
async def switch(
ctx: click.Context, scene_name: str, preview: bool = False, id: bool = False
):
"""Switch to a scene by its name."""
conn = ctx.obj["connection"]
conn = ctx.obj['connection']
ss = ScenesService(conn)
ts = TransitionsService(conn)
@@ -77,41 +126,54 @@ async def switch(ctx: click.Context, scene_name: str, preview: bool = False):
scenes = await ss.get_scenes()
for scene in scenes:
if scene.name == scene_name:
current_state = await ts.get_model()
model = await ts.get_model()
if current_state.studio_mode:
if model.studio_mode:
await ss.make_scene_active(scene.id)
if preview:
click.echo(
f"Switched to scene: {click.style(scene.name, fg='blue')} (ID: {scene.id}) in preview mode."
console.out.print(
f'Switched to preview scene: {console.highlight(ctx, scene.name)} '
f'{f"(ID: {console.highlight(ctx, scene.id)})" if id else ""}'
)
else:
await ts.execute_studio_mode_transition()
click.echo(
f"Switched to scene: {click.style(scene.name, fg='blue')} (ID: {scene.id}) in active mode."
console.out.print(
f'Switched to scene: {console.highlight(ctx, scene.name)} '
f'{f"(ID: {console.highlight(ctx, scene.id)})" if id else ""}'
)
console.err.print(
console.warning(
ctx,
'Warning: You are in studio mode. The scene switch is not active yet.\n'
'use `slobs-cli studiomode force-transition` to activate the scene switch.',
)
)
else:
if preview:
conn.close()
raise click.Abort(
click.style(
"Cannot switch to preview scene in non-studio mode.",
fg="red",
)
raise SlobsCliError(
'Cannot switch the preview scene in non-studio mode.'
)
await ss.make_scene_active(scene.id)
click.echo(
f"Switched to scene: {click.style(scene.name, fg='blue')} (ID: {scene.id}) in active mode."
console.out.print(
f'Switched to scene: {console.highlight(ctx, scene.name)} '
f'{f"(ID: {console.highlight(ctx, scene.id)})" if id else ""}'
)
break
else:
conn.close()
raise click.ClickException(
click.style(f"Scene '{scene_name}' not found.", fg="red")
)
conn.close()
async with create_task_group() as tg:
tg.start_soon(conn.background_processing)
tg.start_soon(_run)
conn.close()
break
else: # If no scene by the given name was found
conn.close()
raise SlobsCliError(f'Scene "{scene_name}" not found.')
try:
async with create_task_group() as tg:
tg.start_soon(conn.background_processing)
tg.start_soon(_run)
except* SlobsCliError as excgroup:
raisable = next(iter(excgroup.exceptions))
raise raisable
except* ProtocolError as excgroup:
p_error = next(iter(excgroup.exceptions))
raisable = SlobsCliProtocolError(str(p_error))
raise raisable

View File

@@ -0,0 +1,192 @@
"""module for scene collection management in SLOBS CLI."""
import asyncclick as click
from anyio import create_task_group
from pyslobs import ISceneCollectionCreateOptions, SceneCollectionsService
from rich.table import Table
from rich.text import Text
from . import console, util
from .cli import cli
from .errors import SlobsCliError
@cli.group()
def scenecollection():
"""Manage scene collections in Slobs CLI."""
@scenecollection.command()
@click.option('--id', is_flag=True, help='Include scene collection IDs in the output.')
@click.pass_context
async def list(ctx: click.Context, id: bool):
"""List all scene collections."""
conn = ctx.obj['connection']
scs = SceneCollectionsService(conn)
async def _run():
collections = await scs.collections()
if not collections:
console.out.print('No scene collections found.')
conn.close()
return
active_collection = await scs.active_collection()
style = ctx.obj['style']
table = Table(
show_header=True,
header_style=style.header,
border_style=style.border,
)
if id:
columns = [
('Scene Collection Name', 'left'),
('Active', 'center'),
('ID', 'left'),
]
else:
columns = [
('Scene Collection Name', 'left'),
('Active', 'center'),
]
for heading, justify in columns:
table.add_column(Text(heading, justify='center'), justify=justify)
for collection in collections:
to_append = [Text(collection.name, style=style.cell)]
to_append.append(
util.check_mark(
ctx, collection.id == active_collection.id, empty_if_false=True
)
)
if id:
to_append.append(Text(collection.id, style=style.cell))
table.add_row(*to_append)
console.out.print(table)
conn.close()
async with create_task_group() as tg:
tg.start_soon(conn.background_processing)
tg.start_soon(_run)
@scenecollection.command()
@click.argument('scenecollection_name')
@click.pass_context
async def load(ctx: click.Context, scenecollection_name: str):
"""Load a scene collection by name."""
conn = ctx.obj['connection']
scs = SceneCollectionsService(conn)
async def _run():
collections = await scs.collections()
for collection in collections:
if collection.name == scenecollection_name:
break
else: # If no collection by the given name was found
conn.close()
raise SlobsCliError(f'Scene collection "{scenecollection_name}" not found.')
await scs.load(collection.id)
console.out.print(
f'Scene collection {console.highlight(scenecollection_name)} loaded successfully.'
)
conn.close()
try:
async with create_task_group() as tg:
tg.start_soon(conn.background_processing)
tg.start_soon(_run)
except* SlobsCliError as excgroup:
raisable = next(iter(excgroup.exceptions))
raise raisable
@scenecollection.command()
@click.argument('scenecollection_name')
@click.pass_context
async def create(ctx: click.Context, scenecollection_name: str):
"""Create a new scene collection."""
conn = ctx.obj['connection']
scs = SceneCollectionsService(conn)
async def _run():
await scs.create(ISceneCollectionCreateOptions(scenecollection_name))
console.out.print(
f'Scene collection {console.highlight(scenecollection_name)} created successfully.'
)
conn.close()
async with create_task_group() as tg:
tg.start_soon(conn.background_processing)
tg.start_soon(_run)
@scenecollection.command()
@click.argument('scenecollection_name')
@click.pass_context
async def delete(ctx: click.Context, scenecollection_name: str):
"""Delete a scene collection by name."""
conn = ctx.obj['connection']
scs = SceneCollectionsService(conn)
async def _run():
collections = await scs.collections()
for collection in collections:
if collection.name == scenecollection_name:
break
else: # If no collection by the given name was found
conn.close()
raise SlobsCliError(f'Scene collection "{scenecollection_name}" not found.')
await scs.delete(collection.id)
console.out.print(
f'Scene collection {console.highlight(scenecollection_name)} deleted successfully.'
)
conn.close()
try:
async with create_task_group() as tg:
tg.start_soon(conn.background_processing)
tg.start_soon(_run)
except* SlobsCliError as excgroup:
raisable = next(iter(excgroup.exceptions))
raise raisable
@scenecollection.command()
@click.argument('scenecollection_name')
@click.argument('new_name')
@click.pass_context
async def rename(ctx: click.Context, scenecollection_name: str, new_name: str):
"""Rename a scene collection."""
conn = ctx.obj['connection']
scs = SceneCollectionsService(conn)
async def _run():
collections = await scs.collections()
for collection in collections:
if collection.name == scenecollection_name:
break
else: # If no collection by the given name was found
conn.close()
raise SlobsCliError(f'Scene collection "{scenecollection_name}" not found.')
await scs.rename(new_name, collection.id)
console.out.print(
f'Scene collection {console.highlight(scenecollection_name)} renamed to {console.highlight(new_name)}.'
)
conn.close()
try:
async with create_task_group() as tg:
tg.start_soon(conn.background_processing)
tg.start_soon(_run)
except* SlobsCliError as excgroup:
raisable = next(iter(excgroup.exceptions))
raise raisable

View File

@@ -1,81 +1,90 @@
"""module for managing the replay buffer in Slobs CLI."""
import asyncclick as click
from anyio import create_task_group
from pyslobs import StreamingService
from . import console
from .cli import cli
from .errors import SlobsCliError
@cli.group()
def stream():
"""Stream management commands."""
"""Manage streaming in Slobs CLI."""
@stream.command()
@click.pass_context
async def start(ctx: click.Context):
"""Start the stream."""
conn = ctx.obj["connection"]
conn = ctx.obj['connection']
ss = StreamingService(conn)
async def _run():
current_state = await ss.get_model()
active = current_state.streaming_status != "offline"
model = await ss.get_model()
active = model.streaming_status != 'offline'
if active:
conn.close()
raise click.Abort(click.style("Stream is already active.", fg="red"))
raise SlobsCliError('Stream is already active.')
await ss.toggle_streaming()
click.echo("Stream started.")
console.out.print('Stream started.')
conn.close()
async with create_task_group() as tg:
tg.start_soon(conn.background_processing)
tg.start_soon(_run)
try:
async with create_task_group() as tg:
tg.start_soon(conn.background_processing)
tg.start_soon(_run)
except* SlobsCliError as excgroup:
raisable = next(iter(excgroup.exceptions))
raise raisable
@stream.command()
@click.pass_context
async def stop(ctx: click.Context):
"""Stop the stream."""
conn = ctx.obj["connection"]
conn = ctx.obj['connection']
ss = StreamingService(conn)
async def _run():
current_state = await ss.get_model()
active = current_state.streaming_status != "offline"
model = await ss.get_model()
active = model.streaming_status != 'offline'
if not active:
conn.close()
raise click.Abort(click.style("Stream is already inactive.", fg="red"))
raise SlobsCliError('Stream is already inactive.')
await ss.toggle_streaming()
click.echo("Stream stopped.")
console.out.print('Stream stopped.')
conn.close()
async with create_task_group() as tg:
tg.start_soon(conn.background_processing)
tg.start_soon(_run)
try:
async with create_task_group() as tg:
tg.start_soon(conn.background_processing)
tg.start_soon(_run)
except* SlobsCliError as excgroup:
raisable = next(iter(excgroup.exceptions))
raise raisable
@stream.command()
@click.pass_context
async def status(ctx: click.Context):
"""Get the current stream status."""
conn = ctx.obj["connection"]
conn = ctx.obj['connection']
ss = StreamingService(conn)
async def _run():
current_state = await ss.get_model()
active = current_state.streaming_status != "offline"
model = await ss.get_model()
active = model.streaming_status != 'offline'
if active:
click.echo("Stream is currently active.")
console.out.print('Stream is currently active.')
else:
click.echo("Stream is currently inactive.")
console.out.print('Stream is currently inactive.')
conn.close()
async with create_task_group() as tg:
@@ -87,19 +96,18 @@ async def status(ctx: click.Context):
@click.pass_context
async def toggle(ctx: click.Context):
"""Toggle the stream status."""
conn = ctx.obj["connection"]
conn = ctx.obj['connection']
ss = StreamingService(conn)
async def _run():
current_state = await ss.get_model()
active = current_state.streaming_status != "offline"
model = await ss.get_model()
active = model.streaming_status != 'offline'
await ss.toggle_streaming()
if active:
click.echo("Stream stopped.")
console.out.print('Stream stopped.')
else:
click.echo("Stream started.")
console.out.print('Stream started.')
conn.close()

View File

@@ -1,75 +1,84 @@
"""module for managing studio mode in Slobs CLI."""
import asyncclick as click
from anyio import create_task_group
from pyslobs import TransitionsService
from . import console
from .cli import cli
from .errors import SlobsCliError
@cli.group()
def studiomode():
"""Studio mode management commands."""
"""Manage studio mode in Slobs CLI."""
@studiomode.command()
@click.pass_context
async def enable(ctx: click.Context):
"""Enable studio mode."""
conn = ctx.obj["connection"]
conn = ctx.obj['connection']
ts = TransitionsService(conn)
async def _run():
current_state = await ts.get_model()
if current_state.studio_mode:
model = await ts.get_model()
if model.studio_mode:
conn.close()
raise click.Abort(click.style("Studio mode is already enabled.", fg="red"))
raise SlobsCliError('Studio mode is already enabled.')
await ts.enable_studio_mode()
click.echo("Studio mode enabled successfully.")
console.out.print('Studio mode enabled successfully.')
conn.close()
async with create_task_group() as tg:
tg.start_soon(conn.background_processing)
tg.start_soon(_run)
try:
async with create_task_group() as tg:
tg.start_soon(conn.background_processing)
tg.start_soon(_run)
except* SlobsCliError as excgroup:
raisable = next(iter(excgroup.exceptions))
raise raisable
@studiomode.command()
@click.pass_context
async def disable(ctx: click.Context):
"""Disable studio mode."""
conn = ctx.obj["connection"]
conn = ctx.obj['connection']
ts = TransitionsService(conn)
async def _run():
current_state = await ts.get_model()
if not current_state.studio_mode:
model = await ts.get_model()
if not model.studio_mode:
conn.close()
raise click.Abort(click.style("Studio mode is already disabled.", fg="red"))
raise SlobsCliError('Studio mode is already disabled.')
await ts.disable_studio_mode()
click.echo("Studio mode disabled successfully.")
console.out.print('Studio mode disabled successfully.')
conn.close()
async with create_task_group() as tg:
tg.start_soon(conn.background_processing)
tg.start_soon(_run)
try:
async with create_task_group() as tg:
tg.start_soon(conn.background_processing)
tg.start_soon(_run)
except* SlobsCliError as excgroup:
raisable = next(iter(excgroup.exceptions))
raise raisable
@studiomode.command()
@click.pass_context
async def status(ctx: click.Context):
"""Check the status of studio mode."""
conn = ctx.obj["connection"]
conn = ctx.obj['connection']
ts = TransitionsService(conn)
async def _run():
current_state = await ts.get_model()
if current_state.studio_mode:
click.echo("Studio mode is currently enabled.")
model = await ts.get_model()
if model.studio_mode:
console.out.print('Studio mode is currently enabled.')
else:
click.echo("Studio mode is currently disabled.")
console.out.print('Studio mode is currently disabled.')
conn.close()
async with create_task_group() as tg:
@@ -81,18 +90,17 @@ async def status(ctx: click.Context):
@click.pass_context
async def toggle(ctx: click.Context):
"""Toggle studio mode."""
conn = ctx.obj["connection"]
conn = ctx.obj['connection']
ts = TransitionsService(conn)
async def _run():
current_state = await ts.get_model()
if current_state.studio_mode:
model = await ts.get_model()
if model.studio_mode:
await ts.disable_studio_mode()
click.echo("Studio mode disabled successfully.")
console.out.print('Studio mode disabled successfully.')
else:
await ts.enable_studio_mode()
click.echo("Studio mode enabled successfully.")
console.out.print('Studio mode enabled successfully.')
conn.close()
async with create_task_group() as tg:
@@ -104,20 +112,23 @@ async def toggle(ctx: click.Context):
@click.pass_context
async def force_transition(ctx: click.Context):
"""Force a transition in studio mode."""
conn = ctx.obj["connection"]
conn = ctx.obj['connection']
ts = TransitionsService(conn)
async def _run():
current_state = await ts.get_model()
if not current_state.studio_mode:
model = await ts.get_model()
if not model.studio_mode:
conn.close()
raise click.Abort(click.style("Studio mode is not enabled.", fg="red"))
raise SlobsCliError('Studio mode is not enabled.')
await ts.execute_studio_mode_transition()
click.echo("Forced studio mode transition.")
console.out.print('Forced studio mode transition.')
conn.close()
async with create_task_group() as tg:
tg.start_soon(conn.background_processing)
tg.start_soon(_run)
try:
async with create_task_group() as tg:
tg.start_soon(conn.background_processing)
tg.start_soon(_run)
except* SlobsCliError as excgroup:
raisable = next(iter(excgroup.exceptions))
raise raisable

211
src/slobs_cli/styles.py Normal file
View File

@@ -0,0 +1,211 @@
"""module containing style management for Slobs CLI."""
import os
from dataclasses import dataclass
registry = {}
def register_style(cls):
"""Register a style class."""
key = cls.__name__.lower()
if key in registry:
raise ValueError(f'Style {key} is already registered.')
registry[key] = cls
return cls
@dataclass
class Style:
"""Base class for styles."""
name: str
border: str
header: str
cell: str
highlight: str
warning: str
no_border: bool = False
def __post_init__(self):
"""Post-initialization to set default values and normalize the name."""
self.name = self.name.lower()
if self.no_border:
self.border = None
@register_style
@dataclass
class Disabled(Style):
"""Disabled style."""
name: str = 'disabled'
header: str = ''
border: str = 'none'
cell: str = 'none'
highlight: str = 'none'
warning: str = 'none'
@register_style
@dataclass
class Red(Style):
"""Red style."""
name: str = 'red'
header: str = ''
border: str = 'dark_red'
cell: str = 'red'
highlight: str = 'red3'
warning: str = 'magenta'
@register_style
@dataclass
class Magenta(Style):
"""Magenta style."""
name: str = 'magenta'
header: str = ''
border: str = 'dark_magenta'
cell: str = 'magenta'
highlight: str = 'magenta3'
warning: str = 'magenta'
@register_style
@dataclass
class Purple(Style):
"""Purple style."""
name: str = 'purple'
header: str = ''
border: str = 'purple'
cell: str = 'medium_orchid'
highlight: str = 'medium_orchid'
warning: str = 'magenta'
@register_style
@dataclass
class Blue(Style):
"""Blue style."""
name: str = 'blue'
header: str = ''
border: str = 'dark_blue'
cell: str = 'blue'
highlight: str = 'blue3'
warning: str = 'magenta'
@register_style
@dataclass
class Cyan(Style):
"""Cyan style."""
name: str = 'cyan'
header: str = ''
border: str = 'dark_cyan'
cell: str = 'cyan'
highlight: str = 'cyan3'
warning: str = 'magenta'
@register_style
@dataclass
class Green(Style):
"""Green style."""
name: str = 'green'
header: str = ''
border: str = 'dark_green'
cell: str = 'green'
highlight: str = 'green3'
warning: str = 'magenta'
@register_style
@dataclass
class Yellow(Style):
"""Yellow style."""
name: str = 'yellow'
header: str = ''
border: str = 'yellow3'
cell: str = 'wheat1'
highlight: str = 'yellow3'
warning: str = 'magenta'
@register_style
@dataclass
class Orange(Style):
"""Orange style."""
name: str = 'orange'
header: str = ''
border: str = 'dark_orange'
cell: str = 'orange'
highlight: str = 'orange3'
warning: str = 'magenta'
@register_style
@dataclass
class White(Style):
"""White style."""
name: str = 'white'
header: str = ''
border: str = 'white'
cell: str = 'white'
highlight: str = 'white'
warning: str = 'magenta'
@register_style
@dataclass
class Grey(Style):
"""Grey style."""
name: str = 'grey'
header: str = ''
border: str = 'grey50'
cell: str = 'grey70'
highlight: str = 'grey90'
warning: str = 'magenta'
@register_style
@dataclass
class Navy(Style):
"""Navy style."""
name: str = 'navy'
header: str = ''
border: str = 'deep_sky_blue4'
cell: str = 'light_sky_blue3'
highlight: str = 'light_sky_blue3'
warning: str = 'magenta'
@register_style
@dataclass
class Black(Style):
"""Black style."""
name: str = 'black'
header: str = ''
border: str = 'black'
cell: str = 'grey30'
highlight: str = 'grey30'
warning: str = 'magenta'
def request_style_obj(style_name: str, no_border: bool) -> Style:
"""Request a style object by name."""
if style_name == 'disabled':
os.environ['NO_COLOR'] = '1'
return registry[style_name.lower()](no_border=no_border)

19
src/slobs_cli/util.py Normal file
View File

@@ -0,0 +1,19 @@
"""module containing utility functions for Slobs CLI."""
import os
import asyncclick as click
def check_mark(ctx: click.Context, value: bool, empty_if_false: bool = False) -> str:
"""Return a check mark or cross mark based on the boolean value."""
if empty_if_false and not value:
return ''
# rich gracefully handles the absence of colour throughout the rest of the application,
# but here we must handle it manually.
# If NO_COLOR is set, we return plain text symbols.
# Otherwise, we return coloured symbols.
if os.getenv('NO_COLOR', '') != '':
return '' if value else ''
return '' if value else ''

View File

@@ -0,0 +1 @@
"""Test suite for the slobs_cli package."""

View File

@@ -1,6 +1,9 @@
"""pytest configuration for async tests using anyio."""
import pytest
@pytest.fixture
def anyio_backend():
return "asyncio"
"""Return the backend to use for async tests."""
return 'asyncio'

41
tests/setup.py Normal file
View File

@@ -0,0 +1,41 @@
"""Create test scenes in Streamlabs.
Usage:
Run this script as a standalone program to setup the test environment.
Requires 'SLOBS_DOMAIN' and 'SLOBS_TOKEN' environment variables to be set.
"""
import os
import anyio
from anyio import create_task_group
from pyslobs import ConnectionConfig, ScenesService, SlobsConnection
async def setup(conn: SlobsConnection):
"""Set up test scenes in Streamlabs OBS."""
ss = ScenesService(conn)
await ss.create_scene('slobs-test-scene-1')
await ss.create_scene('slobs-test-scene-2')
await ss.create_scene('slobs-test-scene-3')
conn.close()
async def main():
"""Establish connection and set up scenes."""
conn = SlobsConnection(
ConnectionConfig(
domain=os.environ['SLOBS_DOMAIN'],
port=59650,
token=os.environ['SLOBS_TOKEN'],
)
)
async with create_task_group() as tg:
tg.start_soon(conn.background_processing)
tg.start_soon(setup, conn)
if __name__ == '__main__':
anyio.run(main)

View File

@@ -1,29 +1,55 @@
"""Remove test scenes in Streamlabs, disable streaming, recording, and replay buffer.
Usage:
Run this script as a standalone program to tear down the test environment.
Requires 'SLOBS_DOMAIN' and 'SLOBS_TOKEN' environment variables to be set.
"""
import os
import anyio
from anyio import create_task_group
from pyslobs import ConnectionConfig, SlobsConnection, StreamingService
from pyslobs import (
ConnectionConfig,
ScenesService,
SlobsConnection,
StreamingService,
TransitionsService,
)
async def cleanup(conn: SlobsConnection):
"""Clean up test scenes and ensure streaming, recording, and replay buffer are stopped."""
ss = ScenesService(conn)
scenes = await ss.get_scenes()
for scene in scenes:
if scene.name.startswith('slobs-test-scene-'):
await ss.remove_scene(scene.id)
ss = StreamingService(conn)
current_state = await ss.get_model()
if current_state.streaming_status != "offline":
model = await ss.get_model()
if model.streaming_status != 'offline':
await ss.toggle_streaming()
if current_state.replay_buffer_status != "offline":
if model.replay_buffer_status != 'offline':
await ss.stop_replay_buffer()
if current_state.recording_status != "offline":
if model.recording_status != 'offline':
await ss.toggle_recording()
ts = TransitionsService(conn)
model = await ts.get_model()
if model.studio_mode:
await ts.disable_studio_mode()
conn.close()
async def main():
"""Establish connection and clean up test scenes."""
conn = SlobsConnection(
ConnectionConfig(
domain=os.environ["SLOBS_DOMAIN"],
domain=os.environ['SLOBS_DOMAIN'],
port=59650,
token=os.environ["SLOBS_TOKEN"],
token=os.environ['SLOBS_TOKEN'],
)
)
@@ -32,5 +58,5 @@ async def main():
tg.start_soon(cleanup, conn)
if __name__ == "__main__":
if __name__ == '__main__':
anyio.run(main)

43
tests/test_audio.py Normal file
View File

@@ -0,0 +1,43 @@
"""Test cases for audio commands in slobs_cli."""
import pytest
from asyncclick.testing import CliRunner
from slobs_cli import cli
@pytest.mark.anyio
async def test_audio_list():
"""Test the list audio sources command."""
runner = CliRunner()
result = await runner.invoke(cli, ['audio', 'list'])
assert result.exit_code == 0
assert 'Desktop Audio' in result.output
assert 'Mic/Aux' in result.output
@pytest.mark.anyio
async def test_audio_mute():
"""Test the mute audio source command."""
runner = CliRunner()
result = await runner.invoke(cli, ['audio', 'mute', 'Mic/Aux'])
assert result.exit_code == 0
assert 'Mic/Aux muted successfully' in result.output
@pytest.mark.anyio
async def test_audio_unmute():
"""Test the unmute audio source command."""
runner = CliRunner()
result = await runner.invoke(cli, ['audio', 'unmute', 'Mic/Aux'])
assert result.exit_code == 0
assert 'Mic/Aux unmuted successfully' in result.output
@pytest.mark.anyio
async def test_audio_invalid_source():
"""Test handling of invalid audio source."""
runner = CliRunner()
result = await runner.invoke(cli, ['audio', 'mute', 'InvalidSource'])
assert result.exit_code != 0
assert 'Audio source "InvalidSource" not found' in result.output

View File

@@ -1,5 +1,6 @@
"""Test cases for the recording commands of the slobs_cli CLI application."""
import anyio
import asyncclick as click
import pytest
from asyncclick.testing import CliRunner
@@ -8,43 +9,35 @@ from slobs_cli import cli
@pytest.mark.anyio
async def test_record_start():
"""Test the start recording command."""
runner = CliRunner()
result = await runner.invoke(cli, ["record", "status"])
result = await runner.invoke(cli, ['record', 'status'])
assert result.exit_code == 0
active = "Recording is currently active." in result.output
active = 'Recording is currently active.' in result.output
result = await runner.invoke(cli, ['record', 'start'])
if not active:
result = await runner.invoke(cli, ["record", "start"])
assert result.exit_code == 0
assert "Recording started" in result.output
await anyio.sleep(1) # Allow some time for the recording to start
assert 'Recording started' in result.output
await anyio.sleep(0.2) # Allow some time for the recording to start
else:
with pytest.raises(ExceptionGroup) as exc_info:
result = await runner.invoke(
cli, ["record", "start"], catch_exceptions=False
)
assert exc_info.group_contains(
click.Abort, match="Recording is already active."
)
assert result.exit_code != 0
assert 'Recording is already active.' in result.output
@pytest.mark.anyio
async def test_record_stop():
"""Test the stop recording command."""
runner = CliRunner()
result = await runner.invoke(cli, ["record", "status"])
result = await runner.invoke(cli, ['record', 'status'])
assert result.exit_code == 0
active = "Recording is currently active." in result.output
active = 'Recording is currently active.' in result.output
result = await runner.invoke(cli, ['record', 'stop'])
if active:
result = await runner.invoke(cli, ["record", "stop"])
assert result.exit_code == 0
assert "Recording stopped" in result.output
await anyio.sleep(1) # Allow some time for the recording to stop
assert 'Recording stopped' in result.output
await anyio.sleep(0.2) # Allow some time for the recording to stop
else:
with pytest.raises(ExceptionGroup) as exc_info:
result = await runner.invoke(
cli, ["record", "stop"], catch_exceptions=False
)
assert exc_info.group_contains(
click.Abort, match="Recording is already inactive."
)
assert result.exit_code != 0
assert 'Recording is already inactive.' in result.output

View File

@@ -1,5 +1,6 @@
"""Test cases for the replay buffer commands in slobs_cli."""
import anyio
import asyncclick as click
import pytest
from asyncclick.testing import CliRunner
@@ -8,43 +9,35 @@ from slobs_cli import cli
@pytest.mark.anyio
async def test_replaybuffer_start():
"""Test the start replay buffer command."""
runner = CliRunner()
result = await runner.invoke(cli, ["replaybuffer", "status"])
result = await runner.invoke(cli, ['replaybuffer', 'status'])
assert result.exit_code == 0
active = "Replay buffer is currently active." in result.output
active = 'Replay buffer is currently active.' in result.output
result = await runner.invoke(cli, ['replaybuffer', 'start'])
if not active:
result = await runner.invoke(cli, ["replaybuffer", "start"])
assert result.exit_code == 0
assert "Replay buffer started" in result.output
await anyio.sleep(1)
assert 'Replay buffer started' in result.output
await anyio.sleep(0.2) # Allow some time for the replay buffer to start
else:
with pytest.raises(ExceptionGroup) as exc_info:
result = await runner.invoke(
cli, ["replaybuffer", "start"], catch_exceptions=False
)
assert exc_info.group_contains(
click.Abort, match="Replay buffer is already active."
)
assert result.exit_code != 0
assert 'Replay buffer is already active.' in result.output
@pytest.mark.anyio
async def test_replaybuffer_stop():
"""Test the stop replay buffer command."""
runner = CliRunner()
result = await runner.invoke(cli, ["replaybuffer", "status"])
result = await runner.invoke(cli, ['replaybuffer', 'status'])
assert result.exit_code == 0
active = "Replay buffer is currently active." in result.output
active = 'Replay buffer is currently active.' in result.output
result = await runner.invoke(cli, ['replaybuffer', 'stop'])
if active:
result = await runner.invoke(cli, ["replaybuffer", "stop"])
assert result.exit_code == 0
assert "Replay buffer stopped" in result.output
await anyio.sleep(1)
assert 'Replay buffer stopped' in result.output
await anyio.sleep(0.2) # Allow some time for the replay buffer to stop
else:
with pytest.raises(ExceptionGroup) as exc_info:
result = await runner.invoke(
cli, ["replaybuffer", "stop"], catch_exceptions=False
)
assert exc_info.group_contains(
click.Abort, match="Replay buffer is already inactive."
)
assert result.exit_code != 0
assert 'Replay buffer is already inactive.' in result.output

View File

@@ -1,3 +1,6 @@
"""Test cases for scene commands in slobs_cli."""
import anyio
import pytest
from asyncclick.testing import CliRunner
@@ -6,20 +9,35 @@ from slobs_cli import cli
@pytest.mark.anyio
async def test_scene_list():
"""Test the list scenes command."""
runner = CliRunner()
result = await runner.invoke(cli, ["scene", "list"])
result = await runner.invoke(cli, ['scene', 'list'])
assert result.exit_code == 0
assert "slobs-test-scene-1" in result.output
assert "slobs-test-scene-2" in result.output
assert "slobs-test-scene-3" in result.output
assert 'slobs-test-scene-1' in result.output
assert 'slobs-test-scene-2' in result.output
assert 'slobs-test-scene-3' in result.output
await anyio.sleep(0.2) # Avoid rate limiting issues
@pytest.mark.anyio
async def test_scene_current():
"""Test the current scene command."""
runner = CliRunner()
result = await runner.invoke(cli, ["scene", "switch", "slobs-test-scene-2"])
result = await runner.invoke(cli, ['scene', 'switch', 'slobs-test-scene-2'])
assert result.exit_code == 0
await anyio.sleep(0.2) # Avoid rate limiting issues
result = await runner.invoke(cli, ["scene", "current"])
result = await runner.invoke(cli, ['scene', 'current'])
assert result.exit_code == 0
assert "Current active scene: slobs-test-scene-2" in result.output
assert 'Current active scene: slobs-test-scene-2' in result.output
await anyio.sleep(0.2) # Avoid rate limiting issues
@pytest.mark.anyio
async def test_scene_invalid_switch():
"""Test switching to an invalid scene."""
runner = CliRunner()
result = await runner.invoke(cli, ['scene', 'switch', 'invalid-scene'])
assert result.exit_code != 0
assert 'Scene "invalid-scene" not found' in result.output
await anyio.sleep(0.2) # Avoid rate limiting issues

View File

@@ -1,5 +1,6 @@
"""Tests for the stream commands in slobs_cli."""
import anyio
import asyncclick as click
import pytest
from asyncclick.testing import CliRunner
@@ -8,39 +9,35 @@ from slobs_cli import cli
@pytest.mark.anyio
async def test_stream_start():
"""Test the start stream command."""
runner = CliRunner()
result = await runner.invoke(cli, ["stream", "status"])
result = await runner.invoke(cli, ['stream', 'status'])
assert result.exit_code == 0
active = "Stream is currently active." in result.output
active = 'Stream is currently active.' in result.output
result = await runner.invoke(cli, ['stream', 'start'])
if not active:
result = await runner.invoke(cli, ["stream", "start"])
assert result.exit_code == 0
assert "Stream started" in result.output
await anyio.sleep(1) # Allow some time for the stream to start
assert 'Stream started' in result.output
await anyio.sleep(0.2) # Allow some time for the stream to start
else:
with pytest.raises(ExceptionGroup) as exc_info:
result = await runner.invoke(
cli, ["stream", "start"], catch_exceptions=False
)
assert exc_info.group_contains(click.Abort, match="Stream is already active.")
assert result.exit_code != 0
assert 'Stream is already active.' in result.output
@pytest.mark.anyio
async def test_stream_stop():
"""Test the stop stream command."""
runner = CliRunner()
result = await runner.invoke(cli, ["stream", "status"])
result = await runner.invoke(cli, ['stream', 'status'])
assert result.exit_code == 0
active = "Stream is currently active." in result.output
active = 'Stream is currently active.' in result.output
result = await runner.invoke(cli, ['stream', 'stop'])
if active:
result = await runner.invoke(cli, ["stream", "stop"])
assert result.exit_code == 0
assert "Stream stopped" in result.output
await anyio.sleep(1) # Allow some time for the stream to stop
assert 'Stream stopped' in result.output
await anyio.sleep(0.2) # Allow some time for the stream to stop
else:
with pytest.raises(ExceptionGroup) as exc_info:
result = await runner.invoke(
cli, ["stream", "stop"], catch_exceptions=False
)
assert exc_info.group_contains(click.Abort, match="Stream is already inactive.")
assert result.exit_code != 0
assert 'Stream is already inactive.' in result.output

60
tests/test_studiomode.py Normal file
View File

@@ -0,0 +1,60 @@
"""Test cases for the studio mode commands of the slobs_cli CLI application."""
import pytest
from asyncclick.testing import CliRunner
from slobs_cli import cli
@pytest.mark.anyio
async def test_studiomode_enable():
"""Test the enable studio mode command."""
runner = CliRunner()
result = await runner.invoke(cli, ['studiomode', 'status'])
assert result.exit_code == 0
active = 'Studio mode is currently enabled.' in result.output
result = await runner.invoke(cli, ['studiomode', 'enable'])
if active:
assert result.exit_code != 0
assert 'Studio mode is already enabled.' in result.output
else:
assert result.exit_code == 0
assert 'Studio mode enabled successfully.' in result.output
@pytest.mark.anyio
async def test_studiomode_disable():
"""Test the disable studio mode command."""
runner = CliRunner()
result = await runner.invoke(cli, ['studiomode', 'status'])
assert result.exit_code == 0
active = 'Studio mode is currently enabled.' in result.output
result = await runner.invoke(cli, ['studiomode', 'disable'])
if not active:
assert result.exit_code != 0
assert 'Studio mode is already disabled.' in result.output
else:
assert result.exit_code == 0
assert 'Studio mode disabled successfully.' in result.output
@pytest.mark.anyio
async def test_studiomode_toggle():
"""Test the toggle studio mode command."""
runner = CliRunner()
result = await runner.invoke(cli, ['studiomode', 'status'])
assert result.exit_code == 0
active = 'Studio mode is currently enabled.' in result.output
result = await runner.invoke(cli, ['studiomode', 'toggle'])
if active:
assert result.exit_code == 0
assert 'Studio mode disabled successfully.' in result.output
else:
assert result.exit_code == 0
assert 'Studio mode enabled successfully.' in result.output

View File

@@ -1,9 +1,11 @@
[tox]
env_list = py{310,311,312}
env_list = py{311,312,313}
[testenv]
passenv = *
setenv = VIRTUALENV_DISCOVERY=pyenv
groups = dev
commands =
python tests/setup.py
pytest tests
python tests/teardown.py