Spark Streaming是核心Apache Spark API的扩展,可实现实时数据流的高吞吐量,容错流处理。从版本1.3.0开始,即使遇到故障,它也支持一次性处理语义。
Spark 2.1 + Kafka 0.10 + Spark流。批处理持续时间为30秒。我有13个节点,2个代理,并且每个主题/分区的每个执行者使用1个核心。 LocationStrategy为PreferConsistent。消费1 ...
Kafka出现火花流问题:无法从具有现有数据的主题中读取数据
我正在尝试通过流式传输向Kafka经纪人阅读,但是我遇到了一些问题。 def spark_streaming_from_STABLE_kafka_topic():conf = SparkConf()。setMaster(“ spark:// antonis-dell:7077”)....
我正在尝试通过流式传输向Kafka经纪人阅读,但是我遇到了一些问题。 def spark_streaming_from_STABLE_kafka_topic():conf = SparkConf()。setMaster(“ spark:// antonis-dell:7077”)....
Spark Streaming Reuse Physical Plan
我们有一个Spark Streaming应用程序,它对传入的数据流执行一些繁重的状态计算。这里的状态保存在某些存储中(HDFS / Hive / Hbase / Cassandra),并且...
Jar文件具有该类,但仍然得到java.lang.ClassNotFoundException:org.apache.kafka.clients.consumer.ConsumerRecord
我正在运行火花流作业,以使用直接方法从kafka消费(对于kafka 0.1.0或更高版本)。使用maven-assembly-plugin构建POM文件,并使用jar tf&...
我有一个这样的类:public class Test {private static String name;公共静态字符串getName(){返回名称; } public static void setName(String name){...
带有火花流的Kafka引发错误:从pyspark.streaming.kafka导入KafkaUtils ImportError:没有名为kafka的模块,我已经使用...]设置了kafka代理和可工作的spark环境,] ...] >> [[
package com.scala.sparkStreaming import org.apache.spark._ import org.apache.spark.streaming._ object Demo1 {def main(assdf:Array [String]){val sc = new SparkContext(“ local”, “ Stream”)val ...
我删除了我的spark流的检查点目录。现在,没有错误,但是流不拾取任何文件。我该如何解决我的愚蠢错误? :)我试图创建一个新的...
我有两个笔记本。第一个笔记本正在使用tweepy从Twitter阅读推文并将其写入套接字。其他笔记本正在使用火花结构化流(Python)从该套接字读取推文...
使用Azure Databricks中的火花流将数据加载到天蓝色的blob中
我正在Azure Databricks中尝试此代码:jsonSchema = StructType([StructField(“ time”,...
我怎么知道谁在Spark Streaming程序中调用System.gc()?
我的火花流程序中GC时间太长。在GC日志中,我发现有人在程序中调用了System.gc()。我没有在代码中调用System.gc()。因此,调用方应为api ...
我想根据以下条件在数据框中添加新列。我的数据帧是这样的:my_string 2020 test 2020 prod 2020 dev我的条件:value1 =从...
[我已经开始了一个火花流工作,该工作流从kafka传输数据。我仅分配了两个带有15gb磁盘的工作节点进行测试。在2个小时内磁盘已满,并且这些节点的状态为...
forEachPartition有效,但mapPartition无效
我有一个Spark Streaming应用程序,该应用程序读取Kafka流并将数据插入数据库。这是代码段eventDStream.foreachRDD {(rdd,time)=> val offsetRanges = rdd ....
当您接收每日包含所有数据(旧数据和新数据)的XML或CSV文件时,如何仅处理新数据
我每天收到一个XML或CSV文件,其中包含所有数据(旧数据和新数据)。例如,如果Yesterday.xml包含3条记录,则Today.xml包含4条记录(3条旧记录和1条新记录)。我只担心...
要使用spark连接到smb服务器,并在spark中加载该服务器中的文件。可以说
让我们说:我有一个类似于smb:// cluster / something /的位置,我想连接到该位置,并希望将文件从该文件夹加载到spark中。有什么可能的办法,我可以做或Spark ...
需要重新分区数据。如何确定Spark中的分区大小。分区概念是否适用于Spark流和结构化流。 DF.repartition(num)
带有Schema的Kafka JSON数据在PySpark结构化流中为空。新架构上的输入不匹配
我正在尝试在Spark结构化流媒体中读取JSON中的Kafka消息。卡夫卡中消息的示例如下:{“ _id”:{“ $ oid”:“ 5e58f86d5afd84019c13540c”},“ Id”:8,“ ...
我想在Spark中编写ETL管道来处理不同的输入源,但要使用尽可能少的计算资源,并且在使用“传统” Spark ETL方法时遇到问题。我有很多...