我试图找到时间序列中最长的时间段,其中从开始到结束的值至少增加一定的阈值,而中间没有任何下降至少该阈值。将时间序列想象成股票图表。假设门槛是 50 美元。找出并标记出股票上涨至少 50 美元且期间不下跌 50 美元的最长一段时间。这些长距离被称为“积极状态”。重复该过程,减少至少 50 美元,期间不增加 50 美元,并将这些标记为“负面状态”。
由于积极的制度从低谷开始并在峰值结束,而消极的制度从高峰开始并在低谷结束,因此制度永远不应该重叠。
下面是阈值为 2 的正向和负向状态的说明示例。请注意,第一个波峰和波谷处于中性状态,因为在第一个正向状态开始之前没有大于阈值 2 的下降。然而,最终的负值状态应该扩展到 -0.80,因为 -0.80 小于 -0.73(当前停止位置),并且当前端和正确端之间没有至少 2 的增加。
您将如何在
python
中实现这一点?我尝试过使用 scipy.signal.find_peaks
但无法让它工作,我觉得应该有一种我缺少的更简单的方法。
#test.ipynb
import numpy as np
import pandas as pd
from regimes import plot_regimes, process_time_series
# Sample Data Generation
np.random.seed(0)
time = np.arange(0, 100, 1)
value = np.sin(np.linspace(0, 10 * np.pi, 100)) + np.random.normal(0, 1, 100)
df = pd.DataFrame({'Time': time, 'Value': value})
# Process the time series data
df_extrema, regimes_df, final_regimes_df = process_time_series(df, value_column='Value', threshold=2, time_column='Time')
# Plot the final regimes along with peaks and troughs
plot_regimes(df, df_extrema, final_regimes_df, value_column='Value', time_column='Time')
# regimes.py
import pandas as pd
import numpy as np
from scipy.signal import find_peaks
import matplotlib.pyplot as plt
import matplotlib.patches as mpatches
def find_extrema(df, time_column='Time', value_column='Value'):
"""
Identifies peaks and troughs in the given time series data.
Parameters:
- df (pd.DataFrame): DataFrame containing time and value columns.
- time_column (str): Name of the column containing the time data.
- value_column (str): Name of the column containing the time series values.
Returns:
- df_extrema (pd.DataFrame): DataFrame with extrema points and their types.
"""
peaks, _ = find_peaks(df[value_column])
troughs, _ = find_peaks(-df[value_column])
extrema_indices = list(peaks) + list(troughs)
extrema_types = ['Peak'] * len(peaks) + ['Trough'] * len(troughs)
# Include first and last points as extrema
if 0 not in extrema_indices:
first_val = df.iloc[0][value_column]
second_val = df.iloc[1][value_column]
extrema_indices.append(0)
extrema_types.append('Peak' if first_val > second_val else 'Trough')
last_idx = len(df) - 1
if last_idx not in extrema_indices:
last_val = df.iloc[last_idx][value_column]
second_last_val = df.iloc[last_idx - 1][value_column]
extrema_indices.append(last_idx)
extrema_types.append('Peak' if last_val > second_last_val else 'Trough')
# Create the extrema DataFrame
extrema = sorted(extrema_indices)
sorted_extrema_types = [extrema_types[extrema_indices.index(idx)] for idx in extrema]
df_extrema = df.iloc[extrema].copy().reset_index(drop=True)
df_extrema['Type'] = sorted_extrema_types
return df_extrema
def label_regimes(df_extrema, threshold=0.25, value_column='Value', time_column='Time'):
"""
Labels regimes based on transitions between extrema points.
Parameters:
- df_extrema (pd.DataFrame): DataFrame containing extrema points and types.
- threshold (float): Minimum height change to consider as a regime.
- value_column (str): Column name for time series values.
- time_column (str): Column name for time values.
Returns:
- regimes_df (pd.DataFrame): DataFrame containing labeled regimes.
"""
regimes = []
for i in range(len(df_extrema) - 1):
current = df_extrema.iloc[i]
next_ = df_extrema.iloc[i + 1]
height_change = next_[value_column] - current[value_column]
if abs(height_change) >= threshold:
regime_type = 'Positive' if current['Type'] == 'Trough' and next_['Type'] == 'Peak' else 'Negative'
regimes.append({
'Start_Time': current[time_column],
'End_Time': next_[time_column],
'Start_Type': current['Type'],
'End_Type': next_['Type'],
'Height_Change': height_change if regime_type == 'Positive' else -height_change,
'Regime_Type': regime_type
})
return pd.DataFrame(regimes)
def merge_consecutive_regimes(regimes_df, df, time_column='Time', value_column='Value'):
"""
Merges consecutive regimes of the same type.
"""
if regimes_df.empty:
return regimes_df.copy()
merged_regimes = []
current_regime = regimes_df.iloc[0].copy()
for i in range(1, len(regimes_df)):
next_regime = regimes_df.iloc[i]
if next_regime['Regime_Type'] == current_regime['Regime_Type']:
current_regime['End_Time'] = next_regime['End_Time']
current_regime['End_Type'] = next_regime['End_Type']
start_value = df.loc[df[time_column] == current_regime['Start_Time'], value_column].values[0]
end_value = df.loc[df[time_column] == current_regime['End_Time'], value_column].values[0]
current_regime['Height_Change'] = (
end_value - start_value if current_regime['Regime_Type'] == 'Positive' else start_value - end_value
)
else:
merged_regimes.append(current_regime)
current_regime = next_regime.copy()
merged_regimes.append(current_regime)
return pd.DataFrame(merged_regimes).reset_index(drop=True)
def calculate_whitespace(merged_regimes_df, df, time_column='Time', value_column='Value'):
"""
Identifies gaps between merged regimes as whitespace.
"""
whitespace = []
for i in range(len(merged_regimes_df) - 1):
current_regime = merged_regimes_df.iloc[i]
next_regime = merged_regimes_df.iloc[i + 1]
current_end_time = current_regime['End_Time']
next_start_time = next_regime['Start_Time']
if next_start_time > current_end_time:
current_end_value = df.loc[df[time_column] == current_end_time, value_column].values[0]
next_start_value = df.loc[df[time_column] == next_start_time, value_column].values[0]
whitespace_type = 'Positive' if current_end_value < next_start_value else 'Negative'
whitespace.append({
'Start_Time': current_end_time,
'End_Time': next_start_time,
'Start_Height': current_end_value,
'End_Height': next_start_value,
'Whitespace_Type': whitespace_type
})
return pd.DataFrame(whitespace)
def add_whitespace_as_regimes_and_merge(merged_regimes_df, whitespace_df, df, time_column='Time', value_column='Value'):
"""
Adds whitespace as new regimes and merges consecutive regimes of the same type.
"""
if whitespace_df.empty:
return merged_regimes_df.copy()
whitespace_regimes = []
for _, ws in whitespace_df.iterrows():
start_time = ws['Start_Time']
end_time = ws['End_Time']
whitespace_type = ws['Whitespace_Type']
start_type = 'Trough' if whitespace_type == 'Positive' else 'Peak'
end_type = 'Peak' if whitespace_type == 'Positive' else 'Trough'
height_change = ws['End_Height'] - ws['Start_Height'] if whitespace_type == 'Positive' else ws['Start_Height'] - ws['End_Height']
whitespace_regimes.append({
'Start_Time': start_time,
'End_Time': end_time,
'Start_Type': start_type,
'End_Type': end_type,
'Height_Change': height_change,
'Regime_Type': whitespace_type
})
whitespace_regimes_df = pd.DataFrame(whitespace_regimes)
combined_regimes = pd.concat([merged_regimes_df, whitespace_regimes_df], ignore_index=True)
combined_regimes = combined_regimes.sort_values(by='Start_Time').reset_index(drop=True)
return merge_consecutive_regimes(combined_regimes, df, time_column, value_column)
def process_time_series(df, time_column='Time', value_column='Value', threshold=0.25):
"""
Processes the time series data to identify, merge, and label regimes.
Parameters:
- df (pd.DataFrame): DataFrame containing time and value data.
- time_column (str): Column name for time data.
- value_column (str): Column name for time series values.
- threshold (float): Height change threshold to identify significant regimes.
Returns:
- df_extrema (pd.DataFrame): DataFrame of identified peaks and troughs.
- regimes_df (pd.DataFrame): DataFrame of initially labeled regimes.
- final_regimes_df (pd.DataFrame): DataFrame of final labeled regimes.
"""
if time_column not in df.columns or value_column not in df.columns:
raise ValueError(f"DataFrame must contain '{time_column}' and '{value_column}' columns.")
df = df.sort_values(by=time_column).reset_index(drop=True)
df_extrema = find_extrema(df, time_column, value_column)
regimes_df = label_regimes(df_extrema, threshold, value_column, time_column)
merged_regimes_df = merge_consecutive_regimes(regimes_df, df, time_column, value_column)
whitespace_df = calculate_whitespace(merged_regimes_df, df, time_column, value_column)
final_regimes_df = add_whitespace_as_regimes_and_merge(merged_regimes_df, whitespace_df, df, time_column, value_column)
return df_extrema, regimes_df, final_regimes_df
def plot_regimes(df, df_extrema, final_regimes_df, value_column='Value', time_column='Time'):
"""
Plots the time series with shaded regimes, highlighting peaks and troughs and labeling their heights.
Parameters:
- df (pd.DataFrame): Original time series data.
- df_extrema (pd.DataFrame): DataFrame with identified peaks and troughs.
- final_regimes_df (pd.DataFrame): DataFrame containing the final regimes.
- value_column (str): Column name for time series values.
- time_column (str): Column name for time.
Returns:
- None: Displays the plot.
"""
plt.figure(figsize=(14, 7))
plt.plot(df[time_column], df[value_column], label=value_column, color='blue')
# Plot peaks and troughs
peaks = df_extrema[df_extrema['Type'] == 'Peak']
troughs = df_extrema[df_extrema['Type'] == 'Trough']
plt.plot(peaks[time_column], peaks[value_column], 'r^', markersize=10, label='Peaks')
plt.plot(troughs[time_column], troughs[value_column], 'gv', markersize=10, label='Troughs')
# Annotate peak and trough heights
for _, row in peaks.iterrows():
plt.text(row[time_column], row[value_column], f"{row[value_column]:.2f}",
fontsize=9, color='red', ha='center', va='bottom')
for _, row in troughs.iterrows():
plt.text(row[time_column], row[value_column], f"{row[value_column]:.2f}",
fontsize=9, color='green', ha='center', va='top')
# Define colors for regimes
color_map = {'Positive': 'green', 'Negative': 'orange'}
# Shade final regimes
for _, regime in final_regimes_df.iterrows():
plt.axvspan(regime['Start_Time'], regime['End_Time'],
color=color_map[regime['Regime_Type']], alpha=0.3)
# Create custom legend for shaded regimes
positive_patch = mpatches.Patch(color='green', alpha=0.3, label='Positive Regime')
negative_patch = mpatches.Patch(color='orange', alpha=0.3, label='Negative Regime')
# Combine all legends
plt.legend(handles=[
plt.Line2D([], [], color='blue', label=value_column),
plt.Line2D([], [], marker='^', color='r', linestyle='None', markersize=10, label='Peaks'),
plt.Line2D([], [], marker='v', color='g', linestyle='None', markersize=10, label='Troughs'),
positive_patch,
negative_patch
], loc='upper right')
# Enhance plot
plt.title('Time Series with Shaded Positive and Negative Regimes')
plt.xlabel(time_column)
plt.ylabel(value_column)
plt.grid(True)
plt.tight_layout()
plt.show()
这会给你预期的结果吗?代码不是很优美,我希望这是可以理解的!
我觉得这个想法和你的很相似。首先,它计算极值之间的差异 (
deltas
),然后评估其低于 (negative_regime_change
) 或高于阈值 (positive_regime_change
) 的位置。然后,它与当前制度进行比较(current_regime
)。
import numpy as np
from scipy.signal import find_peaks
# Const
TRESHOLD=2
# Sample Data Generation
np.random.seed(0)
time = np.arange(0, 100, 1)
value = np.sin(np.linspace(0, 10 * np.pi, 100)) + np.random.normal(0, 1, 100)
# Find peaks and troughs
peaks, _ = find_peaks(value)
troughs, _ = find_peaks(-value)
extremas = np.sort(np.concatenate([peaks, troughs]))
# Add first and last values if necessary
if 0 not in extremas: extremas = np.concatenate([[0], extremas])
if (last:=(len(time)-1)) not in extremas: extremas = np.concatenate([extremas, [last]])
# Diff between extremas
deltas = np.diff(value[extremas])
# Check when a change of regime happen
positive_regime_change = deltas >= TRESHOLD
negative_regime_change = deltas <= -TRESHOLD
# Start regime
if positive_regime_change[0]:
current_regime = "positive"
elif negative_regime_change[0]:
current_regime = "negative"
else:
current_regime = "neutral"
regimes = []
i = 0
for j in range(len(deltas)):
if positive_regime_change[j]:
if current_regime != "positive":
regimes.append([extremas[i], extremas[j], current_regime])
current_regime = "positive"
i = j
elif negative_regime_change[j]:
if current_regime != "negative":
regimes.append([extremas[i], extremas[j], current_regime])
current_regime = "negative"
i = j
regimes.append([extremas[i], extremas[j]+1, current_regime])
print(regimes)