当我想将未绑定的输入从Kafka加载到BigQuery时,我遇到了.withMethod()
选项。通过使用Method.FILE_LOAD,我还必须指定触发频率以及非零numFileShards。
我的问题是:
/**Control how many file shards are written when using BigQuery load jobs. Applicable only when also setting {@link/#withTriggeringFrequency}. The default value is 1000.*/ @Experimental public Write<T> withNumFileShards(int numFileShards) { checkArgument(numFileShards > 0, "numFileShards must be > 0, but was: %s", numFileShards); return toBuilder().setNumFileShards(numFileShards).build(); }
我没有设置NumFileShards时得到的异常:
Exception in thread "main" java.lang.IllegalArgumentException
at com.google.common.base.Preconditions.checkArgument(Preconditions.java:108)
at org.apache.beam.sdk.io.gcp.bigquery.BatchLoads.expandTriggered(BatchLoads.java:212)
at org.apache.beam.sdk.io.gcp.bigquery.BatchLoads.expand(BatchLoads.java:557)
at org.apache.beam.sdk.io.gcp.bigquery.BatchLoads.expand(BatchLoads.java:79)
at org.apache.beam.sdk.Pipeline.applyInternal(Pipeline.java:537)
at org.apache.beam.sdk.Pipeline.applyTransform(Pipeline.java:471)
at org.apache.beam.sdk.values.PCollection.apply(PCollection.java:325)
at org.apache.beam.sdk.io.gcp.bigquery.BigQueryIO$Write.expandTyped(BigQueryIO.java:1656)
at org.apache.beam.sdk.io.gcp.bigquery.BigQueryIO$Write.expand(BigQueryIO.java:1602)
at org.apache.beam.sdk.io.gcp.bigquery.BigQueryIO$Write.expand(BigQueryIO.java:1068)
at org.apache.beam.sdk.Pipeline.applyInternal(Pipeline.java:537)
at org.apache.beam.sdk.Pipeline.applyTransform(Pipeline.java:488)
at org.apache.beam.sdk.values.PCollection.apply(PCollection.java:338)
at come.geotab.bigdata.streaming.mapenrichedgps.MainApplication.main(MainApplication.java:119)
将数据写入BigQuery可以以不同的方式工作。 FILE_LOAD
意味着Beam会将您的窗口化PCollection
写入Avro文件,然后它将触发BigQuery作业以导入这些文件的内容。
文件分片的数量控制将PCollection
写入的文件数量,从而控制BQ导入作业的并行度。
希望有所帮助!