Google Cloud Dataflow是一种完全托管的云服务,可用于大规模创建和评估数据处理管道。数据流管道基于Apache Beam编程模型,可以在批处理和流模式下运行。 Cloud Dataflow是Google云端平台的一部分。
在 GCS 中读取 Avro 文件作为 PCollection<GenericRecord>
我们的 Dataflow 作业是用 Python 编写的,从 Pubsub 订阅中侦听。这些消息是 Avro 文件的 GCS 文件路径字符串 (gs://bucket/file-timestamp.avro)。 avro 文件不统一
我正在尝试使用数据流运行下面的代码,其中我在类中定义了 2-3 个函数,其中 2 个函数正在工作,但 send_email() 正在工作,也没有抛出任何错误。 请...
我已经设置了一个数据流管道,它从 Pub/Sub 获取消息,转换为字典并打印消息。 这是我写的脚本: 导入 apache_beam 作为光束 导入日志记录 导入
google dataflow runner 支持 Statefull doFn 吗?
我无法使批处理 fn 在数据流上工作。我在 Go 中使用了示例中的一个:https://beam.apache.org/documentation/programming-guide/#state-timers-examples dofn 接收元件和触发器...
错误“PDone”对象没有属性“窗口”数据流 WriteToPubSub
我有一个梁管道,它从两个 Postgres CloudSQL DB 读取记录,进行一些数据转换,并通过 WriteToPubSub 模块将数据推送到 Google PubSub。 \ 我能够运行这个管道...
如何将 Spanner 更改数据捕获列名称映射到 BigQuery 中的不同列名称
我创建了Spanner数据库表的变更数据捕获(CDC),但问题是在BigQuery表中,列名不同,导致数据流管道失败。怎么...
在流管道中获取 EOFExceptions,以使用数据流管道将数据插入到启用了 TLS 的 memoystore Redis 实例中
我正在尝试从 pubsub 读取并将其写入内存存储 redis 实例。我使用 jedispool 因为该进程是多线程的。我能够将来自 pubsub 的数据写入实例...
MongoDB 到 BigQuery Dataflow 作业失败
当我运行数据流作业时,读取文档部分并开始记录超时错误需要花费大量时间。我遇到了 4 个超时错误,最后我收到了一条很长的错误消息,这部分 c...
Beam ReadFromKafka `with_metadata=True` 编码错误
使用Python SDK 2.49.0(调用Javaharness)中的ReadFromKafka在使用with_metadata=True时会引发编码错误: java.lang.IllegalArgumentException:无法编码元素'org.apache.beam ....
我需要实现一个具有并行管道的作业数据流(一个用于文件configuration.json中找到的每个实体)。 第一步是从 pub/sub 读取一个事件,通知文件到达
我正在处理一个 xml 文件来压平并加载到舞台区域中。 xml 层次结构数组元素具有不同的列数。例如: 作者元素包含:一个文件中的 id、姓名、书籍以及...
Dataflow 目前支持自定义容器,如下页所示, https://cloud.google.com/dataflow/docs/guides/using-custom-containers 我想知道我们是否可以使用我们自己的 VM 映像进行 sta...
我正在将 IoT 数据从 OPC UA 服务器发送到发布/订阅主题。该主题的每条消息都包含大约 100 个传感器的 15 分钟的每分钟数据。数据流从这个发布/订阅中读取...
如何在 GCP 数据流中使用 python 管道代码读取 BigQuery 表
有人可以分享语法以在用 python 为 GCP 数据流编写的管道中读/写 bigquery 表
嗨,我试图找到允许在 flex 模式下运行的数据流的最大大小以及检查大小的方法。 我知道经典模板每个模板有 10MB 的限制。 但是在谷歌云上搜索
此管道代码作为 Direct runner 运行良好,但在 Dataflow runner 上运行时出错 'name 'read_inputpath_date' is not defined'
从 apache_beam.options.pipeline_options 导入 PipelineOptions 导入操作系统 导入日志 将 apache_beam 导入为光束 导入 gzip 导入json,io 从 google.cloud 导入存储 os.environ["
Dataflow/ApacheBeam 将输入限制为第一个 X 数量?
我有一个有限的 PCollection,但我只想获得第一个 X 数量的输入并丢弃其余的。有没有办法使用 Dataflow 2.X/ApacheBeam 来做到这一点?
我正在尝试扩展 Google 的 Dataflow 模板以将数据从 BQ 移动到 Cloud Storage 上的 parquet 文件,但我在尝试控制 parquet 文件大小时受阻。 https://cloud.google.com/dataflow/docs/...
Go 中的 Apache Beam IO:sql:未知驱动程序“cloudsql-postgres”(忘记导入?)
按照此处的指南,我正在尝试将数据流作业连接到 Go 中 GCP 上的 Cloud SQL Postgres 实例。 我可以在我的机器上本地运行该作业。我已经确认我的权限是...
GCP Dataflow ReadFromKafka 创建大量连接
我们正在使用 Python 创建数据流作业以从 Kafka(Amazon MSK,6 个代理,5 个分区主题)读取数据。数据流作业部署在具有 Cloud NAT(单个公共 IP)的 VPC 中,此 IP 是