spark-structured-streaming 相关问题

Spark Structured Streaming允许使用unifrom DataFrame和Dataset API处理实时数据流。

如何在pyspark的DataStreamReader中解析json字符串列并创建数据框

我正在读取来自kafka主题消息的消息DFRaw = spark.readStream \ .format(“kafka”)\ .option(“kafka.bootstrap.servers”,“localhost:9092”)\ ...

回答 1 投票 0

有没有办法在火花流窗口中提取窗口开始时间和窗口结束时间?

我有一个DSTREAM,我使用窗口方法。然后我做其他操作,如reduceByKey。是否可以将窗口开始时间和结束时间添加到DSTREAM数据并将其用作...

回答 1 投票 0

[Structured Streaming]:将流数据帧写入Postgres

我有一个流数据帧,我试图写入数据库。有关于将rdd或df写入Postgres的文档。但是,我无法找到有关它如何的示例或文档......

回答 1 投票 0

将流式JSON转换为DataFrame

问题:如何将JSON字符串转换为DataFrame并仅选择我想要的键?我上周刚开始使用Spark,我还在学习,所以请耐心等待。我正在使用Spark(2 ....

回答 1 投票 0

Spark结构化流:GroupStateTimeout的java.lang.NoClassDefFoundError [复制]

我正在尝试在https://spark.apache.org/docs/2.2.0/api/scala/index.html#org.apache.spark.sql.streaming.GroupState中定义的spark结构化流中使用mapGroupsWithState但是我明白了......

回答 1 投票 0

为什么Spark应用程序失败并出现“ClassNotFoundException:找不到数据源:kafka”作为带有sbt程序集的uber-jar?

我正在尝试运行像https://github.com/apache/spark/blob/master/examples/src/main/scala/org/apache/spark/examples/sql/streaming/StructuredKafkaWordCount.scala这样的示例。我从Spark开始......

回答 7 投票 19

使用Kafka通过Spark结构化批处理作业管理偏移

我有一个用例,我正在编写批处理作业,我需要阅读Kafka主题并将数据记录到HDFS。我的代码如下所示val df:DataFrame = spark.read .format(“kafka”)。option(“...

回答 1 投票 -2

结构化流 - 从同一个流源连接2个DataFrame

我有这个Spark应用程序,它接收一个Twitter流。我添加了一个时间列:timestamp = datetime.datetime.fromtimestamp(time.time())。strftime('%Y-%m-%d%H:%M:%S')timestamp_df = tmp_df2.withColumn(.. 。

回答 1 投票 0

登录spark结构化流/ SparkException:任务不可序列化

我正在尝试将Apache Flink应用程序(scala)移植到Spark结构化流媒体。该应用程序的基本工作是:从kafka读取消息做一些转换/处理输出零或更多...

回答 1 投票 0

如何使用Spark Structured Streaming从目录读取读取时实现一次性处理?

我想使用流处理的概念从本地目录中读取文件,然后发布到Apache Kafka。我想过使用Spark Structured Streaming。怎么检查点......

回答 1 投票 2

使用ForeachWriter在Spark流中实现Cassandra接收器

显然,Spark内置的Cassandra接收器没有内置支持。我在网上找到了这个例子,它基于...实现了Spark结构流的自定义Cassandra接收器。

回答 1 投票 0

尝试从python 3.5中的pyspark.sql.functions导入col时未解析的引用

请参考这里的帖子:使用python进行Spark结构化流媒体我想从pyspark.sql.functions导入col中导入python 3.5中的'col'但是我得到一个错误,说明未解析的引用...

回答 3 投票 2

用于动态关键字段的Spark JSON Schema?

我使用from_json()方法从kafka接收JSON数据。它期待我的架构。我的JSON结构是这样的; {“Items”:{“key1”:[{“id”:“”,...

回答 2 投票 0

跳过火花结构化流媒体流程中的批次

我有一个火花结构的流媒体工作,它消耗来自天蓝色事件中心服务的事件。在某些情况下,它会发生,流式传输作业不会处理某些批次。在这......

回答 1 投票 2

为什么完整输出模式需要聚合?

我在Apache Spark 2.2中使用最新的结构化流,并得到以下异常:org.apache.spark.sql.AnalysisException:当没有时,不支持完整的输出模式...

回答 2 投票 9

将Spark Structure Streaming DataFrame转换为Pandas DataFrame

我有一个使用Kafka主题的Spark Streaming App设置,我需要使用一些接受Pandas Dataframe的API,但是当我尝试转换它时,我得到了这个:org.apache.spark.sql ....

回答 1 投票 0

Spark 2.2结构化流媒体流 - 静态左外连接问题

我似乎在Stream上遗漏了一些东西 - Spark 2.2中的静态加入。手册说明这种连接是可能的,但是我无法正确理解语法。奇。没有使用水印。 val ...

回答 1 投票 0

如何加载已经从Kafka发布的所有记录?

我有一个pyspark结构流式python应用程序设置如下来自pyspark.sql导入SparkSession spark = SparkSession \ .builder \ .appName(“data streaming app”)\。getOrCreate()...

回答 1 投票 2

Spark Structured Stream在输出数据集中为null

我运行一个scala代码,它将数据和打印输出聚合到控制台。不幸的是,我在小组操作后得到了一个空值。当前输出:| Id | Date | ...

回答 1 投票 0

Spark 2.3.0无法找到数据源:kafka

我正在尝试使用CSV设置Kafka流,以便我可以将其流式传输到Spark。但是,我一直在线程“main”java.lang.ClassNotFoundException中获取Exception:无法找到数据源:...

回答 2 投票 1

© www.soinside.com 2019 - 2024. All rights reserved.