我有两个在 log10 时间戳上离散化的时间序列,我想使用 python 将它们相互卷积。对于线性间隔的时间序列,我使用
scipy.signal.convole
,效果很好。 log10离散化有类似的东西吗?截止频率根据高斯核 (sigma) 的标准偏差定义。目标是用相同的截止频率过滤日志。
import numpy as np
import matplotlib.pyplot as plt
import scipy.signal as sg
f = lambda t : np.sin(2 * t) + np.sin(0.5 * t)
sigma = 1
kernel = lambda x : np.exp(-x**2 / (2 * sigma**2)) / (np.sqrt(2 * np.pi) * sigma)
# LINEAR DISCRETIZATION
t_linear = np.arange(0, 100, 0.01)
signal_linearly_discretized = f(t_linear)
t_kernel_linear = np.arange(-10, 10, 0.01)
kernel_linearly_discretized = kernel(t_kernel_linear)
# Filter by convolution of kernel and signal
signal_filtered_linear = np.convolve(kernel_linearly_discretized, signal_linearly_discretized,
mode="same") / np.sum(kernel_linearly_discretized)
# LOG DISCRETIZATION
t_log = np.logspace(0, 100, 1000)
signal_log_discretized = f(t_log)
"""
Filtering signal_log_discretized is unclear
"""
# Filter by convolution of kernel and signal
signal_filtered_linear = np.convolve(kernel_linearly_discretized, signal_linearly_discretized,
mode="same") / np.sum(kernel_linearly_discretized)
# Plotting of linear signal and filtered signal
fig, axes = plt.subplots(1, 2)
ax = axes[0]
ax.plot(t_linear, signal_linearly_discretized)
ax.set_title("signal")
ax = axes[1]
ax.plot(t_linear, signal_filtered_linear)
ax.set_title("signal filtered")
fig.tight_layout()
在上面您可以看到我用来将高斯滤波器应用于线性离散信号的代码。我想对信号执行相同(或类似)的操作
signal_log_discretized
。
谢谢
如果我理解正确的话,这些函数不会被重新缩放——当支撑不是均匀间隔(均匀)的网格时,只是离散卷积。在此问题中,至少一个函数的离散支持类型为 {ce^a, ce^(2a), ce^(3a),...}。
我认为没有直接的 numpy 方法(?)。方法可能包括首先在均匀网格点上插值函数,然后像往常一样进行卷积。还有非均匀FFT方法(NUFFT等)。在我的一项应用中,由于网格的内存限制,插值并不实用。在这种情况下,我只是在每个卷积步骤中使用(对数)网格的常见细化。 (常见的改进与这篇文章combine two arrays and sort,以及其他相关。)
假设:
以下代码显示了线性采样函数和对数采样函数的卷积。它应该能够处理任何类型的网格配对(包括对数对数)。主要开销是常见的细化操作。它应该相当高效:对于具有组合 N 个点的两个网格,它以 O(N) 运算运行。为了获得最佳性能,可以将其放入 Cython 或 C/C++ 中(法学硕士如今的翻译工作非常好)。
准备网格和函数(我将使用高斯):
# linear mesh
lin_width = 0.1
lin_lo, lin_hi = 1.5*2.0, 1.5*(1+np.exp(1))
linear_mesh_centers = np.arange(lin_lo,lin_hi+lin_width/2,lin_width)
# make Gaussian function, linear mesh support
mu_lin = (lin_lo+lin_hi)/2.0
vr_lin = ((lin_hi-lin_lo)/8)**2
f_lin = (1/np.sqrt(2*np.pi*vr_lin))*np.exp(-(linear_mesh_centers-mu_lin)**2/(
2*vr_lin))
linear_mesh_bins = np.arange(lin_lo-lin_width/2,lin_hi+lin_width,lin_width) #
# mesh point linear_mesh_centers[ii] is centered in linear_mesh_bins[ii],
# linear_mesh_bins[ii+1]
# log mesh
base_width = 0.1
base_lo, base_hi = 0.0, 1.0
base_mesh = np.arange(base_lo-base_width,base_hi+base_width,base_width)
log_mesh_centers = np.exp(base_mesh)[1:-1]
# make Gaussian function, log mesh support
mu_log = (log_mesh_centers[0]+log_mesh_centers[-1])/2.0
vr_log = ((log_mesh_centers[-1]-log_mesh_centers[0])/8)**2
f_log = (1/np.sqrt(2*np.pi*vr_log))*np.exp(-(log_mesh_centers-mu_log)**2/(
2*vr_log))
cc = (np.exp(base_width)-1)/(np.exp(base_width)+1)
log_mesh_bins = np.concatenate([[log_mesh_centers[0]*(1-cc)],
log_mesh_centers*(1+cc)]) # this makes
# the function samples centered in each bin, in the log mesh
卷积循环:
# extend linear mesh just enough to accomodate log_mesh support too (total
# convolution support)
tmp_1 = int(np.floor((log_mesh_centers[-1]-log_mesh_centers[0])/lin_width))
extended_lin_centers = np.concatenate([linear_mesh_centers,
[linear_mesh_centers[-1]+ii*lin_width
for ii in range(1,tmp_1+1)]])
# convolution loop, based on linear mesh centers
out_fun = []
for ix_lin, pt in enumerate(extended_lin_centers): # go through each
# shift-step in the convolution (using linear mesh centers as dominant)
shift_bins = pt - (log_mesh_bins[-1::-1] - log_mesh_centers[0]) #
# partitions for f_log(pt-x); center of log_mesh_bin[0] aligns with pt,
# in the coordinates of pt (and log_mesh_bins are "flipped" +/- for
# the convolution)
# find all bin points in each mesh covering the common refinement
ix = bisect.bisect_right(shift_bins, linear_mesh_bins[0]) # ix is
# shift_bins bin index just above linear_mesh_bins[0]
if ix > 0:
ix -= 1
tmp_log_bins = shift_bins[ix:]
ix = bisect.bisect_left(linear_mesh_bins,shift_bins[-1])
if ix < len(linear_mesh_bins):
ix += 1
tmp_lin_bins = linear_mesh_bins[:ix]
# ivl_lst contains interval endpoints and bin mappings:
# ((start_of_bin, end_of_bin),(lin_bin_index,flipped_log_bin_index))
ivl_lst = common_refinement(tmp_lin_bins, tmp_log_bins)
ln_lg = len(tmp_log_bins)-1-1
cnv_val = sum([(edges[1]-edges[0])*f_lin[ix[0]]*f_log[ln_lg-ix[1]] for
edges,ix in ivl_lst]) # value of convolution at this point
out_fun.append((pt+log_mesh_centers[0],cnv_val)) # record (x,y) result
# from convolution
常用细化函数:
def adv_bins(bins_a, bins_b, ref_ix, new_ix, alt_ix):
# bins_a is "reference" bins; bins_b is the "alt" bins
ret_status = None
if bins_a[new_ix] <= bins_b[alt_ix]:
interval_info = ((bins_a[ref_ix], bins_a[new_ix]), (ref_ix, alt_ix - 1))
ref_ix = new_ix
if ref_ix >= len(bins_a) - 1:
ret_status = "stop"
elif bins_a[new_ix] == bins_b[alt_ix]:
alt_ix += 1
if alt_ix >= len(bins_b):
ret_status = "stop"
else:
interval_info = ((bins_a[ref_ix], bins_b[alt_ix]), (ref_ix, alt_ix - 1))
ref_ix = alt_ix
if ref_ix >= len(bins_b) - 1:
ret_status = "stop"
else:
alt_ix = new_ix
ret_status = "flip"
return interval_info, ref_ix, alt_ix, ret_status
def common_refinement(tmp_lin_bins, tmp_log_bins):
ivl_lst = []
# prep bins arrays:
if tmp_lin_bins[0] >= tmp_log_bins[0]:
ix = bisect.bisect_right(tmp_log_bins,
tmp_lin_bins[0]) # next mesh point
# on log mesh strictly abv tmp_lin_bins[0]
ref_ix, alt_ix = 0, ix
bins_a, bins_b = tmp_lin_bins, tmp_log_bins # reference function
# is over tmp_lin_bins
ref_mode = 0
else:
ix = bisect.bisect_right(tmp_lin_bins,
tmp_log_bins[0]) # next mesh point
# on lin mesh strictly abv tmp_log_bins[0]
ref_ix, alt_ix = 0, ix
bins_a, bins_b = tmp_log_bins, tmp_lin_bins # reference function
# is over tmp_log_bins
ref_mode = 1
# do the refinement:
ret_status = None
while True:
new_ix = ref_ix + 1 # advance reference function's bins
interval_info, ref_ix, alt_ix, ret_status = adv_bins(bins_a, bins_b,
ref_ix, new_ix, alt_ix)
tup_ixs = interval_info[1]
if ref_mode == 1: # interval_info always reports bin indices in order
# (lin bins, log bins)
tup_ixs = (tup_ixs[1], tup_ixs[0])
ivl_lst.append((interval_info[0], tup_ixs))
if ret_status == "flip": # flip reference and alt bins
ref_mode = 1 - ref_mode
if ref_mode == 0:
bins_a, bins_b = tmp_lin_bins, tmp_log_bins
else:
bins_a, bins_b = tmp_log_bins, tmp_lin_bins
elif ret_status == "stop":
break
return ivl_lst
情节:
plt.plot(*tuple(zip(*out_fun)),label="result")
plt.plot(log_mesh_centers,f_log,color='green',label="log")
plt.plot(linear_mesh_centers,f_lin,color='red',label="lin")
mu = mu_lin+mu_log
vr = vr_lin+vr_log
out_domain = extended_lin_centers+log_mesh_centers[0]
check_gauss = (1/np.sqrt(2*np.pi*vr))*np.exp(-(
out_domain-mu)**2/(2*vr))
plt.plot(out_domain,check_gauss,color='yellow',linestyle='dashed',
label="actual")
plt.legend(loc="upper right")
plt.show()