google-cloud-dataflow 相关问题

Google Cloud Dataflow是一种完全托管的云服务,可用于大规模创建和评估数据处理管道。数据流管道基于Apache Beam编程模型,可以在批处理和流模式下运行。 Cloud Dataflow是Google云端平台的一部分。

在python中bigquery sink后是否可以进行其他处理?

我正在编写一个具有以下过程的管道: 1.读取带有属性“uid”的pubsub消息,这是此消息的唯一ID 2.将消息存储在Bigquery中,数据格式为 uid |

回答 1 投票 0

Google数据流API调用失败-

我正在尝试使用数据流运算符从 composer airflow 调用数据流作业,但在调用它时出现以下错误: googleapiclient.errors.HttpError: 我正在尝试使用数据流运算符从 composer airflow 调用数据流作业,但在调用它时出现以下错误: googleapiclient.errors.HttpError: <HttpError 400 when requesting https://test returned "Invalid value at 'launch_parameters.parameters' (type.googleapis.com/google.dataflow.v1beta3.LaunchTemplateParameters.ParametersEntry), "{'test1': 'SELECT distinct data\nFROM project.dataset.table1\nWHERE ace_date="2022-05-12"', 'test2': 'SELECT distinct data\nFROM project.dataset.table2\nWHERE ace_date="2022-05-12"', 'priority_data': 'SELECT distinct data\nFROM project.dataset.table3\nWHERE ace_date="2022-05-12"', 'test3': 'SELECT distinct data\nFROM project.dataset.table4\nWHERE ace_date="2022-05-12"', 'test4': 'SELECT distinct data\nFROM project.dataset.table5\nWHERE ace_date="2022-05-12"', 'test5': 'SELECT distinct data\nFROM project.dataset.tabl6 \nWHERE ace_date="2022-05-12"', 'pack_rules': 'SELECT distinct data\nFROM project.dataset.table7\nWHERE ace_date="2022-05-12"', 'test6': 'SELECT distinct row_key_data as data\nFROM peoject.dataset.table7\nWHERE date_of_run="2022-05-16"'}"" 下面是从 Airflow 调用它时的相同代码: def dataflow_trigger( task, ): """ Dynamic task for calling dataflow job """ return DataflowTemplatedJobStartOperator( task_id=task, project_id="{{task_instance.xcom_pull(key='dataflow_settings', task_ids='get_settings')['project']}}", job_name="{{task_instance.xcom_pull(key='dataflow_settings', task_ids='get_settings')['job_name']}}", template="{{task_instance.xcom_pull(key='dataflow_settings', task_ids='get_settings')['template_path']}}", parameters="{{task_instance.xcom_pull(key='parameters', task_ids='get_settings')}}", location='europe-west2', ) Airflow xcom pull 只返回字符串 这有助于解决问题,因为 xcom push 将其存储为字符串;在 DAG 构造函数中使用 render_template_as_native_obj=True 解决了这个问题。

回答 1 投票 0

Apache Beam IOElasticsearchIO.read() 方法 (Java),它需要一个 PBegin 输入和一种处理查询集合的方法

我在使用 ElasticsearchIO.read() 处理多个查询实例时遇到了问题。我的查询正在根据传入的一组值动态构建为 PCollection....

回答 0 投票 0

来自 Python 中 Bigquery 的动态侧输入

我正在编写一个光束管道,它读取具有名为“uid”的属性的 pubsub 消息,这是当前消息的唯一 id。然后我想使用这个'uid'来查询bigquery以获得额外的

回答 1 投票 0

Azure 数据工厂管道失败:spark.rpc.message.maxSize

"StatusCode":"DFExecutorUserError","Message":"Job failed due to reason: at Source 'source1': Job aborted due to stage failure: Serialized task 10:0 was 135562862 ...

回答 1 投票 0

从列的子集中获取新的 GenericRecord

我们正在处理 Avro 文件,我们使用 GenericRecord 类型(在 Beam/Dataflow 中)。我想基于选定的列子集创建一个新的 GenericRecord。没有

回答 0 投票 0

数据流 BigQuery 到 BigQuery

我正在尝试创建一个从 BigQuery 返回到 BigQuery 的数据流脚本。我们的主表很大,破坏了提取功能。我想创建一个简单的表(由于

回答 3 投票 0

Google Dataflow Key Distribution On Reshuffle After Autoscaling Event

当我的数据流作业从 45 个工作节点扩展到 100 个节点时,我遇到了一些带有键控状态的奇怪行为。 我的代码是键入输入数据,然后使用 Reshuffle 函数重新分配...

回答 1 投票 0

我们如何在 python 中读取 apache beam 中的字典列表?

我有一个包含字典列表的文本文件。 现在,我想使用 apache beam 阅读它,并从列表中返回单个词典。 我该怎么做。 我的文本文件是这样的。 [{"id&q...

回答 1 投票 0

从 BigQuery 中提取数据并加载到 SQL Server 中的最佳方法是什么?

我想创建一些通用管道,我可以在其中传递表名或自定义 SQL 作为输入,并将所需数据从 BigQuery 加载到 SQL Server。该管道应处理每日增量负载...

回答 1 投票 0

是否可以在 Dataflow 中的 BigQuery 上执行脚本?

我有两个表:一个包含要处理的数据的表和一个跟踪已处理数据的跟踪表。 因此,例如,下面的跟踪表表明我们已经

回答 3 投票 0

使用 Dataflow Apache Beam 下沉到 BigQuery 的正确格式

bcdate好像出错了,我改成正确的格式还是报错,我的代码如下: def transform_pandas(数据): 将熊猫导入为 pd 导入 json 我...

回答 2 投票 0

在 python 中使用 apache beam 从 pubsub 输出中提取列值

我正在尝试从 PubSub 订阅中提取数据,最后,一旦数据被提取,我想做一些转换。目前,它是字节格式。我尝试了多种方法来提取...

回答 1 投票 0

如何通过防火墙连接数据流作业?如何使用可预测的 IP 地址连接到 Kafka 或 RDBMS?

我正在尝试运行需要连接到 Kafka 集群的数据流作业。出于安全原因,Kafka 集群需要 IP 地址白名单,因此我需要确保 Dataflow 作业始终...

回答 1 投票 0

如何读取文件并保留 python beam 中的行顺序

我正在尝试从 GCS 存储桶中读取 python beam 中的文件并对其进行处理,在处理时我想添加一个新字段作为 line_number ,这是来自 fi 的原始行号...

回答 0 投票 0

Google 数据流转换需要 jks

在 GCP 数据流中,作为转换的一部分,我需要从 URL 下载内容。为此,我需要使用包含安全证书的 .jks 文件。我总是找不到证书,除了...

回答 0 投票 0

我想使用云数据流提交数据到bigquery

T,WeekCodeOption,TimeWidthSEQ,StationCode,RealBroadcastMinutes,StandardTargetFlag,TargetInfoCode,Rate,ExcludeDates "{'TFrom': '20230213', 'TTo': '20230213'}",11,1,1,1440,"['1', '1', ...

回答 0 投票 0

Google Cloud:获取“可运行的工作流没有指定的步骤。”在 Dataflow 中运行自定义模板时

我只是在探索 Dataflow 并决定创建一个从我的 S3 读取数据的管道,对其进行格式化(因为文件是 gz 文件),然后将其存储在 bigtable 中。 一切似乎都在工作......

回答 0 投票 0

在 Python 中使用 Apache Beam 读取多行 JSON

我无法正确从 Google Cloud Storage 读取 JSON 文件。 输入的格式在他的基本结构中看起来像这样: [ { "id": "CANT14", “实体……

回答 0 投票 0

将 Apache Beam Python 与 GCP Dataflow 结合使用时,是否具体化 GroupByKey 的结果是否重要?

将 Apache Beam Python 与 GCP Dataflow 结合使用时,具体化 GroupByKey 的结果是否存在缺点,例如,计算元素的数量。例如: def consume_group_by_key(元素)...

回答 1 投票 0

© www.soinside.com 2019 - 2024. All rights reserved.