Spark Streaming是核心Apache Spark API的扩展,可实现实时数据流的高吞吐量,容错流处理。从版本1.3.0开始,即使遇到故障,它也支持一次性处理语义。
在 Delta Lake House 中读取文件时出现问题 - 数据流
我正在独自用数据块建造我的第一个三角洲湖房子。我需要读取 AWS 中存储桶中 csv 格式的文件,我可以通过执行 display(dbutils.fs.ls.Howe...
如何通过 pyspark 在 s3 存储桶上写入数据帧但不使用 Hadoop
我想通过 pyspark 直接在 s3 存储桶上写入数据帧,但不想使用 Hadoop。 python 或 pyspark 代码中不需要任何 Hadoop 单词。 从 pyspark.sql 导入 SparkSession
我有一个流应用程序(用spark/storm/任何无关紧要的东西编写)。 Kafka 用作流事件的来源。现在有些事件需要占用更大的资源(ti...
我面临这个问题,我的 Spark Streaming 作业在运行几天后不断失败,并出现以下错误: appattempt_1610108774021_0354_000001 的 AM 容器已退出,exitCode:-104 失败...
对于 Delta 或 Iceberg 使用哪种基于时间的分区策略
我正在使用 Spark-streaming 每 5 分钟摄取实时事件流并附加到 delta 或 apache 冰山表中。该表可以由下游数据管道摄取和处理,也可以...
pyspark .writeStream.format("memory") 未检测到现有数据
我正在测试 pyspark 流,仅使用从文件夹读取的 csv 文件。 当我使用 writeStream.format("console") 时,查询会立即检测到我的 csv 文件并将其输出到
我正在使用 Spark-streaming 每 5 分钟摄取一次实时事件流并附加到增量实时表中。该表可以由下游数据管道摄取和处理,也可以直接...
所以假设我有巨大的数据帧,并且我使用 pyspark 中的“左”连接来加入它们。现在加入他们后,我发现一些列名称由于相同的列被重复而被重复...
我在azure datalake中加载了多个表(每个表的csv文件),并且想使用自动加载器加载Databricks Delta表中的每个表。 我有一个 python 代码,我使用 for 循环来
我正在尝试将结构化流结果输出到控制台: .writeStream \ .outputMode("追加") \ .format(“控制台”) \ 。开始() 输出表如下所示: +-----------------...
如果 Spark 应用程序代码有任何更改,Spark Streaming 检查点将无法工作...所以我想将状态信息显式保存到像 cassandra 这样的外部数据库中。 如何冲洗火花
使用 Abris 从 Kafka 读取 Spark 流,最新的架构注册表不会同步
我有一个 Spark 流,它从 kafka avro 消息中读取并根据最新版本的架构生成数据帧。我正在使用 abris 来做到这一点,它看起来像这样, 导入 za.co.absa.abris.
我可以在结构化流中创建多个批次,而其中只有一个包含连续数据的文件吗? 我已经使用多个输入文件创建了多个批次。但是,现在我想生成多个
我正在独立集群上运行我的 Spark 连续结构化流应用程序。但是我注意到平均输入/秒或平均进程/秒等指标没有在结构上显示(作为 NaN)...
Spark Streaming 不执行 foreach 中的代码行
关于 Spark Streaming 的快速问题。 我正在将 KafkaUtils 中的 createDirectStream 初始化为流,并将其保存为 Spark-streaming 中的 InputDStream,如下所示。 val 流:InputDStream[ConsumerRecord[
Databricks Delta Live 表只是在 CDC 和 SCD 之后覆盖吗?
您好 Databricks 社区, 目前我面临以下问题,我正在尝试为此找到一个好的解决方案。我使用 DLT 开发具有多跳架构的管道。 用于摄取
从 Spark 数据集中检索字符串类型列作为字符串变量,以将其作为 Redis 缓存的“键”传递
我正在尝试使用 Spark Streaming 从 kafka 主题读取数据。 来自 kafka 的消息是一个 JSON,我将其作为字符串存储在下面的数据集的值列中。 示例消息:只是一个
我在Databricks中编写了一个spark结构化流。第一段代码是检查我的实体是否存在增量表。如果没有,则创建增量表。在这里,我想我们...
我有一个 Spark 流应用程序。它需要一批记录并对记录执行多个映射函数。 当少数记录在 .map 阶段失败时,我希望能够知道原始 id/re...
我正在尝试使用 Spark Stream 将聚合数据流式传输到 Azure Cosmos DB。示例 Spark Stream 应用程序从 n/w 控制台获取输入,然后对其应用聚合并尝试编写...