数据流:在事件流中查找上一个事件

问题描述 投票:1回答:1

恢复我想要在Google Dataflow中使用Apache Beam做的事情就像Azure Stream Analytics中的LAG

使用我收到数据的X分钟窗口:

||||||  ||||||  ||||||  ||||||  ||||||  ||||||
|  1 |  |  2 |  |  3 |  |  4 |  |  5 |  |  6 | 
|id=x|  |id=x|  |id=x|  |id=x|  |id=x|  |id=x| 
|||||| ,|||||| ,|||||| ,|||||| ,|||||| ,|||||| , ...

我需要比较数据(n)和数据(n-1),例如,跟上面的例子,它将是这样的:

if data(6) inside and data(5)  outside then ... 
if data(5) inside and data(4)  outside then ... 
if data(4) inside and data(3)  outside then ... 
if data(3) inside and data(2)  outside then ... 
if data(2) inside and data(1)  outside then ... 

有没有“实用”的方法来做到这一点?

python google-cloud-platform google-cloud-dataflow apache-beam
1个回答
1
投票

使用Beam,如docs中所述,每个键和窗口都保持状态。因此,您无法访问以前窗口中的值。

要做你想做的事,你可能需要一个更复杂的管道设计。我的想法,这里作为一个例子,将在ParDo中复制您的消息:

  • 将它们未经修改地发送到主输出
  • 同时,将它们发送到具有单窗口滞后的侧输出

要执行第二个项目符号点,我们可以将窗口的持续时间(WINDOW_SECONDS)添加到元素时间戳:

class DuplicateWithLagDoFn(beam.DoFn):

  def process(self, element, timestamp=beam.DoFn.TimestampParam):
    # Main output gets unmodified element
    yield element
    # The same element is emitted to the side output with a 1-window lag added to timestamp
    yield beam.pvalue.TaggedOutput('lag_output', beam.window.TimestampedValue(element, timestamp + WINDOW_SECONDS))

我们调用函数指定正确的标记:

beam.ParDo(DuplicateWithLagDoFn()).with_outputs('lag_output', main='main_output')

然后将相同的窗口方案应用于两者,按键共同组合等。

windowed_main = results.main_output | 'Window main output' >> beam.WindowInto(window.FixedWindows(WINDOW_SECONDS))
windowed_lag = results.lag_output | 'Window lag output' >> beam.WindowInto(window.FixedWindows(WINDOW_SECONDS))

merged = (windowed_main, windowed_lag) | 'Join Pcollections' >> beam.CoGroupByKey()

最后,我们可以在同一个ParDo中包含两个值(旧的和新的):

class CompareDoFn(beam.DoFn):

  def process(self, element):
    logging.info("Combined with previous vale: {}".format(element))

    try:
      old_value = int(element[1][1][0].split(',')[1])
    except:
      old_value = 0

    try:
      new_value = int(element[1][0][0].split(',')[1])
    except:
      new_value = 0

    logging.info("New value: {}, Old value: {}, Difference: {}".format(new_value, old_value, new_value - old_value))
    return (element[0], new_value - old_value)

为了测试这个,我使用直接运行程序运行管道,并且在单独的shell上,我发布两个超过10秒的消息(在我的情况下WINDOW_SECONDS是10秒):

gcloud pubsub topics publish lag --message="test,120"
sleep 12
gcloud pubsub topics publish lag --message="test,40"

作业输出显示了预期的差异:

INFO:root:New message: (u'test', u'test,120')
INFO:root:Combined with previous vale: (u'test', ([u'test,120'], []))
INFO:root:New value: 120, Old value: 0, Difference: 120
INFO:root:New message: (u'test', u'test,40')
INFO:root:Combined with previous vale: (u'test', ([u'test,40'], [u'test,120']))
INFO:root:New value: 40, Old value: 120, Difference: -80
INFO:root:Combined with previous vale: (u'test', ([], [u'test,40']))
INFO:root:New value: 0, Old value: 40, Difference: -40

我的示例here的完整代码。在复制元素时考虑性能因素,但如果需要在两个窗口中提供值,则有意义。

© www.soinside.com 2019 - 2024. All rights reserved.