dynamically load commands from obsws_cli.commands

no changes to files other than imports

patch bump
This commit is contained in:
2026-01-24 22:29:33 +00:00
parent 8bec6908e5
commit 1fc0bef237
20 changed files with 44 additions and 60 deletions

View File

@@ -0,0 +1,209 @@
"""module containing commands for manipulating filters in scenes."""
from typing import Annotated, Optional
import obsws_python as obsws
import typer
from rich.table import Table
from rich.text import Text
from obsws_cli import console, util
from obsws_cli.alias import SubTyperAliasGroup
app = typer.Typer(cls=SubTyperAliasGroup)
@app.callback()
def main():
"""Control filters in OBS scenes."""
@app.command('list | ls')
def list_(
ctx: typer.Context,
source_name: Annotated[
Optional[str],
typer.Argument(
show_default='The current scene',
help='The source to list filters for',
),
] = None,
):
"""List filters for a source."""
if not source_name:
source_name = ctx.obj['obsws'].get_current_program_scene().scene_name
try:
resp = ctx.obj['obsws'].get_source_filter_list(source_name)
except obsws.error.OBSSDKRequestError as e:
if e.code == 600:
console.err.print(
f'No source was found by the name of [yellow]{source_name}[/yellow].'
)
raise typer.Exit(1)
else:
raise
if not resp.filters:
console.out.print(
f'No filters found for source {console.highlight(ctx, source_name)}'
)
raise typer.Exit()
table = Table(
title=f'Filters for Source: {source_name}',
padding=(0, 2),
border_style=ctx.obj['style'].border,
)
columns = [
(Text('Filter Name', justify='center'), 'left', ctx.obj['style'].column),
(Text('Kind', justify='center'), 'left', ctx.obj['style'].column),
(Text('Enabled', justify='center'), 'center', None),
(Text('Settings', justify='center'), 'center', ctx.obj['style'].column),
]
for heading, justify, style in columns:
table.add_column(heading, justify=justify, style=style)
for filter in resp.filters:
resp = ctx.obj['obsws'].get_source_filter_default_settings(filter['filterKind'])
settings = resp.default_filter_settings | filter['filterSettings']
table.add_row(
filter['filterName'],
util.snakecase_to_titlecase(filter['filterKind']),
util.check_mark(filter['filterEnabled']),
'\n'.join(
[
f'{util.snakecase_to_titlecase(k):<20} {v:>10}'
for k, v in settings.items()
]
),
)
console.out.print(table)
def _get_filter_enabled(ctx: typer.Context, source_name: str, filter_name: str):
"""Get the status of a filter for a source."""
resp = ctx.obj['obsws'].get_source_filter(source_name, filter_name)
return resp.filter_enabled
@app.command('enable | on')
def enable(
ctx: typer.Context,
source_name: Annotated[
str,
typer.Argument(
..., show_default=False, help='The source to enable the filter for'
),
],
filter_name: Annotated[
str,
typer.Argument(
..., show_default=False, help='The name of the filter to enable'
),
],
):
"""Enable a filter for a source."""
if _get_filter_enabled(ctx, source_name, filter_name):
console.err.print(
f'Filter [yellow]{filter_name}[/yellow] is already enabled for source [yellow]{source_name}[/yellow]'
)
raise typer.Exit(1)
ctx.obj['obsws'].set_source_filter_enabled(source_name, filter_name, enabled=True)
console.out.print(
f'Enabled filter {console.highlight(ctx, filter_name)} for source {console.highlight(ctx, source_name)}'
)
@app.command('disable | off')
def disable(
ctx: typer.Context,
source_name: Annotated[
str,
typer.Argument(
..., show_default=False, help='The source to disable the filter for'
),
],
filter_name: Annotated[
str,
typer.Argument(
..., show_default=False, help='The name of the filter to disable'
),
],
):
"""Disable a filter for a source."""
if not _get_filter_enabled(ctx, source_name, filter_name):
console.err.print(
f'Filter [yellow]{filter_name}[/yellow] is already disabled for source [yellow]{source_name}[/yellow]'
)
raise typer.Exit(1)
ctx.obj['obsws'].set_source_filter_enabled(source_name, filter_name, enabled=False)
console.out.print(
f'Disabled filter {console.highlight(ctx, filter_name)} for source {console.highlight(ctx, source_name)}'
)
@app.command('toggle | tg')
def toggle(
ctx: typer.Context,
source_name: Annotated[
str,
typer.Argument(
..., show_default=False, help='The source to toggle the filter for'
),
],
filter_name: Annotated[
str,
typer.Argument(
..., show_default=False, help='The name of the filter to toggle'
),
],
):
"""Toggle a filter for a source."""
is_enabled = _get_filter_enabled(ctx, source_name, filter_name)
new_state = not is_enabled
ctx.obj['obsws'].set_source_filter_enabled(
source_name, filter_name, enabled=new_state
)
if new_state:
console.out.print(
f'Enabled filter {console.highlight(ctx, filter_name)} for source {console.highlight(ctx, source_name)}'
)
else:
console.out.print(
f'Disabled filter {console.highlight(ctx, filter_name)} for source {console.highlight(ctx, source_name)}'
)
@app.command('status | ss')
def status(
ctx: typer.Context,
source_name: Annotated[
str,
typer.Argument(
..., show_default=False, help='The source to get the filter status for'
),
],
filter_name: Annotated[
str,
typer.Argument(
..., show_default=False, help='The name of the filter to get the status for'
),
],
):
"""Get the status of a filter for a source."""
is_enabled = _get_filter_enabled(ctx, source_name, filter_name)
if is_enabled:
console.out.print(
f'Filter {console.highlight(ctx, filter_name)} is enabled for source {console.highlight(ctx, source_name)}'
)
else:
console.out.print(
f'Filter {console.highlight(ctx, filter_name)} is disabled for source {console.highlight(ctx, source_name)}'
)

222
obsws_cli/commands/group.py Normal file
View File

@@ -0,0 +1,222 @@
"""module containing commands for manipulating groups in scenes."""
from typing import Annotated, Optional
import typer
from rich.table import Table
from rich.text import Text
from obsws_cli import console, util, validate
from obsws_cli.alias import SubTyperAliasGroup
from obsws_cli.protocols import DataclassProtocol
app = typer.Typer(cls=SubTyperAliasGroup)
@app.callback()
def main():
"""Control groups in OBS scenes."""
@app.command('list | ls')
def list_(
ctx: typer.Context,
scene_name: Annotated[
Optional[str],
typer.Argument(
show_default='The current scene',
help='Scene name to list groups for',
callback=validate.scene_in_scenes,
),
] = None,
):
"""List groups in a scene."""
if scene_name is None:
scene_name = ctx.obj['obsws'].get_current_program_scene().scene_name
resp = ctx.obj['obsws'].get_scene_item_list(scene_name)
groups = [
(item.get('sceneItemId'), item.get('sourceName'), item.get('sceneItemEnabled'))
for item in resp.scene_items
if item.get('isGroup')
]
if not groups:
console.out.print(
f'No groups found in scene {console.highlight(ctx, scene_name)}.'
)
raise typer.Exit()
table = Table(
title=f'Groups in Scene: {scene_name}',
padding=(0, 2),
border_style=ctx.obj['style'].border,
)
columns = [
(Text('ID', justify='center'), 'center', ctx.obj['style'].column),
(Text('Group Name', justify='center'), 'left', ctx.obj['style'].column),
(Text('Enabled', justify='center'), 'center', None),
]
for heading, justify, style in columns:
table.add_column(heading, justify=justify, style=style)
for item_id, group_name, is_enabled in groups:
table.add_row(
str(item_id),
group_name,
util.check_mark(is_enabled),
)
console.out.print(table)
def _get_group(group_name: str, resp: DataclassProtocol) -> dict | None:
"""Get a group from the scene item list response."""
group = next(
(
item
for item in resp.scene_items
if item.get('sourceName') == group_name and item.get('isGroup')
),
None,
)
return group
@app.command('show | sh')
def show(
ctx: typer.Context,
scene_name: Annotated[
str,
typer.Argument(
...,
show_default=False,
help='Scene name the group is in',
callback=validate.scene_in_scenes,
),
],
group_name: Annotated[
str, typer.Argument(..., show_default=False, help='Group name to show')
],
):
"""Show a group in a scene."""
resp = ctx.obj['obsws'].get_scene_item_list(scene_name)
if (group := _get_group(group_name, resp)) is None:
console.err.print(
f'Group [yellow]{group_name}[/yellow] not found in scene [yellow]{scene_name}[/yellow].'
)
raise typer.Exit(1)
ctx.obj['obsws'].set_scene_item_enabled(
scene_name=scene_name,
item_id=int(group.get('sceneItemId')),
enabled=True,
)
console.out.print(f'Group {console.highlight(ctx, group_name)} is now visible.')
@app.command('hide | h')
def hide(
ctx: typer.Context,
scene_name: Annotated[
str,
typer.Argument(
...,
show_default=False,
help='Scene name the group is in',
callback=validate.scene_in_scenes,
),
],
group_name: Annotated[
str, typer.Argument(..., show_default=False, help='Group name to hide')
],
):
"""Hide a group in a scene."""
resp = ctx.obj['obsws'].get_scene_item_list(scene_name)
if (group := _get_group(group_name, resp)) is None:
console.err.print(
f'Group [yellow]{group_name}[/yellow] not found in scene [yellow]{scene_name}[/yellow].'
)
raise typer.Exit(1)
ctx.obj['obsws'].set_scene_item_enabled(
scene_name=scene_name,
item_id=int(group.get('sceneItemId')),
enabled=False,
)
console.out.print(f'Group {console.highlight(ctx, group_name)} is now hidden.')
@app.command('toggle | tg')
def toggle(
ctx: typer.Context,
scene_name: Annotated[
str,
typer.Argument(
...,
show_default=False,
help='Scene name the group is in',
callback=validate.scene_in_scenes,
),
],
group_name: Annotated[
str, typer.Argument(..., show_default=False, help='Group name to toggle')
],
):
"""Toggle a group in a scene."""
resp = ctx.obj['obsws'].get_scene_item_list(scene_name)
if (group := _get_group(group_name, resp)) is None:
console.err.print(
f'Group [yellow]{group_name}[/yellow] not found in scene [yellow]{scene_name}[/yellow].'
)
raise typer.Exit(1)
new_state = not group.get('sceneItemEnabled')
ctx.obj['obsws'].set_scene_item_enabled(
scene_name=scene_name,
item_id=int(group.get('sceneItemId')),
enabled=new_state,
)
if new_state:
console.out.print(f'Group {console.highlight(ctx, group_name)} is now visible.')
else:
console.out.print(f'Group {console.highlight(ctx, group_name)} is now hidden.')
@app.command('status | ss')
def status(
ctx: typer.Context,
scene_name: Annotated[
str,
typer.Argument(
...,
show_default=False,
help='Scene name the group is in',
callback=validate.scene_in_scenes,
),
],
group_name: Annotated[
str, typer.Argument(..., show_default=False, help='Group name to check status')
],
):
"""Get the status of a group in a scene."""
resp = ctx.obj['obsws'].get_scene_item_list(scene_name)
if (group := _get_group(group_name, resp)) is None:
console.err.print(
f'Group [yellow]{group_name}[/yellow] not found in scene [yellow]{scene_name}[/yellow].'
)
raise typer.Exit(1)
enabled = ctx.obj['obsws'].get_scene_item_enabled(
scene_name=scene_name,
item_id=int(group.get('sceneItemId')),
)
if enabled.scene_item_enabled:
console.out.print(f'Group {console.highlight(ctx, group_name)} is now visible.')
else:
console.out.print(f'Group {console.highlight(ctx, group_name)} is now hidden.')

View File

@@ -0,0 +1,84 @@
"""module containing commands for hotkey management."""
from typing import Annotated
import typer
from rich.table import Table
from rich.text import Text
from obsws_cli import console
from obsws_cli.alias import SubTyperAliasGroup
app = typer.Typer(cls=SubTyperAliasGroup)
@app.callback()
def main():
"""Control hotkeys in OBS."""
@app.command('list | ls')
def list_(
ctx: typer.Context,
):
"""List all hotkeys."""
resp = ctx.obj['obsws'].get_hotkey_list()
if not resp.hotkeys:
console.out.print('No hotkeys found.')
raise typer.Exit()
table = Table(
title='Hotkeys',
padding=(0, 2),
border_style=ctx.obj['style'].border,
)
table.add_column(
Text('Hotkey Name', justify='center'),
justify='left',
style=ctx.obj['style'].column,
)
for i, hotkey in enumerate(resp.hotkeys):
table.add_row(hotkey, style='' if i % 2 == 0 else 'dim')
console.out.print(table)
@app.command('trigger | tr')
def trigger(
ctx: typer.Context,
hotkey: Annotated[
str, typer.Argument(..., show_default=False, help='The hotkey to trigger')
],
):
"""Trigger a hotkey by name."""
ctx.obj['obsws'].trigger_hotkey_by_name(hotkey)
@app.command('trigger-sequence | trs')
def trigger_sequence(
ctx: typer.Context,
key_id: Annotated[
str,
typer.Argument(
...,
show_default=False,
help='The OBS key ID to trigger, see https://github.com/onyx-and-iris/obsws-cli?tab=readme-ov-file#hotkey for more info',
),
],
shift: Annotated[
bool, typer.Option(..., help='Press shift when triggering the hotkey')
] = False,
ctrl: Annotated[
bool, typer.Option(..., help='Press control when triggering the hotkey')
] = False,
alt: Annotated[
bool, typer.Option(..., help='Press alt when triggering the hotkey')
] = False,
cmd: Annotated[
bool, typer.Option(..., help='Press cmd when triggering the hotkey')
] = False,
):
"""Trigger a hotkey by sequence."""
ctx.obj['obsws'].trigger_hotkey_by_key_sequence(key_id, shift, ctrl, alt, cmd)

456
obsws_cli/commands/input.py Normal file
View File

@@ -0,0 +1,456 @@
"""module containing commands for manipulating inputs."""
from typing import Annotated
import obsws_python as obsws
import typer
from rich.table import Table
from rich.text import Text
from obsws_cli import console, util, validate
from obsws_cli.alias import SubTyperAliasGroup
app = typer.Typer(cls=SubTyperAliasGroup)
@app.callback()
def main():
"""Control inputs in OBS."""
@app.command('create | add')
def create(
ctx: typer.Context,
input_name: Annotated[
str,
typer.Argument(
...,
show_default=False,
help='Name of the input to create.',
callback=validate.input_not_in_inputs,
),
],
input_kind: Annotated[
str,
typer.Argument(
...,
show_default=False,
help='Kind of the input to create.',
callback=validate.kind_in_input_kinds,
),
],
):
"""Create a new input."""
current_scene = (
ctx.obj['obsws'].get_current_program_scene().current_program_scene_name
)
try:
ctx.obj['obsws'].create_input(
inputName=input_name,
inputKind=input_kind,
sceneItemEnabled=True,
sceneName=current_scene,
inputSettings={},
)
except obsws.error.OBSSDKRequestError as e:
console.err.print(f'Failed to create input: [yellow]{e}[/yellow]')
raise typer.Exit(1)
console.out.print(
f'Input {console.highlight(ctx, input_name)} of kind '
f'{console.highlight(ctx, input_kind)} created.',
)
@app.command('remove | rm')
def remove(
ctx: typer.Context,
input_name: Annotated[
str,
typer.Argument(
...,
show_default=False,
help='Name of the input to remove.',
callback=validate.input_in_inputs,
),
],
):
"""Remove an input."""
ctx.obj['obsws'].remove_input(name=input_name)
console.out.print(f'Input {console.highlight(ctx, input_name)} removed.')
@app.command('list | ls')
def list_(
ctx: typer.Context,
input: Annotated[bool, typer.Option(help='Filter by input type.')] = False,
output: Annotated[bool, typer.Option(help='Filter by output type.')] = False,
colour: Annotated[bool, typer.Option(help='Filter by colour source type.')] = False,
ffmpeg: Annotated[bool, typer.Option(help='Filter by ffmpeg source type.')] = False,
vlc: Annotated[bool, typer.Option(help='Filter by VLC source type.')] = False,
uuid: Annotated[bool, typer.Option(help='Show UUIDs of inputs.')] = False,
):
"""List all inputs."""
resp = ctx.obj['obsws'].get_input_list()
kinds = []
if input:
kinds.append('input')
if output:
kinds.append('output')
if colour:
kinds.append('color')
if ffmpeg:
kinds.append('ffmpeg')
if vlc:
kinds.append('vlc')
if not any([input, output, colour, ffmpeg, vlc]):
kinds = ctx.obj['obsws'].get_input_kind_list(False).input_kinds
inputs = sorted(
(
(input_.get('inputName'), input_.get('inputKind'), input_.get('inputUuid'))
for input_ in filter(
lambda input_: any(kind in input_.get('inputKind') for kind in kinds),
resp.inputs,
)
),
key=lambda x: x[0], # Sort by input name
)
if not inputs:
console.out.print('No inputs found.')
raise typer.Exit()
table = Table(title='Inputs', padding=(0, 2), border_style=ctx.obj['style'].border)
if uuid:
columns = [
(Text('Input Name', justify='center'), 'left', ctx.obj['style'].column),
(Text('Kind', justify='center'), 'center', ctx.obj['style'].column),
(Text('Muted', justify='center'), 'center', None),
(Text('UUID', justify='center'), 'left', ctx.obj['style'].column),
]
else:
columns = [
(Text('Input Name', justify='center'), 'left', ctx.obj['style'].column),
(Text('Kind', justify='center'), 'center', ctx.obj['style'].column),
(Text('Muted', justify='center'), 'center', None),
]
for heading, justify, style in columns:
table.add_column(heading, justify=justify, style=style)
for input_name, input_kind, input_uuid in inputs:
input_mark = ''
try:
input_muted = ctx.obj['obsws'].get_input_mute(name=input_name).input_muted
input_mark = util.check_mark(input_muted)
except obsws.error.OBSSDKRequestError as e:
if e.code == 604: # Input does not support audio
input_mark = 'N/A'
else:
raise
if uuid:
table.add_row(
input_name,
util.snakecase_to_titlecase(input_kind),
input_mark,
input_uuid,
)
else:
table.add_row(
input_name,
util.snakecase_to_titlecase(input_kind),
input_mark,
)
console.out.print(table)
@app.command('list-kinds | ls-k')
def list_kinds(
ctx: typer.Context,
):
"""List all input kinds."""
resp = ctx.obj['obsws'].get_input_kind_list(False)
kinds = sorted(resp.input_kinds)
if not kinds:
console.out.print('No input kinds found.')
raise typer.Exit()
table = Table(
title='Input Kinds', padding=(0, 2), border_style=ctx.obj['style'].border
)
table.add_column(
Text('Input Kind', justify='center'),
justify='left',
style=ctx.obj['style'].column,
)
for kind in kinds:
table.add_row(util.snakecase_to_titlecase(kind))
console.out.print(table)
@app.command('mute | m')
def mute(
ctx: typer.Context,
input_name: Annotated[
str,
typer.Argument(
...,
show_default=False,
help='Name of the input to mute.',
callback=validate.input_in_inputs,
),
],
):
"""Mute an input."""
ctx.obj['obsws'].set_input_mute(
name=input_name,
muted=True,
)
console.out.print(f'Input {console.highlight(ctx, input_name)} muted.')
@app.command('unmute | um')
def unmute(
ctx: typer.Context,
input_name: Annotated[
str,
typer.Argument(
...,
show_default=False,
help='Name of the input to unmute.',
callback=validate.input_in_inputs,
),
],
):
"""Unmute an input."""
ctx.obj['obsws'].set_input_mute(
name=input_name,
muted=False,
)
console.out.print(f'Input {console.highlight(ctx, input_name)} unmuted.')
@app.command('toggle | tg')
def toggle(
ctx: typer.Context,
input_name: Annotated[
str,
typer.Argument(
...,
show_default=False,
help='Name of the input to toggle.',
callback=validate.input_in_inputs,
),
],
):
"""Toggle an input."""
resp = ctx.obj['obsws'].get_input_mute(name=input_name)
new_state = not resp.input_muted
ctx.obj['obsws'].set_input_mute(
name=input_name,
muted=new_state,
)
if new_state:
console.out.print(
f'Input {console.highlight(ctx, input_name)} muted.',
)
else:
console.out.print(
f'Input {console.highlight(ctx, input_name)} unmuted.',
)
@app.command('volume | vol')
def volume(
ctx: typer.Context,
input_name: Annotated[
str,
typer.Argument(
...,
show_default=False,
help='Name of the input to set volume for.',
callback=validate.input_in_inputs,
),
],
volume: Annotated[
float,
typer.Argument(
...,
show_default=False,
help='Volume level to set (-90 to 0).',
min=-90,
max=0,
),
],
):
"""Set the volume of an input."""
ctx.obj['obsws'].set_input_volume(
name=input_name,
vol_db=volume,
)
console.out.print(
f'Input {console.highlight(ctx, input_name)} volume set to {console.highlight(ctx, volume)}.',
)
@app.command('show | s')
def show(
ctx: typer.Context,
input_name: Annotated[
str,
typer.Argument(
...,
show_default=False,
help='Name of the input to show.',
callback=validate.input_in_inputs,
),
],
verbose: Annotated[
bool, typer.Option(help='List all available input devices.')
] = False,
):
"""Show information for an input in the current scene."""
input_list = ctx.obj['obsws'].get_input_list()
for input_ in input_list.inputs:
if input_['inputName'] == input_name:
input_kind = input_['inputKind']
break
for prop in ['device', 'device_id']:
try:
device_id = (
ctx.obj['obsws']
.get_input_settings(
name=input_name,
)
.input_settings.get(prop)
)
if device_id:
break
except obsws.error.OBSSDKRequestError:
continue
else:
device_id = '(N/A)'
for device in (
ctx.obj['obsws']
.get_input_properties_list_property_items(
input_name=input_name,
prop_name=prop,
)
.property_items
):
if device.get('itemValue') == device_id:
device_id = device.get('itemName')
break
table = Table(
title='Input Information', padding=(0, 2), border_style=ctx.obj['style'].border
)
columns = [
(Text('Input Name', justify='center'), 'left', ctx.obj['style'].column),
(Text('Kind', justify='center'), 'left', ctx.obj['style'].column),
(Text('Device', justify='center'), 'left', ctx.obj['style'].column),
]
for heading, justify, style in columns:
table.add_column(heading, justify=justify, style=style)
table.add_row(
input_name,
util.snakecase_to_titlecase(input_kind),
device_id,
)
console.out.print(table)
if verbose:
resp = ctx.obj['obsws'].get_input_properties_list_property_items(
input_name=input_name,
prop_name=prop,
)
table = Table(
title='Devices',
padding=(0, 2),
border_style=ctx.obj['style'].border,
)
columns = [
(Text('Name', justify='center'), 'left', ctx.obj['style'].column),
]
for heading, justify, style in columns:
table.add_column(heading, justify=justify, style=style)
for i, item in enumerate(resp.property_items):
table.add_row(
item.get('itemName'),
style='' if i % 2 == 0 else 'dim',
)
console.out.print(table)
@app.command('update | upd')
def update(
ctx: typer.Context,
input_name: Annotated[
str,
typer.Argument(
...,
show_default=False,
help='Name of the input to update.',
callback=validate.input_in_inputs,
),
],
device_name: Annotated[
str,
typer.Argument(
...,
show_default=False,
help='Name of the device to set for the input.',
),
],
):
"""Update a setting for an input."""
device_id = None
for prop in ['device', 'device_id']:
try:
for device in (
ctx.obj['obsws']
.get_input_properties_list_property_items(
input_name=input_name,
prop_name=prop,
)
.property_items
):
if device.get('itemName') == device_name:
device_id = device.get('itemValue')
break
except obsws.error.OBSSDKRequestError:
continue
if device_id:
break
if not device_id:
console.err.print(
f'Failed to find device ID for device '
f'{console.highlight(ctx, device_name)}.',
)
raise typer.Exit(1)
ctx.obj['obsws'].set_input_settings(
name=input_name, settings={prop: device_id}, overlay=True
)
console.out.print(
f'Input {console.highlight(ctx, input_name)} updated to use device '
f'{console.highlight(ctx, device_name)}.',
)

101
obsws_cli/commands/media.py Normal file
View File

@@ -0,0 +1,101 @@
"""module containing commands for media inputs."""
from typing import Annotated, Optional
import typer
from obsws_cli import console, util, validate
from obsws_cli.alias import SubTyperAliasGroup
app = typer.Typer(cls=SubTyperAliasGroup)
@app.callback()
def main():
"""Commands for media inputs."""
@app.command('cursor | c')
def cursor(
ctx: typer.Context,
input_name: Annotated[
str, typer.Argument(..., help='The name of the media input.')
],
timecode: Annotated[
Optional[str],
typer.Argument(
...,
help='The timecode to set the cursor to (format: HH:MM:SS).',
callback=validate.timecode_format,
),
] = None,
):
"""Get/set the cursor position of a media input."""
if timecode is None:
resp = ctx.obj['obsws'].get_media_input_status(input_name)
console.out.print(
f'Cursor for {console.highlight(ctx, input_name)} is at {util.milliseconds_to_timecode(resp.media_cursor)}.'
)
return
cursor_position = util.timecode_to_milliseconds(timecode)
ctx.obj['obsws'].set_media_input_cursor(input_name, cursor_position)
console.out.print(
f'Cursor for {console.highlight(ctx, input_name)} set to {timecode}.'
)
@app.command('play | p')
def play(
ctx: typer.Context,
input_name: Annotated[
str, typer.Argument(..., help='The name of the media input.')
],
):
"""Get/set the playing status of a media input."""
ctx.obj['obsws'].trigger_media_input_action(
input_name, 'OBS_WEBSOCKET_MEDIA_INPUT_ACTION_PLAY'
)
console.out.print(f'Playing media input {console.highlight(ctx, input_name)}.')
@app.command('pause | pa')
def pause(
ctx: typer.Context,
input_name: Annotated[
str, typer.Argument(..., help='The name of the media input.')
],
):
"""Pause a media input."""
ctx.obj['obsws'].trigger_media_input_action(
input_name, 'OBS_WEBSOCKET_MEDIA_INPUT_ACTION_PAUSE'
)
console.out.print(f'Paused media input {console.highlight(ctx, input_name)}.')
@app.command('stop | s')
def stop(
ctx: typer.Context,
input_name: Annotated[
str, typer.Argument(..., help='The name of the media input.')
],
):
"""Stop a media input."""
ctx.obj['obsws'].trigger_media_input_action(
input_name, 'OBS_WEBSOCKET_MEDIA_INPUT_ACTION_STOP'
)
console.out.print(f'Stopped media input {console.highlight(ctx, input_name)}.')
@app.command('restart | r')
def restart(
ctx: typer.Context,
input_name: Annotated[
str, typer.Argument(..., help='The name of the media input.')
],
):
"""Restart a media input."""
ctx.obj['obsws'].trigger_media_input_action(
input_name, 'OBS_WEBSOCKET_MEDIA_INPUT_ACTION_RESTART'
)
console.out.print(f'Restarted media input {console.highlight(ctx, input_name)}.')

View File

@@ -0,0 +1,117 @@
"""module containing commands for manipulating profiles in OBS."""
from typing import Annotated
import typer
from rich.table import Table
from rich.text import Text
from obsws_cli import console, util, validate
from obsws_cli.alias import SubTyperAliasGroup
app = typer.Typer(cls=SubTyperAliasGroup)
@app.callback()
def main():
"""Control profiles in OBS."""
@app.command('list | ls')
def list_(ctx: typer.Context):
"""List profiles."""
resp = ctx.obj['obsws'].get_profile_list()
if not resp.profiles:
console.out.print('No profiles found.')
raise typer.Exit()
table = Table(
title='Profiles', padding=(0, 2), border_style=ctx.obj['style'].border
)
columns = [
(Text('Profile Name', justify='center'), 'left', ctx.obj['style'].column),
(Text('Current', justify='center'), 'center', None),
]
for heading, justify, style in columns:
table.add_column(heading, justify=justify, style=style)
for profile in resp.profiles:
table.add_row(
profile,
util.check_mark(
ctx, profile == resp.current_profile_name, empty_if_false=True
),
)
console.out.print(table)
@app.command('current | get')
def current(ctx: typer.Context):
"""Get the current profile."""
resp = ctx.obj['obsws'].get_profile_list()
console.out.print(
f'Current profile: {console.highlight(ctx, resp.current_profile_name)}'
)
@app.command('switch | set')
def switch(
ctx: typer.Context,
profile_name: Annotated[
str,
typer.Argument(
...,
show_default=False,
help='Name of the profile to switch to',
callback=validate.profile_exists,
),
],
):
"""Switch to a profile."""
resp = ctx.obj['obsws'].get_profile_list()
if resp.current_profile_name == profile_name:
console.err.print(
f'Profile [yellow]{profile_name}[/yellow] is already the current profile.'
)
raise typer.Exit(1)
ctx.obj['obsws'].set_current_profile(profile_name)
console.out.print(f'Switched to profile {console.highlight(ctx, profile_name)}.')
@app.command('create | new')
def create(
ctx: typer.Context,
profile_name: Annotated[
str,
typer.Argument(
...,
show_default=False,
help='Name of the profile to create.',
callback=validate.profile_not_exists,
),
],
):
"""Create a new profile."""
ctx.obj['obsws'].create_profile(profile_name)
console.out.print(f'Created profile {console.highlight(ctx, profile_name)}.')
@app.command('remove | rm')
def remove(
ctx: typer.Context,
profile_name: Annotated[
str,
typer.Argument(
...,
show_default=False,
help='Name of the profile to remove.',
callback=validate.profile_exists,
),
],
):
"""Remove a profile."""
ctx.obj['obsws'].remove_profile(profile_name)
console.out.print(f'Removed profile {console.highlight(ctx, profile_name)}.')

View File

@@ -0,0 +1,88 @@
"""module containing commands for manipulating projectors in OBS."""
from typing import Annotated
import typer
from rich.table import Table
from rich.text import Text
from obsws_cli import console
from obsws_cli.alias import SubTyperAliasGroup
app = typer.Typer(cls=SubTyperAliasGroup)
@app.callback()
def main():
"""Control projectors in OBS."""
@app.command('list-monitors | ls-m')
def list_monitors(ctx: typer.Context):
"""List available monitors."""
resp = ctx.obj['obsws'].get_monitor_list()
monitors = sorted(
((m['monitorIndex'], m['monitorName']) for m in resp.monitors),
key=lambda m: m[0],
)
if not monitors:
console.out.print('No monitors found.')
raise typer.Exit()
table = Table(
title='Available Monitors',
padding=(0, 2),
border_style=ctx.obj['style'].border,
)
table.add_column(
Text('Index', justify='center'), justify='center', style=ctx.obj['style'].column
)
table.add_column(
Text('Name', justify='center'), justify='left', style=ctx.obj['style'].column
)
for index, monitor in monitors:
table.add_row(str(index), monitor)
console.out.print(table)
@app.command('open | o')
def open(
ctx: typer.Context,
monitor_index: Annotated[
int,
typer.Option(help='Index of the monitor to open the projector on.'),
] = 0,
source_name: Annotated[
str,
typer.Argument(
show_default='The current scene',
help='Name of the source to project.',
),
] = '',
):
"""Open a fullscreen projector for a source on a specific monitor."""
if not source_name:
source_name = ctx.obj['obsws'].get_current_program_scene().scene_name
monitors = ctx.obj['obsws'].get_monitor_list().monitors
for monitor in monitors:
if monitor['monitorIndex'] == monitor_index:
ctx.obj['obsws'].open_source_projector(
source_name=source_name,
monitor_index=monitor_index,
)
console.out.print(
f'Opened projector for source {console.highlight(ctx, source_name)} on monitor {console.highlight(ctx, monitor["monitorName"])}.'
)
break
else:
console.err.print(
f'Monitor with index [yellow]{monitor_index}[/yellow] not found. '
f'Use [yellow]obsws-cli projector ls-m[/yellow] to see available monitors.'
)
raise typer.Exit(code=1)

View File

@@ -0,0 +1,172 @@
"""module for controlling OBS recording functionality."""
from pathlib import Path
from typing import Annotated, Optional
import typer
from obsws_cli import console
from obsws_cli.alias import SubTyperAliasGroup
app = typer.Typer(cls=SubTyperAliasGroup)
@app.callback()
def main():
"""Control OBS recording functionality."""
def _get_recording_status(ctx: typer.Context) -> tuple:
"""Get recording status."""
resp = ctx.obj['obsws'].get_record_status()
return resp.output_active, resp.output_paused
@app.command('start | s')
def start(ctx: typer.Context):
"""Start recording."""
active, paused = _get_recording_status(ctx)
if active:
err_msg = 'Recording is already in progress, cannot start.'
if paused:
err_msg += ' Try resuming it.'
console.err.print(err_msg)
raise typer.Exit(1)
ctx.obj['obsws'].start_record()
console.out.print('Recording started successfully.')
@app.command('stop | st')
def stop(ctx: typer.Context):
"""Stop recording."""
active, _ = _get_recording_status(ctx)
if not active:
console.err.print('Recording is not in progress, cannot stop.')
raise typer.Exit(1)
resp = ctx.obj['obsws'].stop_record()
console.out.print(
f'Recording stopped successfully. Saved to: {console.highlight(ctx, resp.output_path)}'
)
@app.command('toggle | tg')
def toggle(ctx: typer.Context):
"""Toggle recording."""
resp = ctx.obj['obsws'].toggle_record()
if resp.output_active:
console.out.print('Recording started successfully.')
else:
console.out.print('Recording stopped successfully.')
@app.command('status | ss')
def status(ctx: typer.Context):
"""Get recording status."""
active, paused = _get_recording_status(ctx)
if active:
if paused:
console.out.print('Recording is in progress and paused.')
else:
console.out.print('Recording is in progress.')
else:
console.out.print('Recording is not in progress.')
@app.command('resume | r')
def resume(ctx: typer.Context):
"""Resume recording."""
active, paused = _get_recording_status(ctx)
if not active:
console.err.print('Recording is not in progress, cannot resume.')
raise typer.Exit(1)
if not paused:
console.err.print('Recording is in progress but not paused, cannot resume.')
raise typer.Exit(1)
ctx.obj['obsws'].resume_record()
console.out.print('Recording resumed successfully.')
@app.command('pause | p')
def pause(ctx: typer.Context):
"""Pause recording."""
active, paused = _get_recording_status(ctx)
if not active:
console.err.print('Recording is not in progress, cannot pause.')
raise typer.Exit(1)
if paused:
console.err.print('Recording is in progress but already paused, cannot pause.')
raise typer.Exit(1)
ctx.obj['obsws'].pause_record()
console.out.print('Recording paused successfully.')
@app.command('directory | d')
def directory(
ctx: typer.Context,
record_directory: Annotated[
Optional[Path],
# Since the CLI and OBS may be running on different platforms,
# we won't validate the path here.
typer.Argument(
file_okay=False,
dir_okay=True,
help='Directory to set for recording.',
),
] = None,
):
"""Get or set the recording directory."""
if record_directory is not None:
ctx.obj['obsws'].set_record_directory(str(record_directory))
console.out.print(
f'Recording directory updated to: {console.highlight(ctx, record_directory)}'
)
else:
resp = ctx.obj['obsws'].get_record_directory()
console.out.print(
f'Recording directory: {console.highlight(ctx, resp.record_directory)}'
)
@app.command('split | sp')
def split(ctx: typer.Context):
"""Split the current recording."""
active, paused = _get_recording_status(ctx)
if not active:
console.err.print('Recording is not in progress, cannot split.')
raise typer.Exit(1)
if paused:
console.err.print('Recording is paused, cannot split.')
raise typer.Exit(1)
ctx.obj['obsws'].split_record_file()
console.out.print('Recording split successfully.')
@app.command('chapter | ch')
def chapter(
ctx: typer.Context,
chapter_name: Annotated[
Optional[str],
typer.Argument(
help='Name of the chapter to create.',
),
] = None,
):
"""Create a chapter in the current recording."""
active, paused = _get_recording_status(ctx)
if not active:
console.err.print('Recording is not in progress, cannot create chapter.')
raise typer.Exit(1)
if paused:
console.err.print('Recording is paused, cannot create chapter.')
raise typer.Exit(1)
ctx.obj['obsws'].create_record_chapter(chapter_name)
console.out.print(
f'Chapter {console.highlight(ctx, chapter_name or "unnamed")} created successfully.'
)

View File

@@ -0,0 +1,64 @@
"""module containing commands for manipulating the replay buffer in OBS."""
import typer
from obsws_cli import console
from obsws_cli.alias import SubTyperAliasGroup
app = typer.Typer(cls=SubTyperAliasGroup)
@app.callback()
def main():
"""Control profiles in OBS."""
@app.command('start | s')
def start(ctx: typer.Context):
"""Start the replay buffer."""
resp = ctx.obj['obsws'].get_replay_buffer_status()
if resp.output_active:
console.err.print('Replay buffer is already active.')
raise typer.Exit(1)
ctx.obj['obsws'].start_replay_buffer()
console.out.print('Replay buffer started.')
@app.command('stop | st')
def stop(ctx: typer.Context):
"""Stop the replay buffer."""
resp = ctx.obj['obsws'].get_replay_buffer_status()
if not resp.output_active:
console.err.print('Replay buffer is not active.')
raise typer.Exit(1)
ctx.obj['obsws'].stop_replay_buffer()
console.out.print('Replay buffer stopped.')
@app.command('toggle | tg')
def toggle(ctx: typer.Context):
"""Toggle the replay buffer."""
resp = ctx.obj['obsws'].toggle_replay_buffer()
if resp.output_active:
console.out.print('Replay buffer is active.')
else:
console.out.print('Replay buffer is not active.')
@app.command('status | ss')
def status(ctx: typer.Context):
"""Get the status of the replay buffer."""
resp = ctx.obj['obsws'].get_replay_buffer_status()
if resp.output_active:
console.out.print('Replay buffer is active.')
else:
console.out.print('Replay buffer is not active.')
@app.command('save | sv')
def save(ctx: typer.Context):
"""Save the replay buffer."""
ctx.obj['obsws'].save_replay_buffer()
console.out.print('Replay buffer saved.')

122
obsws_cli/commands/scene.py Normal file
View File

@@ -0,0 +1,122 @@
"""module containing commands for controlling OBS scenes."""
from typing import Annotated
import typer
from rich.table import Table
from rich.text import Text
from obsws_cli import console, util, validate
from obsws_cli.alias import SubTyperAliasGroup
app = typer.Typer(cls=SubTyperAliasGroup)
@app.callback()
def main():
"""Control OBS scenes."""
@app.command('list | ls')
def list_(
ctx: typer.Context,
uuid: Annotated[bool, typer.Option(help='Show UUIDs of scenes')] = False,
):
"""List all scenes."""
resp = ctx.obj['obsws'].get_scene_list()
scenes = (
(scene.get('sceneName'), scene.get('sceneUuid'))
for scene in reversed(resp.scenes)
)
if not scenes:
console.out.print('No scenes found.')
raise typer.Exit()
active_scene = ctx.obj['obsws'].get_current_program_scene().scene_name
table = Table(title='Scenes', padding=(0, 2), border_style=ctx.obj['style'].border)
if uuid:
columns = [
(Text('Scene Name', justify='center'), 'left', ctx.obj['style'].column),
(Text('Active', justify='center'), 'center', None),
(Text('UUID', justify='center'), 'left', ctx.obj['style'].column),
]
else:
columns = [
(Text('Scene Name', justify='center'), 'left', ctx.obj['style'].column),
(Text('Active', justify='center'), 'center', None),
]
for heading, justify, style in columns:
table.add_column(heading, justify=justify, style=style)
for scene_name, scene_uuid in scenes:
if uuid:
table.add_row(
scene_name,
util.check_mark(scene_name == active_scene, empty_if_false=True),
scene_uuid,
)
else:
table.add_row(
scene_name,
util.check_mark(scene_name == active_scene, empty_if_false=True),
)
console.out.print(table)
@app.command('current | get')
def current(
ctx: typer.Context,
preview: Annotated[
bool,
typer.Option(
help='Get the preview scene instead of the program scene',
callback=validate.studio_mode_enabled,
),
] = False,
):
"""Get the current program scene or preview scene."""
if preview:
resp = ctx.obj['obsws'].get_current_preview_scene()
console.out.print(
f'Current Preview Scene: {console.highlight(ctx, resp.current_preview_scene_name)}'
)
else:
resp = ctx.obj['obsws'].get_current_program_scene()
console.out.print(
f'Current Program Scene: {console.highlight(ctx, resp.current_program_scene_name)}'
)
@app.command('switch | set')
def switch(
ctx: typer.Context,
scene_name: Annotated[
str,
typer.Argument(
...,
help='Name of the scene to switch to',
callback=validate.scene_in_scenes,
),
],
preview: Annotated[
bool,
typer.Option(
help='Switch to the preview scene instead of the program scene',
callback=validate.studio_mode_enabled,
),
] = False,
):
"""Switch to a scene."""
if preview:
ctx.obj['obsws'].set_current_preview_scene(scene_name)
console.out.print(
f'Switched to preview scene: {console.highlight(ctx, scene_name)}'
)
else:
ctx.obj['obsws'].set_current_program_scene(scene_name)
console.out.print(
f'Switched to program scene: {console.highlight(ctx, scene_name)}'
)

View File

@@ -0,0 +1,96 @@
"""module containing commands for manipulating scene collections."""
from typing import Annotated
import typer
from rich.table import Table
from obsws_cli import console, validate
from obsws_cli.alias import SubTyperAliasGroup
app = typer.Typer(cls=SubTyperAliasGroup)
@app.callback()
def main():
"""Control scene collections in OBS."""
@app.command('list | ls')
def list_(ctx: typer.Context):
"""List all scene collections."""
resp = ctx.obj['obsws'].get_scene_collection_list()
if not resp.scene_collections:
console.out.print('No scene collections found.')
raise typer.Exit()
table = Table(
title='Scene Collections',
padding=(0, 2),
border_style=ctx.obj['style'].border,
)
table.add_column(
'Scene Collection Name', justify='left', style=ctx.obj['style'].column
)
for scene_collection_name in resp.scene_collections:
table.add_row(scene_collection_name)
console.out.print(table)
@app.command('current | get')
def current(ctx: typer.Context):
"""Get the current scene collection."""
resp = ctx.obj['obsws'].get_scene_collection_list()
console.out.print(
f'Current scene collection: {console.highlight(ctx, resp.current_scene_collection_name)}'
)
@app.command('switch | set')
def switch(
ctx: typer.Context,
scene_collection_name: Annotated[
str,
typer.Argument(
...,
help='Name of the scene collection to switch to',
callback=validate.scene_collection_in_scene_collections,
),
],
):
"""Switch to a scene collection."""
current_scene_collection = (
ctx.obj['obsws'].get_scene_collection_list().current_scene_collection_name
)
if scene_collection_name == current_scene_collection:
console.err.print(
f'Scene collection [yellow]{scene_collection_name}[/yellow] is already active.'
)
raise typer.Exit(1)
ctx.obj['obsws'].set_current_scene_collection(scene_collection_name)
console.out.print(
f'Switched to scene collection {console.highlight(ctx, scene_collection_name)}.'
)
@app.command('create | new')
def create(
ctx: typer.Context,
scene_collection_name: Annotated[
str,
typer.Argument(
...,
help='Name of the scene collection to create',
callback=validate.scene_collection_not_in_scene_collections,
),
],
):
"""Create a new scene collection."""
ctx.obj['obsws'].create_scene_collection(scene_collection_name)
console.out.print(
f'Created scene collection {console.highlight(ctx, scene_collection_name)}.'
)

View File

@@ -0,0 +1,499 @@
"""module containing commands for manipulating items in scenes."""
from typing import Annotated, Optional
import typer
from rich.table import Table
from obsws_cli import console, util, validate
from obsws_cli.alias import SubTyperAliasGroup
app = typer.Typer(cls=SubTyperAliasGroup)
@app.callback()
def main():
"""Control items in OBS scenes."""
@app.command('list | ls')
def list_(
ctx: typer.Context,
scene_name: Annotated[
Optional[str],
typer.Argument(
show_default='The current scene',
help='Scene name to list items for',
callback=validate.scene_in_scenes,
),
] = None,
uuid: Annotated[bool, typer.Option(help='Show UUIDs of scene items')] = False,
):
"""List all items in a scene."""
if scene_name is None:
scene_name = ctx.obj['obsws'].get_current_program_scene().scene_name
resp = ctx.obj['obsws'].get_scene_item_list(scene_name)
items = sorted(
(
(
item.get('sceneItemId'),
item.get('sourceName'),
item.get('isGroup'),
item.get('sceneItemEnabled'),
item.get('sourceUuid', 'N/A'), # Include source UUID
)
for item in resp.scene_items
),
key=lambda x: x[0], # Sort by sceneItemId
)
if not items:
console.out.print(
f'No items found in scene {console.highlight(ctx, scene_name)}.'
)
raise typer.Exit()
table = Table(
title=f'Items in Scene: {scene_name}',
padding=(0, 2),
border_style=ctx.obj['style'].border,
)
if uuid:
columns = [
('Item ID', 'center', ctx.obj['style'].column),
('Item Name', 'left', ctx.obj['style'].column),
('In Group', 'left', ctx.obj['style'].column),
('Enabled', 'center', None),
('UUID', 'left', ctx.obj['style'].column),
]
else:
columns = [
('Item ID', 'center', ctx.obj['style'].column),
('Item Name', 'left', ctx.obj['style'].column),
('In Group', 'left', ctx.obj['style'].column),
('Enabled', 'center', None),
]
# Add columns to the table
for heading, justify, style in columns:
table.add_column(heading, justify=justify, style=style)
for item_id, item_name, is_group, is_enabled, source_uuid in items:
if is_group:
resp = ctx.obj['obsws'].get_group_scene_item_list(item_name)
group_items = sorted(
(
(
gi.get('sceneItemId'),
gi.get('sourceName'),
gi.get('sceneItemEnabled'),
gi.get('sourceUuid', 'N/A'), # Include source UUID
)
for gi in resp.scene_items
),
key=lambda x: x[0], # Sort by sceneItemId
)
for (
group_item_id,
group_item_name,
group_item_enabled,
group_item_source_uuid,
) in group_items:
if uuid:
table.add_row(
str(group_item_id),
group_item_name,
item_name,
util.check_mark(is_enabled and group_item_enabled),
group_item_source_uuid,
)
else:
table.add_row(
str(group_item_id),
group_item_name,
item_name,
util.check_mark(is_enabled and group_item_enabled),
)
else:
if uuid:
table.add_row(
str(item_id),
item_name,
'',
util.check_mark(is_enabled),
source_uuid,
)
else:
table.add_row(
str(item_id),
item_name,
'',
util.check_mark(is_enabled),
)
console.out.print(table)
def _validate_sources(
ctx: typer.Context,
scene_name: str,
item_name: str,
group: Optional[str] = None,
) -> bool:
"""Validate the scene name and item name."""
if not validate.scene_in_scenes(ctx, scene_name):
console.err.print(f'Scene [yellow]{scene_name}[/yellow] not found.')
return False
if group:
if not validate.item_in_scene_item_list(ctx, scene_name, group):
console.err.print(
f'Group [yellow]{group}[/yellow] not found in scene [yellow]{scene_name}[/yellow].'
)
return False
else:
if not validate.item_in_scene_item_list(ctx, scene_name, item_name):
console.err.print(
f'Item [yellow]{item_name}[/yellow] not found in scene [yellow]{scene_name}[/yellow]. Is the item in a group? '
f'If so use the [yellow]--group[/yellow] option to specify the parent group.\n'
'Use [yellow]obsws-cli sceneitem ls[/yellow] for a list of items in the scene.'
)
return False
return True
def _get_scene_name_and_item_id(
ctx: typer.Context, scene_name: str, item_name: str, group: Optional[str] = None
):
"""Get the scene name and item ID for the given scene and item name."""
if group:
resp = ctx.obj['obsws'].get_group_scene_item_list(group)
for item in resp.scene_items:
if item.get('sourceName') == item_name:
scene_name = group
scene_item_id = item.get('sceneItemId')
break
else:
console.err.print(
f'Item [yellow]{item_name}[/yellow] not found in group [yellow]{group}[/yellow].'
)
raise typer.Exit(1)
else:
resp = ctx.obj['obsws'].get_scene_item_id(scene_name, item_name)
scene_item_id = resp.scene_item_id
return scene_name, scene_item_id
@app.command('show | sh')
def show(
ctx: typer.Context,
scene_name: Annotated[
str, typer.Argument(..., show_default=False, help='Scene name the item is in')
],
item_name: Annotated[
str,
typer.Argument(..., show_default=False, help='Item name to show in the scene'),
],
group: Annotated[Optional[str], typer.Option(help='Parent group name')] = None,
):
"""Show an item in a scene."""
if not _validate_sources(ctx, scene_name, item_name, group):
raise typer.Exit(1)
old_scene_name = scene_name
scene_name, scene_item_id = _get_scene_name_and_item_id(
ctx, scene_name, item_name, group
)
ctx.obj['obsws'].set_scene_item_enabled(
scene_name=scene_name,
item_id=int(scene_item_id),
enabled=True,
)
if group:
console.out.print(
f'Item {console.highlight(ctx, item_name)} in group {console.highlight(ctx, group)} '
f'in scene {console.highlight(ctx, old_scene_name)} has been shown.'
)
else:
# If not in a parent group, just show the scene name
# This is to avoid confusion with the parent group name
# which is not the same as the scene name
# and is not needed in this case
console.out.print(
f'Item {console.highlight(ctx, item_name)} in scene {console.highlight(ctx, scene_name)} has been shown.'
)
@app.command('hide | h')
def hide(
ctx: typer.Context,
scene_name: Annotated[
str, typer.Argument(..., show_default=False, help='Scene name the item is in')
],
item_name: Annotated[
str,
typer.Argument(..., show_default=False, help='Item name to hide in the scene'),
],
group: Annotated[Optional[str], typer.Option(help='Parent group name')] = None,
):
"""Hide an item in a scene."""
if not _validate_sources(ctx, scene_name, item_name, group):
raise typer.Exit(1)
old_scene_name = scene_name
scene_name, scene_item_id = _get_scene_name_and_item_id(
ctx, scene_name, item_name, group
)
ctx.obj['obsws'].set_scene_item_enabled(
scene_name=scene_name,
item_id=int(scene_item_id),
enabled=False,
)
if group:
console.out.print(
f'Item {console.highlight(ctx, item_name)} in group {console.highlight(ctx, group)} in scene {console.highlight(ctx, old_scene_name)} has been hidden.'
)
else:
# If not in a parent group, just show the scene name
# This is to avoid confusion with the parent group name
# which is not the same as the scene name
# and is not needed in this case
console.out.print(
f'Item {console.highlight(ctx, item_name)} in scene {console.highlight(ctx, scene_name)} has been hidden.'
)
@app.command('toggle | tg')
def toggle(
ctx: typer.Context,
scene_name: Annotated[
str, typer.Argument(..., show_default=False, help='Scene name the item is in')
],
item_name: Annotated[
str,
typer.Argument(
..., show_default=False, help='Item name to toggle in the scene'
),
],
group: Annotated[Optional[str], typer.Option(help='Parent group name')] = None,
):
"""Toggle an item in a scene."""
if not _validate_sources(ctx, scene_name, item_name, group):
raise typer.Exit(1)
old_scene_name = scene_name
scene_name, scene_item_id = _get_scene_name_and_item_id(
ctx, scene_name, item_name, group
)
enabled = ctx.obj['obsws'].get_scene_item_enabled(
scene_name=scene_name,
item_id=int(scene_item_id),
)
new_state = not enabled.scene_item_enabled
ctx.obj['obsws'].set_scene_item_enabled(
scene_name=scene_name,
item_id=int(scene_item_id),
enabled=new_state,
)
if group:
if new_state:
console.out.print(
f'Item {console.highlight(ctx, item_name)} in group {console.highlight(ctx, group)} '
f'in scene {console.highlight(ctx, old_scene_name)} has been shown.'
)
else:
console.out.print(
f'Item {console.highlight(ctx, item_name)} in group {console.highlight(ctx, group)} '
f'in scene {console.highlight(ctx, old_scene_name)} has been hidden.'
)
else:
# If not in a parent group, just show the scene name
# This is to avoid confusion with the parent group name
# which is not the same as the scene name
# and is not needed in this case
if new_state:
console.out.print(
f'Item {console.highlight(ctx, item_name)} in scene {console.highlight(ctx, scene_name)} has been shown.'
)
else:
console.out.print(
f'Item {console.highlight(ctx, item_name)} in scene {console.highlight(ctx, scene_name)} has been hidden.'
)
@app.command('visible | v')
def visible(
ctx: typer.Context,
scene_name: Annotated[
str, typer.Argument(..., show_default=False, help='Scene name the item is in')
],
item_name: Annotated[
str,
typer.Argument(
..., show_default=False, help='Item name to check visibility in the scene'
),
],
group: Annotated[Optional[str], typer.Option(help='Parent group name')] = None,
):
"""Check if an item in a scene is visible."""
if not _validate_sources(ctx, scene_name, item_name, group):
raise typer.Exit(1)
old_scene_name = scene_name
scene_name, scene_item_id = _get_scene_name_and_item_id(
ctx, scene_name, item_name, group
)
enabled = ctx.obj['obsws'].get_scene_item_enabled(
scene_name=scene_name,
item_id=int(scene_item_id),
)
if group:
console.out.print(
f'Item {console.highlight(ctx, item_name)} in group {console.highlight(ctx, group)} '
f'in scene {console.highlight(ctx, old_scene_name)} is currently {"visible" if enabled.scene_item_enabled else "hidden"}.'
)
else:
# If not in a parent group, just show the scene name
# This is to avoid confusion with the parent group name
# which is not the same as the scene name
# and is not needed in this case
console.out.print(
f'Item {console.highlight(ctx, item_name)} in scene {console.highlight(ctx, scene_name)} '
f'is currently {"visible" if enabled.scene_item_enabled else "hidden"}.'
)
@app.command('transform | t')
def transform(
ctx: typer.Context,
scene_name: Annotated[
str, typer.Argument(..., show_default=False, help='Scene name the item is in')
],
item_name: Annotated[
str,
typer.Argument(
..., show_default=False, help='Item name to transform in the scene'
),
],
group: Annotated[Optional[str], typer.Option(help='Parent group name')] = None,
alignment: Annotated[
Optional[int], typer.Option(help='Alignment of the item in the scene')
] = None,
bounds_alignment: Annotated[
Optional[int], typer.Option(help='Bounds alignment of the item in the scene')
] = None,
bounds_height: Annotated[
Optional[float], typer.Option(help='Height of the item in the scene')
] = None,
bounds_type: Annotated[
Optional[str], typer.Option(help='Type of bounds for the item in the scene')
] = None,
bounds_width: Annotated[
Optional[float], typer.Option(help='Width of the item in the scene')
] = None,
crop_to_bounds: Annotated[
Optional[bool], typer.Option(help='Crop the item to the bounds')
] = None,
crop_bottom: Annotated[
Optional[float], typer.Option(help='Bottom crop of the item in the scene')
] = None,
crop_left: Annotated[
Optional[float], typer.Option(help='Left crop of the item in the scene')
] = None,
crop_right: Annotated[
Optional[float], typer.Option(help='Right crop of the item in the scene')
] = None,
crop_top: Annotated[
Optional[float], typer.Option(help='Top crop of the item in the scene')
] = None,
position_x: Annotated[
Optional[float], typer.Option(help='X position of the item in the scene')
] = None,
position_y: Annotated[
Optional[float], typer.Option(help='Y position of the item in the scene')
] = None,
rotation: Annotated[
Optional[float], typer.Option(help='Rotation of the item in the scene')
] = None,
scale_x: Annotated[
Optional[float], typer.Option(help='X scale of the item in the scene')
] = None,
scale_y: Annotated[
Optional[float], typer.Option(help='Y scale of the item in the scene')
] = None,
):
"""Set the transform of an item in a scene."""
if not _validate_sources(ctx, scene_name, item_name, group):
raise typer.Exit(1)
old_scene_name = scene_name
scene_name, scene_item_id = _get_scene_name_and_item_id(
ctx, scene_name, item_name, group
)
transform = {}
if alignment is not None:
transform['alignment'] = alignment
if bounds_alignment is not None:
transform['boundsAlignment'] = bounds_alignment
if bounds_height is not None:
transform['boundsHeight'] = bounds_height
if bounds_type is not None:
transform['boundsType'] = bounds_type
if bounds_width is not None:
transform['boundsWidth'] = bounds_width
if crop_to_bounds is not None:
transform['cropToBounds'] = crop_to_bounds
if crop_bottom is not None:
transform['cropBottom'] = crop_bottom
if crop_left is not None:
transform['cropLeft'] = crop_left
if crop_right is not None:
transform['cropRight'] = crop_right
if crop_top is not None:
transform['cropTop'] = crop_top
if position_x is not None:
transform['positionX'] = position_x
if position_y is not None:
transform['positionY'] = position_y
if rotation is not None:
transform['rotation'] = rotation
if scale_x is not None:
transform['scaleX'] = scale_x
if scale_y is not None:
transform['scaleY'] = scale_y
if not transform:
console.err.print('No transform options provided.')
raise typer.Exit(1)
transform = ctx.obj['obsws'].set_scene_item_transform(
scene_name=scene_name,
item_id=int(scene_item_id),
transform=transform,
)
if group:
console.out.print(
f'Item {console.highlight(ctx, item_name)} in group {console.highlight(ctx, group)} '
f'in scene {console.highlight(ctx, old_scene_name)} has been transformed.'
)
else:
# If not in a parent group, just show the scene name
# This is to avoid confusion with the parent group name
# which is not the same as the scene name
# and is not needed in this case
console.out.print(
f'Item {console.highlight(ctx, item_name)} in scene {console.highlight(ctx, scene_name)} has been transformed.'
)

View File

@@ -0,0 +1,90 @@
"""module for taking screenshots using OBS WebSocket API."""
from pathlib import Path
from typing import Annotated
import obsws_python as obsws
import typer
from obsws_cli import console
from obsws_cli.alias import SubTyperAliasGroup
app = typer.Typer(cls=SubTyperAliasGroup)
@app.callback()
def main():
"""Take screenshots using OBS."""
@app.command('save | sv')
def save(
ctx: typer.Context,
source_name: Annotated[
str,
typer.Argument(
...,
show_default=False,
help='Name of the source to take a screenshot of.',
),
],
output_path: Annotated[
Path,
# Since the CLI and OBS may be running on different platforms,
# we won't validate the path here.
typer.Argument(
...,
show_default=False,
file_okay=True,
dir_okay=False,
help='Path to save the screenshot (must include file name and extension).',
),
],
width: Annotated[
float,
typer.Option(
help='Width of the screenshot.',
),
] = 1920,
height: Annotated[
float,
typer.Option(
help='Height of the screenshot.',
),
] = 1080,
quality: Annotated[
float,
typer.Option(
min=-1,
max=100,
help='Quality of the screenshot.',
),
] = -1,
):
"""Take a screenshot and save it to a file."""
try:
ctx.obj['obsws'].save_source_screenshot(
name=source_name,
img_format=output_path.suffix.lstrip('.').lower(),
file_path=str(output_path),
width=width,
height=height,
quality=quality,
)
except obsws.error.OBSSDKRequestError as e:
match e.code:
case 403:
console.err.print(
'The [yellow]image format[/yellow] (file extension) must be included in the file name, '
"for example: '/path/to/screenshot.png'.",
)
raise typer.Exit(1)
case 600:
console.err.print(
f'No source was found by the name of [yellow]{source_name}[/yellow]'
)
raise typer.Exit(1)
case _:
raise
console.out.print(f'Screenshot saved to {console.highlight(ctx, output_path)}.')

View File

@@ -0,0 +1,337 @@
"""module for settings management."""
from typing import Annotated, Optional
import typer
from rich.table import Table
from rich.text import Text
from obsws_cli import console, util
from obsws_cli.alias import SubTyperAliasGroup
app = typer.Typer(cls=SubTyperAliasGroup)
@app.callback()
def main():
"""Manage OBS settings."""
@app.command('show | sh')
def show(
ctx: typer.Context,
video: Annotated[
bool, typer.Option('--video', '-v', help='Show video settings.')
] = False,
record: Annotated[
bool, typer.Option('--record', '-r', help='Show recording settings.')
] = False,
profile: Annotated[
bool, typer.Option('--profile', '-p', help='Show profile settings.')
] = False,
):
"""Show current OBS settings."""
if not any([video, record, profile]):
video = True
record = True
profile = True
resp = ctx.obj['obsws'].get_video_settings()
video_table = Table(
title='Video Settings',
padding=(0, 2),
border_style=ctx.obj['style'].border,
)
video_columns = (
('Setting', 'left', ctx.obj['style'].column),
('Value', 'left', ctx.obj['style'].column),
)
for header_text, justify, style in video_columns:
video_table.add_column(
Text(header_text, justify='center'),
justify=justify,
style=style,
)
for setting in resp.attrs():
video_table.add_row(
util.snakecase_to_titlecase(setting),
str(getattr(resp, setting)),
style='' if video_table.row_count % 2 == 0 else 'dim',
)
if video:
console.out.print(video_table)
resp = ctx.obj['obsws'].get_record_directory()
record_table = Table(
title='Recording Settings',
padding=(0, 2),
border_style=ctx.obj['style'].border,
)
record_columns = (
('Setting', 'left', ctx.obj['style'].column),
('Value', 'left', ctx.obj['style'].column),
)
for header_text, justify, style in record_columns:
record_table.add_column(
Text(header_text, justify='center'),
justify=justify,
style=style,
)
record_table.add_row(
'Directory',
resp.record_directory,
style='' if record_table.row_count % 2 == 0 else 'dim',
)
if record:
console.out.print(record_table)
profile_table = Table(
title='Profile Settings',
padding=(0, 2),
border_style=ctx.obj['style'].border,
)
profile_columns = (
('Setting', 'left', ctx.obj['style'].column),
('Value', 'left', ctx.obj['style'].column),
)
for header_text, justify, style in profile_columns:
profile_table.add_column(
Text(header_text, justify='center'),
justify=justify,
style=style,
)
params = [
('Output', 'Mode', 'Output Mode'),
('SimpleOutput', 'StreamEncoder', 'Simple Streaming Encoder'),
('SimpleOutput', 'RecEncoder', 'Simple Recording Encoder'),
('SimpleOutput', 'RecFormat2', 'Simple Recording Video Format'),
('SimpleOutput', 'RecAudioEncoder', 'Simple Recording Audio Format'),
('SimpleOutput', 'RecQuality', 'Simple Recording Quality'),
('AdvOut', 'Encoder', 'Advanced Streaming Encoder'),
('AdvOut', 'RecEncoder', 'Advanced Recording Encoder'),
('AdvOut', 'RecType', 'Advanced Recording Type'),
('AdvOut', 'RecFormat2', 'Advanced Recording Video Format'),
('AdvOut', 'RecAudioEncoder', 'Advanced Recording Audio Format'),
]
for category, name, display_name in params:
resp = ctx.obj['obsws'].get_profile_parameter(
category=category,
name=name,
)
if resp.parameter_value is not None:
profile_table.add_row(
display_name,
str(resp.parameter_value),
style='' if profile_table.row_count % 2 == 0 else 'dim',
)
if profile:
console.out.print(profile_table)
@app.command('profile | pr')
def profile(
ctx: typer.Context,
category: Annotated[
str,
typer.Argument(
...,
help='Profile parameter category (e.g., SimpleOutput, AdvOut).',
),
],
name: Annotated[
str,
typer.Argument(
...,
help='Profile parameter name (e.g., StreamEncoder, RecFormat2).',
),
],
value: Annotated[
Optional[str],
typer.Argument(
...,
help='Value to set for the profile parameter. If omitted, the current value is retrieved.',
),
] = None,
):
"""Get/set OBS profile settings."""
if value is None:
resp = ctx.obj['obsws'].get_profile_parameter(
category=category,
name=name,
)
console.out.print(
f'Parameter Value for [bold]{name}[/bold]: '
f'[green]{resp.parameter_value}[/green]'
)
else:
ctx.obj['obsws'].set_profile_parameter(
category=category,
name=name,
value=value,
)
console.out.print(
f'Set Parameter [bold]{name}[/bold] to [green]{value}[/green]'
)
@app.command('stream-service | ss')
def stream_service(
ctx: typer.Context,
type_: Annotated[
Optional[str],
typer.Argument(
...,
help='Stream service type (e.g., Twitch, YouTube). If omitted, current settings are retrieved.',
),
] = None,
key: Annotated[
Optional[str],
typer.Option('--key', '-k', help='Stream key to set. Optional.'),
] = None,
server: Annotated[
Optional[str],
typer.Option('--server', '-s', help='Stream server to set. Optional.'),
] = None,
):
"""Get/set OBS stream service settings."""
if type_ is None:
resp = ctx.obj['obsws'].get_stream_service_settings()
table = Table(
title='Stream Service Settings',
padding=(0, 2),
border_style=ctx.obj['style'].border,
)
columns = (
('Setting', 'left', ctx.obj['style'].column),
('Value', 'left', ctx.obj['style'].column),
)
for header_text, justify, style in columns:
table.add_column(
Text(header_text, justify='center'),
justify=justify,
style=style,
)
table.add_row(
'Type',
resp.stream_service_type,
style='' if table.row_count % 2 == 0 else 'dim',
)
table.add_row(
'Server',
resp.stream_service_settings.get('server', ''),
style='' if table.row_count % 2 == 0 else 'dim',
)
table.add_row(
'Key',
resp.stream_service_settings.get('key', ''),
style='' if table.row_count % 2 == 0 else 'dim',
)
console.out.print(table)
else:
current_settings = ctx.obj['obsws'].get_stream_service_settings()
if key is None:
key = current_settings.stream_service_settings.get('key', '')
if server is None:
server = current_settings.stream_service_settings.get('server', '')
ctx.obj['obsws'].set_stream_service_settings(
ss_type=type_,
ss_settings={'key': key, 'server': server},
)
console.out.print('Stream service settings updated.')
@app.command('video | vi')
def video(
ctx: typer.Context,
base_width: Annotated[
Optional[int],
typer.Option('--base-width', '-bw', help='Set base (canvas) width.'),
] = None,
base_height: Annotated[
Optional[int],
typer.Option('--base-height', '-bh', help='Set base (canvas) height.'),
] = None,
output_width: Annotated[
Optional[int],
typer.Option('--output-width', '-ow', help='Set output (scaled) width.'),
] = None,
output_height: Annotated[
Optional[int],
typer.Option('--output-height', '-oh', help='Set output (scaled) height.'),
] = None,
fps_num: Annotated[
Optional[int],
typer.Option('--fps-num', '-fn', help='Set FPS numerator.'),
] = None,
fps_den: Annotated[
Optional[int],
typer.Option('--fps-den', '-fd', help='Set FPS denominator.'),
] = None,
):
"""Get/set OBS video settings."""
if not any(
[
base_width,
base_height,
output_width,
output_height,
fps_num,
fps_den,
]
):
resp = ctx.obj['obsws'].get_video_settings()
table = Table(
title='Video Settings',
padding=(0, 2),
border_style=ctx.obj['style'].border,
)
columns = (
('Setting', 'left', ctx.obj['style'].column),
('Value', 'left', ctx.obj['style'].column),
)
for header_text, justify, style in columns:
table.add_column(
Text(header_text, justify='center'),
justify=justify,
style=style,
)
for setting in resp.attrs():
table.add_row(
util.snakecase_to_titlecase(setting),
str(getattr(resp, setting)),
style='' if table.row_count % 2 == 0 else 'dim',
)
console.out.print(table)
else:
current_settings = ctx.obj['obsws'].get_video_settings()
if base_width is None:
base_width = current_settings.base_width
if base_height is None:
base_height = current_settings.base_height
if output_width is None:
output_width = current_settings.output_width
if output_height is None:
output_height = current_settings.output_height
if fps_num is None:
fps_num = current_settings.fps_num
if fps_den is None:
fps_den = current_settings.fps_den
ctx.obj['obsws'].set_video_settings(
base_width=base_width,
base_height=base_height,
out_width=output_width,
out_height=output_height,
numerator=fps_num,
denominator=fps_den,
)
console.out.print('Video settings updated.')

View File

@@ -0,0 +1,81 @@
"""module for controlling OBS stream functionality."""
import typer
from obsws_cli import console
from obsws_cli.alias import SubTyperAliasGroup
app = typer.Typer(cls=SubTyperAliasGroup)
@app.callback()
def main():
"""Control OBS stream functionality."""
def _get_streaming_status(ctx: typer.Context) -> tuple:
"""Get streaming status."""
resp = ctx.obj['obsws'].get_stream_status()
return resp.output_active, resp.output_duration
@app.command('start | s')
def start(ctx: typer.Context):
"""Start streaming."""
active, _ = _get_streaming_status(ctx)
if active:
console.err.print('Streaming is already in progress, cannot start.')
raise typer.Exit(1)
ctx.obj['obsws'].start_stream()
console.out.print('Streaming started successfully.')
@app.command('stop | st')
def stop(ctx: typer.Context):
"""Stop streaming."""
active, _ = _get_streaming_status(ctx)
if not active:
console.err.print('Streaming is not in progress, cannot stop.')
raise typer.Exit(1)
ctx.obj['obsws'].stop_stream()
console.out.print('Streaming stopped successfully.')
@app.command('toggle | tg')
def toggle(ctx: typer.Context):
"""Toggle streaming."""
resp = ctx.obj['obsws'].toggle_stream()
if resp.output_active:
console.out.print('Streaming started successfully.')
else:
console.out.print('Streaming stopped successfully.')
@app.command('status | ss')
def status(ctx: typer.Context):
"""Get streaming status."""
active, duration = _get_streaming_status(ctx)
if active:
if duration > 0:
seconds = duration / 1000
minutes = int(seconds // 60)
seconds = int(seconds % 60)
if minutes > 0:
console.out.print(
f'Streaming is in progress for {minutes} minutes and {seconds} seconds.'
)
else:
if seconds > 0:
console.out.print(
f'Streaming is in progress for {seconds} seconds.'
)
else:
console.out.print(
'Streaming is in progress for less than a second.'
)
else:
console.out.print('Streaming is in progress.')
else:
console.out.print('Streaming is not in progress.')

View File

@@ -0,0 +1,49 @@
"""module containing commands for manipulating studio mode in OBS."""
import typer
from obsws_cli import console
from obsws_cli.alias import SubTyperAliasGroup
app = typer.Typer(cls=SubTyperAliasGroup)
@app.callback()
def main():
"""Control studio mode in OBS."""
@app.command('enable | on')
def enable(ctx: typer.Context):
"""Enable studio mode."""
ctx.obj['obsws'].set_studio_mode_enabled(True)
console.out.print('Studio mode has been enabled.')
@app.command('disable | off')
def disable(ctx: typer.Context):
"""Disable studio mode."""
ctx.obj['obsws'].set_studio_mode_enabled(False)
console.out.print('Studio mode has been disabled.')
@app.command('toggle | tg')
def toggle(ctx: typer.Context):
"""Toggle studio mode."""
resp = ctx.obj['obsws'].get_studio_mode_enabled()
if resp.studio_mode_enabled:
ctx.obj['obsws'].set_studio_mode_enabled(False)
console.out.print('Studio mode is now disabled.')
else:
ctx.obj['obsws'].set_studio_mode_enabled(True)
console.out.print('Studio mode is now enabled.')
@app.command('status | ss')
def status(ctx: typer.Context):
"""Get the status of studio mode."""
resp = ctx.obj['obsws'].get_studio_mode_enabled()
if resp.studio_mode_enabled:
console.out.print('Studio mode is enabled.')
else:
console.out.print('Studio mode is disabled.')

View File

@@ -0,0 +1,78 @@
"""module containing commands for manipulating text inputs."""
from typing import Annotated, Optional
import typer
from obsws_cli import console, validate
from obsws_cli.alias import SubTyperAliasGroup
app = typer.Typer(cls=SubTyperAliasGroup)
@app.callback()
def main():
"""Control text inputs in OBS."""
@app.command('current | get')
def current(
ctx: typer.Context,
input_name: Annotated[
str,
typer.Argument(
help='Name of the text input to get.', callback=validate.input_in_inputs
),
],
):
"""Get the current text for a text input."""
resp = ctx.obj['obsws'].get_input_settings(name=input_name)
if not resp.input_kind.startswith('text_'):
console.err.print(
f'Input [yellow]{input_name}[/yellow] is not a text input.',
)
raise typer.Exit(1)
current_text = resp.input_settings.get('text', '')
if not current_text:
current_text = '(empty)'
console.out.print(
f'Current text for input {console.highlight(ctx, input_name)}: {current_text}',
)
@app.command('update | set')
def update(
ctx: typer.Context,
input_name: Annotated[
str,
typer.Argument(
help='Name of the text input to update.', callback=validate.input_in_inputs
),
],
new_text: Annotated[
Optional[str],
typer.Argument(
help='The new text to set for the input.',
),
] = None,
):
"""Update the text of a text input."""
resp = ctx.obj['obsws'].get_input_settings(name=input_name)
if not resp.input_kind.startswith('text_'):
console.err.print(
f'Input [yellow]{input_name}[/yellow] is not a text input.',
)
raise typer.Exit(1)
ctx.obj['obsws'].set_input_settings(
name=input_name,
settings={'text': new_text},
overlay=True,
)
if not new_text:
new_text = '(empty)'
console.out.print(
f'Text for input {console.highlight(ctx, input_name)} updated to: {new_text}',
)

View File

@@ -0,0 +1,47 @@
"""module containing commands for manipulating virtual camera in OBS."""
import typer
from obsws_cli import console
from obsws_cli.alias import SubTyperAliasGroup
app = typer.Typer(cls=SubTyperAliasGroup)
@app.callback()
def main():
"""Control virtual camera in OBS."""
@app.command('start | s')
def start(ctx: typer.Context):
"""Start the virtual camera."""
ctx.obj['obsws'].start_virtual_cam()
console.out.print('Virtual camera started.')
@app.command('stop | p')
def stop(ctx: typer.Context):
"""Stop the virtual camera."""
ctx.obj['obsws'].stop_virtual_cam()
console.out.print('Virtual camera stopped.')
@app.command('toggle | tg')
def toggle(ctx: typer.Context):
"""Toggle the virtual camera."""
resp = ctx.obj['obsws'].toggle_virtual_cam()
if resp.output_active:
console.out.print('Virtual camera is enabled.')
else:
console.out.print('Virtual camera is disabled.')
@app.command('status | ss')
def status(ctx: typer.Context):
"""Get the status of the virtual camera."""
resp = ctx.obj['obsws'].get_virtual_cam_status()
if resp.output_active:
console.out.print('Virtual camera is enabled.')
else:
console.out.print('Virtual camera is disabled.')