spark-streaming 相关问题

Spark Streaming是核心Apache Spark API的扩展,可实现实时数据流的高吞吐量,容错流处理。从版本1.3.0开始,即使遇到故障,它也支持一次性处理语义。

在火花数据框架中找不到值&&。当比较空值时

Hi I have 2 Dataframes df1 and df2 I'm joining these 2 dataframe based on id column then create one new column as result and check below test conditions. 1. 如果名字在两个案例中是相同的......

回答 1 投票 0

在现有列的基础上增加新的列,并在Spark数据框架中加入协整值。

我想根据下面的条件在我的数据框中新建一列。我的数据框是这样的: my_string 2020 test 2020 prod 2020 dev 我的条件: value1=从......减去空格后的字符串。

回答 1 投票 0

如何使用RDD的persist和cache?

请告诉我如何使用RDD方法Persist()和Cache(),似乎对于我通常用java写的传统程序来说,比如说sparkStreaming,这是一个继续执行的DAG,其中......

回答 2 投票 2

如何将Spark Streaming检查点保存在谷歌云存储上?

我试图运行Spark结构化流作业,并将检查点保存到谷歌存储,我有一对夫妇的工作,一个和聚合工作完美,但第二个与聚合抛出异常。I ...

回答 1 投票 4

Spark Streaming找到了文件,但却声称找不到文件。

我有下面的东西--它可以监控一个目录& 每X秒拉入一次日志。我的问题是这样的。我设置脚本运行,然后在目录中创建一个文件(比如说testfile... ...

回答 1 投票 -1

根据给定的操作列创建一个新的数据集。

我使用spark-sql-2.3.1v有如下场景。给定数据集 val ds = Seq( (1, "x1", "y1", "0.1992019"), (2, null, "y2", "2.2500000"), (3, "x3", null, "15.34567"), (4, null, "y4", ...

回答 1 投票 0

根据给定的操作列创建一个新的数据集。

我使用的是spark-sql-2.3.1v,有如下方案。给定一个数据集: val ds = Seq( (1,"x1","y1","0.1992019"),(2,null,"y2","2.2500000"),(3,"x3",null,"15.34567"),(4,null,"y4"...。

回答 1 投票 1

在Spark中运行现有的生产型Java应用

我一直在阅读Spark,并且对在可扩展计算集群上分配计算的能力非常感兴趣。我们有生产流处理代码(5K行,用Java 9写的) ...

回答 1 投票 0

访问Spark流数据管道。什么方案最有效?

我正在寻找从Spark数据管道访问数据的最佳方案。场景如下。我正在从Kafka主题中读取数据,创建一个流式数据框架,然后对其进行清理和... ...

回答 1 投票 -1

从Spark Streaming中获取异常 "没有注册输出操作,所以没有执行"。

package com.scala.sparkStreaming import org.apache.spark._ import org.apache.spark.streaming._ object Demo1 { def main(assdf:Array[String]){ val sc=new SparkContext("local", "Stream") ....

回答 1 投票 0

Spark *Structured* Streaming中的RecordTooLargeException异常

我一直收到这个错误信息。当序列化时,消息是1169350字节,这比你在max.request.size配置中配置的最大请求大小要大。由于...

回答 1 投票 0

在pyspark中从本地文本文件流

conf = SparkConf().setMaster("spark:/antonis-dell:7077").setAppName("Kafka_Spark") sc = SparkContext(conf=conf) # .getOrCreate() sc.setLogLevel("WARN") ....

回答 2 投票 0

为什么Spark结构化流作业在引发异常后仍未终止?

我在我的结构化流作业中引发了一个自定义异常来测试失败,如下所示。我看到查询被终止,但不能理解为什么驱动脚本没有以非零的方式失败......。

回答 1 投票 0

使用SparkScala用JSON字段过滤RDD的csv。

我正在研究sparkscala,我需要通过一列的特定字段来过滤一个RDD,在这种情况下,用户。我想返回一个包含用户["Joe", "Plank", "Willy"]的RDD,但似乎想不通......。

回答 1 投票 0

附表火花结构化流媒体

它是以某种方式可能安排一个火花流作业只运行在特定的时间吗说从8AM到8PM?集群是在夜间运行,造成不必要的成本。我怎么能重新初始化......。

回答 1 投票 0

spark writeStream into kafka - awaitTermination()与 awaitAnyTermination()之间的区别。

根据官方文档,我使用下面的代码段写入kafka主题,但它没有写入kafka。 finalStream = final.writeStream \ .format("kafka") \ .option("kafka......")。

回答 1 投票 0

如何在控制台中看到数据框架(相当于结构化流的.show())?

我试图看到什么是作为我的数据框架... 这里是火花代码从pyspark.sql导入SparkSession导入pyspark.sql.functions作为psf导入logging导入时间火花=SparkSession ....

回答 1 投票 0

如何在Pyspark中计算或管理流数据?

我想从流媒体数据中提取数据,然后发送到网页上。例如:我想从流式数据中计算出总销售栏的总和,然后发送到网页上。我将计算流数据中TotalSales列的总和。但是在summary = dataStream.select('TotalSales')时出错......。

回答 1 投票 0

错误。在kafka中使用Spark结构化流来读写数据到另一个主题。

我正在做一个小任务,使用kafka主题读取access_logs文件,然后我计算状态,并将状态的计数发送到另一个kafka主题。但是我一直收到错误信息,比如,当我使用no ...

回答 1 投票 0

spark - 方法 错误。匿名函数的参数类型必须是完全已知的。

我知道有很多问题,但我创建了一个简单的例子,我认为应该可以工作,但仍然不行,我不确定我是否理解为什么 def main(args: Array[String]) ...

回答 1 投票 0

最新问题
© www.soinside.com 2019 - 2024. All rights reserved.