我正在使用 Python 开发一个简单的 Apache Beam 管道来处理文本文件并输出 CSV。下面是我的代码:
python
Copy code
import apache_beam as beam
p1 = beam.Pipeline()
attendance_count = (
p1
| beam.io.ReadFromText("dept_data.txt", validate=True)
| beam.Map(lambda x: x.split(","))
| beam.Filter(lambda x: x[3] == "Accounts")
| beam.Map(lambda x: (x[1], 1))
| beam.CombinePerKey(sum)
| beam.Map(lambda x: f"{x[0]},{x[1]}")
| beam.io.WriteToCsv("output/dept_op_data.csv", num_shards=1)
)
p1.run()
当我尝试运行此管道时,出现以下错误:
TypeError: Could not determine schema for type hint Any. Did you mean to create a schema-aware PCollection? See https://s.apache.org/beam-python-schemas
Full Traceback:
Traceback (most recent call last):
File "/path/to/your/script.py", line 7, in <module>
p1
...
File "/opt/anaconda3/envs/beam/lib/python3.12/site-packages/apache_beam/typehints/schemas.py", line 610, in schema_from_element_type
raise TypeError(
TypeError: Could not determine schema for type hint Any. Did you mean to create a schema-aware PCollection? See https://s.apache.org/beam-python-schemas
我尝试过的:
问题: 我该如何解决这个问题?我是否需要使管道模式感知,或者是否有更简单的方法来解决此错误?任何指导将不胜感激。
你应该能够改变:
| beam.Map(lambda x: f"{x[0]},{x[1]}")
到
| beam.Map(lambda x: f"{x[0]},{x[1]}").with_output_types(str)
出现此错误是因为 Beam 无法自动推断地图阶段的输出类型,因此无法将其转换为架构元素。