我们正在 Google bigquery 上新实施 DataWareHouse,我们所有的来源都在 prim 数据库上。因此,我们将数据流用于 ETL 和 Maven 以及 Apache Beam SDK,以便在 Google Cloud Dataflow 服务上运行 30 个管道。
package com.google.cloud.teleport.templates;
import com.google.api.services.bigquery.model.TableRow;
import com.google.cloud.teleport.io.DynamicJdbcIO;
import com.google.cloud.teleport.templates.common.JdbcConverters;
import com.google.cloud.teleport.util.KMSEncryptedNestedValueProvider;
import org.apache.beam.sdk.Pipeline;
import org.apache.beam.sdk.PipelineResult;
import org.apache.beam.sdk.io.gcp.bigquery.BigQueryIO;
import org.apache.beam.sdk.io.gcp.bigquery.TableRowJsonCoder;
import org.apache.beam.sdk.options.PipelineOptionsFactory;
import org.apache.beam.sdk.options.ValueProvider;
public class MToBQ {
private static ValueProvider<String> maybeDecrypt(
ValueProvider<String> unencryptedValue, ValueProvider<String> kmsKey) {
return new KMSEncryptedNestedValueProvider(unencryptedValue, kmsKey);
}
public static void main(String[] args) {
JdbcConverters.JdbcToBigQueryOptions options =
PipelineOptionsFactory.fromArgs(args)
.withValidation()
.as(JdbcConverters.JdbcToBigQueryOptions.class);
run(options);
}
private static PipelineResult run(JdbcConverters.JdbcToBigQueryOptions options) {
Pipeline pipeline = Pipeline.create(options);
pipeline
.apply(
"source",
DynamicJdbcIOMiles.<TableRow>read()
.withDataSourceConfiguration(
DynamicJdbcIOMiles.DynamicDataSourceConfiguration.create(
options.getDriverClassName(),
maybeDecrypt(options.getConnectionURL(), options.getKMSEncryptionKey()))
.withUsername(
maybeDecrypt(options.getUsername(), options.getKMSEncryptionKey()))
.withPassword(
maybeDecrypt(options.getPassword(), options.getKMSEncryptionKey()))
.withDriverJars(options.getDriverJars())
.withConnectionProperties(options.getConnectionProperties()))
.withQuery("select * from abcc")
.withCoder(TableRowJsonCoder.of())
.withRowMapper(JdbcConverters.getResultSetToTableRow()))
.apply(
"Target",
BigQueryIO.writeTableRows()
.withoutValidation()
.withCreateDisposition(BigQueryIO.Write.CreateDisposition.CREATE_NEVER)
.withWriteDisposition(BigQueryIO.Write.WriteDisposition.WRITE_APPEND)
.withCustomGcsTempLocation(options.getBigQueryLoadingTemporaryDirectory())
.to("dev-27:staging.STG_ABC"));
pipeline
.apply(
"SOURCE",
DynamicJdbcIOMiles.<TableRow>read()
.withDataSourceConfiguration(
DynamicJdbcIOMiles.DynamicDataSourceConfiguration.create(
options.getDriverClassName(),
maybeDecrypt(options.getConnectionURL(), options.getKMSEncryptionKey()))
.withUsername(
maybeDecrypt(options.getUsername(), options.getKMSEncryptionKey()))
.withPassword(
maybeDecrypt(options.getPassword(), options.getKMSEncryptionKey()))
.withDriverJars(options.getDriverJars())
.withConnectionProperties(options.getConnectionProperties()))
.withQuery("SELECT * FROM XYZ")
.withCoder(TableRowJsonCoder.of())
.withRowMapper(JdbcConverters.getResultSetToTableRow()))
.apply(
"TARGET",
BigQueryIO.writeTableRows()
.withoutValidation()
.withCreateDisposition(BigQueryIO.Write.CreateDisposition.CREATE_NEVER)
.withWriteDisposition(BigQueryIO.Write.WriteDisposition.WRITE_APPEND)
.withCustomGcsTempLocation(options.getBigQueryLoadingTemporaryDirectory())
.to("dev-27:staging.STG_XYZ"));
return pipeline.run();
}
}
如果表中数据较少,则运行成功。如果数据以百万为单位,则会抛出如下错误
org.apache.beam.runners.dataflow.worker.DataflowBatchWorkerHarness$WorkerThread.call(DataflowBatchWorkerHarness.java:107)
at java.base/java.util.concurrent.FutureTask.run(FutureTask.java:264)
at java.base/java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1128)
at java.base/java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:628)
at java.base/java.lang.Thread.run(Thread.java:834)
Caused by: java.util.concurrent.ExecutionException: java.io.IOException: DEADLINE_EXCEEDED: (g)RPC timed out
要编译并运行带有参数的 Java 类的 main 方法,我正在执行以下命令。
mvn compile exec:java \
-Dexec.mainClass=com.google.cloud.teleport.templates.MToBQ \
-Dexec.cleanupDaemonThreads=false \
-Dexec.args=" \
--project= dev-27 \
--region=australia-southeast1 \
--workerMachineType=n1-highmem-8 \
--workerDiskType=compute.googleapis.com/projects/dev-27/zones/australia-southeast1-c/diskTypes/pd-ssd \
--diskSizeGb=50 \
--stagingLocation=gs://dev-dataset/Data/stagingCustomDataFlow/MToBQ \
--tempLocation=gs://dev-dataset/Data/temp \
--templateLocation=gs://dev-dataset/Data/templatesCustomDataFlow/MToBQ/MToBQ.json \
--experiments=upload_graph \
--runner=DataflowRunner" **
请告诉我我的说法是否正确。正确的参数是什么?数据流可以并行执行多个管道吗?
我认为它会抛出错误,因为它尝试可能使用单线程连接访问源(当大数据集进入时,这会阻塞),我不确定您如何配置源读取,但建议您使其并行。这将在多个线程中从源读取这些数据集,从而也可以扩展。