mirror of
https://github.com/OpenBB-finance/OpenBB.git
synced 2026-05-07 22:40:49 +08:00
* remove python 3.9 support and code * black * more cli lint * more linting * more lint * fix for tests * docstring grammar police * add lock to to build function to avoid async import race conditions * grammar police * lots more linting * relock
421 lines
17 KiB
Python
421 lines
17 KiB
Python
"""Nested completer for completion of OpenBB hierarchical data structures."""
|
|
|
|
from collections.abc import Callable, Iterable, Mapping
|
|
from re import Pattern
|
|
from typing import (
|
|
Any,
|
|
)
|
|
|
|
from prompt_toolkit.completion import CompleteEvent, Completer, Completion
|
|
from prompt_toolkit.document import Document
|
|
from prompt_toolkit.formatted_text import AnyFormattedText
|
|
from prompt_toolkit.history import FileHistory
|
|
|
|
NestedDict = Mapping[str, Any | set[str] | None | Completer]
|
|
|
|
# pylint: disable=too-many-arguments,global-statement,too-many-branches,global-variable-not-assigned
|
|
|
|
|
|
class WordCompleter(Completer):
|
|
"""Simple autocompletion on a list of words.
|
|
|
|
:param words: List of words or callable that returns a list of words.
|
|
:param ignore_case: If True, case-insensitive completion.
|
|
:param meta_dict: Optional dict mapping words to their meta-text. (This
|
|
should map strings to strings or formatted text.)
|
|
:param WORD: When True, use WORD characters.
|
|
:param sentence: When True, don't complete by comparing the word before the
|
|
cursor, but by comparing all the text before the cursor. In this case,
|
|
the list of words is just a list of strings, where each string can
|
|
contain spaces. (Can not be used together with the WORD option.)
|
|
:param match_middle: When True, match not only the start, but also in the
|
|
middle of the word.
|
|
:param pattern: Optional compiled regex for finding the word before
|
|
the cursor to complete. When given, use this regex pattern instead of
|
|
default one (see document._FIND_WORD_RE)
|
|
"""
|
|
|
|
def __init__( # pylint: disable=R0917
|
|
self,
|
|
words: list[str] | Callable[[], list[str]],
|
|
ignore_case: bool = False,
|
|
display_dict: Mapping[str, AnyFormattedText] | None = None,
|
|
meta_dict: Mapping[str, AnyFormattedText] | None = None,
|
|
WORD: bool = True,
|
|
sentence: bool = False,
|
|
match_middle: bool = False,
|
|
pattern: Pattern[str] | None = None,
|
|
) -> None:
|
|
"""Initialize the WordCompleter."""
|
|
assert not (WORD and sentence) # noqa: S101
|
|
|
|
self.words = words
|
|
self.ignore_case = ignore_case
|
|
self.display_dict = display_dict or {}
|
|
self.meta_dict = meta_dict or {}
|
|
self.WORD = WORD
|
|
self.sentence = sentence
|
|
self.match_middle = match_middle
|
|
self.pattern = pattern
|
|
|
|
def get_completions(
|
|
self,
|
|
document: Document,
|
|
_complete_event: CompleteEvent,
|
|
) -> Iterable[Completion]:
|
|
"""Get completions."""
|
|
# Get list of words.
|
|
words = self.words
|
|
if callable(words):
|
|
words = words()
|
|
|
|
# Get word/text before cursor.
|
|
if self.sentence:
|
|
word_before_cursor = document.text_before_cursor
|
|
else:
|
|
word_before_cursor = document.get_word_before_cursor(
|
|
WORD=self.WORD, pattern=self.pattern
|
|
)
|
|
if (
|
|
"--" in document.text_before_cursor
|
|
and document.text_before_cursor.rfind(" --")
|
|
>= document.text_before_cursor.rfind(" -")
|
|
):
|
|
word_before_cursor = f"--{document.text_before_cursor.split('--')[-1]}"
|
|
elif f"--{word_before_cursor}" == document.text_before_cursor:
|
|
word_before_cursor = document.text_before_cursor
|
|
|
|
if self.ignore_case:
|
|
word_before_cursor = word_before_cursor.lower()
|
|
|
|
def word_matches(word: str) -> bool:
|
|
"""Set True when the word before the cursor matches."""
|
|
if self.ignore_case:
|
|
word = word.lower()
|
|
|
|
if self.match_middle:
|
|
return word_before_cursor in word
|
|
return word.startswith(word_before_cursor)
|
|
|
|
for a in words:
|
|
if word_matches(a):
|
|
display = self.display_dict.get(a, a)
|
|
display_meta = self.meta_dict.get(a, "")
|
|
yield Completion(
|
|
text=a,
|
|
start_position=-len(word_before_cursor),
|
|
display=display,
|
|
display_meta=display_meta,
|
|
)
|
|
|
|
|
|
class NestedCompleter(Completer):
|
|
"""Completer which wraps around several other completers, and calls any the
|
|
one that corresponds with the first word of the input.
|
|
|
|
By combining multiple `NestedCompleter` instances, we can achieve multiple
|
|
hierarchical levels of autocompletion. This is useful when `WordCompleter`
|
|
is not sufficient.
|
|
|
|
If you need multiple levels, check out the `from_nested_dict` classmethod.
|
|
"""
|
|
|
|
complementary: list = list()
|
|
|
|
def __init__(
|
|
self, options: dict[str, Completer | None], ignore_case: bool = True
|
|
) -> None:
|
|
"""Initialize the NestedCompleter."""
|
|
self.flags_processed: list = list()
|
|
self.original_options = options
|
|
self.options = options
|
|
self.ignore_case = ignore_case
|
|
self.complementary = list()
|
|
|
|
def __repr__(self) -> str:
|
|
"""Return string representation of NestedCompleter."""
|
|
return f"NestedCompleter({self.options!r}, ignore_case={self.ignore_case!r})"
|
|
|
|
@classmethod
|
|
def from_nested_dict(cls, data: dict) -> "NestedCompleter":
|
|
"""Create a `NestedCompleter`.
|
|
|
|
It starts from a nested dictionary data structure, like this:
|
|
|
|
.. code::
|
|
|
|
data = {
|
|
'show': {
|
|
'version': None,
|
|
'interfaces': None,
|
|
'clock': None,
|
|
'ip': {'interface': {'brief'}}
|
|
},
|
|
'exit': None
|
|
'enable': None
|
|
}
|
|
|
|
The value should be `None` if there is no further completion at some
|
|
point. If all values in the dictionary are None, it is also possible to
|
|
use a set instead.
|
|
|
|
Values in this data structure can be a completers as well.
|
|
"""
|
|
options: dict[str, Any] = {}
|
|
for key, value in data.items():
|
|
if isinstance(value, Completer):
|
|
options[key] = value
|
|
elif isinstance(value, dict):
|
|
options[key] = cls.from_nested_dict(value)
|
|
elif isinstance(value, set):
|
|
options[key] = cls.from_nested_dict({item: None for item in value})
|
|
elif isinstance(key, str) and isinstance(value, str):
|
|
options[key] = options[value]
|
|
else:
|
|
assert value is None # noqa: S101
|
|
options[key] = None
|
|
|
|
for items in cls.complementary:
|
|
if items[0] in options:
|
|
options[items[1]] = options[items[0]]
|
|
elif items[1] in options:
|
|
options[items[0]] = options[items[1]]
|
|
|
|
return cls(options)
|
|
|
|
def get_completions( # noqa: PLR0912
|
|
self, document: Document, complete_event: CompleteEvent
|
|
) -> Iterable[Completion]:
|
|
"""Get completions."""
|
|
# Split document.
|
|
cmd = ""
|
|
text = document.text_before_cursor.lstrip()
|
|
if " " in text:
|
|
cmd = text.split(" ")[0]
|
|
if "-" in text:
|
|
if text.rfind("--") == -1 or text.rfind("-") - 1 > text.rfind("--"):
|
|
unprocessed_text = "-" + text.split("-")[-1]
|
|
else:
|
|
unprocessed_text = "--" + text.split("--")[-1]
|
|
else:
|
|
unprocessed_text = text
|
|
stripped_len = len(document.text_before_cursor) - len(text)
|
|
|
|
# Check if there are multiple flags for the same command
|
|
if self.complementary:
|
|
for same_flags in self.complementary:
|
|
if (
|
|
same_flags[0] in self.flags_processed
|
|
and same_flags[1] not in self.flags_processed
|
|
) or (
|
|
same_flags[1] in self.flags_processed
|
|
and same_flags[0] not in self.flags_processed
|
|
):
|
|
if same_flags[0] in self.flags_processed:
|
|
self.flags_processed.append(same_flags[1])
|
|
elif same_flags[1] in self.flags_processed:
|
|
self.flags_processed.append(same_flags[0])
|
|
|
|
if cmd:
|
|
self.options = {
|
|
k: self.original_options.get(cmd).options[k] # type: ignore
|
|
for k in self.original_options.get(cmd).options # type: ignore
|
|
if k not in self.flags_processed
|
|
}
|
|
else:
|
|
self.options = {
|
|
k: self.original_options[k]
|
|
for k in self.original_options
|
|
if k not in self.flags_processed
|
|
}
|
|
|
|
# If there is a space, check for the first term, and use a subcompleter.
|
|
if " " in unprocessed_text:
|
|
first_term = unprocessed_text.split()[0]
|
|
|
|
# user is updating one of the values
|
|
if unprocessed_text[-1] != " ":
|
|
self.flags_processed = [
|
|
flag for flag in self.flags_processed if flag != first_term
|
|
]
|
|
|
|
if self.complementary:
|
|
for same_flags in self.complementary:
|
|
if (
|
|
same_flags[0] in self.flags_processed
|
|
and same_flags[1] not in self.flags_processed
|
|
) or (
|
|
same_flags[1] in self.flags_processed
|
|
and same_flags[0] not in self.flags_processed
|
|
):
|
|
if same_flags[0] in self.flags_processed:
|
|
self.flags_processed.remove(same_flags[0])
|
|
elif same_flags[1] in self.flags_processed:
|
|
self.flags_processed.remove(same_flags[1])
|
|
|
|
if cmd and self.original_options.get(cmd):
|
|
self.options = self.original_options
|
|
else:
|
|
self.options = {
|
|
k: self.original_options[k]
|
|
for k in self.original_options
|
|
if k not in self.flags_processed
|
|
}
|
|
|
|
if "-" not in text:
|
|
completer = self.options.get(first_term)
|
|
elif cmd in self.options and self.options.get(cmd):
|
|
completer = self.options.get(cmd).options.get(first_term) # type: ignore
|
|
else:
|
|
completer = self.options.get(first_term)
|
|
|
|
# If we have a sub completer, use this for the completions.
|
|
if completer is not None:
|
|
remaining_text = unprocessed_text[len(first_term) :].lstrip()
|
|
move_cursor = len(text) - len(remaining_text) + stripped_len
|
|
|
|
new_document = Document(
|
|
remaining_text,
|
|
cursor_position=document.cursor_position - move_cursor,
|
|
)
|
|
|
|
# Provides auto-completion but if user doesn't take it still keep going
|
|
if " " in new_document.text:
|
|
if (
|
|
new_document.text in [f"{opt} " for opt in self.options]
|
|
or unprocessed_text[-1] == " "
|
|
):
|
|
self.flags_processed.append(first_term)
|
|
if cmd:
|
|
self.options = {
|
|
k: self.original_options.get(cmd).options[k] # type: ignore
|
|
for k in self.original_options.get(cmd).options # type: ignore
|
|
if k not in self.flags_processed
|
|
}
|
|
else:
|
|
self.options = {
|
|
k: self.original_options[k]
|
|
for k in self.original_options
|
|
if k not in self.flags_processed
|
|
}
|
|
|
|
# In case the users inputs a single boolean flag
|
|
elif not completer.options: # type: ignore
|
|
self.flags_processed.append(first_term)
|
|
|
|
if self.complementary:
|
|
for same_flags in self.complementary:
|
|
if (
|
|
same_flags[0] in self.flags_processed
|
|
and same_flags[1] not in self.flags_processed
|
|
) or (
|
|
same_flags[1] in self.flags_processed
|
|
and same_flags[0] not in self.flags_processed
|
|
):
|
|
if same_flags[0] in self.flags_processed:
|
|
self.flags_processed.append(same_flags[1])
|
|
elif same_flags[1] in self.flags_processed:
|
|
self.flags_processed.append(same_flags[0])
|
|
|
|
if cmd:
|
|
self.options = {
|
|
k: self.original_options.get(cmd).options[k] # type: ignore
|
|
for k in self.original_options.get(cmd).options # type: ignore
|
|
if k not in self.flags_processed
|
|
}
|
|
else:
|
|
self.options = {
|
|
k: self.original_options[k]
|
|
for k in self.original_options
|
|
if k not in self.flags_processed
|
|
}
|
|
|
|
else:
|
|
# This is a NestedCompleter
|
|
yield from completer.get_completions(new_document, complete_event)
|
|
|
|
# No space in the input: behave exactly like `WordCompleter`.
|
|
else:
|
|
# check if the prompt has been updated in the meantime
|
|
if " " in text or "-" in text:
|
|
actual_flags_processed = [
|
|
flag for flag in self.flags_processed if flag in text
|
|
]
|
|
|
|
if self.complementary:
|
|
for same_flags in self.complementary:
|
|
if (
|
|
same_flags[0] in actual_flags_processed
|
|
and same_flags[1] not in actual_flags_processed
|
|
) or (
|
|
same_flags[1] in actual_flags_processed
|
|
and same_flags[0] not in actual_flags_processed
|
|
):
|
|
if same_flags[0] in actual_flags_processed:
|
|
actual_flags_processed.append(same_flags[1])
|
|
elif same_flags[1] in actual_flags_processed:
|
|
actual_flags_processed.append(same_flags[0])
|
|
|
|
if len(actual_flags_processed) < len(self.flags_processed):
|
|
self.flags_processed = actual_flags_processed
|
|
if cmd:
|
|
self.options = {
|
|
k: self.original_options.get(cmd).options[k] # type: ignore
|
|
for k in self.original_options.get(cmd).options # type: ignore
|
|
if k not in self.flags_processed
|
|
}
|
|
else:
|
|
self.options = {
|
|
k: self.original_options[k]
|
|
for k in self.original_options
|
|
if k not in self.flags_processed
|
|
}
|
|
|
|
command = self.options.get(cmd)
|
|
options = command.options if command else {} # type: ignore
|
|
command_options = [f"{cmd} {opt}" for opt in options]
|
|
text_list = [text in val for val in command_options]
|
|
if cmd and cmd in self.options and text_list:
|
|
completer = WordCompleter(
|
|
list(self.options.get(cmd).options.keys()), # type: ignore
|
|
ignore_case=self.ignore_case,
|
|
)
|
|
elif bool([val for val in self.options if text in val]):
|
|
completer = WordCompleter(
|
|
list(self.options.keys()), ignore_case=self.ignore_case
|
|
)
|
|
else:
|
|
# The user has delete part of the first command and we need to reset options
|
|
if bool([val for val in self.original_options if text in val]):
|
|
self.options = self.original_options
|
|
self.flags_processed = list()
|
|
completer = WordCompleter(
|
|
list(self.options.keys()), ignore_case=self.ignore_case
|
|
)
|
|
|
|
# This is a WordCompleter
|
|
yield from completer.get_completions(document, complete_event)
|
|
|
|
|
|
class CustomFileHistory(FileHistory):
|
|
"""Filtered file history."""
|
|
|
|
def sanitize_input(self, string: str) -> str:
|
|
"""Sanitize sensitive information from the input string by parsing arguments."""
|
|
keywords = ["--password", "--email", "--pat"]
|
|
string_list = string.split(" ")
|
|
|
|
for kw in keywords:
|
|
if kw in string_list:
|
|
index = string_list.index(kw)
|
|
if len(string_list) > index + 1:
|
|
string_list[index + 1] = "********"
|
|
|
|
result = " ".join(string_list)
|
|
return result
|
|
|
|
def store_string(self, string: str) -> None:
|
|
"""Store string in history."""
|
|
string = self.sanitize_input(string)
|
|
super().store_string(string)
|