我正在尝试使用 Spark 流处理来自 Kinesis 流的数据。 我使用 https://spark.apache.org/docs/latest/streaming-kinesis-integration.html 作为编写代码的参考
object TimeAndLanFixer {
def main(args: Array[String]): Unit = {
val conf = new SparkConf().setMaster("local[6]").setAppName("LangAndTimeZoneFixer")
val ssc = new StreamingContext(conf, Seconds(3))
val time = System.currentTimeMillis()
println(s"Current time in millis: $time")
val lang = System.getProperty("user.language")
println(s"Current language: $lang")
val initialPositionTimestamp = "2024-06-27 00:00:00"
val date: Date = new SimpleDateFormat("yyyy-MM-dd HH:mm:ss").parse(initialPositionTimestamp)
val kinesisStream = KinesisInputDStream.builder
.streamName(System.getenv("STREAM_NAME"))
.streamingContext(ssc)
.endpointUrl("https://kinesis.us-west-2.amazonaws.com")
.regionName("us-west-2")
.checkpointAppName("DebugApp").checkpointInterval(Seconds(10))
.storageLevel(StorageLevel.MEMORY_AND_DISK_2)
.initialPosition(new AtTimestamp(date))
.metricsEnabledDimensions(KinesisClientLibConfiguration.DEFAULT_METRICS_ENABLED_DIMENSIONS.asScala.toSet)
.build()
kinesisStream.start()
kinesisStream.foreachRDD(rdd => {
rdd.foreach(println)
})
ssc.awaitTermination()
}
}
当我运行这个时,我在 Intellij 的控制台上看不到任何输出。我只看到我在程序中添加的两个 println 。我的输入运动流有 5 个分片,根据我在互联网上阅读的一些内容,如果在本地测试,我们需要提供多于分片数量 (local[6]) 的数量来在本地处理数据。计划还没有进展
pom.xml
<dependency>
<groupId>org.apache.spark</groupId>
<artifactId>spark-core_2.12</artifactId>
<version>3.5.1</version>
<!-- <scope>provided</scope>-->
</dependency>
<dependency>
<groupId>org.apache.spark</groupId>
<artifactId>spark-streaming-kinesis-asl_2.12</artifactId>
<version>3.5.1</version>
</dependency>
缺少链接是 ssc.start()
除了 kinesis.start() 之外,你还需要调用 ssc.start() ,这似乎很奇怪
面临很多年轻的 GEN GC 问题,但那是其他帖子的事了
问候 巴努
问候 巴努