Spark Streaming是核心Apache Spark API的扩展,可实现实时数据流的高吞吐量,容错流处理。从版本1.3.0开始,即使遇到故障,它也支持一次性处理语义。
Spark Streaming - 从Kafka读取json并将json写入其他Kafka主题
我正在尝试准备Spark流媒体应用程序(Spark 2.1,Kafka 0.10)我需要从Kafka主题“输入”读取数据,找到正确的数据并将结果写入主题“输出”我可以从...读取数据
我目前正在做的事情如下:val topic =“mytopic”val zkhosts =“localhost”val zkports =“2181”在我的代码中设置然后将其发送到kafkastream函数有效,但我想......
我有Spark Scala的问题,我想在Spark流中加倍元组元素,我从kafka到dstream获取数据,我的RDD数据是这样的,(2,[2,3,4,6,5])(4 ,[2,3,4,6,5])(7,[2,3,4,6,5])......
我有一个要求,我需要从kafka主题中读取消息,对数据集进行查找,然后根据查找数据的结果发送消息。以下示例...
我有一个在EMR上运行的火花流工作,从Kafka读取消息并输出到S3。我使用emr-5.17.0,即hadoop 2.8.4,spark 2.3.1问题是shuffle文件正在积累:/ ...
Spark SCALA - 连接两个数据帧,其中一个数据帧中的连接值位于第二个数据帧中的两个字段之间
我有两个数据帧(删除与问题无关的字段):df1:org.apache.spark.sql.DataFrame = [rawValue:bigint] df2:org.apache.spark.sql.DataFrame = [startLong:bigint ,...
我试图从kafka获得一些我希望在另一个系统中向下游传播的更改事件。但是,变更指令很重要。因此我想知道这样做的恰当方法是什么......
我正在使用spark从主题kafka获取数据。我必须使用KafkaAvroDeserialaizer对avro数据进行deserialaizer。我配置kafka使用者:kafkaParams.put(“bootstrap.servers”,“10.0.4.215:9092”); ...
应用程序监听2 kafka主题userevent paymentevent Payload for userevent {“userId”:“Id_223”,“firstname”:“fname_223”,“lastname”:“lname_223”,“phonenumber”:“P98202384_223”,“usertimestamp”:“.. 。
我正在尝试使用带有spark和kafka的结构化流媒体窗口。我在非基于时间的数据上使用窗口,因此我收到此错误:'流式DataFrames /不支持非基于时间的窗口...
如何在spark结构化流媒体应用程序中优化执行程序实例的数量?
运行时YARN集群模式应用程序Spark结构化流从Kafka主题读取数据关于Kafka主题1主题,包含4个分区 - 现在。 (分区数可以更改)添加2000 ...
Spark 2.4.0是否支持具有连续处理模式的Python UDF?在我的简单代码中,我正在使用kafka主题,每行进行一些简单的处理(基本上为...添加一个虚拟字段)