Files
OpenBB/cli/openbb_cli/controllers/script_parser.py
Andrew Barnes 1788e77fe1 [BugFix] Remove unnecessary eval() calls in script_parser.py (#7403)
* [BugFix] Remove unnecessary eval() calls in script_parser.py

Replace eval() with direct dict access and slice() construction.
Follows the same pattern as #7390 which removed eval() from utils.py.
No behavior change - all replacements are equivalent to the eval'd
expressions.

* style: format script_parser with black

---------

Co-authored-by: Danglewood <85772166+deeleeramone@users.noreply.github.com>
2026-03-13 04:58:52 +00:00

473 lines
19 KiB
Python

"""Routine functions for OpenBB Platform CLI."""
import re
from datetime import datetime, timedelta
from re import Match
from dateutil.relativedelta import relativedelta
from openbb_cli.session import Session
session = Session()
# pylint: disable=too-many-statements,consider-iterating-dictionary
# pylint: disable=too-many-branches,too-many-return-statements
# Necessary for OpenBB keywords
MONTHS_VALUE = {
"JANUARY": 1,
"FEBRUARY": 2,
"MARCH": 3,
"APRIL": 4,
"MAY": 5,
"JUNE": 6,
"JULY": 7,
"AUGUST": 8,
"SEPTEMBER": 9,
"OCTOBER": 10,
"NOVEMBER": 11,
"DECEMBER": 12,
}
WEEKDAY_VALUE = {
"MONDAY": 0,
"TUESDAY": 1,
"WEDNESDAY": 2,
"THURSDAY": 3,
"FRIDAY": 4,
"SATURDAY": 5,
"SUNDAY": 6,
}
def is_reset(command: str) -> bool:
"""Test whether a command is a reset command.
Parameters
----------
command : str
The command to test
Returns
-------
answer : bool
Whether the command is a reset command
"""
if "reset" in command:
return True
return command in ("r", "r\n")
def match_and_return_openbb_keyword_date(keyword: str) -> str: # noqa: PLR0911
"""Return OpenBB keyword into date.
Parameters
----------
keyword : str
String with potential OpenBB keyword (e.g. 1MONTHAGO,LASTFRIDAY,3YEARSFROMNOW,NEXTTUESDAY)
Returns
----------
str: Date with format YYYY-MM-DD
"""
now = datetime.now()
for i, regex in enumerate([r"^\$(\d+)([A-Z]+)AGO$", r"^\$(\d+)([A-Z]+)FROMNOW$"]):
match = re.match(regex, keyword)
if match:
integer_value = int(match.group(1))
time_unit = match.group(2)
clean_time = time_unit.upper()
if "DAYS" in clean_time or "MONTHS" in clean_time or "YEARS" in clean_time:
kwargs = {time_unit.lower(): integer_value}
if i == 0:
return (now - relativedelta(**kwargs)).strftime("%Y-%m-%d") # type: ignore
return (now + relativedelta(**kwargs)).strftime("%Y-%m-%d") # type: ignore
match = re.search(r"\$LAST(\w+)", keyword)
if match:
time_unit = match.group(1)
# Check if it corresponds to a month
if time_unit in list(MONTHS_VALUE.keys()):
the_year = now.year
# Calculate the year and month for last month date
if now.month <= MONTHS_VALUE[time_unit]:
# If the current month is greater than the last date month, it means it is this year
the_year = now.year - 1
return datetime(the_year, MONTHS_VALUE[time_unit], 1).strftime("%Y-%m-%d")
# Check if it corresponds to a week day
if time_unit in list(WEEKDAY_VALUE.keys()):
if datetime.weekday(now) > WEEKDAY_VALUE[time_unit]:
return (
now
- timedelta(datetime.weekday(now))
+ timedelta(WEEKDAY_VALUE[time_unit])
).strftime("%Y-%m-%d")
return (
now
- timedelta(7)
- timedelta(datetime.weekday(now))
+ timedelta(WEEKDAY_VALUE[time_unit])
).strftime("%Y-%m-%d")
match = re.search(r"\$NEXT(\w+)", keyword)
if match:
time_unit = match.group(1)
# Check if it corresponds to a month
if time_unit in list(MONTHS_VALUE.keys()):
# Calculate the year and month for next month date
if now.month < MONTHS_VALUE[time_unit]:
# If the current month is greater than the last date month, it means it is this year
return datetime(now.year, MONTHS_VALUE[time_unit], 1).strftime(
"%Y-%m-%d"
)
return datetime(now.year + 1, MONTHS_VALUE[time_unit], 1).strftime(
"%Y-%m-%d"
)
# Check if it corresponds to a week day
if time_unit in list(WEEKDAY_VALUE.keys()):
if datetime.weekday(now) < WEEKDAY_VALUE[time_unit]:
return (
now
- timedelta(datetime.weekday(now))
+ timedelta(WEEKDAY_VALUE[time_unit])
).strftime("%Y-%m-%d")
return (
now
+ timedelta(7)
- timedelta(datetime.weekday(now))
+ timedelta(WEEKDAY_VALUE[time_unit])
).strftime("%Y-%m-%d")
return ""
def parse_openbb_script( # noqa: PLR0911,PLR0912
raw_lines: list[str],
script_inputs: list[str] | None = None,
) -> tuple[str, str]:
"""Parse .openbb script.
Parameters
----------
raw_lines : List[str]
Lines from .openbb script
script_inputs: str, optional
Inputs to the script that come externally
Returns
-------
str
Error that occurred - if empty means no error
str
Processed string from .openbb script that can be run by the OpenBB Platform CLI
"""
ROUTINE_VARS: dict[str, str | list[str]] = dict()
if script_inputs:
ROUTINE_VARS["$ARGV"] = script_inputs
## PRE PROCESSING
# Remove reset commands, comments, empty lines and trailing/leading whitespaces
raw_lines = [
x.strip()
for x in raw_lines
if (not is_reset(x)) and ("#" not in x) and x.strip()
]
## LOOK FOR NEW VARIABLES BEING DECLARED FROM USERS
lines_without_declarations = list()
for line in raw_lines:
# Check if this line has a variable attribution
# This currently allows user to override ARGV parameter
if "$" in line and "=" in line:
match = re.search(r"\$(\w+)\s*=\s*([\w\d,-.\s]+)", line)
if match:
VAR_NAME = match.group(1)
VAR_VALUES = match.group(2)
ROUTINE_VARS["$" + VAR_NAME] = (
VAR_VALUES if "," not in VAR_VALUES else VAR_VALUES.split(",")
)
# Just throw a warning when user uses wrong convention
numdollars = len(re.findall(r"\$", line))
if numdollars > 1:
session.console.print(
f"The variable {VAR_NAME} should not be declared as "
f"{'$' * numdollars}{VAR_NAME}. Instead it will be "
f"converted into ${VAR_NAME}."
)
else:
lines_without_declarations.append(line)
else:
lines_without_declarations.append(line)
# At this stage our ROUTINE_VARS should be completed coming from external AND from internal
# Now we want to replace the ROUTINE_VARS to where applicable throughout the .openbb script
# Due to this implementation, a variable declared at the end will still be effective
lines_with_vars_replaced = list()
foreach_loop_found = False
for line in lines_without_declarations:
# Save temporary line to ensure that all vars get replaced by correct vars
templine = line
# Found 'end' keyword which means that a loop has terminated
if re.match(r"^\s*end\s*$", line, re.IGNORECASE):
# Check whether the foreach loop has started or not
if not foreach_loop_found:
return (
"[red]The script has a foreach loop that terminates before it gets started. "
"Add the keyword 'foreach' to explicitly start loop[/red]",
"",
)
foreach_loop_found = False
else:
# Found 'foreach' keyword which means there needs to be a matching 'end'
if re.search(r"foreach", line, re.IGNORECASE):
foreach_loop_found = True
# Regular expression pattern to match variables starting with $
pattern = r"(?<!\$)(\$(\w+)(\[[^\]]*\])?)(?=(?:[^\]]*\]*))"
# Find all matches of the pattern in the line
matches: list[Match[str]] | None = re.findall(pattern, line)
if matches:
for match in matches:
if match:
VAR_NAME = "$" + match[1]
VAR_SLICE = match[2][1:-1] if match[2] else ""
# Within a list refers to a single element
if VAR_SLICE.isdigit():
# This is an edge case for when the user has a variable such as $DATE = 2022-01-01
# We want the user to be able to access it with $DATE or $DATE[0] and the latest
# in python will only take the first '2'
if VAR_SLICE == "0":
if VAR_NAME in ROUTINE_VARS:
values = ROUTINE_VARS[VAR_NAME]
if isinstance(values, list):
templine = templine.replace(
match[0],
values[int(VAR_SLICE)],
)
else:
templine = templine.replace(match[0], values)
else:
return (
f"[red]Variable {VAR_NAME} not given for current routine script.[/red]",
"",
)
# Only enters here when any other index from 0 is used
elif VAR_NAME in ROUTINE_VARS:
variable = ROUTINE_VARS[VAR_NAME]
length_variable = (
len(variable) if isinstance(variable, list) else 1
)
# We use <= because we are using 0 index based lists
if length_variable <= int(VAR_SLICE):
return (
f"[red]Variable {VAR_NAME} only has "
f"{length_variable} elements and there "
f"was an attempt to access it with index {VAR_SLICE}.[/red]",
"",
)
templine = templine.replace(
match[0],
variable[int(VAR_SLICE)],
)
else:
return (
f"[red]Variable {VAR_NAME} not given for current routine script.[/red]",
"",
)
# Involves slicing which is a bit more tricky to use eval on
elif (
":" in VAR_SLICE
and len(VAR_SLICE.split(":")) == 2
and (
VAR_SLICE.split(":")[0].isdigit()
or VAR_SLICE.split(":")[1].isdigit()
)
):
parts = VAR_SLICE.split(":")
start = (
int(parts[0])
if parts[0] and parts[0].lstrip("-").isdigit()
else None
)
stop = (
int(parts[1])
if len(parts) > 1
and parts[1]
and parts[1].lstrip("-").isdigit()
else None
)
vars_to_loop = ROUTINE_VARS[VAR_NAME][slice(start, stop)]
# Check whether the slicing was successful or not
if vars_to_loop:
templine = templine.replace(
match[0],
",".join(vars_to_loop),
)
else:
return (
f"[red]The foreach loop cannot run with input: {match[0]}.[/red]",
"",
)
# Just replace value without slicing or list
else:
if VAR_SLICE:
# Check if the string starts with a minus sign
if VAR_SLICE.startswith("-"):
if not VAR_SLICE[1:].isdigit():
return (
f"[red]Index '{VAR_SLICE}' is not a value[/red]",
"",
)
if int(VAR_SLICE) < 0:
return (
f"[red]Negative index on {VAR_NAME} is not allowed[/red]",
"",
)
if not VAR_SLICE.isdigit():
return (
f"[red]Index '{VAR_SLICE}' is not a value[/red]",
"",
)
if VAR_NAME in ROUTINE_VARS:
value = ROUTINE_VARS[VAR_NAME]
# If the value is a list, we want to replace it with the whole list
if isinstance(value, list):
templine = templine.replace(
match[0],
",".join(value),
)
else:
templine = templine.replace(match[0], value)
else:
# Check if this is an OpenBB keyword variable like
# 1MONTHAGO,LASTFRIDAY,3YEARSFROMNOW,NEXTTUESDAY
# and decode it into the right date if it exists
potential_date_match = (
match_and_return_openbb_keyword_date(VAR_NAME)
)
if potential_date_match:
templine = templine.replace(
match[0], potential_date_match
)
else:
return (
f"[red]Variable {VAR_NAME} not given for current routine script.[/red]",
"",
)
lines_with_vars_replaced.append(templine)
# If this flags ends in True it means that the script routine has a foreach loop that never terminates
if foreach_loop_found:
return (
"[red]The script has a foreach loop that doesn't terminate. "
"Add the keyword 'end' to explicitly terminate loop[/red]",
"",
)
# Finally the only remaining thing to address are the foreach loops. For that we'll go through
# those lines and unroll the arguments that will be iterated by.
# Note that the fact that we checked before that the amount of foreach and end matches allow us
# to be confident that the script has no clear issues.
within_foreach = False
foreach_lines_loop: list[str] = list()
parsed_script = ""
final_lines = list()
varname = "VAR"
varused_inside = False
for line in lines_with_vars_replaced:
# Found 'foreach' header associated with loop
match = re.search(
r"foreach \$\$([A-Za-z\_]+) in ([A-Za-z0-9,-.]+)", line, re.IGNORECASE
)
if match:
varname = match.group(1)
foreach_loop = match.group(2).split(",")
within_foreach = True
# We are inside a loop and this is a line that we will want to replicate,
# so we need to temporarily store it until we reach the end
elif within_foreach:
# Found 'end' keyword which means that the foreach loop has reached the end
if re.match(r"^\s*end\s*$", line, re.IGNORECASE):
# Now we want to process what we were waiting for before
# Iterate through main foreach header
for var in foreach_loop:
# Iterate through all lines within foreach and end loop
for foreach_line_loop in foreach_lines_loop:
if f"$${varname}" in foreach_line_loop:
final_lines.append(
foreach_line_loop.replace(f"$${varname}", var).strip()
)
varused_inside = True
elif "$$" in foreach_line_loop:
return (
"[red]The script has a foreach loop that iterates through "
f"{','.join(foreach_loop)} with variable $${varname} "
"but another var name is being utilized instead[/red]",
"",
)
else:
final_lines.append(foreach_line_loop.strip())
if not varused_inside:
session.console.print(
f"The variable {varname} was used in foreach header but it wasn't used inside the loop."
)
varused_inside = False
# Since this has been processed we reset the foreach loop lines
within_foreach = False
foreach_lines_loop = list()
else:
foreach_lines_loop.append(line)
else:
final_lines.append(line)
# If the list is non null, then we want to convert this into a parsed string that is
# recognized by the OpenBB Platform CLI
if final_lines:
parsed_script = f"{'/'.join([line.rstrip() for line in final_lines])}".replace(
"//", "/home/"
)
if parsed_script[0] == "/":
# If the user had added a / at the beginning, then it was converted to //home/
# and we need to remove it
if parsed_script.startswith("//home"):
parsed_script = parsed_script[6:]
else:
# We want the script to start from the home menu, hence we add it if the user
# didn't add it
parsed_script = "/" + parsed_script
# If the script finishes with // it means that we converted it to /home/
# This means that we are expecting a command to follow, but since this is
# the end of the script, we need to remove the trailing /
if parsed_script.endswith("/home/"):
parsed_script = parsed_script[:-1]
return "", parsed_script