Spark Structured Streaming允许使用unifrom DataFrame和Dataset API处理实时数据流。
如何在pyspark的DataStreamReader中解析json字符串列并创建数据框
我正在读取来自kafka主题消息的消息DFRaw = spark.readStream \ .format(“kafka”)\ .option(“kafka.bootstrap.servers”,“localhost:9092”)\ ...
我有一个DSTREAM,我使用窗口方法。然后我做其他操作,如reduceByKey。是否可以将窗口开始时间和结束时间添加到DSTREAM数据并将其用作...
[Structured Streaming]:将流数据帧写入Postgres
我有一个流数据帧,我试图写入数据库。有关于将rdd或df写入Postgres的文档。但是,我无法找到有关它如何的示例或文档......
问题:如何将JSON字符串转换为DataFrame并仅选择我想要的键?我上周刚开始使用Spark,我还在学习,所以请耐心等待。我正在使用Spark(2 ....
Spark结构化流:GroupStateTimeout的java.lang.NoClassDefFoundError [复制]
我正在尝试在https://spark.apache.org/docs/2.2.0/api/scala/index.html#org.apache.spark.sql.streaming.GroupState中定义的spark结构化流中使用mapGroupsWithState但是我明白了......
为什么Spark应用程序失败并出现“ClassNotFoundException:找不到数据源:kafka”作为带有sbt程序集的uber-jar?
我正在尝试运行像https://github.com/apache/spark/blob/master/examples/src/main/scala/org/apache/spark/examples/sql/streaming/StructuredKafkaWordCount.scala这样的示例。我从Spark开始......
我有一个用例,我正在编写批处理作业,我需要阅读Kafka主题并将数据记录到HDFS。我的代码如下所示val df:DataFrame = spark.read .format(“kafka”)。option(“...
我有这个Spark应用程序,它接收一个Twitter流。我添加了一个时间列:timestamp = datetime.datetime.fromtimestamp(time.time())。strftime('%Y-%m-%d%H:%M:%S')timestamp_df = tmp_df2.withColumn(.. 。
登录spark结构化流/ SparkException:任务不可序列化
我正在尝试将Apache Flink应用程序(scala)移植到Spark结构化流媒体。该应用程序的基本工作是:从kafka读取消息做一些转换/处理输出零或更多...
如何使用Spark Structured Streaming从目录读取读取时实现一次性处理?
我想使用流处理的概念从本地目录中读取文件,然后发布到Apache Kafka。我想过使用Spark Structured Streaming。怎么检查点......
使用ForeachWriter在Spark流中实现Cassandra接收器
显然,Spark内置的Cassandra接收器没有内置支持。我在网上找到了这个例子,它基于...实现了Spark结构流的自定义Cassandra接收器。
尝试从python 3.5中的pyspark.sql.functions导入col时未解析的引用
请参考这里的帖子:使用python进行Spark结构化流媒体我想从pyspark.sql.functions导入col中导入python 3.5中的'col'但是我得到一个错误,说明未解析的引用...
我使用from_json()方法从kafka接收JSON数据。它期待我的架构。我的JSON结构是这样的; {“Items”:{“key1”:[{“id”:“”,...
我有一个火花结构的流媒体工作,它消耗来自天蓝色事件中心服务的事件。在某些情况下,它会发生,流式传输作业不会处理某些批次。在这......
我在Apache Spark 2.2中使用最新的结构化流,并得到以下异常:org.apache.spark.sql.AnalysisException:当没有时,不支持完整的输出模式...
将Spark Structure Streaming DataFrame转换为Pandas DataFrame
我有一个使用Kafka主题的Spark Streaming App设置,我需要使用一些接受Pandas Dataframe的API,但是当我尝试转换它时,我得到了这个:org.apache.spark.sql ....
我似乎在Stream上遗漏了一些东西 - Spark 2.2中的静态加入。手册说明这种连接是可能的,但是我无法正确理解语法。奇。没有使用水印。 val ...
我有一个pyspark结构流式python应用程序设置如下来自pyspark.sql导入SparkSession spark = SparkSession \ .builder \ .appName(“data streaming app”)\。getOrCreate()...
Spark Structured Stream在输出数据集中为null
我运行一个scala代码,它将数据和打印输出聚合到控制台。不幸的是,我在小组操作后得到了一个空值。当前输出:| Id | Date | ...
我正在尝试使用CSV设置Kafka流,以便我可以将其流式传输到Spark。但是,我一直在线程“main”java.lang.ClassNotFoundException中获取Exception:无法找到数据源:...