我有下面的代码,它对 CSV 文件执行一些操作。
class SplitRow(beam.DoFn):
def process(self, element):
return [element.split(',')]
class FilterCardioPatients(beam.DoFn):
def process(self, element):
if element[3] == 'cardio':
return [element]
class PairPatients(beam.DoFn):
def process(self, element):
return [(element[1], 1)]
class Counting(beam.DoFn):
def process(self, element):
(key, values) = element
return [(key, sum(values))]
p1 = beam.Pipeline()
visit_count = (
p1
|beam.io.ReadFromText(inputs_pattern)
|beam.ParDo(SplitRow())
|beam.Map(print)
# |beam.ParDo(FilterCardioPatients())
# |beam.ParDo(PairPatients())
# |beam.GroupByKey()
# | beam.ParDo(Counting())
# |beam.io.WriteToText('parddo_output.txt')
)
p1.run()
输出为:
['2984641', 'Emily', '35', 'cardio', '1/9/21']
['9454384', 'Riikka', '86', 'ortho', '21-07-2021']
但是,当我将 SplitRow 的定义更改为:
class SplitRow(beam.DoFn):
def process(self, element):
return [element.split(',')]
输出为:
2984641
艾米丽
35
有氧运动
1/9/21
9454384
Riikka
86
正交
21-07-2021
9266396
为什么我们需要显式返回拆分元素的列表。 python split函数本身返回一个列表。
这两个函数都是有效且有用的,它们只是做不同的事情。
和
[[1,2], [3,4,5], [6,7]]
和[1,2,3,4,5,6,7]
的区别是一样的。一个给你一个允许分组的列表列表,而第二个是没有子组的扁平化列表。
如果您希望每行中的值作为一个单元保持在一起,您可以使用
[element.split(',')]
,就像 CSV 的典型情况一样。如果你通过另一个 DoFn 运行它,那个 Fn 将首先看到 [1,2]
,然后是 [3,4,5]
,等等。
如果您不关心原始行结构并且只想要每个标记,则可以使用
element.split(',')
。这对于例如更常见计算文本中的单词数,因为您不关心单词最初出现在哪一行。如果你通过另一个 DoFn 运行它,那个 Fn 将首先看到1
,然后是2
,然后是3
,等等