Spark Streaming是核心Apache Spark API的扩展,可实现实时数据流的高吞吐量,容错流处理。从版本1.3.0开始,即使遇到故障,它也支持一次性处理语义。
java.lang.NoSuchMethodError: org.apache.hadoop.hive.common.FileUtils.mkdir 试图将表保存到 Hive
我正在尝试读取 kafka 流并将其作为表保存到 Hive。 消费者代码是: 导入 org.apache.spark.sql.{DataFrame, Dataset, SaveMode, SparkSession} 导入 org.apache.spark.sql.functi...
我有一个连续从 Kafka 读取的 spark readStream 函数。我对数据执行了一些操作,并想使用 Spark writeStream 将其批量写入 Cassandra DB。虽然一直
我有一个“通用”spark 结构化流作业,它监视顶级文件夹(伞)并遍历所有子文件夹(kafka 主题数据),然后写入每个 Kafka ...
想要创建持续运行的 Spark 流式查询,该查询从 MemoryStream[String] 读取并输出到控制台
我问的问题——一旦回答——帮助我写一些测试 Spark 结构化流管道,从流源读取和 写入 s3/parquet,但我简化了 ...
在 ETL 之后写入数据库或 myApp(接收器)之前,Spark 只有窗口结果
输入kafka => ETL spark =>输出到kafka。 假设您要计算每个用户的总观看次数,但这些观看次数可能以百万为单位。如果你只是在数据中使用 KAFKA 接收器......
如何在数据框中的每一行上调用外部服务并将每个结果保存到 jdbc。 df2 = df.map(row -> externalservice.call(row)).cache() df2.save().format('...') ... - 在 ALL 'd...之后执行
我在用 卡夫卡 0.8.2.1 火花 2.1.2 我试图运行一个代码,它将数据从 kafka 流式传输到 spark bu 我收到这个错误 文件“c:/Users/anish/OneDrive/Desktop/major project/
How to convert this streaming dataframe in pyspark, +--------------------+------+-------------------- --------------------------+ |时间戳|偏移量|字符串解码(值,UTF-8)...
我有一个连接查询,它有另一个连接查询作为子查询,但该查询没有输出。我单独运行子查询来找出问题所在,并且它按预期工作。 我正在尝试...
我正在使用 Spark Streaming 2.1。我想定期刷新一些缓存表(由 spark 提供的 DataSource 加载,如 parquet、MySQL 或用户定义的数据源)。 如何刷新表格?
在使用 PySpark 消费来自 Kafka 的消息时处理架构演变
我是卡夫卡的新手。目前我正在处理一个要求 - 用例: 我正在使用来自 Kafka 的消息(消息由上游团队在 Kafka 中生成)。上游团队不维护...
我想为历史数据运行一个脚本(2022 data_ 以日期目录格式 yyyy/mm/dd 存储在 azure data lake gen 2 中) 步骤:1 对于 2022 年的每个日期,我都想为......提取数据
将 nlp 应用于抓取的文本和流数据的 Python 应用程序
我正在尝试为我正在构建的应用程序选择合适的工具。 我想抓取多个平台并将抓取的数据保存在某个地方,然后我想清理并应用 nlp (nltk) 和
在 Spark Streaming 中使用 UDF 读取大量 XML 到 Delta 表非常慢
我们有一个输入文件的存储库,如 �3 \*\*Events*.xml => 这表示需要在 Spark Structured Streaming 中读取的输入 XML 文件的路径,以便...
使用 https://github.com/sutugin/spark-streaming-jdbc-source 中的示例 我试图连接到 Postgres 数据库作为 AWS Databricks 中的流媒体源。 我有一个正在运行的集群: 11....
我正在阅读有 5 个分区的 kafka 主题。由于 5 个内核不足以处理负载,我正在将输入重新分区为 30 个。我用 6
使用 toTable 在 Databricks 中写入流不会执行 foreachBatch
下面的代码正常工作,即将数据写入输出表,并可在 10 秒内从表中选择。问题是 foreachBatch 没有被执行。 当我有...
您好,我有超过 1200 多个 SQL 查询,我想并行提交多个 SQL 查询并将每个查询存储到 CSV 文件中, 由于python有GIL限制,如何并行提交, 我看过其他
在spark结构化流作业中,我如何从每个微批中的相同起始偏移量读取?
我使用的是spark结构化流。能否在每个批次执行后重置Kafka偏移量,使每个批次从相同的起始偏移量读取,而不是只读取新发现的事件?...
我是Apache Spark Streaming的新手。我正在开发一个spark流媒体应用程序,以找到最短的路径,并再次发送路径回到客户端。我已经写了代码来获取数据和...