Spark Streaming是核心Apache Spark API的扩展,可实现实时数据流的高吞吐量,容错流处理。从版本1.3.0开始,即使遇到故障,它也支持一次性处理语义。
Spark Kafka Producer抛出太多打开的文件 Exception
我正在尝试运行一个用Java编写的Spark Kafka Job,以产生大约10K记录,每批到一个Kafka Topic。这是一个Spark批处理作业,它读取100个(共100万条记录)hdfs部分文件... ...
什么时候Kafka连接器比Spark流媒体解决方案更受欢迎?
通过Spark流,我可以读取Kafka消息,并将数据写入不同类型的表,例如HBase、Hive和Kudu。但这也可以通过在这些表上使用Kafka连接器来实现。我的...
我使用spark.readStream().text(
在构建路径中找到的scala库的版本与scala IDE提供的版本不兼容。
我正在写一个spark流媒体应用程序,我使用spark汇编jar.但我得到以下错误。在构建路径中找到的scala库的版本不兼容。
我有一个方法,它需要spark sql查询作为参数,运行在流数据集上,我必须处理窗口函数和withWatermark。 窗口函数似乎是可能的,但我无法找到...
在SparkStreaming中暂停和恢复KafkaConsumer的工作。
:) 我已经结束了自己在一个(奇怪的)情况下,简单地说,我不想从Kafka消费任何新的记录,所以暂停sparkStreaming消费(InputDStream[ConsumerRecord])的所有......
背景:我编写了一个简单的Spark结构化蒸汽应用程序,用于将数据从Kafka移至S3。发现为了支持一次保证,spark创建了_spark_metadata文件夹,该文件夹以...
我从源卡夫卡读取了火花结构化的流数据帧。我想将此数据帧传递给函数,并将该函数的结果写入某个目标。案例类JsonSchema(...
我陷入了下面的问题,我能够从Kafka主题中提取数据以引发数据流,但是当我将RDD流连接到Dataset [String]并转储结果数据(经过一些处理之后……)>
我想创建一个可以实时读取日志并使用apache spark处理它的系统。我是否应该使用类似kafka或水槽的东西将日志传递到火花流,还是应该...
我是Spark的新手,正在研究一个简单的应用程序,将从Kafka接收的XML流转换为JSON格式,使用:Spark 2.4.5 Scala 2.11.12在我的用例中,kafka流是xml格式)。 ...
正在尝试将数据帧加载到Kafka主题。选择键和值时出错。任何建议都会有所帮助。下面是我的代码,data = spark.sql('select * from job')kafka = data ....
[正在将Kafka主题流式传输到Spark中,但是在从pyspark导入KafkaUtils导入sys时遇到问题,从pyspark导入SparkContext从pyspark.streaming导入...导入SparkConf ...
env:spark2.4.5 source.json:{“ a_key”:“ 1”,“ a_pro”:“ 2”,“ a_con”:“ 3”,“ b_key”:“ 4”,“ b_pro”:“ 5 “,” b_con“:” 6“,” c_key“:” 7“,” c_pro“:” 8“,” c_con“:” 9“,.....
在将Avro GenericRecord发送到Kafka之前,像这样插入标头。 ProducerRecord record = new ProducerRecord <>(topicName,key,message); record.headers()。add(“ ...
我的最终目标是在处理的批次中写出汇总数据并将其读取到新的Kafka主题。我遵循了官方文档和其他几篇文章,但是没有运气。我会...
env:saprk-2.4.5 source.json {“ group”:“ 1”,“ name”:“ badboi”,“ rank”:“ 3”,“ fellows”:[{“ name”:“ David” ,“年龄”:“ 25”,“爱好”:“代码” ...
如何解决java.lang.NoSuchMethodError:org.apache.kafka.clients.producer.KafkaProducer.flush()V错误在pyspark中
我从Kafka主题中读取了一些消息,并且对于每个rdd,都会执行proccess_rdds函数。 def spark_streaming_online():conf = SparkConf()。setMaster(“ spark:// antonis:7077”).setAppName(“ ...
我是Spark的新手,目前正在解决与在上下文时间过后将Spark Stream的结果保存到文件有关的问题。所以问题是:我希望查询运行60秒并保存所有输入...
将writeStream放电到kafka-awaitTermination()与awaitAnyTermination()之间的差异
根据官方文档,我使用下面的代码段来编写kafka主题,但未将其写入kafka。 finalStream = final \ .writeStream \ .format(“ kafka”)\ .option(“ ...