add ruff config

run files through formatter

add dosctrings to satisfy the linter
This commit is contained in:
2025-06-12 20:34:14 +01:00
parent fecd13d345
commit 582587bed5
21 changed files with 356 additions and 211 deletions

View File

@@ -1 +1,3 @@
__version__ = "0.8.4"
"""module for package metadata."""
__version__ = '0.8.4'

View File

@@ -1,3 +1,5 @@
"""Package slobs_cli provides a command-line interface for interacting with SLOBS (Streamlabs OBS)."""
from .audio import audio
from .cli import cli
from .record import record
@@ -6,4 +8,4 @@ from .scene import scene
from .stream import stream
from .studiomode import studiomode
__all__ = ["cli", "scene", "stream", "record", "audio", "replaybuffer", "studiomode"]
__all__ = ['cli', 'scene', 'stream', 'record', 'audio', 'replaybuffer', 'studiomode']

View File

@@ -1,3 +1,5 @@
"""module for managing audio sources in Slobs CLI."""
import asyncclick as click
from anyio import create_task_group
from pyslobs import AudioService
@@ -9,41 +11,40 @@ from .errors import SlobsCliError
@cli.group()
def audio():
"""Audio management commands."""
"""Manage audio sources in Slobs CLI."""
@audio.command()
@click.option("--id", is_flag=True, help="Include audio source IDs in the output.")
@click.option('--id', is_flag=True, help='Include audio source IDs in the output.')
@click.pass_context
async def list(ctx: click.Context, id: bool = False):
"""List all audio sources."""
conn = ctx.obj["connection"]
conn = ctx.obj['connection']
as_ = AudioService(conn)
async def _run():
sources = await as_.get_sources()
if not sources:
click.echo("No audio sources found.")
click.echo('No audio sources found.')
conn.close()
return
table_data = [["Audio Name", "ID", "Muted"] if id else ["Name", "Muted"]]
table_data = [['Audio Name', 'ID', 'Muted'] if id else ['Name', 'Muted']]
for source in sources:
model = await source.get_model()
to_append = [f"{click.style(model.name, fg='blue')}"]
to_append = [click.style(model.name, fg='blue')]
if id:
to_append.append(f"{model.source_id}")
to_append.append("" if model.muted else "")
to_append.append(model.source_id)
to_append.append('' if model.muted else '')
table_data.append(to_append)
table = AsciiTable(table_data)
table.justify_columns = {
0: "left",
1: "left" if id else "center",
2: "center" if id else None,
0: 'left',
1: 'left' if id else 'center',
2: 'center' if id else None,
}
click.echo(table.table)
@@ -55,12 +56,11 @@ async def list(ctx: click.Context, id: bool = False):
@audio.command()
@click.argument("source_name")
@click.argument('source_name')
@click.pass_context
async def mute(ctx: click.Context, source_name: str):
"""Mute an audio source by name."""
conn = ctx.obj["connection"]
conn = ctx.obj['connection']
as_ = AudioService(conn)
async def _run():
@@ -74,7 +74,7 @@ async def mute(ctx: click.Context, source_name: str):
raise SlobsCliError(f"Source '{source_name}' not found.")
await source.set_muted(True)
click.echo(f"Muted audio source: {source_name}")
click.echo(f'Muted audio source: {source_name}')
conn.close()
try:
@@ -87,12 +87,11 @@ async def mute(ctx: click.Context, source_name: str):
@audio.command()
@click.argument("source_name")
@click.argument('source_name')
@click.pass_context
async def unmute(ctx: click.Context, source_name: str):
"""Unmute an audio source by name."""
conn = ctx.obj["connection"]
conn = ctx.obj['connection']
as_ = AudioService(conn)
async def _run():
@@ -106,7 +105,7 @@ async def unmute(ctx: click.Context, source_name: str):
raise SlobsCliError(f"Source '{source_name}' not found.")
await source.set_muted(False)
click.echo(f"Unmuted audio source: {source_name}")
click.echo(f'Unmuted audio source: {source_name}')
conn.close()
try:
@@ -119,12 +118,11 @@ async def unmute(ctx: click.Context, source_name: str):
@audio.command()
@click.argument("source_name")
@click.argument('source_name')
@click.pass_context
async def toggle(ctx: click.Context, source_name: str):
"""Toggle mute state of an audio source by name."""
conn = ctx.obj["connection"]
conn = ctx.obj['connection']
as_ = AudioService(conn)
async def _run():
@@ -134,10 +132,10 @@ async def toggle(ctx: click.Context, source_name: str):
if model.name.lower() == source_name.lower():
if model.muted:
await source.set_muted(False)
click.echo(f"Unmuted audio source: {source_name}")
click.echo(f'Unmuted audio source: {source_name}')
else:
await source.set_muted(True)
click.echo(f"Muted audio source: {source_name}")
click.echo(f'Muted audio source: {source_name}')
conn.close()
break
else: # If no source by the given name was found

View File

@@ -1,3 +1,5 @@
"""module defining the entry point for the Streamlabs Desktop CLI application."""
import anyio
import asyncclick as click
from pyslobs import ConnectionConfig, SlobsConnection
@@ -7,33 +9,33 @@ from .__about__ import __version__ as version
@click.group()
@click.option(
"-d",
"--domain",
default="127.0.0.1",
'-d',
'--domain',
default='127.0.0.1',
envvar='SLOBS_DOMAIN',
show_default=True,
show_envvar=True,
help="The domain of the SLOBS server.",
envvar="SLOBS_DOMAIN",
help='The domain of the SLOBS server.',
)
@click.option(
"-p",
"--port",
'-p',
'--port',
default=59650,
envvar='SLOBS_PORT',
show_default=True,
show_envvar=True,
help="The port of the SLOBS server.",
envvar="SLOBS_PORT",
help='The port of the SLOBS server.',
)
@click.option(
"-t",
"--token",
help="The token for the SLOBS server.",
envvar="SLOBS_TOKEN",
'-t',
'--token',
envvar='SLOBS_TOKEN',
show_envvar=True,
required=True,
help='The token for the SLOBS server.',
)
@click.version_option(
version, "-v", "--version", message="%(prog)s version: %(version)s"
version, '-v', '--version', message='%(prog)s version: %(version)s'
)
@click.pass_context
async def cli(ctx: click.Context, domain: str, port: int, token: str):
@@ -44,7 +46,7 @@ async def cli(ctx: click.Context, domain: str, port: int, token: str):
port=port,
token=token,
)
ctx.obj["connection"] = SlobsConnection(config)
ctx.obj['connection'] = SlobsConnection(config)
def run():

View File

@@ -1,3 +1,5 @@
"""module for custom exceptions in Slobs CLI."""
import asyncclick as click
@@ -5,9 +7,10 @@ class SlobsCliError(click.ClickException):
"""Base class for all Slobs CLI errors."""
def __init__(self, message: str):
"""Initialize the SlobsCliError with a message."""
super().__init__(message)
self.exit_code = 1
def show(self):
"""Display the error message in red."""
click.secho(f"Error: {self.message}", fg="red", err=True)
click.secho(f'Error: {self.message}', fg='red', err=True)

View File

@@ -1,3 +1,5 @@
"""module for managing recording commands in Slobs CLI."""
import asyncclick as click
from anyio import create_task_group
from pyslobs import StreamingService
@@ -8,27 +10,26 @@ from .errors import SlobsCliError
@cli.group()
def record():
"""Recording management commands."""
"""Manage recording in Slobs CLI."""
@record.command()
@click.pass_context
async def start(ctx: click.Context):
"""Start recording."""
conn = ctx.obj["connection"]
conn = ctx.obj['connection']
ss = StreamingService(conn)
async def _run():
model = await ss.get_model()
active = model.recording_status != "offline"
active = model.recording_status != 'offline'
if active:
conn.close()
raise SlobsCliError("Recording is already active.")
raise SlobsCliError('Recording is already active.')
await ss.toggle_recording()
click.echo("Recording started.")
click.echo('Recording started.')
conn.close()
@@ -45,20 +46,19 @@ async def start(ctx: click.Context):
@click.pass_context
async def stop(ctx: click.Context):
"""Stop recording."""
conn = ctx.obj["connection"]
conn = ctx.obj['connection']
ss = StreamingService(conn)
async def _run():
model = await ss.get_model()
active = model.recording_status != "offline"
active = model.recording_status != 'offline'
if not active:
conn.close()
raise SlobsCliError("Recording is already inactive.")
raise SlobsCliError('Recording is already inactive.')
await ss.toggle_recording()
click.echo("Recording stopped.")
click.echo('Recording stopped.')
conn.close()
@@ -75,18 +75,17 @@ async def stop(ctx: click.Context):
@click.pass_context
async def status(ctx: click.Context):
"""Get recording status."""
conn = ctx.obj["connection"]
conn = ctx.obj['connection']
ss = StreamingService(conn)
async def _run():
model = await ss.get_model()
active = model.recording_status != "offline"
active = model.recording_status != 'offline'
if active:
click.echo("Recording is currently active.")
click.echo('Recording is currently active.')
else:
click.echo("Recording is currently inactive.")
click.echo('Recording is currently inactive.')
conn.close()
@@ -99,20 +98,19 @@ async def status(ctx: click.Context):
@click.pass_context
async def toggle(ctx: click.Context):
"""Toggle recording status."""
conn = ctx.obj["connection"]
conn = ctx.obj['connection']
ss = StreamingService(conn)
async def _run():
model = await ss.get_model()
active = model.recording_status != "offline"
active = model.recording_status != 'offline'
if active:
await ss.toggle_recording()
click.echo("Recording stopped.")
click.echo('Recording stopped.')
else:
await ss.toggle_recording()
click.echo("Recording started.")
click.echo('Recording started.')
conn.close()

View File

@@ -1,3 +1,5 @@
"""module for managing the replay buffer in Slobs CLI."""
import asyncclick as click
from anyio import create_task_group
from pyslobs import StreamingService
@@ -8,27 +10,26 @@ from .errors import SlobsCliError
@cli.group()
def replaybuffer():
"""Replay buffer management commands."""
"""Manage the replay buffer in Slobs CLI."""
@replaybuffer.command()
@click.pass_context
async def start(ctx: click.Context):
"""Start the replay buffer."""
conn = ctx.obj["connection"]
conn = ctx.obj['connection']
ss = StreamingService(conn)
async def _run():
model = await ss.get_model()
active = model.replay_buffer_status != "offline"
active = model.replay_buffer_status != 'offline'
if active:
conn.close()
raise SlobsCliError("Replay buffer is already active.")
raise SlobsCliError('Replay buffer is already active.')
await ss.start_replay_buffer()
click.echo("Replay buffer started.")
click.echo('Replay buffer started.')
conn.close()
try:
@@ -44,20 +45,19 @@ async def start(ctx: click.Context):
@click.pass_context
async def stop(ctx: click.Context):
"""Stop the replay buffer."""
conn = ctx.obj["connection"]
conn = ctx.obj['connection']
ss = StreamingService(conn)
async def _run():
model = await ss.get_model()
active = model.replay_buffer_status != "offline"
active = model.replay_buffer_status != 'offline'
if not active:
conn.close()
raise SlobsCliError("Replay buffer is already inactive.")
raise SlobsCliError('Replay buffer is already inactive.')
await ss.stop_replay_buffer()
click.echo("Replay buffer stopped.")
click.echo('Replay buffer stopped.')
conn.close()
try:
@@ -73,17 +73,16 @@ async def stop(ctx: click.Context):
@click.pass_context
async def status(ctx: click.Context):
"""Get the current status of the replay buffer."""
conn = ctx.obj["connection"]
conn = ctx.obj['connection']
ss = StreamingService(conn)
async def _run():
model = await ss.get_model()
active = model.replay_buffer_status != "offline"
active = model.replay_buffer_status != 'offline'
if active:
click.echo("Replay buffer is currently active.")
click.echo('Replay buffer is currently active.')
else:
click.echo("Replay buffer is currently inactive.")
click.echo('Replay buffer is currently inactive.')
conn.close()
async with create_task_group() as tg:
@@ -95,13 +94,12 @@ async def status(ctx: click.Context):
@click.pass_context
async def save(ctx: click.Context):
"""Save the current replay buffer."""
conn = ctx.obj["connection"]
conn = ctx.obj['connection']
ss = StreamingService(conn)
async def _run():
await ss.save_replay()
click.echo("Replay buffer saved.")
click.echo('Replay buffer saved.')
conn.close()
async with create_task_group() as tg:

View File

@@ -1,3 +1,5 @@
"""module for managing scenes in Slobs CLI."""
import asyncclick as click
from anyio import create_task_group
from pyslobs import ScenesService, TransitionsService
@@ -9,46 +11,45 @@ from .errors import SlobsCliError
@cli.group()
def scene():
"""Scene management commands."""
"""Manage scenes in Slobs CLI."""
@scene.command()
@click.option("--id", is_flag=True, help="Include scene IDs in the output.")
@click.option('--id', is_flag=True, help='Include scene IDs in the output.')
@click.pass_context
async def list(ctx: click.Context, id: bool = False):
"""List all available scenes."""
conn = ctx.obj["connection"]
conn = ctx.obj['connection']
ss = ScenesService(conn)
async def _run():
scenes = await ss.get_scenes()
if not scenes:
click.echo("No scenes found.")
click.echo('No scenes found.')
conn.close()
return
active_scene = await ss.active_scene()
table_data = [
["Scene Name", "ID", "Active"] if id else ["Scene Name", "Active"]
['Scene Name', 'ID', 'Active'] if id else ['Scene Name', 'Active']
]
for scene in scenes:
if scene.id == active_scene.id:
to_append = [f"{click.style(scene.name, fg='green')}"]
to_append = [click.style(scene.name, fg='green')]
else:
to_append = [f"{click.style(scene.name, fg='blue')}"]
to_append = [click.style(scene.name, fg='blue')]
if id:
to_append.append(f"{scene.id}")
to_append.append("" if scene.id == active_scene.id else "")
to_append.append(scene.id)
to_append.append('' if scene.id == active_scene.id else '')
table_data.append(to_append)
table = AsciiTable(table_data)
table.justify_columns = {
0: "left",
1: "left" if id else "center",
2: "center" if id else None,
0: 'left',
1: 'left' if id else 'center',
2: 'center' if id else None,
}
click.echo(table.table)
@@ -60,19 +61,18 @@ async def list(ctx: click.Context, id: bool = False):
@scene.command()
@click.option("--id", is_flag=True, help="Include scene IDs in the output.")
@click.option('--id', is_flag=True, help='Include scene IDs in the output.')
@click.pass_context
async def current(ctx: click.Context, id: bool = False):
"""Show the currently active scene."""
conn = ctx.obj["connection"]
conn = ctx.obj['connection']
ss = ScenesService(conn)
async def _run():
active_scene = await ss.active_scene()
click.echo(
f"Current active scene: {click.style(active_scene.name, fg='green')} "
f"{f'(ID: {active_scene.id})' if id else ''}"
f'Current active scene: {click.style(active_scene.name, fg="green")} '
f'{f"(ID: {active_scene.id})" if id else ""}'
)
conn.close()
@@ -82,20 +82,19 @@ async def current(ctx: click.Context, id: bool = False):
@scene.command()
@click.option("--id", is_flag=True, help="Include scene IDs in the output.")
@click.argument("scene_name", type=str)
@click.option('--id', is_flag=True, help='Include scene IDs in the output.')
@click.argument('scene_name', type=str)
@click.option(
"--preview",
'--preview',
is_flag=True,
help="Switch the preview scene only.",
help='Switch the preview scene only.',
)
@click.pass_context
async def switch(
ctx: click.Context, scene_name: str, preview: bool = False, id: bool = False
):
"""Switch to a scene by its name."""
conn = ctx.obj["connection"]
conn = ctx.obj['connection']
ss = ScenesService(conn)
ts = TransitionsService(conn)
@@ -109,29 +108,29 @@ async def switch(
await ss.make_scene_active(scene.id)
if preview:
click.echo(
f"Switched to preview scene: {click.style(scene.name, fg='blue')} "
f"{f'(ID: {scene.id}).' if id else ''}"
f'Switched to preview scene: {click.style(scene.name, fg="blue")} '
f'{f"(ID: {scene.id})." if id else ""}'
)
else:
click.echo(
f"Switched to scene: {click.style(scene.name, fg='blue')} "
f"{f'(ID: {scene.id}).' if id else ''}"
f'Switched to scene: {click.style(scene.name, fg="blue")} '
f'{f"(ID: {scene.id})." if id else ""}'
)
await ts.execute_studio_mode_transition()
click.echo(
"Executed studio mode transition to make the scene active."
'Executed studio mode transition to make the scene active.'
)
else:
if preview:
conn.close()
raise SlobsCliError(
"Cannot switch the preview scene in non-studio mode."
'Cannot switch the preview scene in non-studio mode.'
)
await ss.make_scene_active(scene.id)
click.echo(
f"Switched to scene: {click.style(scene.name, fg='blue')} "
f"{f'(ID: {scene.id}).' if id else ''}"
f'Switched to scene: {click.style(scene.name, fg="blue")} '
f'{f"(ID: {scene.id})." if id else ""}'
)
conn.close()

View File

@@ -1,3 +1,5 @@
"""module for managing the replay buffer in Slobs CLI."""
import asyncclick as click
from anyio import create_task_group
from pyslobs import StreamingService
@@ -8,27 +10,26 @@ from .errors import SlobsCliError
@cli.group()
def stream():
"""Stream management commands."""
"""Manage streaming in Slobs CLI."""
@stream.command()
@click.pass_context
async def start(ctx: click.Context):
"""Start the stream."""
conn = ctx.obj["connection"]
conn = ctx.obj['connection']
ss = StreamingService(conn)
async def _run():
model = await ss.get_model()
active = model.streaming_status != "offline"
active = model.streaming_status != 'offline'
if active:
conn.close()
raise SlobsCliError("Stream is already active.")
raise SlobsCliError('Stream is already active.')
await ss.toggle_streaming()
click.echo("Stream started.")
click.echo('Stream started.')
conn.close()
try:
@@ -44,20 +45,19 @@ async def start(ctx: click.Context):
@click.pass_context
async def stop(ctx: click.Context):
"""Stop the stream."""
conn = ctx.obj["connection"]
conn = ctx.obj['connection']
ss = StreamingService(conn)
async def _run():
model = await ss.get_model()
active = model.streaming_status != "offline"
active = model.streaming_status != 'offline'
if not active:
conn.close()
raise SlobsCliError("Stream is already inactive.")
raise SlobsCliError('Stream is already inactive.')
await ss.toggle_streaming()
click.echo("Stream stopped.")
click.echo('Stream stopped.')
conn.close()
try:
@@ -73,18 +73,17 @@ async def stop(ctx: click.Context):
@click.pass_context
async def status(ctx: click.Context):
"""Get the current stream status."""
conn = ctx.obj["connection"]
conn = ctx.obj['connection']
ss = StreamingService(conn)
async def _run():
model = await ss.get_model()
active = model.streaming_status != "offline"
active = model.streaming_status != 'offline'
if active:
click.echo("Stream is currently active.")
click.echo('Stream is currently active.')
else:
click.echo("Stream is currently inactive.")
click.echo('Stream is currently inactive.')
conn.close()
async with create_task_group() as tg:
@@ -96,19 +95,18 @@ async def status(ctx: click.Context):
@click.pass_context
async def toggle(ctx: click.Context):
"""Toggle the stream status."""
conn = ctx.obj["connection"]
conn = ctx.obj['connection']
ss = StreamingService(conn)
async def _run():
model = await ss.get_model()
active = model.streaming_status != "offline"
active = model.streaming_status != 'offline'
await ss.toggle_streaming()
if active:
click.echo("Stream stopped.")
click.echo('Stream stopped.')
else:
click.echo("Stream started.")
click.echo('Stream started.')
conn.close()

View File

@@ -1,3 +1,5 @@
"""module for managing studio mode in Slobs CLI."""
import asyncclick as click
from anyio import create_task_group
from pyslobs import TransitionsService
@@ -8,25 +10,24 @@ from .errors import SlobsCliError
@cli.group()
def studiomode():
"""Studio mode management commands."""
"""Manage studio mode in Slobs CLI."""
@studiomode.command()
@click.pass_context
async def enable(ctx: click.Context):
"""Enable studio mode."""
conn = ctx.obj["connection"]
conn = ctx.obj['connection']
ts = TransitionsService(conn)
async def _run():
model = await ts.get_model()
if model.studio_mode:
conn.close()
raise SlobsCliError("Studio mode is already enabled.")
raise SlobsCliError('Studio mode is already enabled.')
await ts.enable_studio_mode()
click.echo("Studio mode enabled successfully.")
click.echo('Studio mode enabled successfully.')
conn.close()
try:
@@ -42,18 +43,17 @@ async def enable(ctx: click.Context):
@click.pass_context
async def disable(ctx: click.Context):
"""Disable studio mode."""
conn = ctx.obj["connection"]
conn = ctx.obj['connection']
ts = TransitionsService(conn)
async def _run():
model = await ts.get_model()
if not model.studio_mode:
conn.close()
raise SlobsCliError("Studio mode is already disabled.")
raise SlobsCliError('Studio mode is already disabled.')
await ts.disable_studio_mode()
click.echo("Studio mode disabled successfully.")
click.echo('Studio mode disabled successfully.')
conn.close()
try:
@@ -69,16 +69,15 @@ async def disable(ctx: click.Context):
@click.pass_context
async def status(ctx: click.Context):
"""Check the status of studio mode."""
conn = ctx.obj["connection"]
conn = ctx.obj['connection']
ts = TransitionsService(conn)
async def _run():
model = await ts.get_model()
if model.studio_mode:
click.echo("Studio mode is currently enabled.")
click.echo('Studio mode is currently enabled.')
else:
click.echo("Studio mode is currently disabled.")
click.echo('Studio mode is currently disabled.')
conn.close()
async with create_task_group() as tg:
@@ -90,18 +89,17 @@ async def status(ctx: click.Context):
@click.pass_context
async def toggle(ctx: click.Context):
"""Toggle studio mode."""
conn = ctx.obj["connection"]
conn = ctx.obj['connection']
ts = TransitionsService(conn)
async def _run():
model = await ts.get_model()
if model.studio_mode:
await ts.disable_studio_mode()
click.echo("Studio mode disabled successfully.")
click.echo('Studio mode disabled successfully.')
else:
await ts.enable_studio_mode()
click.echo("Studio mode enabled successfully.")
click.echo('Studio mode enabled successfully.')
conn.close()
async with create_task_group() as tg:
@@ -113,18 +111,17 @@ async def toggle(ctx: click.Context):
@click.pass_context
async def force_transition(ctx: click.Context):
"""Force a transition in studio mode."""
conn = ctx.obj["connection"]
conn = ctx.obj['connection']
ts = TransitionsService(conn)
async def _run():
model = await ts.get_model()
if not model.studio_mode:
conn.close()
raise SlobsCliError("Studio mode is not enabled.")
raise SlobsCliError('Studio mode is not enabled.')
await ts.execute_studio_mode_transition()
click.echo("Forced studio mode transition.")
click.echo('Forced studio mode transition.')
conn.close()
try: