我使用beam python库设计了一个beam / dataflow管道。管道大致如下:
通常,代码执行它应该执行的操作。但是,当从API收集大数据集(大约500.000个JSON文件)时,bigquery插入作业在启动后会在没有特定错误消息的情况下正确停止(在一秒内),而不是使用DataflowRunner(它正在我的DirectRunner上执行)电脑)。使用较小的数据集时,一切正常。
数据流日志如下:
2019-04-22 (00:41:29) Executing BigQuery import job "dataflow_job_14675275193414385105". You can check its status with the...
Executing BigQuery import job "dataflow_job_14675275193414385105". You can check its status with the bq tool: "bq show -j --project_id=X dataflow_job_14675275193414385105".
2019-04-22 (00:41:29) Workflow failed. Causes: S01:Create Dummy Element/Read+Call API+Transform JSON+Write to Bigquery /Wr...
Workflow failed. Causes: S01:Create Dummy Element/Read+Call API+Transform JSON+Write to Bigquery /WriteToBigQuery/NativeWrite failed., A work item was attempted 4 times without success. Each time the worker eventually lost contact with the service. The work item was attempted on:
beamapp-X-04212005-04211305-sf4k-harness-lqjg,
beamapp-X-04212005-04211305-sf4k-harness-lgg2,
beamapp-X-04212005-04211305-sf4k-harness-qn55,
beamapp-X-04212005-04211305-sf4k-harness-hcsn
使用建议的bq cli工具来获取有关BQ加载作业的更多信息不起作用。无法找到工作(我怀疑它是由于即时故障而创建的)。
我想我遇到某种配额/ bq限制甚至是内存不足问题(参见:https://beam.apache.org/documentation/io/built-in/google-bigquery/)
限制BigQueryIO目前有以下限制。
您无法使用>管道的其他步骤对BigQuery写入的完成进行排序。
如果您使用Beam SDK for Python,如果您编写一个非常大的数据集,则可能会出现导入大小配额>问题。作为一种变通方法,您可以对数据集进行分区(例如,使用Beam的分区变换)并写入>多个BigQuery表。 Beam SDK for Java没有此限制,因为它为您分配数据集。
我很欣赏任何关于如何缩小这个问题的根本原因的暗示。
我还想尝试一个Partition Fn但是没有找到任何python源代码示例如何将分区的pcollection写入BigQuery Tables。
可能有助于调试的一件事是查看Stackdriver日志。
如果您在Google console中提取数据流作业并单击图形面板右上角的LOGS
,则应打开底部的日志面板。 LOGS面板的右上角有一个指向Stackdriver的链接。这将为您提供有关您的worker / shuffle /等的大量日志信息。对于这个特殊的工作。
其中有很多内容,很难过滤掉相关内容,但希望你能找到比A work item was attempted 4 times without success
更有用的东西。例如,每个工作人员偶尔会记录它正在使用多少内存,这可以与每个工作者具有的内存量(基于机器类型)进行比较,以查看它们是否确实内存不足,或者您的错误是否正在发生别处。
祝好运!