Spark Structured Streaming允许使用unifrom DataFrame和Dataset API处理实时数据流。
我们有 Delta Lake 表,其中使用结构化流填充数据。我试图了解 Delta Time Traveling 在结构化流检查点方面的工作原理。 让我们...
摄取 avro 文件时的 Schema Evolution 是由 Event Hub Capture 编写的吗?
我有一个双跳摄取管道,如下所示: 数据生产者 - 事件 - > Azure 事件中心 - 事件中心捕获 - > Avro 格式的 ADLS - 自动加载器 - > Databricks UC 托管增量表 当使用...
使用 RocksDBStateStoreProvider 时 Spark 结构化流 StateStore 异常
我正在开发一个具有流-流连接逻辑的 Spark 结构化流应用程序,利用 RocksDB 作为存储状态的骨干。该设置包括 Kafka 作为我的压力源...
如何转换包含 json 对象数组的输入 json 字符串中的数据
第一次发帖,用Scala学习spark编码: 我有一个输入 json 字符串,其中包含 json 字符串数组。例如 { “交货证明”: { "POD_TYPE_CD": "...
我想知道为什么我的 Spark 流应用程序的每第四批都会出现巨大的峰值。 一些细节 这是使用rocksdb状态存储的安全处理 从 Kafka 读取 180 个分区 写...
inferSchema=true 不适用于 Spark 结构化流读取 csv 文件
我收到错误消息 java.lang.IllegalArgumentException:创建流源 DataFrame 时必须指定架构。如果目录中已经存在某些文件,则取决于...
在结构化流 API 中跨多个集群使用共享 Kafka 主题执行 Spark 作业
我正在开发一个 Spark 项目,我需要在两个不同的集群上运行作业,两个集群都使用相同的 Kafka 主题。我希望这些作业能够有效地共享负载并平衡
Apache Spark Structured Streaming 中 Spark UI 上的查询和阶段卡住了
我在 EMR 集群 (6.14) 上使用 Apache Spark Structured Streaming (3.1.2)。 Spark 结构化流将数据从 Apache Kafka 流式传输到 Delta Lake 表。当我打开 Spark UI 时,我看到以下内容
写入 cassandra 时从 Spark 结构化流数据帧中过滤错误记录
我知道我的 Spark Scala 数据帧的第 n 行存在一些问题(假设数据类型不正确)。当我尝试使用 Spark 结构化流在 cassandra 中写入此数据帧时,它失败了......
是否可以使用Kinesis流作为Spark结构化流的数据源?我找不到任何可用的连接器。
将可变数量的参数传递给 pyspark.sql.functions.udf
我正在使用 pyspark 创建一个 Spark 结构化流应用程序,并希望将每一行的数据输出为 json 数据包。我正在使用 udf 进行此操作,如下所示。 来自 pyspark.sql.functi...
Apache Spark 结构化流 - 检查点和预写日志所需的简单解释
我是数据工程新手。谁能简单解释一下 Apache Spark 结构化流文档中所谓的“检查点”和“预写日志”是什么? 我经历了...
PySpark 中的 Union 静态数据帧与 Spark 结构化流数据帧?
有没有其他方法可以在 PySpark 中应用静态数据帧和结构化流数据帧之间的并集?
如何使用pyspark filestream读取最近一小时内上传的新文件?
我正在尝试读取目录中可用的最新文件(例如过去一小时内的新文件)并加载该数据。我正在尝试使用 pyspark 结构化流。我尝试过 Spark 的 maxFileAge 选项
Apache Spark 3.5 批处理模式结构化流中的 Kafka 偏移量问题
我正在根据 Kafka 集成指南编写一个使用 Kafka 作为源的批量查询,并希望定期提交该批处理,比如每天一次,以处理已添加的记录
我有 Spark 结构化流应用程序,可以愉快地与 Spark 3.0.1 一起使用。现在我尝试升级到 Spark 3.3.2 并收到以下异常: > 23/12/15 00:14:45 错误
我想在 Databricks 中设置集群使用警报。 例如:如果驱动程序节点的 CPU 利用率 > 80%,则发送警报。 可以为工作设置电子邮件通知,但我很感兴趣...
首先是一些背景知识,在一些业务需求将所有管道合并到一个中心位置之后,我最近不得不将管道从标准读取和写入转移到结构化流。 ...
Spark Structured Streaming dropDuplicates 在重新启动应用程序后无法按预期工作
我正在连接到 NATS Jetstream 以使用 Spark Java 代码使用消息和处理。下面是代码片段 私有静态无效sparkNatsTester(){ SparkSession Spark = SparkSe...
java.lang.IllegalStateException:读取增量文件时出错,使用 kafka 进行 Spark 结构化流处理
我在我们的项目中使用结构化流+ Kafka 进行实时数据分析。我使用的是 Spark 2.2,kafka 0.10.2。 我在从位于