spark-structured-streaming 相关问题

Spark Structured Streaming允许使用unifrom DataFrame和Dataset API处理实时数据流。





不会使用Pyspark读取Kafka的数据 我的Kafka主题中有一个流数据。我需要使用Pyspark的Pyspark DataFrame形式从主题中读取此数据。但是当我调用ReadStream功能时,我会不断收到错误...

os.environ['PYSPARK_SUBMIT_ARGS'] = '--packages org.apache.spark:spark-streaming-kafka-0-8_2.11:2.0.2 pyspark-shell' if __name__ == '__main__': sc = SparkSession.builder.appName('PythonStreamingDirectKafkaWordCount').getOrCreate() ssc = StreamingContext(sc, 60) df = sc \ .readStream \ .format("kafka") \ .option("kafka.bootstrap.servers", "localhost:9092") \ .option("subscribe", "near_line") \ .load() \ .selectExpr("CAST(key AS STRING)", "CAST(value AS STRING)","CAST(value AS STRING)") ssc.start() ssc.awaitTermination()

回答 3 投票 0

链接三角洲流编程提高分析感受

Situation:我正在生产一个带有以前流媒体查询A的数据的Delta文件夹,并从另一个DF中读取,如下所示 df_out.writestream.format(“ delta”)。(...)。开始(“ path”) (.....

回答 4 投票 0

理解databricks结构化流溢出到磁盘行为

I是使用PySpark(128GB内存群集,带有DBR 14.3,Spark 3.5.0)上的数据链键赛上运行的流媒体管道。该流正在处理ZSON文件并将其合并到Delta表中。 fo ...

回答 1 投票 0

为什么Stream to Stream内连接不需要强制水印

根据 Spark Structured Streaming 的 Spark 文档,如果 2 个表是流类型,则内连接将在没有任何水印的情况下工作,但是,左外连接强制需要水印....

回答 1 投票 0

Spark Streaming 检查点中sources 目录中的文件夹“0”有何意义?

我看到spark结构化流检查点目录有一个sources文件夹,用于跟踪处理数据的文件名和batch_id。但它会创建一个名为“0”的父文件夹,然后

回答 1 投票 0

是否可以将dataFrame注册为spark结构化流数据帧上的SQL临时视图?

我正在使用spark结构化流从kafka主题读取数据,我想对此流数据运行sql查询。 以下是代码:- 从 pyspark.sql 导入 SparkSession、SQLContext 定义

回答 2 投票 0

Spark Java 结构化流数据集过滤器

代码是 数据集 mainData=df.select( "data.*").filter("data.eventdesc='logout'"); 数据集 groupByData = mainData.groupBy("ipaddress1").count().fil...

回答 1 投票 0

使用自动加载器数据块过滤目录

我有一个具有这种结构的数据湖。不幸的是,正如您在第二张图片中看到的那样,数据中存在错误,因此我的未来和过去的岁月毫无意义,而且他们有虚拟......

回答 1 投票 0

Kafka和Spark的Structured Streaming结果应该如何根据特定列插入到Iceberg Table中?没有分区

我已经成功设置了 Spark 的 Session 并从 Kafka 的 Topic 流式传输消息。 kafka_stream_df = 火花 \ .readStream \ .format('卡夫卡') \ .option('kafka.bootstrap.ser...

回答 1 投票 0

处理 Databricks 上 Delta Live 表中联接表的增量数据加载和 SCD 类型 2

我正在开发一个利用 Databricks 上的 Delta Live Tables 的项目,其中我需要创建一个具有缓慢变化的维度类型 2 的维度(Kimball 样式)。该维度是连接 b 的结果...

回答 1 投票 0

捕获 foreachBatch 函数中引发的异常

我正在使用 Pyspark 结构化流处理 Databricks,并且希望捕获我自己在作为“.foreachBatch”函数传递到流的函数中引发的异常。 这是我的前任...

回答 1 投票 0

Spark 中的 StreamQueryListener 不执行 onQueryProgress() 中的代码

我正在从 Databricks 增量表作为流读取数据并将其写入另一个增量表(使用屏幕截图中的控制台以便于调试),我想使用 StreamingQueryListener(...

回答 3 投票 0

Spark 结构化流可用,失败时触发结束偏移?

我想知道使用AvailableNow Trigger并且查询期间出现查询失败时spark结构化流应该有什么行为?更具体地说,会发生什么......

回答 1 投票 0

Autoloader 未在流模式下拾取 .text 文件

我正在使用 Databricks Autoloader 以流(微批量)模式处理文件。源文件采用.text 格式。虽然创建了检查点并且流没有失败,但 Delta ta...

回答 1 投票 0

从 SQS 驱动的 Pyspark 结构化流检索路径

我有一个 DMS 生成的 s3 数据湖,并设置 SQS 来跟踪生成的文件。现在我想将其流式传输到我的 EMR 集群中,为此我在此处找到了 Spark Streaming s3 连接器 https://git...

回答 1 投票 0

Spark 结构化流:流-流连接与不写入的聚合

平台:Databricks Notebooks|语言:PySpark 上下文:我正在尝试在流数据管道中构建一个节点,该节点评估所有存在的行数(例如 count(*) == Expected_co...

回答 1 投票 0

最新问题
© www.soinside.com 2019 - 2025. All rights reserved.