我有这样一段简单的代码
def print_windows(element, window=beam.DoFn.WindowParam, timestamp=beam.DoFn.TimestampParam):
print(window)
print(timestamp)
print(element)
print('-----------------')
options = PipelineOptions()
with beam.Pipeline(options=options) as p:
keyed_elements = [
('USA', {'user_id': 1, 'timestamp': 2}),
('USA', {'user_id': 1, 'timestamp': 14}),
('USA', {'user_id': 1, 'timestamp': 17}),
]
sliding_windows = (
p
| beam.Create(keyed_elements)
| 'ConvertIntoUserEvents' >> beam.Map(lambda e: beam.window.TimestampedValue(e, e[1]['timestamp']))
| beam.WindowInto(
beam.window.SlidingWindows(60, 10),
trigger= beam.transforms.trigger.AfterWatermark(early=AfterCount(1)),
accumulation_mode=beam.transforms.trigger.AccumulationMode.DISCARDING
)
| beam.ParDo(print_windows)
)
基本上,它在接收一些数据,并在其上创建滑动窗口。然而,考虑到触发器的定义方式,我本以为每个窗口会有多个窗格。例如,在[0.0, 60)窗口中,我希望每个项目都有一个窗格,但也有一个包括所有元素的最后一个窗格。
我在下面提供了实际的输出。似乎我得到了所述的每个元素的早期发射,但我没有得到整个窗口。我试着把AccumulationMode改成accumulating,但我还是没有得到想要的输入。
[0.0, 60.0)
Timestamp(2)
('USA', {'user_id': 1, 'timestamp': 2})
-----------------
[-10.0, 50.0)
Timestamp(2)
('USA', {'user_id': 1, 'timestamp': 2})
-----------------
[-20.0, 40.0)
Timestamp(2)
('USA', {'user_id': 1, 'timestamp': 2})
-----------------
[-30.0, 30.0)
Timestamp(2)
('USA', {'user_id': 1, 'timestamp': 2})
-----------------
[-40.0, 20.0)
Timestamp(2)
('USA', {'user_id': 1, 'timestamp': 2})
-----------------
[-50.0, 10.0)
Timestamp(2)
('USA', {'user_id': 1, 'timestamp': 2})
-----------------
[10.0, 70.0)
Timestamp(14)
('USA', {'user_id': 1, 'timestamp': 14})
-----------------
[0.0, 60.0)
Timestamp(14)
('USA', {'user_id': 1, 'timestamp': 14})
-----------------
[-10.0, 50.0)
Timestamp(14)
('USA', {'user_id': 1, 'timestamp': 14})
-----------------
[-20.0, 40.0)
Timestamp(14)
('USA', {'user_id': 1, 'timestamp': 14})
-----------------
[-30.0, 30.0)
Timestamp(14)
('USA', {'user_id': 1, 'timestamp': 14})
-----------------
[-40.0, 20.0)
Timestamp(14)
('USA', {'user_id': 1, 'timestamp': 14})
-----------------
[10.0, 70.0)
Timestamp(17)
('USA', {'user_id': 1, 'timestamp': 17})
-----------------
[0.0, 60.0)
Timestamp(17)
('USA', {'user_id': 1, 'timestamp': 17})
-----------------
[-10.0, 50.0)
Timestamp(17)
('USA', {'user_id': 1, 'timestamp': 17})
-----------------
[-20.0, 40.0)
Timestamp(17)
('USA', {'user_id': 1, 'timestamp': 17})
-----------------
[-30.0, 30.0)
Timestamp(17)
('USA', {'user_id': 1, 'timestamp': 17})
-----------------
[-40.0, 20.0)
Timestamp(17)
('USA', {'user_id': 1, 'timestamp': 17})
-----------------
在上面分享的代码片段中,并没有进行Combine操作,如 beam.CombinePerKey
. 在Python SDK中,这是必须的步骤,否则所有的面板都会被标记为------。UNKNOWN
. 记载如下
PaneInfo: When triggers are used, Beam provides a DoFn.PaneInfoParam object that contains information about the current firing. Using DoFn.PaneInfoParam you can determine whether this is an early or a late firing, and how many times this window has already fired for this key. This feature implementation in python sdk is not fully completed, see more at BEAM-3759.
JIRA的详细信息可以在这里找到------。BEAM-3759.
由于PaneInfo被设置为UNKNOWN,因此不会重复发射,如下图所示
INFO:root:[1590314700.0, 1590314760.0)
INFO:root:PaneInfo(first: True, last: True, timing: UNKNOWN, index: 0, nonspeculative_index: 0)
INFO:root:Timestamp(1590314727.557941)
INFO:root:('USA', {'score': 3, 'ts': 15})
INFO:root:-----------------
INFO:root:[1590314700.0, 1590314760.0)
INFO:root:PaneInfo(first: True, last: True, timing: UNKNOWN, index: 0, nonspeculative_index: 0)
INFO:root:Timestamp(1590314717.558444)
INFO:root:('USA', {'score': 1, 'ts': 5})
INFO:root:-----------------
INFO:root:[1590314700.0, 1590314760.0)
INFO:root:PaneInfo(first: True, last: True, timing: UNKNOWN, index: 0, nonspeculative_index: 0)
INFO:root:Timestamp(1590314727.558758)
INFO:root:('USA', {'score': 4, 'ts': 15})
INFO:root:-----------------
INFO:root:[1590314700.0, 1590314760.0)
INFO:root:PaneInfo(first: True, last: True, timing: UNKNOWN, index: 0, nonspeculative_index: 0)
INFO:root:Timestamp(1590314757.559044)
INFO:root:('USA', {'score': 6, 'ts': 45})
INFO:root:-----------------
INFO:root:[1590314700.0, 1590314760.0)
INFO:root:PaneInfo(first: True, last: True, timing: UNKNOWN, index: 0, nonspeculative_index: 0)
INFO:root:Timestamp(1590314717.559365)
INFO:root:('USA', {'score': 2, 'ts': 5})
INFO:root:-----------------
INFO:root:[1590314700.0, 1590314760.0)
INFO:root:PaneInfo(first: True, last: True, timing: UNKNOWN, index: 0, nonspeculative_index: 0)
INFO:root:Timestamp(1590314757.559638)
INFO:root:('USA', {'score': 5, 'ts': 45})
INFO:root:-----------------
如果你修改了下面的代码,你会看到多个发射选项。
def print_output(element, window=beam.DoFn.WindowParam, pane_info=beam.DoFn.PaneInfoParam, timestamp=beam.DoFn.TimestampParam):
logging.info(window.start.to_utc_datetime())
logging.info(window.end.to_utc_datetime())
logging.info(pane_info)
logging.info(timestamp)
logging.info(element)
logging.info('-----------------')
def run(argv=None):
keyed_elements = [
('USA', {'score': 1, 'ts': 5}),
('USA', {'score': 2, 'ts': 5}),
('USA', {'score': 3, 'ts': 60}),
('USA', {'score': 4, 'ts': 60}),
('USA', {'score': 5, 'ts': 105}),
('USA', {'score': 6, 'ts': 105}),
]
import time
#beam.io.ReadFromPubSub(subscription=subscription)
data = (p | "read" >> beam.Create(keyed_elements)
#| "JsonConvert" >> beam.Map(json.loads)
| "ConvertIntoUserEvents" >> beam.Map(lambda e: beam.window.TimestampedValue(e, time.time() + e[1]['ts']))
)
results = (
data
| "" >> beam.WindowInto(
beam.window.FixedWindows(120),
trigger=beam.transforms.trigger.AfterWatermark(early=AfterCount(1)),
accumulation_mode=beam.transforms.trigger.AccumulationMode.ACCUMULATING
)
| beam.CombinePerKey(beam.combiners.ToListCombineFn())
)
results | beam.ParDo(print_output)
在运行上述样本时,你会看到如下结果
INFO:apache_beam.runners.portability.fn_api_runner:Running (CombinePerKey(ToListCombineFn)/GroupByKey/Read)+((CombinePerKey(ToListCombineFn)/Combine)+(ref_AppliedPTransform_ParDo(CallableWrapperDoFn)_26))
INFO:root:2020-05-24 11:02:00
INFO:root:2020-05-24 11:04:00
INFO:root:PaneInfo(first: True, last: False, timing: EARLY, index: 0, nonspeculative_index: -1)
INFO:root:Timestamp(1590318239.999000)
INFO:root:('USA', [{'score': 5, 'ts': 105}, {'score': 6, 'ts': 105}])
INFO:root:-----------------
INFO:root:2020-05-24 11:00:00
INFO:root:2020-05-24 11:02:00
INFO:root:PaneInfo(first: True, last: False, timing: EARLY, index: 0, nonspeculative_index: -1)
INFO:root:Timestamp(1590318119.999000)
INFO:root:('USA', [{'score': 3, 'ts': 60}, {'score': 1, 'ts': 5}, {'score': 2, 'ts': 5}, {'score': 4, 'ts': 60}])
INFO:root:-----------------
INFO:root:2020-05-24 11:02:00
INFO:root:2020-05-24 11:04:00
INFO:root:PaneInfo(first: False, last: True, timing: ON_TIME, index: 1, nonspeculative_index: 0)
INFO:root:Timestamp(1590318239.999000)
INFO:root:('USA', [{'score': 5, 'ts': 105}, {'score': 6, 'ts': 105}])
INFO:root:-----------------
INFO:root:2020-05-24 11:00:00
INFO:root:2020-05-24 11:02:00
INFO:root:PaneInfo(first: False, last: True, timing: ON_TIME, index: 1, nonspeculative_index: 0)
INFO:root:Timestamp(1590318119.999000)
INFO:root:('USA', [{'score': 3, 'ts': 60}, {'score': 1, 'ts': 5}, {'score': 2, 'ts': 5}, {'score': 4, 'ts': 60}])
INFO:root:-----------------