Apache Beam如何处理中间面板?

问题描述 投票:0回答:1

我有这样一段简单的代码

def print_windows(element, window=beam.DoFn.WindowParam, timestamp=beam.DoFn.TimestampParam):
    print(window)
    print(timestamp)
    print(element)
    print('-----------------')


options = PipelineOptions()
with beam.Pipeline(options=options) as p:
    keyed_elements = [
        ('USA', {'user_id': 1, 'timestamp': 2}),
        ('USA', {'user_id': 1, 'timestamp': 14}),
        ('USA', {'user_id': 1, 'timestamp': 17}),
    ]

    sliding_windows = (
        p 
        | beam.Create(keyed_elements)
        | 'ConvertIntoUserEvents' >> beam.Map(lambda e: beam.window.TimestampedValue(e, e[1]['timestamp']))
        | beam.WindowInto(
            beam.window.SlidingWindows(60, 10),
            trigger= beam.transforms.trigger.AfterWatermark(early=AfterCount(1)),
            accumulation_mode=beam.transforms.trigger.AccumulationMode.DISCARDING
        )
        | beam.ParDo(print_windows)
    )

基本上,它在接收一些数据,并在其上创建滑动窗口。然而,考虑到触发器的定义方式,我本以为每个窗口会有多个窗格。例如,在[0.0, 60)窗口中,我希望每个项目都有一个窗格,但也有一个包括所有元素的最后一个窗格。

我在下面提供了实际的输出。似乎我得到了所述的每个元素的早期发射,但我没有得到整个窗口。我试着把AccumulationMode改成accumulating,但我还是没有得到想要的输入。

[0.0, 60.0)
Timestamp(2)
('USA', {'user_id': 1, 'timestamp': 2})
-----------------
[-10.0, 50.0)
Timestamp(2)
('USA', {'user_id': 1, 'timestamp': 2})
-----------------
[-20.0, 40.0)
Timestamp(2)
('USA', {'user_id': 1, 'timestamp': 2})
-----------------
[-30.0, 30.0)
Timestamp(2)
('USA', {'user_id': 1, 'timestamp': 2})
-----------------
[-40.0, 20.0)
Timestamp(2)
('USA', {'user_id': 1, 'timestamp': 2})
-----------------
[-50.0, 10.0)
Timestamp(2)
('USA', {'user_id': 1, 'timestamp': 2})
-----------------
[10.0, 70.0)
Timestamp(14)
('USA', {'user_id': 1, 'timestamp': 14})
-----------------
[0.0, 60.0)
Timestamp(14)
('USA', {'user_id': 1, 'timestamp': 14})
-----------------
[-10.0, 50.0)
Timestamp(14)
('USA', {'user_id': 1, 'timestamp': 14})
-----------------
[-20.0, 40.0)
Timestamp(14)
('USA', {'user_id': 1, 'timestamp': 14})
-----------------
[-30.0, 30.0)
Timestamp(14)
('USA', {'user_id': 1, 'timestamp': 14})
-----------------
[-40.0, 20.0)
Timestamp(14)
('USA', {'user_id': 1, 'timestamp': 14})
-----------------
[10.0, 70.0)
Timestamp(17)
('USA', {'user_id': 1, 'timestamp': 17})
-----------------
[0.0, 60.0)
Timestamp(17)
('USA', {'user_id': 1, 'timestamp': 17})
-----------------
[-10.0, 50.0)
Timestamp(17)
('USA', {'user_id': 1, 'timestamp': 17})
-----------------
[-20.0, 40.0)
Timestamp(17)
('USA', {'user_id': 1, 'timestamp': 17})
-----------------
[-30.0, 30.0)
Timestamp(17)
('USA', {'user_id': 1, 'timestamp': 17})
-----------------
[-40.0, 20.0)
Timestamp(17)
('USA', {'user_id': 1, 'timestamp': 17})
-----------------
python google-cloud-dataflow apache-beam
1个回答
2
投票

在上面分享的代码片段中,并没有进行Combine操作,如 beam.CombinePerKey. 在Python SDK中,这是必须的步骤,否则所有的面板都会被标记为------。UNKNOWN. 记载如下

PaneInfo: When triggers are used, Beam provides a DoFn.PaneInfoParam object that contains information about the current firing. Using DoFn.PaneInfoParam you can determine whether this is an early or a late firing, and how many times this window has already fired for this key. This feature implementation in python sdk is not fully completed, see more at BEAM-3759.

JIRA的详细信息可以在这里找到------。BEAM-3759.

由于PaneInfo被设置为UNKNOWN,因此不会重复发射,如下图所示

INFO:root:[1590314700.0, 1590314760.0)
INFO:root:PaneInfo(first: True, last: True, timing: UNKNOWN, index: 0, nonspeculative_index: 0)
INFO:root:Timestamp(1590314727.557941)
INFO:root:('USA', {'score': 3, 'ts': 15})
INFO:root:-----------------
INFO:root:[1590314700.0, 1590314760.0)
INFO:root:PaneInfo(first: True, last: True, timing: UNKNOWN, index: 0, nonspeculative_index: 0)
INFO:root:Timestamp(1590314717.558444)
INFO:root:('USA', {'score': 1, 'ts': 5})
INFO:root:-----------------
INFO:root:[1590314700.0, 1590314760.0)
INFO:root:PaneInfo(first: True, last: True, timing: UNKNOWN, index: 0, nonspeculative_index: 0)
INFO:root:Timestamp(1590314727.558758)
INFO:root:('USA', {'score': 4, 'ts': 15})
INFO:root:-----------------
INFO:root:[1590314700.0, 1590314760.0)
INFO:root:PaneInfo(first: True, last: True, timing: UNKNOWN, index: 0, nonspeculative_index: 0)
INFO:root:Timestamp(1590314757.559044)
INFO:root:('USA', {'score': 6, 'ts': 45})
INFO:root:-----------------
INFO:root:[1590314700.0, 1590314760.0)
INFO:root:PaneInfo(first: True, last: True, timing: UNKNOWN, index: 0, nonspeculative_index: 0)
INFO:root:Timestamp(1590314717.559365)
INFO:root:('USA', {'score': 2, 'ts': 5})
INFO:root:-----------------
INFO:root:[1590314700.0, 1590314760.0)
INFO:root:PaneInfo(first: True, last: True, timing: UNKNOWN, index: 0, nonspeculative_index: 0)
INFO:root:Timestamp(1590314757.559638)
INFO:root:('USA', {'score': 5, 'ts': 45})
INFO:root:-----------------

如果你修改了下面的代码,你会看到多个发射选项。

def print_output(element, window=beam.DoFn.WindowParam,  pane_info=beam.DoFn.PaneInfoParam, timestamp=beam.DoFn.TimestampParam):
    logging.info(window.start.to_utc_datetime())
    logging.info(window.end.to_utc_datetime())
    logging.info(pane_info)
    logging.info(timestamp)
    logging.info(element)
    logging.info('-----------------')

def run(argv=None):
  keyed_elements = [
      ('USA', {'score': 1, 'ts': 5}),
      ('USA', {'score': 2, 'ts': 5}),
      ('USA', {'score': 3, 'ts': 60}),
      ('USA', {'score': 4, 'ts': 60}),
      ('USA', {'score': 5, 'ts': 105}),
      ('USA', {'score': 6, 'ts': 105}),
  ]
  import time
  #beam.io.ReadFromPubSub(subscription=subscription)
  data = (p | "read" >> beam.Create(keyed_elements)
          #| "JsonConvert" >> beam.Map(json.loads)
          | "ConvertIntoUserEvents" >> beam.Map(lambda e: beam.window.TimestampedValue(e, time.time() + e[1]['ts']))
          )

  results = (
      data
      | "" >> beam.WindowInto(
      beam.window.FixedWindows(120),
      trigger=beam.transforms.trigger.AfterWatermark(early=AfterCount(1)),
      accumulation_mode=beam.transforms.trigger.AccumulationMode.ACCUMULATING
  )
      | beam.CombinePerKey(beam.combiners.ToListCombineFn())
  )
  results | beam.ParDo(print_output)

在运行上述样本时,你会看到如下结果

INFO:apache_beam.runners.portability.fn_api_runner:Running (CombinePerKey(ToListCombineFn)/GroupByKey/Read)+((CombinePerKey(ToListCombineFn)/Combine)+(ref_AppliedPTransform_ParDo(CallableWrapperDoFn)_26))
INFO:root:2020-05-24 11:02:00
INFO:root:2020-05-24 11:04:00
INFO:root:PaneInfo(first: True, last: False, timing: EARLY, index: 0, nonspeculative_index: -1)
INFO:root:Timestamp(1590318239.999000)
INFO:root:('USA', [{'score': 5, 'ts': 105}, {'score': 6, 'ts': 105}])
INFO:root:-----------------
INFO:root:2020-05-24 11:00:00
INFO:root:2020-05-24 11:02:00
INFO:root:PaneInfo(first: True, last: False, timing: EARLY, index: 0, nonspeculative_index: -1)
INFO:root:Timestamp(1590318119.999000)
INFO:root:('USA', [{'score': 3, 'ts': 60}, {'score': 1, 'ts': 5}, {'score': 2, 'ts': 5}, {'score': 4, 'ts': 60}])
INFO:root:-----------------
INFO:root:2020-05-24 11:02:00
INFO:root:2020-05-24 11:04:00
INFO:root:PaneInfo(first: False, last: True, timing: ON_TIME, index: 1, nonspeculative_index: 0)
INFO:root:Timestamp(1590318239.999000)
INFO:root:('USA', [{'score': 5, 'ts': 105}, {'score': 6, 'ts': 105}])
INFO:root:-----------------
INFO:root:2020-05-24 11:00:00
INFO:root:2020-05-24 11:02:00
INFO:root:PaneInfo(first: False, last: True, timing: ON_TIME, index: 1, nonspeculative_index: 0)
INFO:root:Timestamp(1590318119.999000)
INFO:root:('USA', [{'score': 3, 'ts': 60}, {'score': 1, 'ts': 5}, {'score': 2, 'ts': 5}, {'score': 4, 'ts': 60}])
INFO:root:-----------------
最新问题
© www.soinside.com 2019 - 2024. All rights reserved.