我一直在关注Timely (and Stateful) Processing with Apache Beam文章,虽然全面而且写得很好,但没有说明如何用python实现相同的目标。更具体地说,它指出:
Beam的Python SDK尚不支持状态和定时器。
它没有说明这一点的原因......是否有一个天生的原因,为什么这是不可能的?
我正在寻求为我希望实现的信号处理系统实现重放缓冲/窗口系统。因此,使用最新窗口不断更新长度为W的特征的滑动窗口/历史帧缓冲器。
在Java中,它的实现如下所示:
static class FeatureFrameBuffer扩展DoFn,FeatureFrame> {Integer bufferSize;
public FeatureFrameBuffer(Integer bufferSize) {
this.bufferSize = bufferSize;
}
@StateId("buffer")
private final StateSpec<BagState<KV<String, Double>>> bufferedFeatures = StateSpecs.bag();
@StateId("count")
private final StateSpec<ValueState<Integer>> countState = StateSpecs.value();
@ProcessElement
public void process(
ProcessContext context,
@StateId("buffer") BagState<KV<String, Double>> bufferState,
@StateId("count") ValueState<Integer> countState
) {
int count = firstNonNull(countState.read(), 0);
count = count + 1;
countState.write(count);
bufferState.add(context.element());
// Only output buffer if count is greater than bufferSize
// Remove last element from buffer if count
// greater than or equals buferSize
if (count >= bufferSize) {
bufferState.read();
createFeatureFrame();
context.output(featureFrame);
bufferState.clear();
countState.clear();
}
}
}
我想知道在开始开发自定义实现之前是否可以使用python sdk实现相同的功能。关于此事的一些建议会很棒。
截至今天,Python SDK对状态处理的支持仍然是一个悬而未决的问题。请参阅https://issues.apache.org/jira/browse/BEAM-2687,它被this票据阻止:“实施Beam Python用户状态和计时器API”,虽然正在积极进行中。
从Beam版本2.9.0开始提供用户状态和定时器。该文档尚未更新。