我在尝试修改传递给 Apache Beam Dataflow 管道中的
RuntimeValueProvider
的参数时遇到问题。这是我的代码的简化版本:
import apache_beam as beam
from apache_beam.io.textio import WriteToText
from apache_beam.options.pipeline_options import (GoogleCloudOptions,
PipelineOptions)
class DataflowFlags(PipelineOptions):
@classmethod
def _add_argparse_args(cls, parser):
parser.add_value_provider_argument(
"--output_path",
dest="output_path",
type=str,
default="./python_extract_output",
)
parser.add_argument(
"--project_id", dest="project_id", default="test"
)
class ExtractPipelineRunner:
def __init__(
self,
output_path: str,
):
self.output_path = output_path
def run(self, p: beam.Pipeline) -> None:
_ = (
p
| "Create" >> beam.Create(["hello", "world"])
| "WriteToText" >> WriteToText(self.output_path.get() + "test/")
)
def main() -> None:
pipeline_options = PipelineOptions()
known_args = pipeline_options.view_as(DataflowFlags)
pipeline_options.view_as(GoogleCloudOptions).project = known_args.project_id
with beam.Pipeline(options=pipeline_options) as p:
extract_runner = ExtractPipelineRunner(known_args.output_path)
extract_runner.run(p)
if __name__ == "__main__":
main()
我试图通过在将它传递给
WriteToText
之前附加一个额外的值来修改output_path,但我遇到以下错误:
apache_beam.error.RuntimeValueProviderError: RuntimeValueProvider(option: output_path, type: str, default_value: './python_extract_output').get() not called from a runtime context
如何解决这个问题?我希望能够适当修改
RuntimeValueProvider
参数。谢谢您的帮助!
我想每月动态触发一个数据流模板,并将当前月份(即代替“test/”)作为字符串附加到输出路径。我尝试通过使用 StaticValueProvider 来实现此目的,但它并不是每月更新一次。
您是否考虑过使用
NestedValueProvider
? ([文档][1]、[Python 实现][2])。
这应该允许您使用原始的
ValueProvider
并使用翻译功能修改它以包括月份。
output_path
并提供
[2]:https://github.com/apache/beam/blob/37609ba70fab2216edc338121bf2f3a056a1e490/sdks/python/apache_beam/options/value_provider.py#L141