spark-streaming 相关问题

Spark Streaming是核心Apache Spark API的扩展,可实现实时数据流的高吞吐量,容错流处理。从版本1.3.0开始,即使遇到故障,它也支持一次性处理语义。

Spark Kafka Producer抛出太多打开的文件 Exception

我正在尝试运行一个用Java编写的Spark Kafka Job,以产生大约10K记录,每批到一个Kafka Topic。这是一个Spark批处理作业,它读取100个(共100万条记录)hdfs部分文件... ...

回答 1 投票 0

什么时候Kafka连接器比Spark流媒体解决方案更受欢迎?

通过Spark流,我可以读取Kafka消息,并将数据写入不同类型的表,例如HBase、Hive和Kudu。但这也可以通过在这些表上使用Kafka连接器来实现。我的...

回答 1 投票 0


在构建路径中找到的scala库的版本与scala IDE提供的版本不兼容。

我正在写一个spark流媒体应用程序,我使用spark汇编jar.但我得到以下错误。在构建路径中找到的scala库的版本不兼容。

回答 1 投票 0

等价于withWatermark火花流的SQL查询。

我有一个方法,它需要spark sql查询作为参数,运行在流数据集上,我必须处理窗口函数和withWatermark。 窗口函数似乎是可能的,但我无法找到...

回答 0 投票 0

在SparkStreaming中暂停和恢复KafkaConsumer的工作。

:) 我已经结束了自己在一个(奇怪的)情况下,简单地说,我不想从Kafka消费任何新的记录,所以暂停sparkStreaming消费(InputDStream[ConsumerRecord])的所有......

回答 1 投票 0

我们如何在Spark结构化流中管理偏移量?

背景:我编写了一个简单的Spark结构化蒸汽应用程序,用于将数据从Kafka移至S3。发现为了支持一次保证,spark创建了_spark_metadata文件夹,该文件夹以...

回答 1 投票 0

将Spark结构化的流数据帧传递给函数

我从源卡夫卡读取了火花结构化的流数据帧。我想将此数据帧传递给函数,并将该函数的结果写入某个目标。案例类JsonSchema(...

回答 1 投票 0

[sbt项目使用kafka进行流式传输

我陷入了下面的问题,我能够从Kafka主题中提取数据以引发数据流,但是当我将RDD流连接到Dataset [String]并转储结果数据(经过一些处理之后……)>

回答 1 投票 1

使用Apache Spark流的实时日志处理

我想创建一个可以实时读取日志并使用apache spark处理它的系统。我是否应该使用类似kafka或水槽的东西将日志传递到火花流,还是应该...

回答 3 投票 9

将流XML转换为Spark中的JSON

我是Spark的新手,正在研究一个简单的应用程序,将从Kafka接收的XML流转换为JSON格式,使用:Spark 2.4.5 Scala 2.11.12在我的用例中,kafka流是xml格式)。 ...

回答 1 投票 0

PySpark:将Spark数据帧写入Kafka主题

正在尝试将数据帧加载到Kafka主题。选择键和值时出错。任何建议都会有所帮助。下面是我的代码,data = spark.sql('select * from job')kafka = data ....

回答 1 投票 0

找不到参考'Kafka'导入KafkaUtils

[正在将Kafka主题流式传输到Spark中,但是在从pyspark导入KafkaUtils导入sys时遇到问题,从pyspark导入SparkContext从pyspark.streaming导入...导入SparkConf ...

回答 1 投票 0

我该如何进行转换?

env:spark2.4.5 source.json:{“ a_key”:“ 1”,“ a_pro”:“ 2”,“ a_con”:“ 3”,“ b_key”:“ 4”,“ b_pro”:“ 5 “,” b_con“:” 6“,” c_key“:” 7“,” c_pro“:” 8“,” c_con“:” 9“,.....

回答 1 投票 0

使用Spark SQL流时缺少Avro自定义标题

在将Avro GenericRecord发送到Kafka之前,像这样插入标头。 ProducerRecord record = new ProducerRecord <>(topicName,key,message); record.headers()。add(“ ...

回答 1 投票 1

无法使用spark将结果写入kafka主题

我的最终目标是在处理的批次中写出汇总数据并将其读取到新的Kafka主题。我遵循了官方文档和其他几篇文章,但是没有运气。我会...

回答 1 投票 0

如何重命名现有列在数组中添加新列?

env:saprk-2.4.5 source.json {“ group”:“ 1”,“ name”:“ badboi”,“ rank”:“ 3”,“ fellows”:[{“ name”:“ David” ,“年龄”:“ 25”,“爱好”:“代码” ...

回答 1 投票 0

如何解决java.lang.NoSuchMethodError:org.apache.kafka.clients.producer.KafkaProducer.flush()V错误在pyspark中

我从Kafka主题中读取了一些消息,并且对于每个rdd,都会执行proccess_rdds函数。 def spark_streaming_online():conf = SparkConf()。setMaster(“ spark:// antonis:7077”).setAppName(“ ...

回答 1 投票 0

如何将Spark Stream数据保存到文件中

我是Spark的新手,目前正在解决与在上下文时间过后将Spark Stream的结果保存到文件有关的问题。所以问题是:我希望查询运行60秒并保存所有输入...

回答 1 投票 0

将writeStream放电到kafka-awaitTermination()与awaitAnyTermination()之间的差异

根据官方文档,我使用下面的代码段来编写kafka主题,但未将其写入kafka。 finalStream = final \ .writeStream \ .format(“ kafka”)\ .option(“ ...

回答 1 投票 0

最新问题
© www.soinside.com 2019 - 2024. All rights reserved.