spark-streaming 相关问题

Spark Streaming是核心Apache Spark API的扩展,可实现实时数据流的高吞吐量,容错流处理。从版本1.3.0开始,即使遇到故障,它也支持一次性处理语义。

检查HDFS路径[Spark Scala]中是否存在文件

如何在给定基本路径的情况下检查文件是否存在。我正在为该方法提供文件列表,例如:file1.snappy,file2,snappy,...我需要检查文件是否存在于给定的任何一个中...

回答 2 投票 0

火花流-过滤大型数据帧中不存在键的行

假设我有一个流数据帧A和一个较大的静态数据帧B。假定A的大小通常<10000个记录。但是,B是一个更大的数据帧,大小在数百万范围内。 ...

回答 1 投票 0

计算流数据集中事件之间的时间差

在我的应用程序中,将为用户执行的每个操作生成事件,并使用以下格式的数据生成事件-user_id | step_num | event_timestamp这些命令的顺序...

回答 1 投票 0

如何在一个微批量的Spark结构化流中设置批量大小

我正在从Kafka源中读取流数据,但是来自kafka的所有数据都是在一个微型批次中读取的。 spark.readStream.format(“ kafka”)。option(“ kafka.bootstrap.servers”,bootstrap_servers).option(...

回答 1 投票 0


Spark流,Kafka和多个主题的性能不佳

Spark 2.1 + Kafka 0.10 + Spark流。批处理持续时间为30秒。我有13个节点,2个代理,并且每个主题/分区的每个执行者使用1个核心。 LocationStrategy为PreferConsistent。消费1 ...

回答 1 投票 8

Kafka出现火花流问题:无法从具有现有数据的主题中读取数据

我正在尝试通过流式传输向Kafka经纪人阅读,但是我遇到了一些问题。 def spark_streaming_from_STABLE_kafka_topic():conf = SparkConf()。setMaster(“ spark:// antonis-dell:7077”)....

回答 1 投票 0

Kafka出现火花流问题

我正在尝试通过流式传输向Kafka经纪人阅读,但是我遇到了一些问题。 def spark_streaming_from_STABLE_kafka_topic():conf = SparkConf()。setMaster(“ spark:// antonis-dell:7077”)....

回答 1 投票 0

Spark Streaming Reuse Physical Plan

我们有一个Spark Streaming应用程序,它对传入的数据流执行一些繁重的状态计算。这里的状态保存在某些存储中(HDFS / Hive / Hbase / Cassandra),并且...

回答 1 投票 0

Jar文件具有该类,但仍然得到java.lang.ClassNotFoundException:org.apache.kafka.clients.consumer.ConsumerRecord

我正在运行火花流作业,以使用直接方法从kafka消费(对于kafka 0.1.0或更高版本)。使用maven-assembly-plugin构建POM文件,并使用jar tf&...

回答 1 投票 0

如何从spark设置和获取静态变量?

我有一个这样的类:public class Test {private static String name;公共静态字符串getName(){返回名称; } public static void setName(String name){...

回答 3 投票 8

具有火花流集成功能的Kafka

带有火花流的Kafka引发错误:从pyspark.streaming.kafka导入KafkaUtils ImportError:没有名为kafka的模块,我已经使用...]设置了kafka代理和可工作的spark环境,] ...] >> [[

回答 1 投票 0

从Spark Streaming获取异常

package com.scala.sparkStreaming import org.apache.spark._ import org.apache.spark.streaming._ object Demo1 {def main(assdf:Array [String]){val sc = new SparkContext(“ local”, “ Stream”)val ...

回答 1 投票 0

已删除火花检查点目录,现在不起作用

我删除了我的spark流的检查点目录。现在,没有错误,但是流不拾取任何文件。我该如何解决我的愚蠢错误? :)我试图创建一个新的...

回答 1 投票 0

Jupyter Notebook上未显示结构化流输出

我有两个笔记本。第一个笔记本正在使用tweepy从Twitter阅读推文并将其写入套接字。其他笔记本正在使用火花结构化流(Python)从该套接字读取推文...

回答 1 投票 1

使用Azure Databricks中的火花流将数据加载到天蓝色的blob中

我正在Azure Databricks中尝试此代码:jsonSchema = StructType([StructField(“ time”,...

回答 1 投票 0

我怎么知道谁在Spark Streaming程序中调用System.gc()?

我的火花流程序中GC时间太长。在GC日志中,我发现有人在程序中调用了System.gc()。我没有在代码中调用System.gc()。因此,调用方应为api ...

回答 1 投票 0

基于具有concat值Sprak数据帧的现有列添加新列

我想根据以下条件在数据框中添加新列。我的数据帧是这样的:my_string 2020 test 2020 prod 2020 dev我的条件:value1 =从...

回答 1 投票 0

如何释放Dataproc中块池使用的空间

[我已经开始了一个火花流工作,该工作流从kafka传输数据。我仅分配了两个带有15gb磁盘的工作节点进行测试。在2个小时内磁盘已满,并且这些节点的状态为...

回答 1 投票 1

forEachPartition有效,但mapPartition无效

我有一个Spark Streaming应用程序,该应用程序读取Kafka流并将数据插入数据库。这是代码段eventDStream.foreachRDD {(rdd,time)=> val offsetRanges = rdd ....

回答 1 投票 0

最新问题
© www.soinside.com 2019 - 2024. All rights reserved.