如何输入提示 Apache Beam 默认 DoFn.TimestampParam

问题描述 投票:0回答:1

我正在努力注释 DoFn

process
方法的额外参数,特别是时间戳参数

最小示例:

import apache_beam as beam
from apache_beam.transforms.window import TimestampedValue
from apache_beam.utils.timestamp import TimestampTypes

class Do(beam.DoFn):
    def process(
        self,
        element: int,
        timestamp: TimestampTypes = beam.DoFn.TimestampParam,
    ) -> Iterable[TimestampedValue[int]]:
        yield TimestampedValue(element, timestamp)

注意:

TimestampTypes
的类型为
Union[int, float, Timestamp]

这会导致 mypy 指出参数类型不正确:

参数“timestamp”的默认值不兼容(默认值类型为“_DoFnParam”,参数类型为“Union[int, float, Timestamp]”)

但是,如果我按照指示注释参数,则生成的

timestamp
类型不正确:

import apache_beam as beam
from apache_beam.transforms.core import _DoFnParam
from apache_beam.transforms.window import TimestampedValue

class Do(beam.DoFn):
    def process(
        self,
        element: int,
        timestamp: _DoFnParam = beam.DoFn.TimestampParam,
    ) -> Iterable[TimestampedValue[int]]:
        yield TimestampedValue(element, timestamp)

“TimestampedValue”的参数 2 具有不兼容的类型“_DoFnParam”;预期“Union[int, float, Timestamp]”

有没有人成功解决了这个差异,或者这是 Beam 中类型暗示的限制,我现在应该忽略检查?

python apache-beam python-typing mypy
1个回答
0
投票

正如 @chepner 和 @user3412205 在评论中提到的,涉及联合类型和默认参数值的一些微妙之处,我们可以尝试使用

TimestampTypes | _DoFnParam
来启动,并在方法
if not isinstance(timestamp, (int, float)): raise
中,这有效。

将答案发布为社区 wiki,以造福于将来可能遇到此用例的社区。请随意编辑此答案以获取更多信息。

© www.soinside.com 2019 - 2024. All rights reserved.