Apache Beam:类型错误:无法确定类型提示 Any 的架构

问题描述 投票:0回答:1

我正在使用 Python 开发一个简单的 Apache Beam 管道来处理文本文件并输出 CSV。下面是我的代码:

python
Copy code
import apache_beam as beam

p1 = beam.Pipeline()

attendance_count = (
    p1
    | beam.io.ReadFromText("dept_data.txt", validate=True)
    | beam.Map(lambda x: x.split(","))
    | beam.Filter(lambda x: x[3] == "Accounts")
    | beam.Map(lambda x: (x[1], 1))
    | beam.CombinePerKey(sum)
    | beam.Map(lambda x: f"{x[0]},{x[1]}")
    | beam.io.WriteToCsv("output/dept_op_data.csv", num_shards=1)
)

p1.run()

当我尝试运行此管道时,出现以下错误:

TypeError: Could not determine schema for type hint Any. Did you mean to create a schema-aware PCollection? See https://s.apache.org/beam-python-schemas
Full Traceback:
Traceback (most recent call last):
  File "/path/to/your/script.py", line 7, in <module>
    p1
  ...
  File "/opt/anaconda3/envs/beam/lib/python3.12/site-packages/apache_beam/typehints/schemas.py", line 610, in schema_from_element_type
    raise TypeError(
TypeError: Could not determine schema for type hint Any. Did you mean to create a schema-aware PCollection? See https://s.apache.org/beam-python-schemas

我尝试过的:

  1. 检查ReadFromText中的validate=True。
  2. 重新审视了我对 Map 和 JointPerKey 的使用。
  3. 研究了 Beam 的模式感知转换,但不知道如何将它们集成到我的管道中。

问题: 我该如何解决这个问题?我是否需要使管道模式感知,或者是否有更简单的方法来解决此错误?任何指导将不胜感激。

python apache-beam
1个回答
0
投票

你应该能够改变:

| beam.Map(lambda x: f"{x[0]},{x[1]}")

| beam.Map(lambda x: f"{x[0]},{x[1]}").with_output_types(str)

出现此错误是因为 Beam 无法自动推断地图阶段的输出类型,因此无法将其转换为架构元素。

© www.soinside.com 2019 - 2024. All rights reserved.