mirror of
https://github.com/OpenBB-finance/OpenBB.git
synced 2026-06-04 00:54:17 +08:00
[Feature] - Rename terminal folder (#6349)
* rename terminal folder * docstring * Rename to openbb_cli * rename constant * rename .his file * pyproject.toml * Update pre-commit and remove docker-build.yml
This commit is contained in:
947
cli/openbb_cli/controllers/cli_controller.py
Normal file
947
cli/openbb_cli/controllers/cli_controller.py
Normal file
@@ -0,0 +1,947 @@
|
||||
#!/usr/bin/env python
|
||||
"""Main CLI Module."""
|
||||
|
||||
import argparse
|
||||
import contextlib
|
||||
import difflib
|
||||
import logging
|
||||
import os
|
||||
import re
|
||||
import sys
|
||||
import time
|
||||
import webbrowser
|
||||
from datetime import datetime
|
||||
from functools import partial, update_wrapper
|
||||
from pathlib import Path
|
||||
from types import MethodType
|
||||
from typing import Any, Dict, List, Optional
|
||||
|
||||
import certifi
|
||||
import pandas as pd
|
||||
import requests
|
||||
from openbb import obb
|
||||
from openbb_cli.argparse_translator.obbject_registry import Registry
|
||||
from openbb_cli.config import constants
|
||||
from openbb_cli.config.completer import NestedCompleter
|
||||
from openbb_cli.config.constants import (
|
||||
ASSETS_DIRECTORY,
|
||||
ENV_FILE_SETTINGS,
|
||||
HOME_DIRECTORY,
|
||||
REPOSITORY_DIRECTORY,
|
||||
)
|
||||
from openbb_cli.config.menu_text import MenuText
|
||||
from openbb_cli.controllers.base_controller import BaseController
|
||||
from openbb_cli.controllers.platform_controller_factory import (
|
||||
PlatformControllerFactory,
|
||||
)
|
||||
from openbb_cli.controllers.script_parser import is_reset, parse_openbb_script
|
||||
from openbb_cli.controllers.utils import (
|
||||
bootup,
|
||||
first_time_user,
|
||||
get_flair_and_username,
|
||||
is_installer,
|
||||
parse_and_split_input,
|
||||
print_goodbye,
|
||||
print_rich_table,
|
||||
reset,
|
||||
suppress_stdout,
|
||||
welcome_message,
|
||||
)
|
||||
from openbb_cli.session import Session
|
||||
from prompt_toolkit.formatted_text import HTML
|
||||
from prompt_toolkit.styles import Style
|
||||
from pydantic import BaseModel
|
||||
|
||||
PLATFORM_ROUTERS = {
|
||||
d: "menu" if not isinstance(getattr(obb, d), BaseModel) else "command"
|
||||
for d in dir(obb)
|
||||
if "_" not in d
|
||||
}
|
||||
NON_DATA_ROUTERS = ["coverage", "account", "reference", "system", "user"]
|
||||
DATA_PROCESSING_ROUTERS = ["technical", "quantitative", "econometrics"]
|
||||
|
||||
# pylint: disable=too-many-public-methods,import-outside-toplevel, too-many-function-args
|
||||
# pylint: disable=too-many-branches,no-member,C0302,too-many-return-statements, inconsistent-return-statements
|
||||
|
||||
logger = logging.getLogger(__name__)
|
||||
|
||||
env_file = str(ENV_FILE_SETTINGS)
|
||||
|
||||
if is_installer():
|
||||
# Necessary for installer so that it can locate the correct certificates for
|
||||
# API calls and https
|
||||
# https://stackoverflow.com/questions/27835619/urllib-and-ssl-certificate-verify-failed-error/73270162#73270162
|
||||
os.environ["REQUESTS_CA_BUNDLE"] = certifi.where()
|
||||
os.environ["SSL_CERT_FILE"] = certifi.where()
|
||||
|
||||
|
||||
class CLIController(BaseController):
|
||||
"""CLI Controller class."""
|
||||
|
||||
CHOICES_COMMANDS = ["record", "stop", "exe", "results"]
|
||||
CHOICES_MENUS = [
|
||||
"settings",
|
||||
]
|
||||
|
||||
for router, value in PLATFORM_ROUTERS.items():
|
||||
if value == "menu":
|
||||
CHOICES_MENUS.append(router)
|
||||
else:
|
||||
CHOICES_COMMANDS.append(router)
|
||||
|
||||
PATH = "/"
|
||||
CHOICES_GENERATION = False
|
||||
|
||||
def __init__(self, jobs_cmds: Optional[List[str]] = None):
|
||||
"""Construct CLI controller."""
|
||||
self.ROUTINE_FILES: Dict[str, str] = dict()
|
||||
self.ROUTINE_DEFAULT_FILES: Dict[str, str] = dict()
|
||||
self.ROUTINE_PERSONAL_FILES: Dict[str, str] = dict()
|
||||
self.ROUTINE_CHOICES: Dict[str, Any] = dict()
|
||||
|
||||
super().__init__(jobs_cmds)
|
||||
|
||||
self.queue: List[str] = list()
|
||||
|
||||
if jobs_cmds:
|
||||
self.queue = parse_and_split_input(
|
||||
an_input=" ".join(jobs_cmds), custom_filters=[]
|
||||
)
|
||||
|
||||
self.update_success = False
|
||||
|
||||
self._generate_platform_commands()
|
||||
|
||||
self.update_runtime_choices()
|
||||
|
||||
def _generate_platform_commands(self):
|
||||
"""Generate Platform based commands/menus."""
|
||||
|
||||
def method_call_class(self, _, controller, name, parent_path, target):
|
||||
self.queue = self.load_class(
|
||||
controller, name, parent_path, target, self.queue
|
||||
)
|
||||
|
||||
# pylint: disable=unused-argument
|
||||
def method_call_command(self, _, router: str):
|
||||
"""Call command."""
|
||||
mdl = getattr(obb, router)
|
||||
df = pd.DataFrame.from_dict(mdl.model_dump(), orient="index")
|
||||
return print_rich_table(df, show_index=True)
|
||||
|
||||
for router, value in PLATFORM_ROUTERS.items():
|
||||
target = getattr(obb, router)
|
||||
|
||||
if value == "menu":
|
||||
pcf = PlatformControllerFactory(
|
||||
target, reference=obb.reference["paths"] # type: ignore
|
||||
)
|
||||
DynamicController = pcf.create()
|
||||
|
||||
# Bind the method to the class
|
||||
bound_method = MethodType(method_call_class, self)
|
||||
|
||||
# Update the wrapper and set the attribute
|
||||
bound_method = update_wrapper( # type: ignore
|
||||
partial(
|
||||
bound_method,
|
||||
controller=DynamicController,
|
||||
name=router,
|
||||
target=target,
|
||||
parent_path=self.path,
|
||||
),
|
||||
method_call_class,
|
||||
)
|
||||
else:
|
||||
bound_method = MethodType(method_call_command, self)
|
||||
bound_method = update_wrapper( # type: ignore
|
||||
partial(bound_method, router=router),
|
||||
method_call_command,
|
||||
)
|
||||
|
||||
setattr(self, f"call_{router}", bound_method)
|
||||
|
||||
def update_runtime_choices(self):
|
||||
"""Update runtime choices."""
|
||||
routines_directory = Path(
|
||||
Session().user.preferences.export_directory, "routines"
|
||||
)
|
||||
|
||||
if Session().prompt_session and Session().settings.USE_PROMPT_TOOLKIT:
|
||||
# choices: dict = self.choices_default
|
||||
choices: dict = {c: {} for c in self.controller_choices} # type: ignore
|
||||
choices["hold"] = {c: None for c in ["on", "off", "-s", "--sameaxis"]}
|
||||
choices["hold"]["off"] = {"--title": None}
|
||||
|
||||
self.ROUTINE_FILES = {
|
||||
filepath.name: filepath # type: ignore
|
||||
for filepath in routines_directory.rglob("*.openbb")
|
||||
}
|
||||
self.ROUTINE_DEFAULT_FILES = {
|
||||
filepath.name: filepath # type: ignore
|
||||
for filepath in Path(routines_directory / "hub" / "default").rglob(
|
||||
"*.openbb"
|
||||
)
|
||||
}
|
||||
self.ROUTINE_PERSONAL_FILES = {
|
||||
filepath.name: filepath # type: ignore
|
||||
for filepath in Path(routines_directory / "hub" / "personal").rglob(
|
||||
"*.openbb"
|
||||
)
|
||||
}
|
||||
|
||||
choices["exe"] = {
|
||||
"--file": {
|
||||
filename: {} for filename in list(self.ROUTINE_FILES.keys())
|
||||
},
|
||||
"-f": "--file",
|
||||
"--example": None,
|
||||
"-e": "--example",
|
||||
"--input": None,
|
||||
"-i": "--input",
|
||||
"--url": None,
|
||||
}
|
||||
|
||||
choices["record"] = {
|
||||
"--name": None,
|
||||
"-n": "--name",
|
||||
"--description": None,
|
||||
"-d": "--description",
|
||||
"--public": None,
|
||||
"-p": "--public",
|
||||
"--local": None,
|
||||
"-l": "--local",
|
||||
"--tag1": {c: None for c in constants.SCRIPT_TAGS},
|
||||
"--tag2": {c: None for c in constants.SCRIPT_TAGS},
|
||||
"--tag3": {c: None for c in constants.SCRIPT_TAGS},
|
||||
}
|
||||
|
||||
self.completer = NestedCompleter.from_nested_dict(choices)
|
||||
|
||||
def print_help(self):
|
||||
"""Print help."""
|
||||
mt = MenuText("")
|
||||
mt.add_info("_configure_")
|
||||
mt.add_menu("settings")
|
||||
mt.add_raw("\n")
|
||||
mt.add_info("_scripts_")
|
||||
mt.add_cmd("record")
|
||||
mt.add_cmd("stop")
|
||||
mt.add_cmd("exe")
|
||||
mt.add_raw("\n")
|
||||
mt.add_info("Platform CLI")
|
||||
mt.add_raw(" data\n")
|
||||
for router, value in PLATFORM_ROUTERS.items():
|
||||
if router in NON_DATA_ROUTERS or router in DATA_PROCESSING_ROUTERS:
|
||||
continue
|
||||
if value == "menu":
|
||||
menu_description = (
|
||||
obb.reference["routers"] # type: ignore
|
||||
.get(f"{self.PATH}{router}", {})
|
||||
.get("description")
|
||||
) or ""
|
||||
mt.add_menu(
|
||||
key_menu=router,
|
||||
menu_description=menu_description.split(".")[0].lower(),
|
||||
)
|
||||
else:
|
||||
mt.add_cmd(router)
|
||||
|
||||
if any(router in PLATFORM_ROUTERS for router in DATA_PROCESSING_ROUTERS):
|
||||
mt.add_raw("\n data processing\n")
|
||||
for router, value in PLATFORM_ROUTERS.items():
|
||||
if router not in DATA_PROCESSING_ROUTERS:
|
||||
continue
|
||||
if value == "menu":
|
||||
menu_description = (
|
||||
obb.reference["routers"] # type: ignore
|
||||
.get(f"{self.PATH}{router}", {})
|
||||
.get("description")
|
||||
) or ""
|
||||
mt.add_menu(
|
||||
key_menu=router,
|
||||
menu_description=menu_description.split(".")[0].lower(),
|
||||
)
|
||||
else:
|
||||
mt.add_cmd(router)
|
||||
|
||||
mt.add_raw("\n configuration\n")
|
||||
for router, value in PLATFORM_ROUTERS.items():
|
||||
if router not in NON_DATA_ROUTERS or router == "reference":
|
||||
continue
|
||||
if value == "menu":
|
||||
menu_description = (
|
||||
obb.reference["routers"] # type: ignore
|
||||
.get(f"{self.PATH}{router}", {})
|
||||
.get("description")
|
||||
) or ""
|
||||
mt.add_menu(
|
||||
key_menu=router,
|
||||
menu_description=menu_description.split(".")[0].lower(),
|
||||
)
|
||||
else:
|
||||
mt.add_cmd(router)
|
||||
|
||||
mt.add_raw("\n cached results (OBBjects)\n")
|
||||
mt.add_cmd("results")
|
||||
|
||||
Session().console.print(text=mt.menu_text, menu="Home")
|
||||
self.update_runtime_choices()
|
||||
|
||||
def parse_input(self, an_input: str) -> List:
|
||||
"""Overwrite the BaseController parse_input for `askobb` and 'exe'.
|
||||
|
||||
This will allow us to search for something like "P/E" ratio.
|
||||
"""
|
||||
# Filtering out sorting parameters with forward slashes like P/E
|
||||
sort_filter = r"((\ -q |\ --question|\ ).*?(/))"
|
||||
# Filter out urls
|
||||
url = r"(exe (--url )?(https?://)?my\.openbb\.(dev|co)/u/.*/routine/.*)"
|
||||
custom_filters = [sort_filter, url]
|
||||
return parse_and_split_input(an_input=an_input, custom_filters=custom_filters)
|
||||
|
||||
def call_settings(self, _):
|
||||
"""Process feature flags command."""
|
||||
from openbb_cli.controllers.feature_flags_controller import (
|
||||
FeatureFlagsController,
|
||||
)
|
||||
|
||||
self.queue = self.load_class(FeatureFlagsController, self.queue)
|
||||
|
||||
def call_exe(self, other_args: List[str]):
|
||||
"""Process exe command."""
|
||||
# Merge rest of string path to other_args and remove queue since it is a dir
|
||||
other_args += self.queue
|
||||
|
||||
if not other_args:
|
||||
Session().console.print(
|
||||
"[info]Provide a path to the routine you wish to execute. For an example, please use "
|
||||
"`exe --example` and for documentation and to learn how create your own script "
|
||||
"type `about exe`.\n[/info]"
|
||||
)
|
||||
return
|
||||
parser = argparse.ArgumentParser(
|
||||
add_help=False,
|
||||
formatter_class=argparse.ArgumentDefaultsHelpFormatter,
|
||||
prog="exe",
|
||||
description="Execute automated routine script. For an example, please use "
|
||||
"`exe --example` and for documentation and to learn how create your own script "
|
||||
"type `about exe`.",
|
||||
)
|
||||
parser.add_argument(
|
||||
"--file",
|
||||
"-f",
|
||||
help="The path or .openbb file to run.",
|
||||
dest="file",
|
||||
required="-h" not in other_args
|
||||
and "--help" not in other_args
|
||||
and "-e" not in other_args
|
||||
and "--example" not in other_args
|
||||
and "--url" not in other_args
|
||||
and "my.openbb" not in other_args[0],
|
||||
type=str,
|
||||
nargs="+",
|
||||
)
|
||||
parser.add_argument(
|
||||
"-i",
|
||||
"--input",
|
||||
help="Select multiple inputs to be replaced in the routine and separated by commas. E.g. GME,AMC,BTC-USD",
|
||||
dest="routine_args",
|
||||
type=lambda s: [str(item) for item in s.split(",")],
|
||||
)
|
||||
parser.add_argument(
|
||||
"-e",
|
||||
"--example",
|
||||
help="Run an example script to understand how routines can be used.",
|
||||
dest="example",
|
||||
action="store_true",
|
||||
default=False,
|
||||
)
|
||||
parser.add_argument(
|
||||
"--url", help="URL to run openbb script from.", dest="url", type=str
|
||||
)
|
||||
if other_args and "-" not in other_args[0][0]:
|
||||
if other_args[0].startswith("my.") or other_args[0].startswith("http"):
|
||||
other_args.insert(0, "--url")
|
||||
else:
|
||||
other_args.insert(0, "--file")
|
||||
ns_parser = self.parse_known_args_and_warn(parser, other_args)
|
||||
if ns_parser:
|
||||
if ns_parser.example:
|
||||
routine_path = ASSETS_DIRECTORY / "routines" / "routine_example.openbb"
|
||||
Session().console.print(
|
||||
"[info]Executing an example, please type `about exe` "
|
||||
"to learn how to create your own script.[/info]\n"
|
||||
)
|
||||
time.sleep(3)
|
||||
elif ns_parser.url:
|
||||
if not ns_parser.url.startswith(
|
||||
"https"
|
||||
) and not ns_parser.url.startswith("http:"):
|
||||
url = "https://" + ns_parser.url
|
||||
elif ns_parser.url.startswith("http://"):
|
||||
url = ns_parser.url.replace("http://", "https://")
|
||||
else:
|
||||
url = ns_parser.url
|
||||
username = url.split("/")[-3]
|
||||
script_name = url.split("/")[-1]
|
||||
file_name = f"{username}_{script_name}.openbb"
|
||||
final_url = f"{url}?raw=true"
|
||||
response = requests.get(final_url, timeout=10)
|
||||
if response.status_code != 200:
|
||||
Session().console.print(
|
||||
"[red]Could not find the requested script.[/red]"
|
||||
)
|
||||
return
|
||||
routine_text = response.json()["script"]
|
||||
file_path = Path(
|
||||
Session().user.preferences.export_directory, "routines"
|
||||
)
|
||||
routine_path = file_path / file_name
|
||||
with open(routine_path, "w") as file:
|
||||
file.write(routine_text)
|
||||
self.update_runtime_choices()
|
||||
|
||||
elif ns_parser.file:
|
||||
file_path = " ".join(ns_parser.file) # type: ignore
|
||||
# if string is not in this format "default/file.openbb" then check for files in ROUTINE_FILES
|
||||
full_path = file_path
|
||||
hub_routine = file_path.split("/") # type: ignore
|
||||
# Change with: my.openbb.co
|
||||
if hub_routine[0] == "default":
|
||||
routine_path = Path(
|
||||
self.ROUTINE_DEFAULT_FILES.get(hub_routine[1], full_path)
|
||||
)
|
||||
elif hub_routine[0] == "personal":
|
||||
routine_path = Path(
|
||||
self.ROUTINE_PERSONAL_FILES.get(hub_routine[1], full_path)
|
||||
)
|
||||
else:
|
||||
routine_path = Path(self.ROUTINE_FILES.get(file_path, full_path)) # type: ignore
|
||||
else:
|
||||
return
|
||||
|
||||
with open(routine_path) as fp:
|
||||
raw_lines = list(fp)
|
||||
|
||||
# Capture ARGV either as list if args separated by commas or as single value
|
||||
if ns_parser.routine_args:
|
||||
script_inputs = (
|
||||
ns_parser.routine_args
|
||||
if "," not in ns_parser.routine_args
|
||||
else ns_parser.routine_args.split(",")
|
||||
)
|
||||
|
||||
err, parsed_script = parse_openbb_script(
|
||||
raw_lines=raw_lines,
|
||||
script_inputs=script_inputs if ns_parser.routine_args else None,
|
||||
)
|
||||
|
||||
# If there err output is not an empty string then it means there was an
|
||||
# issue in parsing the routine and therefore we don't want to feed it
|
||||
# to the terminal
|
||||
if err:
|
||||
Session().console.print(err)
|
||||
return
|
||||
|
||||
self.queue = [
|
||||
val
|
||||
for val in parse_and_split_input(
|
||||
an_input=parsed_script, custom_filters=[]
|
||||
)
|
||||
if val
|
||||
]
|
||||
|
||||
if "export" in self.queue[0]:
|
||||
export_path = self.queue[0].split(" ")[1]
|
||||
# If the path selected does not start from the user root, give relative location from root
|
||||
if export_path[0] == "~":
|
||||
export_path = export_path.replace(
|
||||
"~", HOME_DIRECTORY.as_posix()
|
||||
)
|
||||
elif export_path[0] != "/":
|
||||
export_path = os.path.join(
|
||||
os.path.dirname(os.path.abspath(__file__)), export_path
|
||||
)
|
||||
|
||||
# Check if the directory exists
|
||||
if os.path.isdir(export_path):
|
||||
Session().console.print(
|
||||
f"Export data to be saved in the selected folder: '{export_path}'"
|
||||
)
|
||||
else:
|
||||
os.makedirs(export_path)
|
||||
Session().console.print(
|
||||
f"[green]Folder '{export_path}' successfully created.[/green]"
|
||||
)
|
||||
self.queue = self.queue[1:]
|
||||
|
||||
def call_results(self, _):
|
||||
"""Process results command."""
|
||||
results = Registry().all
|
||||
if results:
|
||||
df = pd.DataFrame.from_dict(results, orient="index")
|
||||
print_rich_table(
|
||||
df, show_index=True, index_name="stack index", title="OBBject Results"
|
||||
)
|
||||
else:
|
||||
Session().console.print("[info]No results found.[/info]")
|
||||
|
||||
|
||||
def handle_job_cmds(jobs_cmds: Optional[List[str]]) -> Optional[List[str]]:
|
||||
"""Handle job commands."""
|
||||
# If the path selected does not start from the user root,
|
||||
# give relative location from root
|
||||
if jobs_cmds is not None and jobs_cmds:
|
||||
logger.info("INPUT: %s", "/".join(jobs_cmds))
|
||||
|
||||
export_path = ""
|
||||
if jobs_cmds and "export" in jobs_cmds[0]:
|
||||
commands = jobs_cmds[0].split("/")
|
||||
first_split = commands[0].split(" ")
|
||||
if len(first_split) > 1:
|
||||
export_path = first_split[1]
|
||||
jobs_cmds = ["/".join(commands[1:])]
|
||||
if not export_path:
|
||||
return jobs_cmds
|
||||
if export_path[0] == "~":
|
||||
export_path = export_path.replace("~", HOME_DIRECTORY.as_posix())
|
||||
elif export_path[0] != "/":
|
||||
export_path = os.path.join(
|
||||
os.path.dirname(os.path.abspath(__file__)), export_path
|
||||
)
|
||||
|
||||
# Check if the directory exists
|
||||
if os.path.isdir(export_path):
|
||||
Session().console.print(
|
||||
f"Export data to be saved in the selected folder: '{export_path}'"
|
||||
)
|
||||
else:
|
||||
os.makedirs(export_path)
|
||||
Session().console.print(
|
||||
f"[green]Folder '{export_path}' successfully created.[/green]"
|
||||
)
|
||||
return jobs_cmds
|
||||
|
||||
|
||||
# pylint: disable=unused-argument
|
||||
def run_cli(jobs_cmds: Optional[List[str]] = None, test_mode=False):
|
||||
"""Run the CLI menu."""
|
||||
ret_code = 1
|
||||
t_controller = CLIController(jobs_cmds)
|
||||
an_input = ""
|
||||
|
||||
jobs_cmds = handle_job_cmds(jobs_cmds)
|
||||
|
||||
bootup()
|
||||
if not jobs_cmds:
|
||||
welcome_message()
|
||||
|
||||
if first_time_user():
|
||||
with contextlib.suppress(EOFError):
|
||||
webbrowser.open(
|
||||
"https://docs.openbb.co/terminal/usage/overview/structure-and-navigation"
|
||||
)
|
||||
|
||||
t_controller.print_help()
|
||||
|
||||
while ret_code:
|
||||
|
||||
# There is a command in the queue
|
||||
if t_controller.queue and len(t_controller.queue) > 0:
|
||||
# If the command is quitting the menu we want to return in here
|
||||
if t_controller.queue[0] in ("q", "..", "quit"):
|
||||
print_goodbye()
|
||||
break
|
||||
|
||||
# Consume 1 element from the queue
|
||||
an_input = t_controller.queue[0]
|
||||
t_controller.queue = t_controller.queue[1:]
|
||||
|
||||
# Print the current location because this was an instruction and we want user to know what was the action
|
||||
if an_input and an_input.split(" ")[0] in t_controller.CHOICES_COMMANDS:
|
||||
Session().console.print(f"{get_flair_and_username()} / $ {an_input}")
|
||||
|
||||
# Get input command from user
|
||||
else:
|
||||
try:
|
||||
# Get input from user using auto-completion
|
||||
if Session().prompt_session and Session().settings.USE_PROMPT_TOOLKIT:
|
||||
# Check if toolbar hint was enabled
|
||||
if Session().settings.TOOLBAR_HINT:
|
||||
an_input = Session().prompt_session.prompt( # type: ignore[union-attr]
|
||||
f"{get_flair_and_username()} / $ ",
|
||||
completer=t_controller.completer,
|
||||
search_ignore_case=True,
|
||||
bottom_toolbar=HTML(
|
||||
'<style bg="ansiblack" fg="ansiwhite">[h]</style> help menu '
|
||||
'<style bg="ansiblack" fg="ansiwhite">[q]</style> return to previous menu '
|
||||
'<style bg="ansiblack" fg="ansiwhite">[e]</style> exit the program '
|
||||
'<style bg="ansiblack" fg="ansiwhite">[cmd -h]</style> '
|
||||
"see usage and available options "
|
||||
'<style bg="ansiblack" fg="ansiwhite">[about (cmd/menu)]</style> '
|
||||
),
|
||||
style=Style.from_dict(
|
||||
{
|
||||
"bottom-toolbar": "#ffffff bg:#333333",
|
||||
}
|
||||
),
|
||||
)
|
||||
else:
|
||||
an_input = Session().prompt_session.prompt( # type: ignore[union-attr]
|
||||
f"{get_flair_and_username()} / $ ",
|
||||
completer=t_controller.completer,
|
||||
search_ignore_case=True,
|
||||
)
|
||||
|
||||
# Get input from user without auto-completion
|
||||
else:
|
||||
an_input = input(f"{get_flair_and_username()} / $ ")
|
||||
|
||||
except (KeyboardInterrupt, EOFError):
|
||||
print_goodbye()
|
||||
break
|
||||
|
||||
try:
|
||||
# Process the input command
|
||||
t_controller.queue = t_controller.switch(an_input)
|
||||
|
||||
if an_input in ("q", "quit", "..", "exit", "e"):
|
||||
print_goodbye()
|
||||
break
|
||||
|
||||
# Check if the user wants to reset application
|
||||
if an_input in ("r", "reset") or t_controller.update_success:
|
||||
reset(t_controller.queue if t_controller.queue else [])
|
||||
break
|
||||
|
||||
except SystemExit:
|
||||
logger.exception(
|
||||
"The command '%s' doesn't exist on the / menu.",
|
||||
an_input,
|
||||
)
|
||||
Session().console.print(
|
||||
f"[red]The command '{an_input}' doesn't exist on the / menu.[/red]\n",
|
||||
)
|
||||
similar_cmd = difflib.get_close_matches(
|
||||
an_input.split(" ")[0] if " " in an_input else an_input,
|
||||
t_controller.controller_choices,
|
||||
n=1,
|
||||
cutoff=0.7,
|
||||
)
|
||||
if similar_cmd:
|
||||
an_input = similar_cmd[0]
|
||||
if " " in an_input:
|
||||
candidate_input = (
|
||||
f"{similar_cmd[0]} {' '.join(an_input.split(' ')[1:])}"
|
||||
)
|
||||
if candidate_input == an_input:
|
||||
an_input = ""
|
||||
t_controller.queue = []
|
||||
Session().console.print("\n")
|
||||
continue
|
||||
an_input = candidate_input
|
||||
|
||||
Session().console.print(f"[green]Replacing by '{an_input}'.[/green]")
|
||||
t_controller.queue.insert(0, an_input)
|
||||
|
||||
|
||||
def insert_start_slash(cmds: List[str]) -> List[str]:
|
||||
"""Insert a slash at the beginning of a command sequence."""
|
||||
if not cmds[0].startswith("/"):
|
||||
cmds[0] = f"/{cmds[0]}"
|
||||
if cmds[0].startswith("/home"):
|
||||
cmds[0] = f"/{cmds[0][5:]}"
|
||||
return cmds
|
||||
|
||||
|
||||
def run_scripts(
|
||||
path: Path,
|
||||
test_mode: bool = False,
|
||||
verbose: bool = False,
|
||||
routines_args: Optional[List[str]] = None,
|
||||
special_arguments: Optional[Dict[str, str]] = None,
|
||||
output: bool = True,
|
||||
):
|
||||
"""Run given .openbb scripts.
|
||||
|
||||
Parameters
|
||||
----------
|
||||
path : str
|
||||
The location of the .openbb file
|
||||
test_mode : bool
|
||||
Whether the CLI is in test mode
|
||||
verbose : bool
|
||||
Whether to run tests in verbose mode
|
||||
routines_args : List[str]
|
||||
One or multiple inputs to be replaced in the routine and separated by commas.
|
||||
E.g. GME,AMC,BTC-USD
|
||||
special_arguments: Optional[Dict[str, str]]
|
||||
Replace `${key=default}` with `value` for every key in the dictionary
|
||||
output: bool
|
||||
Whether to log tests to txt files
|
||||
"""
|
||||
if not path.exists():
|
||||
Session().console.print(f"File '{path}' doesn't exist. Launching base CLI.\n")
|
||||
if not test_mode:
|
||||
run_cli()
|
||||
|
||||
# THIS NEEDS TO BE REFACTORED!!! - ITS USED FOR TESTING
|
||||
with path.open() as fp:
|
||||
raw_lines = [x for x in fp if (not is_reset(x)) and ("#" not in x) and x]
|
||||
raw_lines = [
|
||||
raw_line.strip("\n") for raw_line in raw_lines if raw_line.strip("\n")
|
||||
]
|
||||
|
||||
if routines_args:
|
||||
lines = []
|
||||
for rawline in raw_lines:
|
||||
templine = rawline
|
||||
for i, arg in enumerate(routines_args):
|
||||
templine = templine.replace(f"$ARGV[{i}]", arg)
|
||||
lines.append(templine)
|
||||
# Handle new testing arguments:
|
||||
elif special_arguments:
|
||||
lines = []
|
||||
for line in raw_lines:
|
||||
new_line = re.sub(
|
||||
r"\${[^{]+=[^{]+}",
|
||||
lambda x: replace_dynamic(x, special_arguments), # type: ignore
|
||||
line,
|
||||
)
|
||||
lines.append(new_line)
|
||||
|
||||
else:
|
||||
lines = raw_lines
|
||||
|
||||
if test_mode and "exit" not in lines[-1]:
|
||||
lines.append("exit")
|
||||
|
||||
# Deals with the export with a path with "/" in it
|
||||
export_folder = ""
|
||||
if "export" in lines[0]:
|
||||
export_folder = lines[0].split("export ")[1].rstrip()
|
||||
lines = lines[1:]
|
||||
|
||||
simulate_argv = f"/{'/'.join([line.rstrip() for line in lines])}"
|
||||
file_cmds = simulate_argv.replace("//", "/home/").split()
|
||||
file_cmds = insert_start_slash(file_cmds) if file_cmds else file_cmds
|
||||
file_cmds = (
|
||||
[f"export {export_folder}{' '.join(file_cmds)}"]
|
||||
if export_folder
|
||||
else [" ".join(file_cmds)]
|
||||
)
|
||||
|
||||
if not test_mode or verbose:
|
||||
run_cli(file_cmds, test_mode=True)
|
||||
else:
|
||||
with suppress_stdout():
|
||||
Session().console.print(f"To ensure: {output}")
|
||||
if output:
|
||||
timestamp = datetime.now().timestamp()
|
||||
stamp_str = str(timestamp).replace(".", "")
|
||||
whole_path = Path(REPOSITORY_DIRECTORY / "integration_test_output")
|
||||
whole_path.mkdir(parents=True, exist_ok=True)
|
||||
first_cmd = file_cmds[0].split("/")[1]
|
||||
with open(
|
||||
whole_path / f"{stamp_str}_{first_cmd}_output.txt", "w"
|
||||
) as output_file, contextlib.redirect_stdout(output_file):
|
||||
run_cli(file_cmds, test_mode=True)
|
||||
else:
|
||||
run_cli(file_cmds, test_mode=True)
|
||||
|
||||
|
||||
def replace_dynamic(match: re.Match, special_arguments: Dict[str, str]) -> str:
|
||||
"""Replace ${key=default} with value in special_arguments if it exists, else with default.
|
||||
|
||||
Parameters
|
||||
----------
|
||||
match: re.Match[str]
|
||||
The match object
|
||||
special_arguments: Dict[str, str]
|
||||
The key value pairs to replace in the scripts
|
||||
|
||||
Returns
|
||||
----------
|
||||
str
|
||||
The new string
|
||||
"""
|
||||
|
||||
cleaned = match[0].replace("{", "").replace("}", "").replace("$", "")
|
||||
key, default = cleaned.split("=")
|
||||
dict_value = special_arguments.get(key, default)
|
||||
if dict_value:
|
||||
return dict_value
|
||||
return default
|
||||
|
||||
|
||||
def run_routine(file: str, routines_args=Optional[str]):
|
||||
"""Execute command routine from .openbb file."""
|
||||
user_routine_path = Path(Session().user.preferences.export_directory, "routines")
|
||||
default_routine_path = ASSETS_DIRECTORY / "routines" / file
|
||||
|
||||
if user_routine_path.exists():
|
||||
run_scripts(path=user_routine_path, routines_args=routines_args)
|
||||
elif default_routine_path.exists():
|
||||
run_scripts(path=default_routine_path, routines_args=routines_args)
|
||||
else:
|
||||
Session().console.print(
|
||||
f"Routine not found, please put your `.openbb` file into : {user_routine_path}."
|
||||
)
|
||||
|
||||
|
||||
# pylint: disable=unused-argument
|
||||
def main(
|
||||
debug: bool,
|
||||
dev: bool,
|
||||
path_list: List[str],
|
||||
routines_args: Optional[List[str]] = None,
|
||||
**kwargs,
|
||||
):
|
||||
"""Run the CLI with various options.
|
||||
|
||||
Parameters
|
||||
----------
|
||||
debug : bool
|
||||
Whether to run the CLI in debug mode
|
||||
dev:
|
||||
Points backend towards development environment instead of production
|
||||
test : bool
|
||||
Whether to run the CLI in integrated test mode
|
||||
filtert : str
|
||||
Filter test files with given string in name
|
||||
paths : List[str]
|
||||
The paths to run for scripts or to test
|
||||
verbose : bool
|
||||
Whether to show output from tests
|
||||
routines_args : List[str]
|
||||
One or multiple inputs to be replaced in the routine and separated by commas.
|
||||
E.g. GME,AMC,BTC-USD
|
||||
"""
|
||||
if debug:
|
||||
Session().settings.DEBUG_MODE = True
|
||||
|
||||
if dev:
|
||||
Session().settings.DEV_BACKEND = True
|
||||
Session().settings.BASE_URL = "https://payments.openbb.dev/"
|
||||
Session().settings.HUB_URL = "https://my.openbb.dev"
|
||||
|
||||
if isinstance(path_list, list) and path_list[0].endswith(".openbb"):
|
||||
run_routine(file=path_list[0], routines_args=routines_args)
|
||||
elif path_list:
|
||||
argv_cmds = list([" ".join(path_list).replace(" /", "/home/")])
|
||||
argv_cmds = insert_start_slash(argv_cmds) if argv_cmds else argv_cmds
|
||||
run_cli(argv_cmds)
|
||||
else:
|
||||
run_cli()
|
||||
|
||||
|
||||
def parse_args_and_run():
|
||||
"""Parse input arguments and run CLI."""
|
||||
parser = argparse.ArgumentParser(
|
||||
formatter_class=argparse.ArgumentDefaultsHelpFormatter,
|
||||
prog="cli",
|
||||
description="The OpenBB Platform CLI.",
|
||||
)
|
||||
parser.add_argument(
|
||||
"-d",
|
||||
"--debug",
|
||||
dest="debug",
|
||||
action="store_true",
|
||||
default=False,
|
||||
help="Runs the CLI in debug mode.",
|
||||
)
|
||||
parser.add_argument(
|
||||
"--dev",
|
||||
dest="dev",
|
||||
action="store_true",
|
||||
default=False,
|
||||
help="Points backend towards development environment instead of production",
|
||||
)
|
||||
parser.add_argument(
|
||||
"--file",
|
||||
help="The path or .openbb file to run.",
|
||||
dest="path",
|
||||
nargs="+",
|
||||
default="",
|
||||
type=str,
|
||||
)
|
||||
parser.add_argument(
|
||||
"-i",
|
||||
"--input",
|
||||
help=(
|
||||
"Select multiple inputs to be replaced in the routine and separated by commas."
|
||||
"E.g. GME,AMC,BTC-USD"
|
||||
),
|
||||
dest="routine_args",
|
||||
type=lambda s: [str(item) for item in s.split(",")],
|
||||
default=None,
|
||||
)
|
||||
parser.add_argument(
|
||||
"-t",
|
||||
"--test",
|
||||
action="store_true",
|
||||
help=(
|
||||
"Run the CLI in testing mode. Also run this option and '-h'"
|
||||
" to see testing argument options."
|
||||
),
|
||||
)
|
||||
# The args -m, -f and --HistoryManager.hist_file are used only in reports menu
|
||||
# by papermill and that's why they have suppress help.
|
||||
parser.add_argument(
|
||||
"-m",
|
||||
help=argparse.SUPPRESS,
|
||||
dest="module",
|
||||
default="",
|
||||
type=str,
|
||||
)
|
||||
parser.add_argument(
|
||||
"-f",
|
||||
help=argparse.SUPPRESS,
|
||||
dest="module_file",
|
||||
default="",
|
||||
type=str,
|
||||
)
|
||||
parser.add_argument(
|
||||
"--HistoryManager.hist_file",
|
||||
help=argparse.SUPPRESS,
|
||||
dest="module_hist_file",
|
||||
default="",
|
||||
type=str,
|
||||
)
|
||||
if sys.argv[1:] and "-" not in sys.argv[1][0]:
|
||||
sys.argv.insert(1, "--file")
|
||||
ns_parser, unknown = parser.parse_known_args()
|
||||
|
||||
# This ensures that if cli.py receives unknown args it will not start.
|
||||
# Use -d flag if you want to see the unknown args.
|
||||
if unknown:
|
||||
if ns_parser.debug:
|
||||
Session().console.print(unknown)
|
||||
else:
|
||||
sys.exit(-1)
|
||||
|
||||
main(
|
||||
ns_parser.debug,
|
||||
ns_parser.dev,
|
||||
ns_parser.path,
|
||||
ns_parser.routine_args,
|
||||
module=ns_parser.module,
|
||||
module_file=ns_parser.module_file,
|
||||
module_hist_file=ns_parser.module_hist_file,
|
||||
)
|
||||
|
||||
|
||||
def launch(
|
||||
debug: bool = False, dev: bool = False, queue: Optional[List[str]] = None
|
||||
) -> None:
|
||||
"""Launch CLI."""
|
||||
|
||||
if queue:
|
||||
main(debug, dev, queue, module="")
|
||||
else:
|
||||
parse_args_and_run()
|
||||
|
||||
|
||||
if __name__ == "__main__":
|
||||
parse_args_and_run()
|
||||
Reference in New Issue
Block a user