Google Cloud Dataflow是一种完全托管的云服务,可用于大规模创建和评估数据处理管道。数据流管道基于Apache Beam编程模型,可以在批处理和流模式下运行。 Cloud Dataflow是Google云端平台的一部分。
如何获取Dataflow上worker_status.py产生的数据?
我有一个使用 Python SDK 构建的管道,使用流引擎在 Dataflow 上运行。 我在光束源中注意到 apache_beam/runners/worker/worker_status.py 公开了大量的诊断...
在start_bundle中NameError:名称'bigquery'未定义[运行'ParDo(Error_handle)-ptransform-41'时]
我编写了下面的代码,它将 json 行加载到 bigquery。 它在 Direct runner 上工作,但是在 DataFlowRunneer 上它会抛出此错误。 在start_bundle中NameError:名称'bigquery'未定义[wh...
我正在尝试将文件名作为参数传递给突触管道中的数据流,但遇到问题。 我正在设置一个变量并将其传递给数据流,如下所示。 还在数据内创建了一个参数...
如何在 Apache Beam Java 中编写带有动态标头的 CSV 文件
我是 Apache Beam 的新手,我正在从事一项将在 GCP Dataflow 中运行的作业。我需要从 BigQuery 获取一些数据,对其进行转换并编写一个带有标题的 CSV 文件作为结果。但我找到了自己...
使用突触数据流,我想将下面的json格式转换为表格格式,其中一列作为日期,另一列作为总计 { “2023-12-11T00:00:00+01:00”:1272, “2023-12-...
是否支持或计划支持 TPU 作为数据流(经典或主要)工作线程中的加速器?从文档来看,目前似乎仅支持 GPU。
如何在 Airflow 的 on_failure_callback 中捕获 Java 应用程序抛出的异常?
我正在使用 Airflow 来运行运行 java 应用程序映像的 KubernetesPodOperator 任务。 java应用程序使用beam数据流。 我的自定义气流操作符继承了 KubernetesPodOperator。 我正在努力...
如何从 PubSub 主题读取数据并将其解析到梁管道中并打印它
我有一个程序,可以在 pubSub 中创建一个主题,并向该主题发布消息。我还有一个自动数据流作业(使用模板),它将这些消息保存到我的 BigQuery 表中......
如何处理 Apache Beam 中的数据偏斜?这是可以实现的吗?如果是的话,怎么办?
我是一名数据工程师。 我已经使用 PySpark 很长时间了,现在转向 Apache Beam/Dataflow 。 因此,由于这是托管服务,我们不必做太多事情。 但是,有一个问题,我想知道...
说明: 我正在开发一个项目,需要将超过 100 万条记录插入 Google Firestore。目前,我的方法效率还不够,而且过程极其缓慢。我是
我正在准备数据工程师考试,在练习过程中,我发现了这个问题: 您正在操作流式 Cloud Dataflow 管道。您的工程师拥有新版本的管道
我正在编写一个光束流管道,具有 1 分钟固定窗口。 我正在从 PubSub 读取数据,并使用 PubSub 消息的时间戳: beam.WindowInto(窗口.FixedWindows(60))) 因为,PubSub 消息已经有
我有一个数据流管道,可以从 kafka 读取消息,处理它们,然后将它们插入到 bigquery 中。 我希望处理/bigquery 插入将按时间批量进行,这样......
Azure Synapse 数据流 - 无法在 Pre SQL 脚本中使用参数
在我的接收器中,我正在运行一个 Pre SQL 脚本来删除特定日期后的数据,但我在使用脚本中的参数时遇到问题。 这是数据流中的日期参数变量,下面是我的
我有一个非常小的python数据流包,包的结构如下所示 。 ├── __pycache__ ├── pubsubtobigq.py ├── 需求.txt └── 维尼夫 requirements.txt的内容是 原型...
如果我们更新现有管道,Google Dataflow Apache Beam 版本升级将失败
我有一个在 Apache beam Java 2.50.0 上运行的 Google 数据流流管道。我希望通过更新管道选项升级到 2.56.0(当前最新版本)。然而,更新给出了错误......
我正在通过 Flex 模板创建 GCP 数据流作业,使用 Cloud Build 生成模板等。 这会导致每次都会创建全新的存储桶。例如我有一个 dataflow-staging-us-
GCP datflow 作业连接到运行 HTTP 服务器的 GCP 虚拟机实例
我有一个 GCP 数据流应用程序,我计划向在我的一台虚拟机主机上运行的 FAST API 服务器调用 REST 命令 我计划让我的虚拟机不公开外部 IP 地址(因为我不这样做...
使用 Cloud Build 从 Dockerfile 安装 ArtifactRegistry Python 包
我的 Artifact 注册表存储库中有一个 python 包。 我的 Dataflow Flex 模板使用以下命令打包在 Docker 映像中: gcloud 构建提交 --tag $CONTAINER_IMA...
通过 Azure 数据工厂中的数据流将嵌套 JSON 对象映射到 SQL 表
我有一个 JSON 对象,其中包含一个没有键的嵌套数组。我想从这个嵌套数组中提取所有元素,将每个元素分配给具有适当名称的单独列,然后接收数据...