pyspark中的阅读词典专栏

问题描述 投票:0回答:1

我在 pyspark 数据框(字典)中有一个复杂的列。每行有三个键,string_value、timestamp 和 user_property。 User_property 包含其他数组中对应值的名称。不幸的是,每一行的 user_property 的顺序不同,因此有时它以 User_ID 开头,有时以 Country 开头。它们的大小也不同。

如何从该字典中提取 user_property 中每个项目的单独信息,并从其他数组(如 string_value 和时间戳)获取相应的值。

采样两行

{"string_value":"[\"N/A\",\"GA1.1.1034467876.1923457682\"]","timestamp":"[1722173340000000,1722173340000000]","user_property":"[\"User_ID\",\"client_id\"]"}
{"string_value":"[\"N/A\",\"PH\",\"GA1.1.1694456741.1715432443\",\"d76d23423423543333456g6\"]","timestamp":"[1714100700000000,1714100700000000,1714100700000000,1723951500000000]","user_property":"[\"User_ID\",\"country\",\"client_id\",\"visitor_id\"]"}
dictionary apache-spark pyspark
1个回答
0
投票

如果您想要这样的东西:

+-------------+--------------------+----------------+
|user_property|        string_value|       timestamp|
+-------------+--------------------+----------------+
|      User_ID|               "N/A"|1722173340000000|
|    client_id|"GA1.1.1034467876...|1722173340000000|
|      User_ID|               "N/A"|1714100700000000|
|      country|                "PH"|1714100700000000|
|    client_id|"GA1.1.1694456741...|1714100700000000|
|   visitor_id|"d76d234234235433...|1723951500000000|
+-------------+--------------------+----------------+

我可以给你这个解决方案:

from pyspark.sql.functions import col, expr, from_json
from pyspark.sql.types import StructType, StructField, StringType

input_data = [
    ("""{"string_value":"[\\"N/A\\",\\"GA1.1.1034467876.1923457682\\"]","timestamp":"[1722173340000000,1722173340000000]","user_property":"[\\"User_ID\\",\\"client_id\\"]"}""",),
    ("""{"string_value":"[\\"N/A\\",\\"PH\\",\\"GA1.1.1694456741.1715432443\\",\\"d76d23423423543333456g6\\"]","timestamp":"[1714100700000000,1714100700000000,1714100700000000,1723951500000000]","user_property":"[\\"User_ID\\",\\"country\\",\\"client_id\\",\\"visitor_id\\"]"}""",)
]

schema = StructType([
    StructField("string_value", StringType(), True),
    StructField("timestamp", StringType(), True),
    StructField("user_property", StringType(), True)
])

df = spark.createDataFrame(data, ["unformatted_column"])

df_parsed = df.select(from_json("unformatted_column", schema).alias("data")).select("data.*")

## Switch to array type
df_arrays = df_parsed \
    .withColumn("string_value", expr("split(substring(string_value, 2, length(string_value)-2), ',')")) \
    .withColumn("timestamp", expr("split(substring(timestamp, 2, length(timestamp)-2), ',')")) \
    .withColumn("user_property", expr("split(substring(user_property, 2, length(user_property)-2), ',')"))


# Explode arrays
final_df = df_arrays \
    .selectExpr("posexplode(user_property) as (pos, user_property)", "string_value", "timestamp") \
    .withColumn("user_property", expr("regexp_replace(user_property, '\"', '')")) \
    .withColumn("string_value", col("string_value")[col("pos")]) \
    .withColumn("timestamp", col("timestamp")[col("pos")]) \
    .drop("pos")

final_df.show()
© www.soinside.com 2019 - 2024. All rights reserved.