spark-structured-streaming 相关问题

Spark Structured Streaming允许使用unifrom DataFrame和Dataset API处理实时数据流。

访问Spark流数据管道。什么方案最有效?

我正在寻找从Spark数据管道访问数据的最佳方案。场景如下。我正在从Kafka主题中读取数据,创建一个流式数据框架,然后对其进行清理和... ...

回答 1 投票 -1

当结构化流中使用的Spark DataFrame的底层数据在源端更新时,会发生什么?

我有一个用例,在这个用例中,我将一个流式DataFrame和一个静态DataFrame连接在一起。静态DataFrame是从一个parquet表(一个包含parquet文件的目录)中读取的。这个parquet数据是...

回答 1 投票 0

从Spark Streaming中获取异常 "没有注册输出操作,所以没有执行"。

package com.scala.sparkStreaming import org.apache.spark._ import org.apache.spark.streaming._ object Demo1 { def main(assdf:Array[String]){ val sc=new SparkContext("local", "Stream") ....

回答 1 投票 0

Spark *Structured* Streaming中的RecordTooLargeException异常

我一直收到这个错误信息。当序列化时,消息是1169350字节,这比你在max.request.size配置中配置的最大请求大小要大。由于...

回答 1 投票 0

为什么Spark结构化流作业在引发异常后仍未终止?

我在我的结构化流作业中引发了一个自定义异常来测试失败,如下所示。我看到查询被终止,但不能理解为什么驱动脚本没有以非零的方式失败......。

回答 1 投票 0

如何在大型数据框架中过滤没有键的行?

假设我有一个流数据框A和一个大的静态数据框B,假设A的大小为< 10000条记录。然而,B是一个大得多的数据帧,大小在数百万条范围内。...

回答 2 投票 0

在Spark结构化流中对createOrReplaceTempView进行查询时出现错误。

下面是我在spark结构化流中的代码,in foreachBatch df.writeStream.trigger(Trigger.ProcessingTime("10秒")).foreachBatch((batchDF: DataFrame, batchId: Long) => { ...

回答 1 投票 0

错误。在kafka中使用Spark结构化流来读写数据到另一个主题。

我正在做一个小任务,使用kafka主题读取access_logs文件,然后我计算状态,并将状态的计数发送到另一个kafka主题。但是我一直收到错误信息,比如,当我使用no ...

回答 1 投票 0

在spark结构化流媒体中,Json字符串应该作为Kafka主题消耗,而不是模式。

我需要消耗Kafka主题,它为每一行产生动态Json字符串,我不能解析没有schema的Json字符串。在我的案例中,Schema可以是动态的。有什么办法可以将Kafka topic(value)转换为......?

回答 1 投票 0

为什么我的AWS Glue开发端点在尝试从Confluent Kafka Topic (RBACLDAP)读取时,会抛出NoClassDefFoundError错误?

当我试图通过AWS Glue Endpoint运行spark流作业时,我收到了一个java.lang.NoClassDefFoundError:orgapachekafkacommonsecurityauthAuthenticateCallbackHandler错误,该错误读取......

回答 1 投票 0

Scala与Python的Spark结构化流媒体性能比较

你好~我准备用Kafka+Spark结构化流开发一个小批量的程序,但我很困惑,到底是用Python还是Scala,哪个更快?但是我很困惑,到底是用python还是scala,哪个更快。如果有什么好 ...

回答 1 投票 0

Spark结构化流使用spark-acid writeStream(带检查点)抛出org.apache.hadoop.fs.FileAlreadyExistsException。

在我们的Spark应用中,我们使用Spark结构化流。它使用Kafka作为输入流,& HiveAcid作为Hive表的writeStream。对于HiveAcid,它是开源的库,叫做spark acid,来自qubole:... ...

回答 1 投票 0

在Spark结构化流中,在createOrReplaceTempView上写查询时遇到问题。

下面是我的代码在spark结构化流内foreachBatch df.writeStream.trigger(Trigger.ProcessingTime("10秒")).foreachBatch((batchDF: DataFrame, batchId: Long)=> { batchDF......。

回答 1 投票 0


GCP: Spark结构化流+自定义PubSub源码

目前在Spark Structured Streaming中还没有对PubSub源的支持。有人在Spark Structured Streaming中写了一个自定义的源码来从PubSub中读取吗?这是个可行的方法吗?

回答 1 投票 1

使用Spark结构化流从多个Kafka主题读取并写入不同的汇的最佳方式是什么?

我正试图编写一个Spark结构化流作业,从多个Kafka主题(可能是100个)中读取,并根据主题名称将结果写入S3上的不同位置。我已经...

回答 1 投票 0

将Spark结构化的流数据帧传递给函数

我从源卡夫卡读取了火花结构化的流数据帧。我想将此数据帧传递给函数,并将该函数的结果写入某个目标。案例类JsonSchema(...

回答 1 投票 0

PySpark:连续流相同文件

我已经编写了下面的代码,它可以完美地工作。将新文件添加到目录后,将拾取数据并将其加载到数据框中。现在尝试流式传输同一文件,例如:Job.csv ...

回答 1 投票 1

[导入Pyspark Delta Lake模块时找不到模块错误

我正在用三角洲湖泊运行Pyspark,但是当我尝试导入三角洲模块时,出现ModuleNotFoundError:没有名为'delta'的模块。这是在没有互联网连接的机器上,因此我不得不...

回答 1 投票 0

如何在运行时更改流查询的输入目录?

我正在使用spark.readStream()。text( )运行流作业。是否可以根据当前日期动态更改输入目录?

回答 1 投票 0

© www.soinside.com 2019 - 2024. All rights reserved.