使用显式阈值进行从峰值到谷值增加/减少的 Python 机制标记

问题描述 投票:0回答:1

我试图找到时间序列中最长的时间段,其中从开始到结束的值至少增加一定的阈值,而中间没有任何下降至少该阈值。将时间序列想象成股票图表。假设门槛是 50 美元。找出并标记出股票上涨至少 50 美元且期间不下跌 50 美元的最长一段时间。这些长距离被称为“积极状态”。重复该过程,减少至少 50 美元,期间不增加 50 美元,并将这些标记为“负面状态”。

由于积极的制度从低谷开始并在峰值结束,而消极的制度从高峰开始并在低谷结束,因此制度永远不应该重叠。

下面是阈值为 2 的正向和负向状态的说明示例。请注意,第一个波峰和波谷处于中性状态,因为在第一个正向状态开始之前没有大于阈值 2 的下降。然而,最终的负值状态应该扩展到 -0.80,因为 -0.80 小于 -0.73(当前停止位置),并且当前端和正确端之间没有至少 2 的增加。

您将如何在

python
中实现这一点?我尝试过使用
scipy.signal.find_peaks
但无法让它工作,我觉得应该有一种我缺少的更简单的方法。

Example Regimes

#test.ipynb

import numpy as np
import pandas as pd
from regimes import plot_regimes, process_time_series

# Sample Data Generation
np.random.seed(0)
time = np.arange(0, 100, 1)
value = np.sin(np.linspace(0, 10 * np.pi, 100)) + np.random.normal(0, 1, 100)
df = pd.DataFrame({'Time': time, 'Value': value})

# Process the time series data
df_extrema, regimes_df, final_regimes_df = process_time_series(df, value_column='Value', threshold=2, time_column='Time')

# Plot the final regimes along with peaks and troughs
plot_regimes(df, df_extrema, final_regimes_df, value_column='Value', time_column='Time')
# regimes.py
import pandas as pd
import numpy as np
from scipy.signal import find_peaks
import matplotlib.pyplot as plt
import matplotlib.patches as mpatches

def find_extrema(df, time_column='Time', value_column='Value'):
    """
    Identifies peaks and troughs in the given time series data.
    
    Parameters:
    - df (pd.DataFrame): DataFrame containing time and value columns.
    - time_column (str): Name of the column containing the time data.
    - value_column (str): Name of the column containing the time series values.
    
    Returns:
    - df_extrema (pd.DataFrame): DataFrame with extrema points and their types.
    """
    peaks, _ = find_peaks(df[value_column])
    troughs, _ = find_peaks(-df[value_column])
    
    extrema_indices = list(peaks) + list(troughs)
    extrema_types = ['Peak'] * len(peaks) + ['Trough'] * len(troughs)
    
    # Include first and last points as extrema
    if 0 not in extrema_indices:
        first_val = df.iloc[0][value_column]
        second_val = df.iloc[1][value_column]
        extrema_indices.append(0)
        extrema_types.append('Peak' if first_val > second_val else 'Trough')
    
    last_idx = len(df) - 1
    if last_idx not in extrema_indices:
        last_val = df.iloc[last_idx][value_column]
        second_last_val = df.iloc[last_idx - 1][value_column]
        extrema_indices.append(last_idx)
        extrema_types.append('Peak' if last_val > second_last_val else 'Trough')
    
    # Create the extrema DataFrame
    extrema = sorted(extrema_indices)
    sorted_extrema_types = [extrema_types[extrema_indices.index(idx)] for idx in extrema]
    df_extrema = df.iloc[extrema].copy().reset_index(drop=True)
    df_extrema['Type'] = sorted_extrema_types
    
    return df_extrema

def label_regimes(df_extrema, threshold=0.25, value_column='Value', time_column='Time'):
    """
    Labels regimes based on transitions between extrema points.
    
    Parameters:
    - df_extrema (pd.DataFrame): DataFrame containing extrema points and types.
    - threshold (float): Minimum height change to consider as a regime.
    - value_column (str): Column name for time series values.
    - time_column (str): Column name for time values.
    
    Returns:
    - regimes_df (pd.DataFrame): DataFrame containing labeled regimes.
    """
    regimes = []
    for i in range(len(df_extrema) - 1):
        current = df_extrema.iloc[i]
        next_ = df_extrema.iloc[i + 1]
        height_change = next_[value_column] - current[value_column]
        
        if abs(height_change) >= threshold:
            regime_type = 'Positive' if current['Type'] == 'Trough' and next_['Type'] == 'Peak' else 'Negative'
            regimes.append({
                'Start_Time': current[time_column],
                'End_Time': next_[time_column],
                'Start_Type': current['Type'],
                'End_Type': next_['Type'],
                'Height_Change': height_change if regime_type == 'Positive' else -height_change,
                'Regime_Type': regime_type
            })
    
    return pd.DataFrame(regimes)

def merge_consecutive_regimes(regimes_df, df, time_column='Time', value_column='Value'):
    """
    Merges consecutive regimes of the same type.
    """
    if regimes_df.empty:
        return regimes_df.copy()
    
    merged_regimes = []
    current_regime = regimes_df.iloc[0].copy()
    
    for i in range(1, len(regimes_df)):
        next_regime = regimes_df.iloc[i]
        
        if next_regime['Regime_Type'] == current_regime['Regime_Type']:
            current_regime['End_Time'] = next_regime['End_Time']
            current_regime['End_Type'] = next_regime['End_Type']
            start_value = df.loc[df[time_column] == current_regime['Start_Time'], value_column].values[0]
            end_value = df.loc[df[time_column] == current_regime['End_Time'], value_column].values[0]
            
            current_regime['Height_Change'] = (
                end_value - start_value if current_regime['Regime_Type'] == 'Positive' else start_value - end_value
            )
        else:
            merged_regimes.append(current_regime)
            current_regime = next_regime.copy()
    
    merged_regimes.append(current_regime)
    return pd.DataFrame(merged_regimes).reset_index(drop=True)

def calculate_whitespace(merged_regimes_df, df, time_column='Time', value_column='Value'):
    """
    Identifies gaps between merged regimes as whitespace.
    """
    whitespace = []
    for i in range(len(merged_regimes_df) - 1):
        current_regime = merged_regimes_df.iloc[i]
        next_regime = merged_regimes_df.iloc[i + 1]
        
        current_end_time = current_regime['End_Time']
        next_start_time = next_regime['Start_Time']
        
        if next_start_time > current_end_time:
            current_end_value = df.loc[df[time_column] == current_end_time, value_column].values[0]
            next_start_value = df.loc[df[time_column] == next_start_time, value_column].values[0]
            whitespace_type = 'Positive' if current_end_value < next_start_value else 'Negative'
            
            whitespace.append({
                'Start_Time': current_end_time,
                'End_Time': next_start_time,
                'Start_Height': current_end_value,
                'End_Height': next_start_value,
                'Whitespace_Type': whitespace_type
            })
    
    return pd.DataFrame(whitespace)

def add_whitespace_as_regimes_and_merge(merged_regimes_df, whitespace_df, df, time_column='Time', value_column='Value'):
    """
    Adds whitespace as new regimes and merges consecutive regimes of the same type.
    """
    if whitespace_df.empty:
        return merged_regimes_df.copy()
    
    whitespace_regimes = []
    for _, ws in whitespace_df.iterrows():
        start_time = ws['Start_Time']
        end_time = ws['End_Time']
        whitespace_type = ws['Whitespace_Type']
        start_type = 'Trough' if whitespace_type == 'Positive' else 'Peak'
        end_type = 'Peak' if whitespace_type == 'Positive' else 'Trough'
        height_change = ws['End_Height'] - ws['Start_Height'] if whitespace_type == 'Positive' else ws['Start_Height'] - ws['End_Height']
        
        whitespace_regimes.append({
            'Start_Time': start_time,
            'End_Time': end_time,
            'Start_Type': start_type,
            'End_Type': end_type,
            'Height_Change': height_change,
            'Regime_Type': whitespace_type
        })
    
    whitespace_regimes_df = pd.DataFrame(whitespace_regimes)
    combined_regimes = pd.concat([merged_regimes_df, whitespace_regimes_df], ignore_index=True)
    combined_regimes = combined_regimes.sort_values(by='Start_Time').reset_index(drop=True)
    
    return merge_consecutive_regimes(combined_regimes, df, time_column, value_column)

def process_time_series(df, time_column='Time', value_column='Value', threshold=0.25):
    """
    Processes the time series data to identify, merge, and label regimes.
    
    Parameters:
    - df (pd.DataFrame): DataFrame containing time and value data.
    - time_column (str): Column name for time data.
    - value_column (str): Column name for time series values.
    - threshold (float): Height change threshold to identify significant regimes.
    
    Returns:
    - df_extrema (pd.DataFrame): DataFrame of identified peaks and troughs.
    - regimes_df (pd.DataFrame): DataFrame of initially labeled regimes.
    - final_regimes_df (pd.DataFrame): DataFrame of final labeled regimes.
    """
    if time_column not in df.columns or value_column not in df.columns:
        raise ValueError(f"DataFrame must contain '{time_column}' and '{value_column}' columns.")
    
    df = df.sort_values(by=time_column).reset_index(drop=True)
    df_extrema = find_extrema(df, time_column, value_column)
    regimes_df = label_regimes(df_extrema, threshold, value_column, time_column)
    merged_regimes_df = merge_consecutive_regimes(regimes_df, df, time_column, value_column)
    whitespace_df = calculate_whitespace(merged_regimes_df, df, time_column, value_column)
    final_regimes_df = add_whitespace_as_regimes_and_merge(merged_regimes_df, whitespace_df, df, time_column, value_column)
    
    return df_extrema, regimes_df, final_regimes_df

def plot_regimes(df, df_extrema, final_regimes_df, value_column='Value', time_column='Time'):
    """
    Plots the time series with shaded regimes, highlighting peaks and troughs and labeling their heights.
    
    Parameters:
    - df (pd.DataFrame): Original time series data.
    - df_extrema (pd.DataFrame): DataFrame with identified peaks and troughs.
    - final_regimes_df (pd.DataFrame): DataFrame containing the final regimes.
    - value_column (str): Column name for time series values.
    - time_column (str): Column name for time.
    
    Returns:
    - None: Displays the plot.
    """
    plt.figure(figsize=(14, 7))
    plt.plot(df[time_column], df[value_column], label=value_column, color='blue')
    
    # Plot peaks and troughs
    peaks = df_extrema[df_extrema['Type'] == 'Peak']
    troughs = df_extrema[df_extrema['Type'] == 'Trough']
    
    plt.plot(peaks[time_column], peaks[value_column], 'r^', markersize=10, label='Peaks')
    plt.plot(troughs[time_column], troughs[value_column], 'gv', markersize=10, label='Troughs')
    
    # Annotate peak and trough heights
    for _, row in peaks.iterrows():
        plt.text(row[time_column], row[value_column], f"{row[value_column]:.2f}", 
                 fontsize=9, color='red', ha='center', va='bottom')
    
    for _, row in troughs.iterrows():
        plt.text(row[time_column], row[value_column], f"{row[value_column]:.2f}", 
                 fontsize=9, color='green', ha='center', va='top')
    
    # Define colors for regimes
    color_map = {'Positive': 'green', 'Negative': 'orange'}
    
    # Shade final regimes
    for _, regime in final_regimes_df.iterrows():
        plt.axvspan(regime['Start_Time'], regime['End_Time'],
                    color=color_map[regime['Regime_Type']], alpha=0.3)
    
    # Create custom legend for shaded regimes
    positive_patch = mpatches.Patch(color='green', alpha=0.3, label='Positive Regime')
    negative_patch = mpatches.Patch(color='orange', alpha=0.3, label='Negative Regime')
    
    # Combine all legends
    plt.legend(handles=[
        plt.Line2D([], [], color='blue', label=value_column),
        plt.Line2D([], [], marker='^', color='r', linestyle='None', markersize=10, label='Peaks'),
        plt.Line2D([], [], marker='v', color='g', linestyle='None', markersize=10, label='Troughs'),
        positive_patch,
        negative_patch
    ], loc='upper right')
    
    # Enhance plot
    plt.title('Time Series with Shaded Positive and Negative Regimes')
    plt.xlabel(time_column)
    plt.ylabel(value_column)
    plt.grid(True)
    plt.tight_layout()
    plt.show()
python scikit-learn scipy time-series
1个回答
0
投票

这会给你预期的结果吗?代码不是很优美,我希望这是可以理解的!

我觉得这个想法和你的很相似。首先,它计算极值之间的差异 (

deltas
),然后评估其低于 (
negative_regime_change
) 或高于阈值 (
positive_regime_change
) 的位置。然后,它与当前制度进行比较(
current_regime
)。

import numpy as np
from scipy.signal import find_peaks

# Const
TRESHOLD=2

# Sample Data Generation
np.random.seed(0)
time = np.arange(0, 100, 1)
value = np.sin(np.linspace(0, 10 * np.pi, 100)) + np.random.normal(0, 1, 100)

# Find peaks and troughs
peaks, _ = find_peaks(value)
troughs, _ = find_peaks(-value)
extremas = np.sort(np.concatenate([peaks, troughs]))
# Add first and last values if necessary
if 0 not in extremas: extremas = np.concatenate([[0], extremas])
if (last:=(len(time)-1)) not in extremas: extremas = np.concatenate([extremas, [last]])

# Diff between extremas
deltas = np.diff(value[extremas])

# Check when a change of regime happen
positive_regime_change = deltas >= TRESHOLD    
negative_regime_change = deltas <= -TRESHOLD     
    
# Start regime
if positive_regime_change[0]:
    current_regime = "positive"
elif negative_regime_change[0]:
    current_regime = "negative"
else:
    current_regime = "neutral"

regimes = []

i = 0

for j in range(len(deltas)):
    
    if positive_regime_change[j]:
        if current_regime != "positive":
            regimes.append([extremas[i], extremas[j], current_regime])
            current_regime = "positive"
            i = j
    
    elif negative_regime_change[j]:
        if current_regime != "negative":
            regimes.append([extremas[i], extremas[j], current_regime])
            current_regime = "negative"
            i = j

regimes.append([extremas[i], extremas[j]+1, current_regime])

print(regimes)

© www.soinside.com 2019 - 2024. All rights reserved.