spark-streaming 相关问题

Spark Streaming是核心Apache Spark API的扩展,可实现实时数据流的高吞吐量,容错流处理。从版本1.3.0开始,即使遇到故障,它也支持一次性处理语义。

Spark流,Kafka和多个主题的性能不佳

Spark 2.1 + Kafka 0.10 + Spark流。批处理持续时间为30秒。我有13个节点,2个代理,并且每个主题/分区的每个执行者使用1个核心。 LocationStrategy为PreferConsistent。消费1 ...

回答 1 投票 8

Kafka出现火花流问题:无法从具有现有数据的主题中读取数据

我正在尝试通过流式传输向Kafka经纪人阅读,但是我遇到了一些问题。 def spark_streaming_from_STABLE_kafka_topic():conf = SparkConf()。setMaster(“ spark:// antonis-dell:7077”)....

回答 1 投票 0

Kafka出现火花流问题

我正在尝试通过流式传输向Kafka经纪人阅读,但是我遇到了一些问题。 def spark_streaming_from_STABLE_kafka_topic():conf = SparkConf()。setMaster(“ spark:// antonis-dell:7077”)....

回答 1 投票 0

Spark Streaming Reuse Physical Plan

我们有一个Spark Streaming应用程序,它对传入的数据流执行一些繁重的状态计算。这里的状态保存在某些存储中(HDFS / Hive / Hbase / Cassandra),并且...

回答 1 投票 0

Jar文件具有该类,但仍然得到java.lang.ClassNotFoundException:org.apache.kafka.clients.consumer.ConsumerRecord

我正在运行火花流作业,以使用直接方法从kafka消费(对于kafka 0.1.0或更高版本)。使用maven-assembly-plugin构建POM文件,并使用jar tf&...

回答 1 投票 0

如何从spark设置和获取静态变量?

我有一个这样的类:public class Test {private static String name;公共静态字符串getName(){返回名称; } public static void setName(String name){...

回答 3 投票 8

具有火花流集成功能的Kafka

带有火花流的Kafka引发错误:从pyspark.streaming.kafka导入KafkaUtils ImportError:没有名为kafka的模块,我已经使用...]设置了kafka代理和可工作的spark环境,] ...] >> [[

回答 1 投票 0

从Spark Streaming获取异常

package com.scala.sparkStreaming import org.apache.spark._ import org.apache.spark.streaming._ object Demo1 {def main(assdf:Array [String]){val sc = new SparkContext(“ local”, “ Stream”)val ...

回答 1 投票 0

已删除火花检查点目录,现在不起作用

我删除了我的spark流的检查点目录。现在,没有错误,但是流不拾取任何文件。我该如何解决我的愚蠢错误? :)我试图创建一个新的...

回答 1 投票 0

Jupyter Notebook上未显示结构化流输出

我有两个笔记本。第一个笔记本正在使用tweepy从Twitter阅读推文并将其写入套接字。其他笔记本正在使用火花结构化流(Python)从该套接字读取推文...

回答 1 投票 1

使用Azure Databricks中的火花流将数据加载到天蓝色的blob中

我正在Azure Databricks中尝试此代码:jsonSchema = StructType([StructField(“ time”,...

回答 1 投票 0

我怎么知道谁在Spark Streaming程序中调用System.gc()?

我的火花流程序中GC时间太长。在GC日志中,我发现有人在程序中调用了System.gc()。我没有在代码中调用System.gc()。因此,调用方应为api ...

回答 1 投票 0

基于具有concat值Sprak数据帧的现有列添加新列

我想根据以下条件在数据框中添加新列。我的数据帧是这样的:my_string 2020 test 2020 prod 2020 dev我的条件:value1 =从...

回答 1 投票 0

如何释放Dataproc中块池使用的空间

[我已经开始了一个火花流工作,该工作流从kafka传输数据。我仅分配了两个带有15gb磁盘的工作节点进行测试。在2个小时内磁盘已满,并且这些节点的状态为...

回答 1 投票 1

forEachPartition有效,但mapPartition无效

我有一个Spark Streaming应用程序,该应用程序读取Kafka流并将数据插入数据库。这是代码段eventDStream.foreachRDD {(rdd,time)=> val offsetRanges = rdd ....

回答 1 投票 0

当您接收每日包含所有数据(旧数据和新数据)的XML或CSV文件时,如何仅处理新数据

我每天收到一个XML或CSV文件,其中包含所有数据(旧数据和新数据)。例如,如果Yesterday.xml包含3条记录,则Today.xml包含4条记录(3条旧记录和1条新记录)。我只担心...

回答 1 投票 0

要使用spark连接到smb服务器,并在spark中加载该服务器中的文件。可以说

让我们说:我有一个类似于smb:// cluster / something /的位置,我想连接到该位置,并希望将文件从该文件夹加载到spark中。有什么可能的办法,我可以做或Spark ...

回答 2 投票 0

Spark分区

需要重新分区数据。如何确定Spark中的分区大小。分区概念是否适用于Spark流和结构化流。 DF.repartition(num)

回答 1 投票 0

带有Schema的Kafka JSON数据在PySpark结构化流中为空。新架构上的输入不匹配

我正在尝试在Spark结构化流媒体中读取JSON中的Kafka消息。卡夫卡中消息的示例如下:{“ _id”:{“ $ oid”:“ 5e58f86d5afd84019c13540c”},“ Id”:8,“ ...

回答 1 投票 0

如何在单个Spark作业中提取不同的Spark数据帧

我想在Spark中编写ETL管道来处理不同的输入源,但要使用尽可能少的计算资源,并且在使用“传统” Spark ETL方法时遇到问题。我有很多...

回答 1 投票 0

© www.soinside.com 2019 - 2024. All rights reserved.