mirror of
https://github.com/OpenBB-finance/OpenBB.git
synced 2026-05-06 22:12:12 +08:00
* few changes * Update get_search_results.md * updates * undo build * Update test_generate_docs.py * Update controller_doc_classes.py * Update params_controller.py * Update params_controller.py
532 lines
18 KiB
Python
532 lines
18 KiB
Python
# pylint: disable=unused-argument
|
|
import argparse
|
|
import contextlib
|
|
import inspect
|
|
import sys
|
|
from datetime import datetime, timedelta
|
|
from importlib.util import module_from_spec, spec_from_file_location
|
|
from inspect import FullArgSpec, getmembers, isclass
|
|
from pathlib import Path
|
|
from types import FunctionType, ModuleType
|
|
from typing import Any, Dict, List, Optional
|
|
from unittest.mock import patch
|
|
|
|
import pandas as pd
|
|
from pandas.tseries.holiday import USFederalHolidayCalendar
|
|
from rich.console import Console, Theme
|
|
|
|
import openbb_terminal
|
|
import openbb_terminal.config_terminal as cfg
|
|
from openbb_terminal.core.plots.backend import plots_backend
|
|
from openbb_terminal.core.plots.plotly_helper import theme
|
|
from openbb_terminal.core.session.current_system import set_system_variable
|
|
from openbb_terminal.core.session.current_user import get_current_user # noqa: F401
|
|
from openbb_terminal.decorators import disable_check_api
|
|
from openbb_terminal.helper_funcs import (
|
|
EXPORT_BOTH_RAW_DATA_AND_FIGURES,
|
|
EXPORT_ONLY_FIGURES_ALLOWED,
|
|
NO_EXPORT,
|
|
set_command_location,
|
|
)
|
|
from openbb_terminal.parent_classes import BaseController
|
|
from openbb_terminal.sdk import openbb
|
|
from openbb_terminal.stocks.comparison_analysis import finviz_compare_model
|
|
|
|
set_system_variable("TEST_MODE", True)
|
|
set_system_variable("LOG_COLLECT", False)
|
|
disable_check_api()
|
|
console = Console(theme=Theme(theme.console_style), highlight=False, soft_wrap=True)
|
|
|
|
|
|
CRYPTO_DATA = openbb.crypto.load("BTC", to_symbol="usd", source="YahooFinance")
|
|
ETF_DATA = openbb.etf.load("SPY")
|
|
ETF_DATA.index.name = "date"
|
|
FOREX_DATA = openbb.forex.load(to_symbol="USD", from_symbol="EUR")
|
|
FOREX_DATA.index.name = "date"
|
|
STOCK_DATA = openbb.stocks.load("AAPL", start_date="2022-01-01")
|
|
STOCK_DATA.index.name = "date"
|
|
|
|
data_dict = {
|
|
"stocks": {
|
|
"data": STOCK_DATA,
|
|
"symbol": "AAPL",
|
|
},
|
|
"crypto": {
|
|
"data": CRYPTO_DATA,
|
|
"symbol": "BTC",
|
|
},
|
|
"forex": {
|
|
"data": FOREX_DATA,
|
|
"symbol": "EURUSD",
|
|
},
|
|
"futures": {
|
|
"data": STOCK_DATA,
|
|
"symbol": "ES",
|
|
},
|
|
"etf": {
|
|
"data": ETF_DATA,
|
|
"symbol": "SPY",
|
|
},
|
|
}
|
|
|
|
sub_folders_abbr = {
|
|
"discovery": "disc",
|
|
"due_diligence": "dd",
|
|
"overview": "ov",
|
|
"alternative": "alt",
|
|
"cryptocurrency": "crypto",
|
|
"behavioural_analysis": "ba",
|
|
"comparison_analysis": "ca",
|
|
"dark_pool_shorts": "dps",
|
|
"portfolio_optimization": "po",
|
|
"companieshouse": "companieshouse",
|
|
"quantitative_analysis": "qa",
|
|
"technical_analysis": "ta",
|
|
"tradinghours": "th",
|
|
"fundamental_analysis": "fa",
|
|
"mutual_funds": "funds",
|
|
"government": "gov",
|
|
"insider": "ins",
|
|
}
|
|
|
|
sub_names_full = {
|
|
"alt": "Alternative",
|
|
"ba": "Behavioural Analysis",
|
|
"ca": "Comparison Analysis",
|
|
"companieshouse": "Companies House",
|
|
"crypto": "Cryptocurrency",
|
|
"dd": "Due Diligence",
|
|
"defi": "DeFi",
|
|
"disc": "Discovery",
|
|
"dps": "Darkpool Shorts",
|
|
"etf": "ETFs",
|
|
"fa": "Fundamental Analysis",
|
|
"forecast": "Forecasting",
|
|
"funds": "Mutual Funds",
|
|
"gov": "Government",
|
|
"ins": "Insiders",
|
|
"keys": "Keys",
|
|
"nft": "NFTs",
|
|
"onchain": "OnChain",
|
|
"ov": "Overview",
|
|
"po": "Portfolio Optimization",
|
|
"qa": "Quantitative Analysis",
|
|
"screener": "Screener",
|
|
"ta": "Technical Analysis",
|
|
"th": "Trading Hours",
|
|
}
|
|
|
|
required_flags = {
|
|
"crypto": {
|
|
"funot": ["-p", "ethereum"],
|
|
},
|
|
"econometrics": {
|
|
"coint": ["-t"], # TODO: add choice
|
|
"norm": ["-v"], # TODO: add choice
|
|
},
|
|
}
|
|
|
|
|
|
def get_expiration_date():
|
|
"""Gets the next expiration date for controller init"""
|
|
dt = datetime.now()
|
|
bdays_indx = pd.bdate_range(
|
|
dt.strftime("%Y-%m-%d"),
|
|
(dt + timedelta(days=20)).strftime("%Y-%m-%d"),
|
|
freq=pd.offsets.CustomBusinessDay(calendar=USFederalHolidayCalendar()),
|
|
).tolist()
|
|
expiration = [x.strftime("%Y-%m-%d") for x in bdays_indx if x.weekday() == 4][0]
|
|
return expiration
|
|
|
|
|
|
start_date = (datetime.now() - timedelta(days=365)).strftime("%Y-%m-%d")
|
|
end_date = datetime.now().strftime("%Y-%m-%d")
|
|
|
|
param_name_to_value = {
|
|
"expiration": get_expiration_date(),
|
|
"start_date": start_date,
|
|
"start": start_date,
|
|
"end_date": end_date,
|
|
"end": end_date,
|
|
"from_symbol": "USD",
|
|
"to_symbol": "EUR",
|
|
"vs": "USDT",
|
|
}
|
|
param_type_to_value = {
|
|
(dict, Dict): {},
|
|
list: [],
|
|
str: "",
|
|
int: 0,
|
|
float: 0.0,
|
|
bool: False,
|
|
}
|
|
|
|
|
|
def get_parameters(
|
|
controller_cls: BaseController, name: str, df_loaded: pd.DataFrame, symbol: str
|
|
) -> Dict[str, Any]:
|
|
"""Gets the parameters of a controller's `__init__` signature. If required parameters are missing,
|
|
we get the type and use a default value for it.
|
|
|
|
Parameters
|
|
----------
|
|
controller_class: Type[BaseController]
|
|
The controller class
|
|
name: str
|
|
The name of the controller
|
|
df_loaded: pd.DataFrame
|
|
The dataframe loaded from the `load` function
|
|
symbol: str
|
|
The symbol to use on controller init
|
|
|
|
Returns
|
|
-------
|
|
dict[str, Any]
|
|
The parameters with their values for controller init
|
|
"""
|
|
signature = inspect.signature(controller_cls) # type: ignore
|
|
kwargs: Dict[str, Any] = {}
|
|
|
|
for param in signature.parameters.values():
|
|
if param.name in ("ticker", "symbol", "coin"):
|
|
kwargs[param.name] = symbol
|
|
elif param.name == "data" and name in ("forecast", "qa"):
|
|
kwargs["data"] = df_loaded
|
|
elif (
|
|
param.default is inspect.Parameter.empty
|
|
and param.kind is not inspect.Parameter.VAR_KEYWORD
|
|
):
|
|
for param_name, value in param_name_to_value.items():
|
|
if param.name == param_name:
|
|
kwargs[param.name] = value
|
|
break
|
|
if param.name not in kwargs:
|
|
for param_type, value in param_type_to_value.items():
|
|
if isinstance(param_type, tuple):
|
|
if param.annotation in param_type:
|
|
kwargs[param.name] = {symbol: df_loaded}
|
|
break
|
|
elif param.annotation is pd.DataFrame:
|
|
kwargs[param.name] = df_loaded
|
|
break
|
|
elif param.annotation is param_type:
|
|
kwargs[param.name] = value
|
|
break
|
|
|
|
return kwargs
|
|
|
|
|
|
class ControllerDoc:
|
|
"""Class that retrieves the ArgumentParser for each command of the Controller and stores it in a dictionary
|
|
for use in auto-generating the documentation.
|
|
|
|
Parameters
|
|
----------
|
|
controller: BaseController
|
|
The controller to get the commands from
|
|
|
|
Attributes
|
|
----------
|
|
controller: BaseController
|
|
The controller to get the commands from
|
|
cmd_parsers: Dict[str, argparse.ArgumentParser]
|
|
A dictionary of the command name and the ArgumentParser for that command
|
|
cmd_funcs: Dict[str, FunctionType]
|
|
A dictionary of the command name and the function for that command
|
|
cmd_fullspec: Dict[str, FullArgSpec]
|
|
A dictionary of the command name and the full argument spec for that command
|
|
ignore: List[str]
|
|
A list of commands to ignore
|
|
commands: List[str]
|
|
A list of commands to document
|
|
|
|
Methods
|
|
-------
|
|
get_commands()
|
|
Get commands
|
|
get_command_parser(command: str)
|
|
Get the parser for a command
|
|
get_all_command_parsers()
|
|
Get all command parsers
|
|
has_commands()
|
|
Checks if controller has commands to document
|
|
"""
|
|
|
|
def __init__(self, controller: BaseController, trailmap: str):
|
|
self.trailmap = trailmap
|
|
self.name = trailmap.split(".")[-1]
|
|
|
|
params = data_dict.get(trailmap.split(".")[0], data_dict["stocks"])
|
|
self.symbol = params.get("symbol", "AAPL")
|
|
self.current_df = params.get("data", STOCK_DATA)
|
|
|
|
self.controller: BaseController = controller( # type: ignore
|
|
**get_parameters(controller, self.name, self.current_df, self.symbol)
|
|
)
|
|
|
|
root_menu = self.controller.path[0]
|
|
sub_menu = self.controller.path[1] if len(self.controller.path) > 1 else None
|
|
if sub_menu and sub_menu:
|
|
console.print(
|
|
f"[bold green]Loading {root_menu}[/] [bold yellow]{sub_menu}[/]"
|
|
)
|
|
else:
|
|
console.print(f"[bold green]Loading {root_menu}[/]")
|
|
|
|
self.cmd_parsers: Dict[str, argparse.ArgumentParser] = {}
|
|
self.cmd_funcs: Dict[str, FunctionType] = {}
|
|
self.cmd_fullspec: Dict[str, FullArgSpec] = {}
|
|
self.image_exportable: Dict[str, bool] = {}
|
|
self.ignore = [
|
|
"call_help",
|
|
"call_new",
|
|
"call_exit",
|
|
"call_clear",
|
|
"call_cls",
|
|
"call_quit",
|
|
"call_reset",
|
|
"call_support",
|
|
"call_wiki",
|
|
"call_record",
|
|
"call_stop",
|
|
"call_screenshot",
|
|
]
|
|
self.commands: List[str] = self.get_commands()
|
|
|
|
if self.name == "options" and hasattr(self.controller, "selected_date"):
|
|
self.controller.selected_date = get_expiration_date()
|
|
elif self.name == "ca" and hasattr(self.controller, "similar"):
|
|
self.controller.similar = finviz_compare_model.get_similar_companies(
|
|
self.symbol, ["Sector", "Industry"]
|
|
)
|
|
if hasattr(self.controller, "current_currency"):
|
|
self.controller.current_currency = "usdt"
|
|
if hasattr(self.controller, "source") and trailmap.split(".")[0] == "crypto":
|
|
self.controller.source = "YahooFinance"
|
|
|
|
for attr in ["ticker", "symbol", "coin", "etf_name"]:
|
|
if hasattr(self.controller, attr):
|
|
setattr(self.controller, attr, self.symbol)
|
|
for attr in ["current_df", "etf_data"]:
|
|
if hasattr(self.controller, attr):
|
|
setattr(self.controller, attr, self.current_df)
|
|
|
|
self.get_all_command_parsers()
|
|
|
|
def get_commands(self) -> List[str]:
|
|
"""Get commands"""
|
|
commands = []
|
|
for name, _ in getmembers(self.controller, predicate=inspect.ismethod):
|
|
if name.startswith("call_") and name not in self.ignore:
|
|
func = getattr(self.controller, name)
|
|
|
|
if hasattr(func, "__wrapped__"):
|
|
while hasattr(func, "__wrapped__"):
|
|
func = func.__wrapped__
|
|
|
|
self.cmd_funcs[name] = func
|
|
self.cmd_fullspec[name] = inspect.getfullargspec(func)
|
|
|
|
if "_" not in self.cmd_fullspec[
|
|
name
|
|
].args and "from openbb_terminal." not in inspect.getsource(func):
|
|
commands.append(name)
|
|
|
|
return commands
|
|
|
|
def get_command_parser(self, command: str) -> Optional[argparse.ArgumentParser]:
|
|
"""Get command parser"""
|
|
if command not in self.cmd_parsers:
|
|
self._get_parser(command)
|
|
|
|
if command in self.cmd_parsers:
|
|
return self.cmd_parsers[command]
|
|
|
|
return None
|
|
|
|
def _get_parser(self, command: str) -> None:
|
|
"""Get parser information from source"""
|
|
self.image_exportable[command] = False
|
|
|
|
def mock_func(fparser: argparse.ArgumentParser, *args, **kwargs):
|
|
"""Mock function to get the parser"""
|
|
allowed = [EXPORT_BOTH_RAW_DATA_AND_FIGURES, EXPORT_ONLY_FIGURES_ALLOWED]
|
|
export = kwargs.get("export_allowed", NO_EXPORT)
|
|
|
|
if export in allowed:
|
|
self.image_exportable[command] = True
|
|
else:
|
|
for arg in args:
|
|
if arg in allowed:
|
|
self.image_exportable[command] = True
|
|
break
|
|
|
|
self.cmd_parsers[command] = fparser
|
|
|
|
try:
|
|
with patch.object(
|
|
self.controller, "parse_known_args_and_warn", new=mock_func
|
|
) as _:
|
|
args = {}
|
|
|
|
fullspec = self.cmd_fullspec[command]
|
|
if "_" in fullspec.args:
|
|
return
|
|
|
|
if len(fullspec.args) > 2:
|
|
args.update({arg: ["1234"] for arg in fullspec.args[2:]})
|
|
with patch(
|
|
"openbb_terminal.rich_config.console.print"
|
|
), contextlib.suppress(SystemExit, AttributeError):
|
|
_ = getattr(self.controller, command)(["--help"], **args)
|
|
|
|
except Exception as e:
|
|
print(e)
|
|
|
|
def run_image_exports(self) -> None:
|
|
"""Run image exports"""
|
|
if not self.image_exportable:
|
|
return
|
|
|
|
for command in self.commands:
|
|
self.get_image_export(command)
|
|
|
|
def get_image_export(self, command: str) -> None:
|
|
"""Get image export"""
|
|
if not self.image_exportable[command]:
|
|
return
|
|
|
|
console.print(
|
|
f"[yellow]Exporting {command} image for {self.name} controller[/]"
|
|
)
|
|
try:
|
|
cmd_name = command.replace("call_", "")
|
|
|
|
other_args = [
|
|
"--export",
|
|
f"{'_'.join(self.controller.path)}_{cmd_name}.png",
|
|
]
|
|
|
|
for action in self.cmd_parsers[command]._actions: # pylint: disable=W0212
|
|
if action.dest == "strike":
|
|
other_args.extend(["--strike", "100"])
|
|
if action.dest == "target_dataset":
|
|
other_args.extend(["--dataset", "AAPL"])
|
|
if action.dest == "ticker":
|
|
other_args.extend(["--ticker", self.symbol])
|
|
if self.name == "forecast" and action.dest == "values":
|
|
other_args.extend(["--values", "AAPL.close"])
|
|
|
|
set_command_location(f"/{'/'.join(self.controller.path)}/{cmd_name}")
|
|
|
|
_ = getattr(self.controller, command)(other_args)
|
|
|
|
except (SystemExit, KeyError):
|
|
pass
|
|
|
|
def get_all_command_parsers(self) -> None:
|
|
"""Get all command parsers"""
|
|
for command in self.commands:
|
|
self.get_command_parser(command)
|
|
|
|
def has_commands(self) -> bool:
|
|
"""Checks if controller has commands"""
|
|
return len(self.commands) > 0
|
|
|
|
|
|
class LoadControllersDoc:
|
|
"""Class that loads all controllers and creates a ControllerDoc class instance for each one
|
|
|
|
|
|
Attributes
|
|
----------
|
|
controller_docs: Dict[str, ControllerDoc]
|
|
A dictionary of the controller name and the ControllerDoc class instance for that controller
|
|
|
|
Methods
|
|
-------
|
|
get_controllers()
|
|
Gets all controllers to create a ControllerDoc class instance for
|
|
get_controller_doc(controller: str)
|
|
Gets the ControllerDoc class instance for a controller
|
|
available_controllers()
|
|
Gets a list of available controllers names
|
|
"""
|
|
|
|
def __init__(self) -> None:
|
|
self.controller_docs: Dict[str, ControllerDoc] = {}
|
|
self.get_controllers()
|
|
|
|
def get_controllers(self) -> None:
|
|
"""Gets all controllers"""
|
|
for trailmap, module in self._get_modules().items():
|
|
for name, obj in getmembers(module):
|
|
if ( # noqa: SIM102
|
|
name != "TerminalController" and "BaseController" not in name
|
|
): # noqa: SIM102
|
|
if (
|
|
isclass(obj)
|
|
and issubclass(obj, BaseController)
|
|
and trailmap not in self.controller_docs
|
|
):
|
|
ctrl = ControllerDoc(obj, trailmap) # type: ignore
|
|
if ctrl.has_commands():
|
|
self.controller_docs[trailmap] = ctrl
|
|
|
|
def _get_modules(self) -> Dict[str, ModuleType]:
|
|
"""Gets all controllers modules"""
|
|
modules = {}
|
|
terminal_path = Path(openbb_terminal.__file__).parent
|
|
|
|
for file in terminal_path.glob("**/*controller.py"):
|
|
spec = spec_from_file_location(file.stem, file)
|
|
if spec is not None and spec.loader is not None:
|
|
module = module_from_spec(spec)
|
|
spec.loader.exec_module(module)
|
|
|
|
ctrl_path = (
|
|
str(file)
|
|
.replace(str(terminal_path), "")
|
|
.replace("\\", "/")
|
|
.split("/")[1:]
|
|
)
|
|
for sub_name, abbr in sub_folders_abbr.items():
|
|
ctrl_path = [
|
|
path.lower().replace(sub_name, abbr) for path in ctrl_path
|
|
]
|
|
|
|
trailmap = ".".join(ctrl_path[:-1])
|
|
if trailmap not in modules:
|
|
modules[trailmap] = module
|
|
|
|
return modules
|
|
|
|
def get_controller_doc(self, controller_name: str) -> ControllerDoc:
|
|
"""Get the ControllerDoc instance for a controller"""
|
|
if controller_name not in self.controller_docs:
|
|
raise KeyError(f"Controller {controller_name} not found")
|
|
|
|
return self.controller_docs[controller_name]
|
|
|
|
def available_controllers(self) -> List[str]:
|
|
"""Get available controllers"""
|
|
return [ctrl for ctrl in self.controller_docs if ctrl != ""]
|
|
|
|
|
|
if __name__ == "__main__":
|
|
try:
|
|
run_image_exports = "--images" in sys.argv
|
|
load_controllers = LoadControllersDoc()
|
|
|
|
for loaded in load_controllers.available_controllers():
|
|
controller_doc = load_controllers.get_controller_doc(loaded)
|
|
controller_doc.get_all_command_parsers()
|
|
if run_image_exports:
|
|
cfg.setup_config_terminal()
|
|
plots_backend().start()
|
|
plots_backend().isatty = True
|
|
get_current_user().preferences.USE_INTERACTIVE_DF = False
|
|
controller_doc.run_image_exports()
|
|
except KeyboardInterrupt:
|
|
sys.exit(0)
|