mirror of
https://github.com/OpenBB-finance/OpenBB.git
synced 2026-06-01 15:33:00 +08:00
* round 1 * black * more black * providers a-thru-fmp * charting settings * charting backend * black * charting_settings * missing nasdaq import * nasdaq mypy * fed money measure * some linting * black * more linting * more linting * charting imports * black * pylint * refactor fred * more linting * rest of the providers * didn't want that file * more cleanup * missed * ta_helpers * router views * chart ta_class * forgot a file * forgot another file * biztoc * package builder * technical router * fred params * bump finviz * fix tmx * more cleanup * try again * currency historical * equity search * missed files in commit * fix some tests * black * fix more test cassettes * more patchy * black * linters * more tests * black * mypy * delete unused model * more linting * forgot files * more linting * make up your mind, black, and stop moving pylint for me * more black * back to where we were before black destroyed it * .......... * .................. * unused fetcher test * remove standard model validator * commodity router * treasury prices * oecd tests * finra * container cleanup * black * forgotten file * linter * remove wrapped "" * fix merge * patches * ecb test * finra cassette * finviz cassettes * nasdaq cassettes * oecd cassettes * yfinance cassettes * more test cassettes * black * merge cleanup * cboe helpers * black * fix typo --------- Co-authored-by: Henrique Joaquim <henriquecjoaquim@gmail.com> Co-authored-by: Igor Radovanovic <74266147+IgorWounds@users.noreply.github.com>
620 lines
17 KiB
Python
620 lines
17 KiB
Python
"""Technical Analysis Helpers."""
|
|
|
|
# pylint: disable=too-many-arguments, too-many-locals
|
|
|
|
from typing import TYPE_CHECKING, Any, List, Literal, Optional, Tuple, Union
|
|
from warnings import warn
|
|
|
|
if TYPE_CHECKING:
|
|
from pandas import DataFrame, Series, Timestamp
|
|
|
|
|
|
def validate_data(data: list, length: Union[int, List[int]]) -> None:
|
|
"""Validate data."""
|
|
if isinstance(length, int):
|
|
length = [length]
|
|
for item in length:
|
|
if item > len(data):
|
|
raise ValueError(
|
|
f"Data length is less than required by parameters: {max(length)}"
|
|
)
|
|
|
|
|
|
def parkinson(
|
|
data: "DataFrame",
|
|
window: int = 30,
|
|
trading_periods: Optional[int] = None,
|
|
is_crypto: bool = False,
|
|
clean=True,
|
|
) -> "DataFrame":
|
|
"""Parkinson volatility.
|
|
|
|
Uses the high and low price of the day rather than just close to close prices.
|
|
It is useful for capturing large price movements during the day.
|
|
|
|
Parameters
|
|
----------
|
|
data : DataFrame
|
|
Dataframe of OHLC prices.
|
|
window : int [default: 30]
|
|
Length of window to calculate over.
|
|
trading_periods : Optional[int] [default: 252]
|
|
Number of trading periods in a year.
|
|
is_crypto : bool [default: False]
|
|
If true, trading_periods is defined as 365.
|
|
clean : bool [default: True]
|
|
Whether to clean the data or not by dropping NaN values.
|
|
|
|
Returns
|
|
-------
|
|
DataFrame : results
|
|
Dataframe with results.
|
|
"""
|
|
# pylint: disable=import-outside-toplevel
|
|
from numpy import log
|
|
|
|
if window < 1:
|
|
warn("Error: Window must be at least 1, defaulting to 30.")
|
|
window = 30
|
|
|
|
if trading_periods and is_crypto:
|
|
warn("is_crypto is overridden by trading_periods.")
|
|
|
|
if not trading_periods:
|
|
trading_periods = 365 if is_crypto else 252
|
|
|
|
rs = (1.0 / (4.0 * log(2.0))) * ((data["high"] / data["low"]).apply(log)) ** 2.0
|
|
|
|
def f(v):
|
|
return (trading_periods * v.mean()) ** 0.5
|
|
|
|
result = rs.rolling(window=window, center=False).apply(func=f)
|
|
|
|
if clean:
|
|
return result.dropna()
|
|
|
|
return result
|
|
|
|
|
|
def standard_deviation(
|
|
data: "DataFrame",
|
|
window: int = 30,
|
|
trading_periods: Optional[int] = None,
|
|
is_crypto: bool = False,
|
|
clean: bool = True,
|
|
) -> "DataFrame":
|
|
"""Calculate the Standard deviation.
|
|
|
|
Measures how widely returns are dispersed from the average return.
|
|
It is the most common (and biased) estimator of volatility.
|
|
|
|
Parameters
|
|
----------
|
|
data : DataFrame
|
|
Dataframe of OHLC prices.
|
|
window : int [default: 30]
|
|
Length of window to calculate over.
|
|
trading_periods : Optional[int] [default: 252]
|
|
Number of trading periods in a year.
|
|
is_crypto : bool [default: False]
|
|
If true, trading_periods is defined as 365.
|
|
clean : bool [default: True]
|
|
Whether to clean the data or not by dropping NaN values.
|
|
|
|
Returns
|
|
-------
|
|
DataFrame : results
|
|
Dataframe with results.
|
|
"""
|
|
# pylint: disable=import-outside-toplevel
|
|
from numpy import log, sqrt
|
|
|
|
if window < 2:
|
|
warn("Error: Window must be at least 2, defaulting to 30.")
|
|
window = 30
|
|
|
|
if trading_periods and is_crypto:
|
|
warn("is_crypto is overridden by trading_periods.")
|
|
|
|
if not trading_periods:
|
|
trading_periods = 365 if is_crypto else 252
|
|
|
|
log_return = (data["close"] / data["close"].shift(1)).apply(log)
|
|
|
|
result = log_return.rolling(window=window, center=False).std() * sqrt(
|
|
trading_periods
|
|
)
|
|
|
|
if clean:
|
|
return result.dropna()
|
|
|
|
return result
|
|
|
|
|
|
def garman_klass(
|
|
data: "DataFrame",
|
|
window: int = 30,
|
|
trading_periods: Optional[int] = None,
|
|
is_crypto: bool = False,
|
|
clean=True,
|
|
) -> "DataFrame":
|
|
"""Garman-Klass volatility.
|
|
|
|
Extends Parkinson volatility by taking into account the opening and closing price.
|
|
As markets are most active during the opening and closing of a trading session.
|
|
It makes volatility estimation more accurate.
|
|
|
|
Parameters
|
|
----------
|
|
data : DataFrame
|
|
Dataframe of OHLC prices.
|
|
window : int [default: 30]
|
|
Length of window to calculate over.
|
|
trading_periods : Optional[int] [default: 252]
|
|
Number of trading periods in a year.
|
|
is_crypto : bool [default: False]
|
|
If true, trading_periods is defined as 365.
|
|
clean : bool [default: True]
|
|
Whether to clean the data or not by dropping NaN values.
|
|
|
|
Returns
|
|
-------
|
|
DataFrame : results
|
|
Dataframe with results.
|
|
"""
|
|
# pylint: disable=import-outside-toplevel
|
|
from numpy import log
|
|
|
|
if window < 1:
|
|
warn("Error: Window must be at least 1, defaulting to 30.")
|
|
window = 30
|
|
|
|
if trading_periods and is_crypto:
|
|
warn("is_crypto is overridden by trading_periods.")
|
|
|
|
if not trading_periods:
|
|
trading_periods = 365 if is_crypto else 252
|
|
|
|
log_hl = (data["high"] / data["low"]).apply(log)
|
|
log_co = (data["close"] / data["open"]).apply(log)
|
|
|
|
rs = 0.5 * log_hl**2 - (2 * log(2) - 1) * log_co**2
|
|
|
|
def f(v):
|
|
return (trading_periods * v.mean()) ** 0.5
|
|
|
|
result = rs.rolling(window=window, center=False).apply(func=f)
|
|
|
|
if clean:
|
|
return result.dropna()
|
|
|
|
return result
|
|
|
|
|
|
def hodges_tompkins(
|
|
data: "DataFrame",
|
|
window: int = 30,
|
|
trading_periods: Optional[int] = None,
|
|
is_crypto: bool = False,
|
|
clean=True,
|
|
) -> "DataFrame":
|
|
"""Hodges-Tompkins volatility.
|
|
|
|
Is a bias correction for estimation using an overlapping data sample.
|
|
It produces unbiased estimates and a substantial gain in efficiency.
|
|
|
|
Parameters
|
|
----------
|
|
data : DataFrame
|
|
Dataframe of OHLC prices.
|
|
window : int [default: 30]
|
|
Length of window to calculate over.
|
|
trading_periods : Optional[int] [default: 252]
|
|
Number of trading periods in a year.
|
|
is_crypto : bool [default: False]
|
|
If true, trading_periods is defined as 365.
|
|
clean : bool [default: True]
|
|
Whether to clean the data or not by dropping NaN values.
|
|
|
|
Returns
|
|
-------
|
|
DataFrame : results
|
|
Dataframe with results.
|
|
|
|
Example
|
|
-------
|
|
>>> data = obb.equity.price.historical('BTC-USD')
|
|
>>> df = obb.technical.hodges_tompkins(data, is_crypto = True)
|
|
"""
|
|
# pylint: disable=import-outside-toplevel
|
|
from numpy import log, sqrt
|
|
|
|
if window < 2:
|
|
warn("Error: Window must be at least 2, defaulting to 30.")
|
|
window = 30
|
|
|
|
if trading_periods and is_crypto:
|
|
warn("is_crypto is overridden by trading_periods.")
|
|
|
|
if not trading_periods:
|
|
trading_periods = 365 if is_crypto else 252
|
|
|
|
log_return = (data["close"] / data["close"].shift(1)).apply(log)
|
|
|
|
vol = log_return.rolling(window=window, center=False).std() * sqrt(trading_periods)
|
|
|
|
h = window
|
|
n = (log_return.count() - h) + 1
|
|
|
|
adj_factor = 1.0 / (1.0 - (h / n) + ((h**2 - 1) / (3 * n**2)))
|
|
|
|
result = vol * adj_factor
|
|
|
|
if clean:
|
|
return result.dropna()
|
|
|
|
return result
|
|
|
|
|
|
def rogers_satchell(
|
|
data: "DataFrame",
|
|
window: int = 30,
|
|
trading_periods: Optional[int] = None,
|
|
is_crypto: bool = False,
|
|
clean=True,
|
|
) -> "Series":
|
|
"""Rogers-Satchell Estimator.
|
|
|
|
Is an estimator for measuring the volatility with an average return not equal to zero.
|
|
Unlike Parkinson and Garman-Klass estimators, Rogers-Satchell incorporates a drift term,
|
|
mean return not equal to zero.
|
|
|
|
Parameters
|
|
----------
|
|
data : DataFrame
|
|
Dataframe of OHLC prices.
|
|
window : int [default: 30]
|
|
Length of window to calculate over.
|
|
trading_periods : Optional[int] [default: 252]
|
|
Number of trading periods in a year.
|
|
is_crypto : bool [default: False]
|
|
If true, trading_periods is defined as 365.
|
|
clean : bool [default: True]
|
|
Whether to clean the data or not by dropping NaN values.
|
|
|
|
Returns
|
|
-------
|
|
Series : results
|
|
Pandas Series with results.
|
|
"""
|
|
# pylint: disable=import-outside-toplevel
|
|
from numpy import log
|
|
|
|
if window < 1:
|
|
warn("Error: Window must be at least 1, defaulting to 30.")
|
|
window = 30
|
|
|
|
if trading_periods and is_crypto:
|
|
warn("is_crypto is overridden by trading_periods.")
|
|
|
|
if not trading_periods:
|
|
trading_periods = 365 if is_crypto else 252
|
|
|
|
log_ho = (data["high"] / data["open"]).apply(log)
|
|
log_lo = (data["low"] / data["open"]).apply(log)
|
|
log_co = (data["close"] / data["open"]).apply(log)
|
|
|
|
rs = log_ho * (log_ho - log_co) + log_lo * (log_lo - log_co)
|
|
|
|
def f(v):
|
|
return (trading_periods * v.mean()) ** 0.5
|
|
|
|
result = rs.rolling(window=window, center=False).apply(func=f)
|
|
|
|
if clean:
|
|
return result.dropna()
|
|
|
|
return result
|
|
|
|
|
|
def yang_zhang(
|
|
data: "DataFrame",
|
|
window: int = 30,
|
|
trading_periods: Optional[int] = None,
|
|
is_crypto: bool = False,
|
|
clean=True,
|
|
) -> "DataFrame":
|
|
"""Yang-Zhang Volatility.
|
|
|
|
Is the combination of the overnight (close-to-open volatility).
|
|
It is a weighted average of the Rogers-Satchell volatility and the open-to-close volatility.
|
|
|
|
Parameters
|
|
----------
|
|
data : DataFrame
|
|
Dataframe of OHLC prices.
|
|
window : int [default: 30]
|
|
Length of window to calculate standard deviation.
|
|
trading_periods : Optional[int] [default: 252]
|
|
Number of trading periods in a year.
|
|
is_crypto : bool [default: False]
|
|
If true, trading_periods is defined as 365.
|
|
clean : bool [default: True]
|
|
Whether to clean the data or not by dropping NaN values.
|
|
|
|
Returns
|
|
-------
|
|
DataFrame : results
|
|
Dataframe with results.
|
|
"""
|
|
# pylint: disable=import-outside-toplevel
|
|
from numpy import log, sqrt
|
|
|
|
if window < 2:
|
|
warn("Error: Window must be at least 2, defaulting to 30.")
|
|
window = 30
|
|
|
|
if trading_periods and is_crypto:
|
|
warn("is_crypto is overridden by trading_periods.")
|
|
|
|
if not trading_periods:
|
|
trading_periods = 365 if is_crypto else 252
|
|
|
|
log_ho = (data["high"] / data["open"]).apply(log)
|
|
log_lo = (data["low"] / data["open"]).apply(log)
|
|
log_co = (data["close"] / data["open"]).apply(log)
|
|
|
|
log_oc = (data["open"] / data["close"].shift(1)).apply(log)
|
|
log_oc_sq = log_oc**2
|
|
|
|
log_cc = (data["close"] / data["close"].shift(1)).apply(log)
|
|
log_cc_sq = log_cc**2
|
|
|
|
rs = log_ho * (log_ho - log_co) + log_lo * (log_lo - log_co)
|
|
|
|
close_vol = log_cc_sq.rolling(window=window, center=False).sum() * (
|
|
1.0 / (window - 1.0)
|
|
)
|
|
open_vol = log_oc_sq.rolling(window=window, center=False).sum() * (
|
|
1.0 / (window - 1.0)
|
|
)
|
|
window_rs = rs.rolling(window=window, center=False).sum() * (1.0 / (window - 1.0))
|
|
|
|
k = 0.34 / (1.34 + (window + 1) / (window - 1))
|
|
result = (open_vol + k * close_vol + (1 - k) * window_rs).apply(sqrt) * sqrt(
|
|
trading_periods
|
|
)
|
|
|
|
if clean:
|
|
return result.dropna()
|
|
|
|
return result
|
|
|
|
|
|
def calculate_cones(
|
|
data: "DataFrame",
|
|
lower_q: float,
|
|
upper_q: float,
|
|
is_crypto: bool,
|
|
model: Literal[
|
|
"std",
|
|
"parkinson",
|
|
"garman_klass",
|
|
"hodges_tompkins",
|
|
"rogers_satchell",
|
|
"yang_zhang",
|
|
],
|
|
trading_periods: Optional[int] = None,
|
|
) -> "DataFrame":
|
|
"""Calculate Cones."""
|
|
# pylint: disable=import-outside-toplevel
|
|
from pandas import DataFrame
|
|
|
|
estimator = DataFrame()
|
|
|
|
if lower_q > upper_q:
|
|
lower_q, upper_q = upper_q, lower_q
|
|
|
|
if (lower_q >= 1) or (upper_q >= 1):
|
|
raise ValueError("Error: lower_q and upper_q must be between 0 and 1")
|
|
|
|
lower_q_label = str(int(lower_q * 100))
|
|
upper_q_label = str(int(upper_q * 100))
|
|
quantiles = [lower_q, upper_q]
|
|
windows = [3, 10, 30, 60, 90, 120, 150, 180, 210, 240, 300, 360]
|
|
min_ = []
|
|
max_ = []
|
|
median = []
|
|
top_q = []
|
|
bottom_q = []
|
|
realized = []
|
|
allowed_windows = []
|
|
data = data.sort_index(ascending=True)
|
|
|
|
model_functions = {
|
|
"std": standard_deviation,
|
|
"parkinson": parkinson,
|
|
"garman_klass": garman_klass,
|
|
"hodges_tompkins": hodges_tompkins,
|
|
"rogers_satchell": rogers_satchell,
|
|
"yang_zhang": yang_zhang,
|
|
}
|
|
|
|
for window in windows:
|
|
estimator = model_functions[model]( # type: ignore
|
|
window=window,
|
|
data=data,
|
|
is_crypto=is_crypto,
|
|
trading_periods=trading_periods,
|
|
)
|
|
|
|
if estimator.empty:
|
|
continue
|
|
|
|
min_.append(estimator.min()) # type: ignore
|
|
max_.append(estimator.max()) # type: ignore
|
|
median.append(estimator.median()) # type: ignore
|
|
top_q.append(estimator.quantile(quantiles[1])) # type: ignore
|
|
bottom_q.append(estimator.quantile(quantiles[0])) # type: ignore
|
|
realized.append(estimator[-1]) # type: ignore
|
|
|
|
allowed_windows.append(window)
|
|
|
|
df_ = [realized, min_, bottom_q, median, top_q, max_]
|
|
df_windows = allowed_windows
|
|
df = DataFrame(df_, columns=df_windows)
|
|
df = df.rename(
|
|
index={
|
|
0: "realized",
|
|
1: "min",
|
|
2: f"lower_{lower_q_label}%",
|
|
3: "median",
|
|
4: f"upper_{upper_q_label}%",
|
|
5: "max",
|
|
}
|
|
)
|
|
cones_df = df.copy()
|
|
return cones_df.transpose().reset_index().rename(columns={"index": "window"})
|
|
|
|
|
|
def clenow_momentum(
|
|
values: "Series", window: int = 90
|
|
) -> Tuple[float, float, "Series"]:
|
|
"""Clenow Volatility Adjusted Momentum.
|
|
|
|
This is defined as the regression coefficient on log prices multiplied by the R^2
|
|
value of the regression.
|
|
|
|
Parameters
|
|
----------
|
|
values: Series
|
|
Values to perform regression for
|
|
window: int
|
|
Length of look back period
|
|
|
|
Returns
|
|
-------
|
|
float:
|
|
R2 of fit to log data
|
|
float:
|
|
Coefficient of linear regression
|
|
Series:
|
|
Values for best fit line
|
|
"""
|
|
# pylint: disable=import-outside-toplevel
|
|
from numpy import arange, exp, log
|
|
from pandas import Series
|
|
from sklearn.linear_model import LinearRegression
|
|
|
|
if len(values) < window:
|
|
raise ValueError(f"Calculation asks for at least last {window} days of data")
|
|
|
|
values = values[-window:]
|
|
|
|
y = log(values)
|
|
X = arange(len(y)).reshape(-1, 1) # pylint: disable=invalid-name
|
|
|
|
lr = LinearRegression()
|
|
lr.fit(X, y)
|
|
|
|
r2 = lr.score(X, y)
|
|
coef = lr.coef_[0]
|
|
annualized_coef = (exp(coef) ** 252) - 1
|
|
|
|
return r2, annualized_coef, Series(lr.predict(X))
|
|
|
|
|
|
def calculate_fib_levels(
|
|
data: "DataFrame",
|
|
close_col: str,
|
|
limit: int = 120,
|
|
start_date: Optional[Any] = None,
|
|
end_date: Optional[Any] = None,
|
|
) -> Tuple["DataFrame", "Timestamp", "Timestamp", float, float, str]:
|
|
"""Calculate Fibonacci levels.
|
|
|
|
Parameters
|
|
----------
|
|
data : DataFrame
|
|
Dataframe of prices
|
|
close_col : str
|
|
Column name of close prices
|
|
limit : int
|
|
Days to look back for retracement
|
|
start_date : Any
|
|
Custom start date for retracement
|
|
end_date : Any
|
|
Custom end date for retracement
|
|
|
|
Returns
|
|
-------
|
|
df : DataFrame
|
|
Dataframe of fib levels
|
|
min_date: Timestamp
|
|
Date of min point
|
|
max_date: Timestamp:
|
|
Date of max point
|
|
min_pr: float
|
|
Price at min point
|
|
max_pr: float
|
|
Price at max point
|
|
"""
|
|
# pylint: disable=import-outside-toplevel
|
|
from pandas import DataFrame
|
|
|
|
if close_col not in data.columns:
|
|
raise ValueError(f"Column {close_col} not in data")
|
|
|
|
if start_date and end_date:
|
|
if start_date not in data.index:
|
|
date0 = data.index[data.index.get_indexer([end_date], method="nearest")[0]]
|
|
warn(f"Start date not in data. Using nearest: {date0}")
|
|
else:
|
|
date0 = start_date
|
|
if end_date not in data.index:
|
|
date1 = data.index[data.index.get_indexer([end_date], method="nearest")[0]]
|
|
warn(f"End date not in data. Using nearest: {date1}")
|
|
else:
|
|
date1 = end_date
|
|
|
|
data0 = data.loc[date0, close_col]
|
|
data1 = data.loc[date1, close_col]
|
|
|
|
min_pr = min(data0, data1)
|
|
max_pr = max(data0, data1)
|
|
|
|
if min_pr == data0:
|
|
min_date = date0
|
|
max_date = date1
|
|
else:
|
|
min_date = date1
|
|
max_date = date0
|
|
else:
|
|
data_to_use = data.iloc[-limit:, :][close_col]
|
|
|
|
min_pr = data_to_use.min()
|
|
min_date = data_to_use.idxmin()
|
|
max_pr = data_to_use.max()
|
|
max_date = data_to_use.idxmax()
|
|
|
|
fib_levels = [0, 0.235, 0.382, 0.5, 0.618, 0.65, 1]
|
|
|
|
lvl_text: str = "left" if min_date < max_date else "right"
|
|
if min_date > max_date:
|
|
min_date, max_date = max_date, min_date
|
|
min_pr, max_pr = max_pr, min_pr
|
|
|
|
price_dif = max_pr - min_pr
|
|
|
|
levels = [
|
|
round(max_pr - price_dif * f_lev, (2 if f_lev > 1 else 4))
|
|
for f_lev in fib_levels
|
|
]
|
|
|
|
df = DataFrame()
|
|
df["Level"] = fib_levels
|
|
df["Level"] = df["Level"].apply(lambda x: str(x * 100) + "%")
|
|
df["Price"] = levels
|
|
|
|
return df, min_date, max_date, min_pr, max_pr, lvl_text
|