OHLC 数据采用以下格式:
open_time open high low close
2023-07-02 0.12800000 0.12800000 0.12090000 0.12390000
2023-07-03 0.12360000 0.13050000 0.12220000 0.12830000
2023-07-04 0.12830000 0.12830000 0.12320000 0.12410000
2023-07-05 0.12410000 0.12530000 0.11800000 0.11980000
2023-07-06 0.11990000 0.12270000 0.11470000 0.11500000
date1 val1 date2 val2 <---up
date2 val2 date3 val3 <---down
date3 val3 date4 val4 <---up
date4 val4 date5 val5 <---down
2023-07-02 0.1280 2023-07-02 0.1209 -5.55%
2023-07-02 0.1209 2023-07-03 0.1305 7.94%
2023-07-03 0.1305 2023-07-06 0.1147 -12.11%
2022-02-25 38340.4200 39699.0000 38038.4600 39237.0600
2022-02-26 39237.0700 40300.0000 38600.4600 39138.1100
2022-02-27 39138.1100 39881.7700 37027.5500 37714.4300
2022-02-28 37714.4200 44200.0000 37468.2800 43181.2700
2022-03-01 43176.4100 44968.1300 42838.6800 44434.0900
2022-02-25 38038 2022-02-26 40300 5.95%
2022-02-26 40300 2022-02-26 38600 -4.22%
2022-02-26 38600 2022-02-27 39881 3.32%
2022-02-27 39881 2022-02-27 37027 -7.16%
2022-02-27 37027 2022-02-28 44200 19.37%
2022-02-28 44200 2022-03-01 42838 -3.08%
这是一个简单的解决方案,将每条每日 OHLC 行分成四个(天、值)条目。然后,我们处理每个条目(顺序取决于方向),同时记录局部最小值/最大值(“峰值”),合并连续运行并跳过不重要的运动。
有两个 NamedTuple's:
(对于(日,值)对)和 Movement
(对于结果的每一行)。我本可以使用元组,但 NamedTuple 为每个字段提供了清晰的名称。
它也不依赖于 numpy、pandas 或任何其他库,如果与 mypy 等静态检查器一起使用,类型提示有助于在编译时捕获错误。对于纯 Python 解决方案来说,它也应该相当快,因为它一次性计算所有运动。
from typing import Iterator, NamedTuple
Entry = NamedTuple('Entry', [('value', float), ('date', str)])
Movement = NamedTuple('Movement', [('start', Entry), ('end', Entry), ('percentage', float)])
get_change = lambda a, b: (b.value - a.value) / a.value
def get_movements(data_str: str, min_change_percent: float = 0.05) -> Iterator[Movement]:
""" Return all movements with changes above a threshold. """
peaks: list[Entry] = []
for line in data_str.strip().split('\n'):
# Read lines from input and split into date and values.
date, open, high, low, close = line.split()
# Order values according to movement direction.
values_str = [open, low, high, close] if close > open else [open, high, low, close]
for value_str in values_str:
entry = Entry(float(value_str), date)
if len(peaks) >= 2 and (entry > peaks[-1]) == (peaks[-1] > peaks[-2]):
# Continue movement of same direction by replacing last peak.
peaks[-1] = entry
elif not peaks or abs(get_change(peaks[-1], entry)) >= min_change_percent:
# New peak is above minimum threshold.
# Convert every pair of remaining peaks to a `Movement`.
for start, end in zip(peaks, peaks[1:]):
yield Movement(start, end, percentage=get_change(start, end))
data_str = """
2023-07-02 0.12800000 0.12800000 0.12090000 0.12390000
2023-07-03 0.12360000 0.13050000 0.12220000 0.12830000
2023-07-04 0.12830000 0.12830000 0.12320000 0.12410000
2023-07-05 0.12410000 0.12530000 0.11800000 0.11980000
2023-07-06 0.11990000 0.12270000 0.11470000 0.11500000
for mov in get_movements(data_str, 0.05):
print(f'{mov.start.date} {mov.start.value:.4f} {mov.end.date} {mov.end.value:.4f} {mov.percentage:.2%}')
# 2023-07-02 0.1280 2023-07-02 0.1209 -5.55%
# 2023-07-02 0.1209 2023-07-03 0.1305 7.94%
# 2023-07-03 0.1305 2023-07-06 0.1147 -12.11%
data_str = """
2022-02-25 38340.4200 39699.0000 38038.4600 39237.0600
2022-02-26 39237.0700 40300.0000 38600.4600 39138.1100
2022-02-27 39138.1100 39881.7700 37027.5500 37714.4300
2022-02-28 37714.4200 44200.0000 37468.2800 43181.2700
2022-03-01 43176.4100 44968.1300 42838.6800 44434.0900
for mov in get_movements(data_str, 0.03):
print(f'{mov.start.date} {int(mov.start.value)} {mov.end.date} {int(mov.end.value)} {mov.percentage:.2%}')
# 2022-02-25 38340 2022-02-26 40300 5.11%
# 2022-02-26 40300 2022-02-26 38600 -4.22%
# 2022-02-26 38600 2022-02-27 39881 3.32%
# 2022-02-27 39881 2022-02-27 37027 -7.16%
# 2022-02-27 37027 2022-02-28 44200 19.37%
# 2022-02-28 44200 2022-03-01 42838 -3.08%
# 2022-03-01 42838 2022-03-01 44968 4.97%
而不是 38340
。我想不出比 @BoppreH 更好的方法来实际实现峰值确定的业务逻辑。
我创建一个可配置的过滤器,将其应用于 DataFrame
def min_percent_change_filter(min_change_percent=0.05):
peaks = []
get_change = lambda a, b: (b - a) / a
def add_entry(row):
"""By @BoppreH, with slight modifications
Update list of peaks with one new entry."""
if len(peaks) >= 2 and (row["data"] > peaks[-1]["data"]) == (
peaks[-1]["data"] > peaks[-2]["data"]
# Continue movement of same direction by replacing last peak.
peaks[-1] = row.copy()
return peaks
elif (
not peaks
or abs(get_change(peaks[-1]["data"], row["data"])) >= min_change_percent
# New peak is above minimum threshold.
return peaks
return peaks
return add_entry
部分需要相当多的操作才能使数据达到正确的形状。形状正确后,我们跨行应用过滤器。最后我们将 DataFrame
import pandas as pd
def pandas_approach(data, min_pct_change):
df = pd.DataFrame(data)
df["open_time"] = pd.to_datetime(df["open_time"])
# Respect termporal aspect, create new columns first and second
# set them to the respective value depending on whether we're
# moving down or up
df["first"] = df["low"].where(df["open"] <= df["close"], df["high"])
df["second"] = df["high"].where(df["open"] <= df["close"], df["low"])
# Create a new representation of the data, by stacking first and second
# on the index, then sorting by 'open_time' and whether it came first
# or second (Note: assert 'first' < 'second')
stacked_representation = (
df.set_index("open_time")[["first", "second"]]
.sort_values(["open_time", "level_1"])[["open_time", 0]]
stacked_representation.columns = ["open_time", "data"]
# Now we can go to work with our filter
results = pd.DataFrame(
stacked_representation.apply(min_percent_change_filter(min_pct_change), axis=1)[
# We reshape /rename/reorder our data to fit the desired output format
results["begin"] = results["data"].shift()
results["begin_date"] = results["open_time"].shift()
results = results.dropna()[["begin_date", "begin", "open_time", "data"]]
results.columns = ["begin_date", "begin", "end_date", "end"]
# Lastly add the pct change
results["pct_change"] = (results.end - results.begin) / results.begin
# This returns the styler for output formatting purposes, but you can return the
# DataFrame instead by commenting/deleting it
def format_datetime(dt):
return pd.to_datetime(dt).strftime("%Y-%m-%d")
def price_formatter(value):
return "{:.4f}".format(value) if abs(value) < 10000 else "{:.0f}".format(value)
return results.style.format(
"pct_change": "{:,.2%}".format,
"begin_date": format_datetime,
"end_date": format_datetime,
"begin": price_formatter,
"end": price_formatter,
import pandas as pd
data = {
"open_time": ["2023-07-02", "2023-07-03", "2023-07-04", "2023-07-05", "2023-07-06"],
"open": [0.12800000, 0.12360000, 0.12830000, 0.12410000, 0.11990000],
"high": [0.12800000, 0.13050000, 0.12830000, 0.12530000, 0.12270000],
"low": [0.12090000, 0.12220000, 0.12320000, 0.11800000, 0.11470000],
"close": [0.12390000, 0.12830000, 0.12410000, 0.11980000, 0.11500000],
begin_date begin end_date end pct_change
1 2023-07-02 0.1280 2023-07-02 0.1209 -5.55%
3 2023-07-02 0.1209 2023-07-03 0.1305 7.94%
9 2023-07-03 0.1305 2023-07-06 0.1147 -12.11%
data_2 = {
"open_time": ["2022-02-25", "2022-02-26", "2022-02-27", "2022-02-28", "2022-03-01"],
"open": [38340.4200, 39237.0700, 39138.1100, 37714.4200, 43176.4100],
"high": [39699.0000, 40300.0000, 39881.7700, 44200.0000, 44968.1300],
"low": [38038.4600, 38600.4600, 37027.5500, 37468.2800, 42838.6800],
"close": [39237.0600, 39138.1100, 37714.4300, 43181.2700, 44434.0900],
pandas_approach(data_2, 0.03)
begin_date begin end_date end pct_change
2 2022-02-25 38038 2022-02-26 40300 5.95%
3 2022-02-26 40300 2022-02-26 38600 -4.22%
4 2022-02-26 38600 2022-02-27 39882 3.32%
5 2022-02-27 39882 2022-02-27 37028 -7.16%
7 2022-02-27 37028 2022-02-28 44200 19.37%
8 2022-02-28 44200 2022-03-01 42839 -3.08%
9 2022-03-01 42839 2022-03-01 44968 4.97%
输入:Pandas DataFrame,包含列:开盘价、最高价、最低价、收盘价。
。 is_max.
不显着的高点和低点不是通过百分比过滤掉的,而是基于 ATR(平均真实范围)乘以某个系数。
import pandas as pd
from utils.import_data import import_ohlc_daily
def add_atr_col_to_df(
df: pd.DataFrame, n: int = 5, exponential: bool = False
) -> pd.DataFrame:
Add ATR (Average True Range) column to DataFrame.
Average True Range is a volatility estimate.
n - number of periods.
If exponential is true,
use ewm - exponentially weighted values,
to give more weight to the recent data point.
Otherwise, calculate simple moving average.
data = df.copy(deep=True)
high = data["High"]
low = data["Low"]
close = data["Close"]
data["tr0"] = abs(high - low)
data["tr1"] = abs(high - close.shift())
data["tr2"] = abs(low - close.shift())
data["tr"] = data[["tr0", "tr1", "tr2"]].max(axis=1)
if exponential:
data[f"atr_{n}"] = (
data["tr"].ewm(alpha=2 / (n + 1), min_periods=n, adjust=False).mean()
data[f"atr_{n}"] = data["tr"].rolling(window=n, min_periods=n).mean()
del data["tr0"]
del data["tr1"]
del data["tr2"]
# del data["tr"]
return data
class MinMaxProcessor:
def __init__(self, df: pd.DataFrame):
self.df = df.copy()
if not "is_min" in self.df.columns:
self.df["is_min"] = False
if not "is_max" in self.df.columns:
self.df["is_max"] = False
def fill_is_min_max(self):
Process all rows and fill is_min and is_max columns in self.df
current_candidate = {
"date": self.df.index[0],
"close_price_val": self.df["Close"].iloc[0],
"extremum_type": "max", # this choice is arbitrary, could be min, doesn't matter
counter = 0
total_count = self.df.shape[0]
for i, row in self.df.iterrows():
counter = counter + 1
print(f"Iterating row {counter} of {total_count}...")
# looking for max
if current_candidate["extremum_type"] == "max":
if row["Close"] >= current_candidate["close_price_val"]:
current_candidate["close_price_val"] = row["Close"]
current_candidate["date"] = i
elif (current_candidate["close_price_val"] - row["Close"]) > (
self.df.index == current_candidate["date"], "is_max"
] = True
current_candidate["extremum_type"] = "min"
current_candidate["date"] = i
current_candidate["close_price_val"] = row["Close"]
else: # looking for min, current_candidate['extremum_type'] == 'min'
if row["Close"] <= current_candidate["close_price_val"]:
current_candidate["close_price_val"] = row["Close"]
current_candidate["date"] = i
elif (row["Close"] - current_candidate["close_price_val"]) > (
self.df.index == current_candidate["date"], "is_min"
] = True
current_candidate["extremum_type"] = "max"
current_candidate["date"] = i
current_candidate["close_price_val"] = row["Close"]
if __name__ == "__main__":
ticker = "AAPL"
# import daily OHLC data from somewhere
res = import_ohlc_daily(ticker=ticker)
res = add_atr_col_to_df(df=res, n=ATR_SMOOTHING_N, exponential=False)
aapl_processor = MinMaxProcessor(df=res)