spark-streaming 相关问题

Spark Streaming是核心Apache Spark API的扩展,可实现实时数据流的高吞吐量,容错流处理。从版本1.3.0开始,即使遇到故障,它也支持一次性处理语义。

Twitter spark streaming:登录尝试次数过多

我正在运行这个由apache提供的示例来传输推文。但是,我无法获取任何数据,因为似乎流API不断尝试连接到Twitter,从而导致此跟踪:...

回答 1 投票 0

_spark_metadata为什么所有镶木地板分区文件都在0内,但集群有2个工人?

我有一个小火花簇,一个主人和两个工人。我有一个Kafka流媒体应用程序,它从Kafka传输数据并以镶木地板格式和附加模式写入目录。到目前为止我...

回答 1 投票 0

计算我的RDD在大型Dstream中的记录

我正在尝试使用文件DStream读取的大型RDD。代码如下所示:val creatingFunc = {()=> val conf = new SparkConf()。setMaster(“local [10]”)...

回答 2 投票 2

Kafkaconsumer对多线程访问不安全

我使用下面的代码来读取Kafka主题,并处理数据。 JavaDStream transformedMessages = messages.flatMap(record - > processData(record))。transnsform(new ...

回答 4 投票 11

如何在驱动程序中提取累加器值?

以下是我的代码排序方式,//累加器初始化val count = new LongAccumulator sparksession.sparkContext.register(count,“count accumulator”)// Streaming Transformation val DF = ...

回答 1 投票 0

默认情况下,spark的Cache内存限制是多少?

spark中缓存的最大限制是多少。它可以同时保存多少数据?

回答 3 投票 1

Spark 2.3.1结构化流输入速率

我想知道是否有办法在Spark Structured流中指定小批量的大小。那不仅仅是说明小批量间隔(触发器),我想说明有多少行......

回答 1 投票 0

如何使用直接流在Kafka Spark Streaming中指定使用者组

如何使用直接流API为kafka spark流指定使用者组ID。 HashMap中 kafkaParams =新的HashMap (); kafkaParams.put(“metadata.broker ....

回答 2 投票 3

找到csv:readStream的多个源

我试图运行下面的代码将文件作为数据帧读取到Kafka主题(用于Spark Streaming),通过Eclipse IDE开发,使用Scala,通过运行瘦jar来适当地定义模式...

回答 1 投票 0

如何热切地执行火花变换?

由于火花的变换被懒惰地评估,我们有什么方法可以急切地执行变换?是否需要进行任何配置更改?例如,我有......

回答 1 投票 1

在Spark Structured Streaming中处理二进制数据

我正在使用Kafka和Spark Structured Streaming。我收到以下格式的kafka消息。 {“deviceId”:“001”,“sNo”:1,“data”:“aaaaa”} {“deviceId”:“002”,“sNo”:1,“data”:“bbbbb”} {“deviceId” :” ...

回答 1 投票 1

如何在spark-submit命令中指定要使用的java版本?

我想在远程服务器上的纱线群集上运行火花流应用程序。默认的java版本是1.7,但我想使用1.8作为我的应用程序,它也在服务器中,但不是...

回答 4 投票 8

如何在Hive DataBase中插入Kafka Spark流式Json数据

我的Streaming JsonData就在这里:{“visibility”:“public”,“response”:“yes”,“guests”:0,“member”:{“member_id”:145170662,“photo”:“http:\ / \ /photos2.meetupstatic ....

回答 2 投票 -1

spark 2.1.1:解析的JSON值与类构造函数不匹配

我对spark 2.1.1和json4s.jackson有一个奇怪的问题。我从spark 1.5.1升级了我的流媒体项目。现在当我在IDE中执行代码时,一切正常。但是在组装和代码之后......

回答 2 投票 7

为什么即使没有新数据,Spark Streaming也会执行foreachRDD?

我有以下Spark流示例:val conf = new SparkConf()。setAppName(“Name”)。setMaster(“local”)val sc = new SparkContext(conf)val ssc = new StreamingContext(sc,Seconds(2)) val ...

回答 1 投票 1

在kafka ofset mgt期间面临问题的“无法访问包kafka010中的对象分配”

环境:Kafka 10,Spark 2.1我试图存储Kafka偏移外部存储。通过Apache Spark网站和一些在线研究后,我能够编写以下代码。现在......

回答 1 投票 0

什么时候广播变量会发生变化?

我被告知广播变量应该是不可变的。然而,我看到了一个代码片段,其中广播变量用作标志。公共类TestBroadcast {private static ...

回答 1 投票 3

Java Spark:com.mongodb.spark.config.writeconfig问题

我试图通过java spark连接器与MongoDB连接,当我提交jar并在spark shell中运行jar时,我收到错误“com.mongodb.spark.config.writeconfig”。这里错误......

回答 1 投票 1

当使用spark-streaming时,如何通过自己保存多个分区的Kafka偏移量

我使用spark-streaming来读取kafka数据,并处理我在下面使用的每一行来创建一个流:lines = KafkaUtils.createDirectStream(jssc,LocationStrategies ....

回答 1 投票 3

如何将每个DStream保存/插入永久表

我一直面临着关于将输出Dstream插入永久SQL表的“Spark Streaming”的问题。我想插入每个输出DStream(来自单个批处理,引发...

回答 2 投票 4

© www.soinside.com 2019 - 2024. All rights reserved.