Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Lazy loading support #101

Open
wants to merge 6 commits into
base: main
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
93 changes: 93 additions & 0 deletions tests/test_lazy_command.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,93 @@

import click
import typing as t
import inspect
import trogon

class LazyCommand(click.Command):
"""
This class is a wrapper meant to only load a command's module (file) when
it's absolutely necessary. This is so we don't have to load potentially
dozens of script files every time we do a CLI command, and also so that if
a script happens to be broken, it will be compartmentalized and not affect
running other scripts.
"""

def __init__(self,
name: str,
callback: t.Callable[[], click.Command],
short_help: str,
params = [],
*args,
**kwargs):
assert len(params) == 0,\
"Additional params were given to a LazyCommand class. "\
"These should be added to the base command to be called. "
assert len(kwargs) == 0 and len(args) == 0,\
"Additional arguments were supplied to a LazyCommand class. "\
"The only allowed arguments are: name, short_help. "\
f"Found: {', '.join(kwargs.keys())}"
super().__init__(name)
self.short_help = short_help
self.callback = callback
self.cmd: click.Command | None = None
self.hidden = False

def to_info_dict(self, ctx: click.Context):
return self._get_cmd().to_info_dict(ctx)

def get_params(self, ctx: click.Context) -> t.List["click.Parameter"]:
return self._get_cmd().get_params(ctx)

def get_usage(self, ctx: click.Context) -> str:
return self._get_cmd().get_usage(ctx)

def get_help(self, ctx: click.Context) -> str:
return self._get_cmd().get_help(ctx)

def parse_args(self, ctx: click.Context, args: t.List[str]) -> t.List[str]:
return self._get_cmd().parse_args(ctx, args)

def invoke(self, ctx: click.Context):
return self._get_cmd().invoke(ctx)

def get_short_help_str(self, limit: int = 45) -> str:
return inspect.cleandoc(self.short_help).strip()

def _get_cmd(self):
if self.cmd is None:
self.cmd = self.callback()
return self.cmd

@trogon.tui(run_if_no_command=True)
@click.group()
def cli():
"""
Super cool and great CLI
"""
pass

@click.command()
@click.option('-t', help="Turns on the trigger")
def cmd_1(t):
"""
cmd_1 finds all the problems you have, and prints them
"""

@click.command()
@click.argument('path')
def cmd_2(path):
"""
cmd_2 fixes all the problems you have, and prints a report, given a PATH
"""

cmd_1_lazy = LazyCommand("cmd_1", lambda: cmd_1, "Really great command")
cmd_2_lazy = LazyCommand("cmd_2", lambda: cmd_2, "Really amazing command")
cli.add_command(cmd_1_lazy)
cli.add_command(cmd_2_lazy)

def test_lazy_commands():
cli()

if __name__ == "__main__":
test_lazy_commands()
216 changes: 140 additions & 76 deletions trogon/introspect.py
Original file line number Diff line number Diff line change
@@ -1,13 +1,18 @@
from __future__ import annotations

from functools import cached_property
import uuid
from abc import ABC, abstractmethod
from dataclasses import dataclass, field
from typing import Any, Callable, Sequence, NewType

import click
from click import BaseCommand, ParamType


CommandName = NewType("CommandName", str)


def generate_unique_id():
return f"id_{str(uuid.uuid4())[:8]}"

Expand Down Expand Up @@ -68,17 +73,42 @@ class ArgumentSchema:
nargs: int = 1


@dataclass
class CommandSchema:
name: CommandName
function: Callable[..., Any | None]
key: str = field(default_factory=generate_unique_id)
docstring: str | None = None
options: list[OptionSchema] = field(default_factory=list)
arguments: list[ArgumentSchema] = field(default_factory=list)
subcommands: dict["CommandName", "CommandSchema"] = field(default_factory=dict)
parent: "CommandSchema | None" = None
is_group: bool = False
class CommandSchema(ABC):

def __init__(self, name: CommandName, parent: "CommandSchema | None" = None):
self.name = name
self.parent = parent
self.key = generate_unique_id()

@property
@abstractmethod
def options(self) -> list[OptionSchema]:
pass

@property
@abstractmethod
def arguments(self) -> list[ArgumentSchema]:
pass

@property
@abstractmethod
def subcommands(self) -> dict["CommandName", "CommandSchema"]:
pass

@property
@abstractmethod
def docstring(self) -> str | None:
pass

@property
@abstractmethod
def function(self) -> Callable[..., Any | None]:
pass

@property
@abstractmethod
def is_group(self) -> bool:
pass

@property
def path_from_root(self) -> list["CommandSchema"]:
Expand All @@ -92,7 +122,101 @@ def path_from_root(self) -> list["CommandSchema"]:
return list(reversed(path))


def introspect_click_app(app: BaseCommand) -> dict[CommandName, CommandSchema]:
class ClickCommandSchema(CommandSchema):

def __init__(
self,
cmd_obj: click.Command,
cmd_ctx: click.Context,
cmd_name: CommandName | None = None,
parent: CommandSchema | None = None,
):
super().__init__(cmd_name or cmd_obj.name, parent)
self.cmd_obj = cmd_obj
self.cmd_ctx = cmd_ctx
self._options = None
self._arguments = None
self._subcommands = None
self._docstring = None

@cached_property
def options(self) -> list[OptionSchema]:
options = list[OptionSchema]()
help_option_names = set(self.cmd_obj.get_help_option_names(self.cmd_ctx))
for param in self.cmd_obj.get_params(self.cmd_ctx):
if not isinstance(param, (click.Option, click.core.Group)):
continue
is_help_param = len(help_option_names & set(param.opts)) > 0
if is_help_param:
continue
default = MultiValueParamData.process_cli_option(param.default)
option_data = OptionSchema(
name=param.opts,
type=param.type,
is_flag=param.is_flag,
is_boolean_flag=param.is_bool_flag,
flag_value=param.flag_value,
counting=param.count,
opts=param.opts,
secondary_opts=param.secondary_opts,
required=param.required,
default=default,
help=param.help,
multiple=param.multiple,
nargs=param.nargs,
)
if isinstance(param.type, click.Choice):
option_data.choices = param.type.choices
options.append(option_data)
return options

@cached_property
def arguments(self) -> list[ArgumentSchema]:
arguments = list[ArgumentSchema]()
for param in self.cmd_obj.get_params(self.cmd_ctx):
default = MultiValueParamData.process_cli_option(param.default)
if isinstance(param, click.Argument):
argument_data = ArgumentSchema(
name=param.name,
type=param.type,
required=param.required,
multiple=param.multiple,
default=default,
nargs=param.nargs,
)
if isinstance(param.type, click.Choice):
argument_data.choices = param.type.choices
arguments.append(argument_data)
return arguments

@cached_property
def subcommands(self) -> dict["CommandName", "CommandSchema"]:
subcommands = dict["CommandName", "CommandSchema"]()
if isinstance(self.cmd_obj, click.core.Group):
self.cmd_obj.to_info_dict(self.cmd_ctx)
for subcmd_name, subcmd_obj in self.cmd_obj.commands.items():
subcommands[CommandName(subcmd_name)] = ClickCommandSchema(
cmd_obj=subcmd_obj,
cmd_ctx=self.cmd_ctx,
cmd_name=subcmd_name,
parent=self,
)
return subcommands

@cached_property
def docstring(self) -> str | None:
return self.cmd_obj.get_short_help_str()

@cached_property
def function(self) -> Callable[..., Any | None]:
return self.cmd_obj.callback

@cached_property
def is_group(self) -> bool:
return isinstance(self.cmd_obj, click.Group)


def introspect_click_app(app: BaseCommand, click_context: click.Context) -> dict[CommandName, CommandSchema]:
"""
Introspect a Click application and build a data structure containing
information about all commands, options, arguments, and subcommands,
Expand All @@ -112,80 +236,20 @@ def introspect_click_app(app: BaseCommand) -> dict[CommandName, CommandSchema]:
TypedDicts (OptionData and ArgumentData).
"""

def process_command(
cmd_name: CommandName, cmd_obj: click.Command, parent=None
) -> CommandSchema:
cmd_data = CommandSchema(
name=cmd_name,
docstring=cmd_obj.help,
function=cmd_obj.callback,
options=[],
arguments=[],
subcommands={},
parent=parent,
is_group=isinstance(cmd_obj, click.Group),
)

for param in cmd_obj.params:
default = MultiValueParamData.process_cli_option(param.default)
if isinstance(param, (click.Option, click.core.Group)):
option_data = OptionSchema(
name=param.opts,
type=param.type,
is_flag=param.is_flag,
is_boolean_flag=param.is_bool_flag,
flag_value=param.flag_value,
counting=param.count,
opts=param.opts,
secondary_opts=param.secondary_opts,
required=param.required,
default=default,
help=param.help,
multiple=param.multiple,
nargs=param.nargs,
)
if isinstance(param.type, click.Choice):
option_data.choices = param.type.choices
cmd_data.options.append(option_data)
elif isinstance(param, click.Argument):
argument_data = ArgumentSchema(
name=param.name,
type=param.type,
required=param.required,
multiple=param.multiple,
default=default,
nargs=param.nargs,
)
if isinstance(param.type, click.Choice):
argument_data.choices = param.type.choices
cmd_data.arguments.append(argument_data)

if isinstance(cmd_obj, click.core.Group):
for subcmd_name, subcmd_obj in cmd_obj.commands.items():
cmd_data.subcommands[CommandName(subcmd_name)] = process_command(
CommandName(subcmd_name), subcmd_obj, parent=cmd_data
)

return cmd_data

data: dict[CommandName, CommandSchema] = {}

# Special case for the root group
if isinstance(app, click.Group):
root_cmd_name = CommandName("root")
data[root_cmd_name] = process_command(root_cmd_name, app)
data[root_cmd_name] = ClickCommandSchema(app, click_context, cmd_name=root_cmd_name)
app = data[root_cmd_name]

if isinstance(app, click.Group):
for cmd_name, cmd_obj in app.commands.items():
data[CommandName(cmd_name)] = process_command(
CommandName(cmd_name), cmd_obj
)
data[CommandName(cmd_name)] = ClickCommandSchema(cmd_obj, click_context, cmd_name=CommandName(cmd_name))

elif isinstance(app, click.Command):
cmd_name = CommandName(app.name)
data[cmd_name] = process_command(cmd_name, app)
data[cmd_name] = ClickCommandSchema(cmd_obj, click_context, cmd_name=cmd_name)

return data


CommandName = NewType("CommandName", str)
Loading