为此问题设计的示例。我有一个 pyspark 数据框,为了方便起见,我对数据进行了硬编码,但我有一堆 csv 文件读入数据框。我可以将数据帧保存为 json 或至少将 pyspark 数据帧转换为 pandas,然后将其输出为 json 文件。但是是否可以将数据帧的每一行/行输出到 json 文件中,并在末尾添加一个新行,本质上是作为 jsonl 文件?
from pyspark.sql import SparkSession
# Create a SparkSession
spark = SparkSession.builder.getOrCreate()
# Create a sample DataFrame
data = [("Alice", 25), ("Bob", 30), ("Charlie", 35)]
df = spark.createDataFrame(data, ["name", "age"])
# Save the DataFrame as JSON
df.write.json("path/to/output.json")
当您编写 pyspark DF 时,您可以使用 lineSep 选项。
示例
from pyspark.sql import SparkSession
from pyspark.sql.types import StructType, StructField, StringType, DateType, FloatType
from datetime import datetime
spark = SparkSession.builder.appName("ExampleDataFrame").getOrCreate()
schema = StructType([
StructField("account", StringType(), True),
StructField("date", DateType(), True),
StructField("amount", FloatType(), True)
])
data = [
("account1", datetime(2024, 10, 1), 150.75),
("account1", datetime(2024, 10, 2), 200.50),
("account2", datetime(2024, 10, 1), 305.20),
("account2", datetime(2024, 10, 3), 120.80),
("account3", datetime(2024, 11, 1), 510.30),
("account3", datetime(2024, 11, 2), 75.00),
("account1", datetime(2024, 12, 1), 45.60),
("account2", datetime(2024, 12, 3), 300.45)
]
df = spark.createDataFrame(data, schema)
df.coalesce(1).write.mode("overwrite").json("path", lineSep="\n")
输出
{"account":"account1","date":"2024-10-01","amount":150.75}
{"account":"account1","date":"2024-10-02","amount":200.5}
{"account":"account2","date":"2024-10-01","amount":305.2}
{"account":"account2","date":"2024-10-03","amount":120.8}
{"account":"account3","date":"2024-11-01","amount":510.3}
{"account":"account3","date":"2024-11-02","amount":75.0}
{"account":"account1","date":"2024-12-01","amount":45.6}
{"account":"account2","date":"2024-12-03","amount":300.45}
供您参考: JSON Lines 使应用程序能够逐行读取对象,每行都完整描述一个 JSON 对象 (https://jsonlines.org/examples/)