Source code for almanac.core.command_completer

from __future__ import annotations

import re

from typing import Any, Iterable, TYPE_CHECKING

from prompt_toolkit.document import Document
from prompt_toolkit.completion import (
    CompleteEvent,
    Completion,
    Completer,
    merge_completers
)

from ..arguments import FrozenArgument
from ..commands import FrozenCommand
from ..completion import rewrite_completion_stream
from ..errors import NoSuchArgumentError
from ..parsing import (
    IncompleteToken,
    last_incomplete_token,
    parse_cmd_line,
    ParseState,
    Patterns
)
from ..types import is_matching_type

if TYPE_CHECKING:
    from .application import Application


_compiled_word_re = re.compile(Patterns.UNQUOTED_STRING)


[docs]class CommandCompleter(Completer): """A completer that provides command argument completion for an application.""" def __init__( self, app: Application ) -> None: self._app = app self._command_engine = app.command_engine def _maybe_complete_for_type( self, annotation: Any, document: Document, complete_event: CompleteEvent ) -> Iterable[Completion]: for _type, completers in self._app.type_completer_mapping.items(): if is_matching_type(_type, annotation): yield from merge_completers(completers).get_completions( document, complete_event ) def _get_command_completions( self, start_of_command: str ) -> Iterable[Completion]: for name_or_alias in sorted(self._command_engine.keys()): if name_or_alias.startswith(start_of_command): command = self._command_engine[name_or_alias] if name_or_alias == command.name: display_meta = command.abbreviated_description else: display_meta = f'(alias for {command.name})' yield Completion( name_or_alias, start_position=-len(start_of_command), display_meta=display_meta ) def _get_completions_for_arg( self, frozen_arg: FrozenArgument, document: Document, complete_event: CompleteEvent ) -> Iterable[Completion]: # Completions from any per-argument registered completer. for completer in frozen_arg.completers: yield from rewrite_completion_stream( self._app.call_as_current_app_sync( completer.get_completions, document, complete_event ), display_meta='From per-argument completer.' ) # Completions from any matching application-global type completers. yield from rewrite_completion_stream( self._app.call_as_current_app_sync( self._maybe_complete_for_type, frozen_arg.annotation, document, complete_event ), display_meta='From global per-type completer.' ) def _get_kw_arg_name_completions( self, start_of_kw_arg: str, unbound_kw_args: Iterable[FrozenArgument] ) -> Iterable[Completion]: candidate_args = [ x for x in unbound_kw_args if not x.hidden and not x.is_var_kw ] for candidate_arg in sorted(candidate_args, key=lambda x: x.display_name): if candidate_arg.display_name.startswith(start_of_kw_arg): text = f'{candidate_arg.display_name}=' meta = candidate_arg.abbreviated_description yield Completion( text, start_position=-len(start_of_kw_arg), display_meta=meta )
[docs] def get_completions( self, document: Document, complete_event: CompleteEvent ) -> Iterable[Completion]: cmd_line = document.text word_before_cursor = document.get_word_before_cursor() token_before_cursor = document.get_word_before_cursor(pattern=_compiled_word_re) parse_results, unparsed_text, _, parse_status = parse_cmd_line(cmd_line) # Determine if we are in the command name, in which case we can fall back on # the CommandEngine for finding potential command names or aliases. stripped_cmd_line = cmd_line.strip() if parse_status == ParseState.NONE and stripped_cmd_line: # There is non-whitespace, and the parser still fails. The line is # inherently malformed, so any further completions would just build on that. return elif not stripped_cmd_line or word_before_cursor == parse_results.command: yield from self._get_command_completions(token_before_cursor) return # Figure out what command we are working with. try: command: FrozenCommand = self._command_engine[parse_results.command] except KeyError: # Not a real command name, so we can't provide completions for it. return # Determine the last incomplete token. last_token: IncompleteToken = last_incomplete_token(document, unparsed_text) args = [x for x in parse_results.positionals] kwargs, _ = command.resolved_kwarg_names(parse_results.kv.asDict()) # Check if we want to avoid binding this argument, since we might not know if # really a positional argument or actually an incomplete keyword argument. if args and last_token.is_ambiguous_arg and last_token.value == args[-1]: args.pop() # Determine what would be the unbound arguments if we attempted to bind the # current state of the arguments to the command's underlying coroutine. These # are our options for future argument-based completions. unbound_arguments = command.get_unbound_arguments(*args, **kwargs) unbound_kw_args = [x for x in unbound_arguments if not x.is_pos_only] unbound_pos_args = [x for x in unbound_arguments if not x.is_kw_only] could_be_key_or_pos_value = ( last_token.is_ambiguous_arg and ( token_before_cursor == last_token.key or (not word_before_cursor and not last_token.key) ) ) # Yield keyword argument name completions. if could_be_key_or_pos_value: yield from self._get_kw_arg_name_completions( last_token.key, unbound_kw_args ) # Yield possible values for the next positional argument. if unbound_pos_args and (could_be_key_or_pos_value or last_token.is_pos_arg): next_pos_arg = unbound_pos_args[0] yield from self._get_completions_for_arg( next_pos_arg, document, complete_event ) # Yield possible values for the current keyword argument. kwarg_name = last_token.key if last_token.is_kw_arg: try: matching_kw_arg = command[kwarg_name] yield from self._get_completions_for_arg( matching_kw_arg, document, complete_event ) except NoSuchArgumentError: pass
# TODO: completions based on the history of the argument? # will probably be a mechanism implemented on the CommandEngine # TODO: if we want to inject global styles into the completions generated here, # I think we will need some kind of Completion.replace function, and # re-write properties of each Completion as we yield them