您如何将JSON对象数组解析为Spark Dataframe?

问题描述 投票:0回答:2

我有一个包含Twitter数据的JSON文件的集合,我想将它们用作Databricks / Spark中结构化流的数据源。 JSON文件具有以下结构:

[{...tweet data...},{...tweet data...},{...tweet data...},...]

我的PySpark代码:

# Stream from the /tmp/tweets folder
tweetstore = "/tmp/tweets/"

# Set up the folder as a streaming source
streamingInputDF = (
  spark \
    .readStream \
    .schema(json_schema) \
    .json(tweetstore)
)

# Check
streamingInputDF.isStreaming

# Access the DF using SQL
streamingQuery = streamingInputDF \
  .select("run_stamp", "user", "id", "source", "favorite_count", "retweet_count")\
  .writeStream \
  .format("memory") \
  .queryName("tweetstream") \
  .outputMode("append")\
  .start()

streamingDF = spark.sql("select * from tweetstream order by 1 desc")

我的输出看起来像这样:

Number of entries in dataframe: 3875046
+---------+----+----+------+--------------+-------------+
|run_stamp|user|id  |source|favorite_count|retweet_count|
+---------+----+----+------+--------------+-------------+
|null     |null|null|null  |null          |null         |
|null     |null|null|null  |null          |null         |
|null     |null|null|null  |null          |null         |

据我所知,我可能需要使用UDFexplode()来正确解析JSON数组,但到目前为止还没有弄清楚。

json apache-spark twitter pyspark databricks
2个回答
0
投票

它对我的样本数据很好-

 val data = """[{"id":1,"name":"abc1"},{"id":2,"name":"abc2"},{"id":3,"name":"abc3"}]"""
    val df = spark.read.json(Seq(data).toDS())
    df.show(false)
    df.printSchema()

    /**
      * +---+----+
      * |id |name|
      * +---+----+
      * |1  |abc1|
      * |2  |abc2|
      * |3  |abc3|
      * +---+----+
      *
      * root
      * |-- id: long (nullable = true)
      * |-- name: string (nullable = true)
      */


0
投票

为可能偶然发现此问题的其他人记录答案:我意识到JSON并不像Spark期望的那样每行有一个对象。然后,关键是添加.option("multiline", True),即:

streamingInputDF = (
  spark \
    .readStream \
    .option("multiline", True)
    .schema(json_schema) \
    .json(tweetstore)
)
© www.soinside.com 2019 - 2024. All rights reserved.