spark-structured-streaming 相关问题

Spark Structured Streaming允许使用unifrom DataFrame和Dataset API处理实时数据流。

Spark Structured Streaming dropDuplicates 在重新启动应用程序后无法按预期工作

我正在连接到 NATS Jetstream 以使用 Spark Java 代码使用消息和处理。下面是代码片段 私有静态无效sparkNatsTester(){ SparkSession Spark = SparkSe...

回答 2 投票 0

java.lang.IllegalStateException:读取增量文件时出错,使用 kafka 进行 Spark 结构化流处理

我在我们的项目中使用结构化流+ Kafka 进行实时数据分析。我使用的是 Spark 2.2,kafka 0.10.2。 我在从位于

回答 2 投票 0

引起:java.lang.ClassNotFoundException:org.apache.kafka.common.serialization.ByteArraySerializer

我正在使用 kafka 和 cassandra 进行 Spark 结构化流处理,当我在命令下运行时出现错误 Spark-submit --class StreamHandler --master local[*] --packages "org.apache.spark...

回答 1 投票 0

消费MSK主题

我正在开发一个 AWS MSK 消费者项目,我希望在部署到 EMR 之前开发/测试我的应用程序。但是我不知道如何配置我的本地以从我的 MSK kafka 中使用

回答 1 投票 0

Spark 结构化流 dropDuplicates 未按预期工作

我正在连接到 NATS Jetstream 以使用 Spark Java 代码使用消息和处理。下面是代码片段 私有静态无效sparkNatsTester(){ SparkSession Spark = SparkSe...

回答 1 投票 0

如何使结构化流中的 dropDuplicates 状态过期以避免 OOM?

我想使用spark结构化流计算每天的唯一访问次数,所以我使用以下代码 .dropDuplicates("uuid") 第二天,今天维持的状态应该会消失...

回答 4 投票 0

具有 Spark 结构化流的动态过滤器

我正在开发 Spark Streaming 项目,目标是创建一个简单的应用程序,以便在数据流满足条件时通知用户(例如,当股票价格 > x 时发送通知)。 df =...

回答 1 投票 0

通过 Spark Streaming 高效读取 Kafka

我有一个应用程序,它从 Kafka 获取数据并将其保存到数据库中。我的代码如下所示: Spark.readStream .format("卡夫卡") .选项(选项) 。加载() .writeStream .触发(

回答 1 投票 0

我们可以在 PySpark 结构化流中使用 row_number() 吗?

Row_number() 函数上的 PySpark SQL 函数参考说 返回窗口分区内从 1 开始的序列号 暗示该功能仅适用于 Windows。试 df.

回答 2 投票 0

dropDuplicatesWithinWatermark 到底是如何工作的?

我有这个代码: 导入 org.apache.spark.sql.SparkSession 导入 org.apache.spark.sql.execution.streaming.MemoryStream 导入 org.apache.spark.sql.streaming.{OutputMode, StreamingQueryListener, Tr...

回答 1 投票 0

检查点后未使用新的spark.sql.shuffle.partitions值

我有一个 Spark 的结构化流应用程序,带有检查点以在 parquet 中写入输出并使用默认的 Spark.sql.shuffle.partitions = 200。我需要更改随机分区,但是...

回答 2 投票 0

Databricks Autoloader Schema Evolution 抛出 StateSchemaNotCompatible 异常

我正在尝试将 Databricks Autoloader 用于一个非常简单的用例: 从 S3 读取 JSON 并将其加载到增量表中,并进行模式推断和演化。 这是我的代码: 自我火花\ ...

回答 1 投票 0

当 Spark Streaming 查询进行数据摄取时,对增量表运行 VACUUM 和 DELETE 是否安全

我有一个 24/7 Spark 结构化流查询(Kafka 作为源),它将数据附加到增量表。 定期对不同的相同增量表运行 VACUUM 和 DELETE 是否安全

回答 1 投票 0

当 Spark Streaming 查询进行数据摄取时,对增量表运行 VACCUM 和 DELETE 是否安全

我有一个 24/7 Spark 结构化流查询(Kafka 作为源),它将数据附加到增量表。 定期对不同的相同增量表运行 VACUUM 和 DELETE 是否安全

回答 1 投票 0

使用托管身份访问 Spark Streaming 中的 Eventhub

我有一个要求,我需要访问 Spark Streaming(天蓝色数据砖)中的 eventhub 流,但使用托管身份验证。目前我在所有BL中只看到SAS密钥访问...

回答 1 投票 0

Azure databricks 自动加载器 Spark Streaming 无法读取输入文件

我已经使用自动加载器功能设置了流作业,输入位于 azure adls gen2 中,采用 parquet 格式。下面是代码。 df = Spark.readStream.format("cloudFiles")\ .选择...

回答 1 投票 0

delta mergeSchema 无法将 MemoryStream 与 Spark 检查点结合使用

我正在使用 Spark 的 MemoryStream 测试 DeltaWriter 类来创建流(而不是 readStream),并且我想使用选项“mergeSchema”将结果作为增量文件写入 s3 上:...

回答 1 投票 0

从结构流式json数据创建分区列

我是结构化流媒体新手,希望根据 json 消息中的日期列创建分区列。 这是示例消息: {“日期”:“2022-03-01”,“c...

回答 1 投票 0

将数据从增量表写入事件中心的流作业在数据块中失败 - 超时问题

这里是我的代码,用于将数据从增量表写入事件中心(消费者团队将从这里消费数据): 导入 org.apache.spark.eventhubs._ 导入 org.apache.spark.sql.streaming.Trigger._...

回答 1 投票 0

Trigger.AvailableNow 用于 PySpark (Databricks) 中的 Delta 源流查询

Databricks 文档中的所有示例均采用 Scala 语言。无法从 PySpark 找到如何使用此触发器类型。是否有等效的 API 或解决方法?

回答 2 投票 0

© www.soinside.com 2019 - 2024. All rights reserved.