Spark Streaming是核心Apache Spark API的扩展,可实现实时数据流的高吞吐量,容错流处理。从版本1.3.0开始,即使遇到故障,它也支持一次性处理语义。
Twitter spark streaming:登录尝试次数过多
我正在运行这个由apache提供的示例来传输推文。但是,我无法获取任何数据,因为似乎流API不断尝试连接到Twitter,从而导致此跟踪:...
_spark_metadata为什么所有镶木地板分区文件都在0内,但集群有2个工人?
我有一个小火花簇,一个主人和两个工人。我有一个Kafka流媒体应用程序,它从Kafka传输数据并以镶木地板格式和附加模式写入目录。到目前为止我...
我正在尝试使用文件DStream读取的大型RDD。代码如下所示:val creatingFunc = {()=> val conf = new SparkConf()。setMaster(“local [10]”)...
我使用下面的代码来读取Kafka主题,并处理数据。 JavaDStream transformedMessages = messages.flatMap(record - > processData(record))。transnsform(new ...
以下是我的代码排序方式,//累加器初始化val count = new LongAccumulator sparksession.sparkContext.register(count,“count accumulator”)// Streaming Transformation val DF = ...
spark中缓存的最大限制是多少。它可以同时保存多少数据?
我想知道是否有办法在Spark Structured流中指定小批量的大小。那不仅仅是说明小批量间隔(触发器),我想说明有多少行......
如何使用直接流在Kafka Spark Streaming中指定使用者组
如何使用直接流API为kafka spark流指定使用者组ID。 HashMap中 kafkaParams =新的HashMap (); kafkaParams.put(“metadata.broker ....
我试图运行下面的代码将文件作为数据帧读取到Kafka主题(用于Spark Streaming),通过Eclipse IDE开发,使用Scala,通过运行瘦jar来适当地定义模式...
由于火花的变换被懒惰地评估,我们有什么方法可以急切地执行变换?是否需要进行任何配置更改?例如,我有......
在Spark Structured Streaming中处理二进制数据
我正在使用Kafka和Spark Structured Streaming。我收到以下格式的kafka消息。 {“deviceId”:“001”,“sNo”:1,“data”:“aaaaa”} {“deviceId”:“002”,“sNo”:1,“data”:“bbbbb”} {“deviceId” :” ...
如何在spark-submit命令中指定要使用的java版本?
我想在远程服务器上的纱线群集上运行火花流应用程序。默认的java版本是1.7,但我想使用1.8作为我的应用程序,它也在服务器中,但不是...
如何在Hive DataBase中插入Kafka Spark流式Json数据
我的Streaming JsonData就在这里:{“visibility”:“public”,“response”:“yes”,“guests”:0,“member”:{“member_id”:145170662,“photo”:“http:\ / \ /photos2.meetupstatic ....
我对spark 2.1.1和json4s.jackson有一个奇怪的问题。我从spark 1.5.1升级了我的流媒体项目。现在当我在IDE中执行代码时,一切正常。但是在组装和代码之后......
为什么即使没有新数据,Spark Streaming也会执行foreachRDD?
我有以下Spark流示例:val conf = new SparkConf()。setAppName(“Name”)。setMaster(“local”)val sc = new SparkContext(conf)val ssc = new StreamingContext(sc,Seconds(2)) val ...
在kafka ofset mgt期间面临问题的“无法访问包kafka010中的对象分配”
环境:Kafka 10,Spark 2.1我试图存储Kafka偏移外部存储。通过Apache Spark网站和一些在线研究后,我能够编写以下代码。现在......
我被告知广播变量应该是不可变的。然而,我看到了一个代码片段,其中广播变量用作标志。公共类TestBroadcast {private static ...
Java Spark:com.mongodb.spark.config.writeconfig问题
我试图通过java spark连接器与MongoDB连接,当我提交jar并在spark shell中运行jar时,我收到错误“com.mongodb.spark.config.writeconfig”。这里错误......
当使用spark-streaming时,如何通过自己保存多个分区的Kafka偏移量
我使用spark-streaming来读取kafka数据,并处理我在下面使用的每一行来创建一个流:lines = KafkaUtils.createDirectStream(jssc,LocationStrategies ....
我一直面临着关于将输出Dstream插入永久SQL表的“Spark Streaming”的问题。我想插入每个输出DStream(来自单个批处理,引发...