Spark Streaming是核心Apache Spark API的扩展,可实现实时数据流的高吞吐量,容错流处理。从版本1.3.0开始,即使遇到故障,它也支持一次性处理语义。
如何在给定基本路径的情况下检查文件是否存在。我正在为该方法提供文件列表,例如:file1.snappy,file2,snappy,...我需要检查文件是否存在于给定的任何一个中...
假设我有一个流数据帧A和一个较大的静态数据帧B。假定A的大小通常<10000个记录。但是,B是一个更大的数据帧,大小在数百万范围内。 ...
在我的应用程序中,将为用户执行的每个操作生成事件,并使用以下格式的数据生成事件-user_id | step_num | event_timestamp这些命令的顺序...
我正在从Kafka源中读取流数据,但是来自kafka的所有数据都是在一个微型批次中读取的。 spark.readStream.format(“ kafka”)。option(“ kafka.bootstrap.servers”,bootstrap_servers).option(...
Spark 2.1 + Kafka 0.10 + Spark流。批处理持续时间为30秒。我有13个节点,2个代理,并且每个主题/分区的每个执行者使用1个核心。 LocationStrategy为PreferConsistent。消费1 ...
Kafka出现火花流问题:无法从具有现有数据的主题中读取数据
我正在尝试通过流式传输向Kafka经纪人阅读,但是我遇到了一些问题。 def spark_streaming_from_STABLE_kafka_topic():conf = SparkConf()。setMaster(“ spark:// antonis-dell:7077”)....
我正在尝试通过流式传输向Kafka经纪人阅读,但是我遇到了一些问题。 def spark_streaming_from_STABLE_kafka_topic():conf = SparkConf()。setMaster(“ spark:// antonis-dell:7077”)....
Spark Streaming Reuse Physical Plan
我们有一个Spark Streaming应用程序,它对传入的数据流执行一些繁重的状态计算。这里的状态保存在某些存储中(HDFS / Hive / Hbase / Cassandra),并且...
Jar文件具有该类,但仍然得到java.lang.ClassNotFoundException:org.apache.kafka.clients.consumer.ConsumerRecord
我正在运行火花流作业,以使用直接方法从kafka消费(对于kafka 0.1.0或更高版本)。使用maven-assembly-plugin构建POM文件,并使用jar tf&...
我有一个这样的类:public class Test {private static String name;公共静态字符串getName(){返回名称; } public static void setName(String name){...
带有火花流的Kafka引发错误:从pyspark.streaming.kafka导入KafkaUtils ImportError:没有名为kafka的模块,我已经使用...]设置了kafka代理和可工作的spark环境,] ...] >> [[
package com.scala.sparkStreaming import org.apache.spark._ import org.apache.spark.streaming._ object Demo1 {def main(assdf:Array [String]){val sc = new SparkContext(“ local”, “ Stream”)val ...
我删除了我的spark流的检查点目录。现在,没有错误,但是流不拾取任何文件。我该如何解决我的愚蠢错误? :)我试图创建一个新的...
我有两个笔记本。第一个笔记本正在使用tweepy从Twitter阅读推文并将其写入套接字。其他笔记本正在使用火花结构化流(Python)从该套接字读取推文...
使用Azure Databricks中的火花流将数据加载到天蓝色的blob中
我正在Azure Databricks中尝试此代码:jsonSchema = StructType([StructField(“ time”,...
我怎么知道谁在Spark Streaming程序中调用System.gc()?
我的火花流程序中GC时间太长。在GC日志中,我发现有人在程序中调用了System.gc()。我没有在代码中调用System.gc()。因此,调用方应为api ...
我想根据以下条件在数据框中添加新列。我的数据帧是这样的:my_string 2020 test 2020 prod 2020 dev我的条件:value1 =从...
[我已经开始了一个火花流工作,该工作流从kafka传输数据。我仅分配了两个带有15gb磁盘的工作节点进行测试。在2个小时内磁盘已满,并且这些节点的状态为...
forEachPartition有效,但mapPartition无效
我有一个Spark Streaming应用程序,该应用程序读取Kafka流并将数据插入数据库。这是代码段eventDStream.foreachRDD {(rdd,time)=> val offsetRanges = rdd ....