我有一个包含Twitter数据的JSON文件的集合,我想将它们用作Databricks / Spark中结构化流的数据源。 JSON文件具有以下结构:
[{...tweet data...},{...tweet data...},{...tweet data...},...]
我的PySpark代码:
# Stream from the /tmp/tweets folder
tweetstore = "/tmp/tweets/"
# Set up the folder as a streaming source
streamingInputDF = (
spark \
.readStream \
.schema(json_schema) \
.json(tweetstore)
)
# Check
streamingInputDF.isStreaming
# Access the DF using SQL
streamingQuery = streamingInputDF \
.select("run_stamp", "user", "id", "source", "favorite_count", "retweet_count")\
.writeStream \
.format("memory") \
.queryName("tweetstream") \
.outputMode("append")\
.start()
streamingDF = spark.sql("select * from tweetstream order by 1 desc")
我的输出看起来像这样:
Number of entries in dataframe: 3875046
+---------+----+----+------+--------------+-------------+
|run_stamp|user|id |source|favorite_count|retweet_count|
+---------+----+----+------+--------------+-------------+
|null |null|null|null |null |null |
|null |null|null|null |null |null |
|null |null|null|null |null |null |
据我所知,我可能需要使用UDF
或explode()
来正确解析JSON数组,但到目前为止还没有弄清楚。
它对我的样本数据很好-
val data = """[{"id":1,"name":"abc1"},{"id":2,"name":"abc2"},{"id":3,"name":"abc3"}]"""
val df = spark.read.json(Seq(data).toDS())
df.show(false)
df.printSchema()
/**
* +---+----+
* |id |name|
* +---+----+
* |1 |abc1|
* |2 |abc2|
* |3 |abc3|
* +---+----+
*
* root
* |-- id: long (nullable = true)
* |-- name: string (nullable = true)
*/
为可能偶然发现此问题的其他人记录答案:我意识到JSON并不像Spark期望的那样每行有一个对象。然后,关键是添加.option("multiline", True)
,即:
streamingInputDF = (
spark \
.readStream \
.option("multiline", True)
.schema(json_schema) \
.json(tweetstore)
)