我正在努力注释 DoFn
process
方法的额外参数,特别是时间戳参数。
最小示例:
import apache_beam as beam
from apache_beam.transforms.window import TimestampedValue
from apache_beam.utils.timestamp import TimestampTypes
class Do(beam.DoFn):
def process(
self,
element: int,
timestamp: TimestampTypes = beam.DoFn.TimestampParam,
) -> Iterable[TimestampedValue[int]]:
yield TimestampedValue(element, timestamp)
注意:
TimestampTypes
的类型为 Union[int, float, Timestamp]
这会导致 mypy 指出参数类型不正确:
参数“timestamp”的默认值不兼容(默认值类型为“_DoFnParam”,参数类型为“Union[int, float, Timestamp]”)
但是,如果我按照指示注释参数,则生成的
timestamp
类型不正确:
import apache_beam as beam
from apache_beam.transforms.core import _DoFnParam
from apache_beam.transforms.window import TimestampedValue
class Do(beam.DoFn):
def process(
self,
element: int,
timestamp: _DoFnParam = beam.DoFn.TimestampParam,
) -> Iterable[TimestampedValue[int]]:
yield TimestampedValue(element, timestamp)
“TimestampedValue”的参数 2 具有不兼容的类型“_DoFnParam”;预期“Union[int, float, Timestamp]”
有没有人成功解决了这个差异,或者这是 Beam 中类型暗示的限制,我现在应该忽略检查?
正如 @chepner 和 @user3412205 在评论中提到的,涉及联合类型和默认参数值的一些微妙之处,我们可以尝试使用
TimestampTypes | _DoFnParam
来启动,并在方法 if not isinstance(timestamp, (int, float)): raise
中,这有效。
将答案发布为社区 wiki,以造福于将来可能遇到此用例的社区。请随意编辑此答案以获取更多信息。