我在运行这个时遇到了一些问题:
import com.google.api.services.bigquery.model.TableRow
import com.google.cloud.bigquery.*
import org.apache.beam.runners.dataflow.DataflowRunner
import org.apache.beam.sdk.Pipeline
import org.apache.beam.sdk.io.gcp.bigquery.BigQueryHelpers
import org.apache.beam.sdk.io.gcp.bigquery.BigQueryIO
import org.apache.beam.sdk.io.kafka.KafkaIO
import org.apache.beam.sdk.options.Description
import org.apache.beam.sdk.options.PipelineOptions
import org.apache.beam.sdk.options.PipelineOptionsFactory
import org.apache.beam.sdk.options.Validation
import org.apache.beam.sdk.transforms.MapElements
import org.apache.beam.sdk.transforms.SerializableFunction
import org.apache.beam.sdk.values.TypeDescriptor
import org.apache.kafka.common.serialization.ByteArrayDeserializer
import org.joda.time.Duration
interface KafkaToBigQueryOptions : PipelineOptions {
@get:Description("mytopic")
@get:Validation.Required
var inputTopic: String
@get:Description("mytable")
@get:Validation.Required
var outputTable: String
@get:Description("mydataset")
@get:Validation.Required
var outputDataset: String
@get:Description("my-project")
@get:Validation.Required
var outputProject: String
}
fun main(args: Array<String>) {
val options = PipelineOptionsFactory.fromArgs(*args)
.withValidation()
.`as`(KafkaToBigQueryOptions::class.java)
options.runner = DataflowRunner::class.java
val pipeline = Pipeline.create(options)
val kafkaReadOptions = KafkaIO.read<ByteArray, ByteArray>()
.withBootstrapServers("mykafka-server.local:9092")
.withTopic(options.inputTopic)
.withKeyDeserializer(ByteArrayDeserializer::class.java)
.withValueDeserializer(ByteArrayDeserializer::class.java)
.withDynamicRead(Duration.standardSeconds(5))
val bigQueryTable = BigQueryHelpers.parseTableSpec(
"${options.outputProject}:${options.outputDataset}.${options.outputTable}"
)
pipeline.apply("Read from Kafka", kafkaReadOptions)
.apply("Transform to TableRow", MapElements.into(TypeDescriptor.of(TableRow::class.java))
.via(SerializableFunction { value ->
val row = TableRow()
row.set("message", value.toString())
row
}))
.apply("Write to BigQuery", BigQueryIO.writeTableRows()
.to(bigQueryTable)
.withCreateDisposition(BigQueryIO.Write.CreateDisposition.CREATE_IF_NEEDED)
.withWriteDisposition(BigQueryIO.Write.WriteDisposition.WRITE_APPEND))
pipeline.run().waitUntilFinish()
}
当我从 IntelliJ 运行它时,出现以下错误:
即使我在 options.inputTopic = "mytopic" 中声明输入主题,也会出现同样的错误
SLF4J: Failed to load class "org.slf4j.impl.StaticLoggerBinder". SLF4J: Defaulting to no-operation (NOP) logger implementation SLF4J: See http://www.slf4j.org/codes.html#StaticLoggerBinder for further details. Exception in thread "main" java.lang.IllegalArgumentException: Missing required value for [--inputTopic, "mytopic"]. at org.apache.beam.vendor.guava.v26_0_jre.com.google.common.base.Preconditions.checkArgument(Preconditions.java:440) at org.apache.beam.sdk.options.PipelineOptionsValidator.validate(PipelineOptionsValidator.java:93) at org.apache.beam.sdk.options.PipelineOptionsValidator.validateCli(PipelineOptionsValidator.java:65) at org.apache.beam.sdk.options.PipelineOptionsFactory$Builder.as(PipelineOptionsFactory.java:346) at MainKt.main(Main.kt:39)
重建.jar,用命令运行它:
java -jar jarfile.jar --experiments=beam_fn_api --runner=DataflowRunner --project=myproject --region=us-central1 --inputTopic=mytopic --outputTable=mytable --outputDataset=mydataset --outputProject=myotherproject
错误:
SLF4J:加载类“org.slf4j.impl.StaticLoggerBinder”失败。 SLF4J:默认为无操作 (NOP) 记录器实现 SLF4J:有关详细信息,请参阅http://www.slf4j.org/codes.html#StaticLoggerBinder。 线程“main”中的异常 java.lang.IllegalArgumentException:未知的“runner”指定了“DataflowRunner”,支持的管道运行器 []