使用Pyspark的Jsons列表分解列

问题描述 投票:1回答:2

我试图爆炸一个名为phone的列,如下面的架构和内容:

(customer_external_id,StringType
phones,StringType)

customer_id    phones
x8x46x5        [{"phone" : "(xx) 35xx4x80"},{"phone" : "(xx) xxxx46605"}]
xx44xx5        [{"phone" : "(xx) xxx3x8443"}]
4xxxxx5        [{"phone" : "(xx) x6xx083x3"},{"areaCode" : "xx"},{"phone" : "(xx) 3xxx83x3"}]
xx6564x        [{"phone" : "(x3) x88xx344x"}]
xx8x4x0        [{"phone" : "(xx) x83x5x8xx"}]
xx0434x        [{"phone" : "(0x) 3x6x4080"},{"areaCode" : "xx"}]
x465x40        [{"phone" : "(6x) x6x445xx"}]
x0684x8        [{"phone" : "(xx) x8x88x4x4"},{"phone" : "(xx) x8x88x4x4"}]
x84x850        [{"phone" : "(xx) 55x56xx4"}]
x0604xx        [{"phone" : "(xx) x8x4xxx68"}]
4x6xxx0        [{"phone" : "(xx) x588x43xx"},{"phone" : "(xx) 5x6465xx"},{"phone" : "(xx) x835xxxx8"},{"phone" : "(xx) x5x6465xx"}]
x6x000x        [{"phone" : "(xx) xxx044xx4"}]
5x65533        [{"phone" : "(xx) xx686x0xx"}]
x3668xx        [{"phone" : "(5x) 33x8x3x4"},{"phone" : "(5x) 8040x8x6"}]

所以我尝试运行此代码并得到后续错误:

df.select('customer_external_id', explode(df.phones))

AnalysisException: u"cannot resolve 'explode(`phones`)' due to data type mismatch: input to function explode should be array or map type, not StringType;;
'Project [customer_external_id#293, explode(phones#296) AS List()]\n+- Relation[order_id#292,customer_external_id#293,name#294,email#295,phones#296,phones_version#297,classification#298,locale#299] parquet\n"

通过这个错误我发现我的列是一个StringType所以我运行此代码删除括号并转换为json:

phones = df.select('customer_external_id', 'phones').rdd\
    .map(lambda x: str(x).replace('[','')\
                       .replace(']','')\
                       .replace('},{', ','))\
    .map(lambda x: json.loads(x).get('phone')\
    .map(lambda x: Row(x))\
    .toDF(df.select('customer_external_id','phones').schema)
phones.show()

org.apache.spark.SparkException: Job aborted due to stage failure: Task 0 in stage 38.0 failed 4 times, most recent failure: Lost task 0.3 in stage 38.0 (TID 2740, 10.112.80.248, executor 8): org.apache.spark.api.python.PythonException: Traceback (most recent call last)

显然我无法投射到Json,我无法爆炸列。那么我怎么能正确处理这种数据来获得这个输出:

    +-----------+--------+--------------+
    |customer_id|areaCode|phone         |
    +-----------+--------+--------------+
    |x8x46x5    |null    |(xx) 35xx4x80 |
    |x8x46x5    |null    |(xx) xxxx46605|
    |xx44xx5    |null    |(xx) xxx3x8443|
    |4xxxxx5    |null    |(xx) x6xx083x3|
    |4xxxxx5    |xx      |null          |
    |4xxxxx5    |null    |(xx) 3xxx83x3 |
    |xx6564x    |null    |(x3) x88xx344x|
    |xx8x4x0    |null    |(xx) x83x5x8xx|
    |xx0434x    |null    |(0x) 3x6x4080 |
    |xx0434x    |xx      |null          |
    |x465x40    |null    |(6x) x6x445xx |
    |x0684x8    |null    |(xx) x8x88x4x4|
    |x0684x8    |null    |(xx) x8x88x4x4|
    |x84x850    |null    |(xx) 55x56xx4 |
    |x0604xx    |null    |(xx) x8x4xxx68|
    |4x6xxx0    |null    |(xx) x588x43xx|
    |4x6xxx0    |null    |(xx) 5x6465xx |
    |4x6xxx0    |null    |(xx) x835xxxx8|
    |4x6xxx0    |null    |(xx) x5x6465xx|
    |x6x000x    |null    |(xx) xxx044xx4|
    |5x65533    |null    |(xx) xx686x0xx|
    |x3668xx    |null    |(5x) 33x8x3x4 |
    |x3668xx    |null    |(5x) 8040x8x6 |
    +-----------+--------+--------------+
python apache-spark pyspark etl
2个回答
4
投票

你想要做的是使用from_json方法将字符串转换为数组,然后爆炸:

from pyspark.sql.functions import *
from pyspark.sql.types import *

phone_schema = ArrayType(StructType([StructField("phone", StringType())]))

converted = inputDF\
  .withColumn("areaCode", get_json_object("phones", "$[*].areaCode"))\
  .withColumn("phones", explode(from_json("phones", phone_schema)))\
  .withColumn("phone", col("phones.phone"))\
  .drop("phones")\
  .filter(~isnull("phone"))

converted.show()

1
投票

我认为你应该可以直接调用json.loads()而不使用你所展示的replace()

  1. 使用stringArrayType(MapType())映射到json.loads()
  2. 调用flatMap()为数组的每个元素创建一个新的Row()
  3. 将这些行映射到所需的输出。

看一下下面的例子:

from StringIO import StringIO
from pyspark.sql import Row
import json
import pandas as pd

# mock up some sample data
data = StringIO("""customer_id\tphones
x8x46x5\t[{"phone" : "(xx) 35xx4x80"},{"phone" : "(xx) xxxx46605"}]
xx44xx5\t[{"phone" : "(xx) xxx3x8443"}]
4xxxxx5\t[{"phone" : "(xx) x6xx083x3"},{"areaCode" : "xx"},{"phone" : "(xx) 3xxx83x3"}]
xx6564x\t[{"phone" : "(x3) x88xx344x"}]
xx8x4x0\t[{"phone" : "(xx) x83x5x8xx"}]
xx0434x\t[{"phone" : "(0x) 3x6x4080"},{"areaCode" : "xx"}]
x465x40\t[{"phone" : "(6x) x6x445xx"}]
x0684x8\t[{"phone" : "(xx) x8x88x4x4"},{"phone" : "(xx) x8x88x4x4"}]
x84x850\t[{"phone" : "(xx) 55x56xx4"}]
x0604xx\t[{"phone" : "(xx) x8x4xxx68"}]
4x6xxx0\t[{"phone" : "(xx) x588x43xx"},{"phone" : "(xx) 5x6465xx"},{"phone" : "(xx) x835xxxx8"},{"phone" : "(xx) x5x6465xx"}]
x6x000x\t[{"phone" : "(xx) xxx044xx4"}]
5x65533\t[{"phone" : "(xx) xx686x0xx"}]
x3668xx\t[{"phone" : "(5x) 33x8x3x4"},{"phone" : "(5x) 8040x8x6"}]""")

pandas_df =  pd.read_csv(data, sep="\t")
df = sqlCtx.createDataFrame(pandas_df)  # convert pandas to spark df

# run the steps outlined above
df.rdd\
    .map(lambda x: Row(customer_id=x['customer_id'], phones=json.loads(x['phones'])))\
    .flatMap(lambda x: [Row(customer_id=x['customer_id'], phone=phone) for phone in x['phones']])\
    .map(lambda x: Row(customer_id=x['customer_id'], phone=x['phone'].get('phone'), areaCode=x['phone'].get('areaCode')))\
    .toDF()\
    .select('customer_id', 'areaCode', 'phone')\
    .show(truncate=False, n=100)

输出:

+-----------+--------+--------------+
|customer_id|areaCode|phone         |
+-----------+--------+--------------+
|x8x46x5    |null    |(xx) 35xx4x80 |
|x8x46x5    |null    |(xx) xxxx46605|
|xx44xx5    |null    |(xx) xxx3x8443|
|4xxxxx5    |null    |(xx) x6xx083x3|
|4xxxxx5    |xx      |null          |
|4xxxxx5    |null    |(xx) 3xxx83x3 |
|xx6564x    |null    |(x3) x88xx344x|
|xx8x4x0    |null    |(xx) x83x5x8xx|
|xx0434x    |null    |(0x) 3x6x4080 |
|xx0434x    |xx      |null          |
|x465x40    |null    |(6x) x6x445xx |
|x0684x8    |null    |(xx) x8x88x4x4|
|x0684x8    |null    |(xx) x8x88x4x4|
|x84x850    |null    |(xx) 55x56xx4 |
|x0604xx    |null    |(xx) x8x4xxx68|
|4x6xxx0    |null    |(xx) x588x43xx|
|4x6xxx0    |null    |(xx) 5x6465xx |
|4x6xxx0    |null    |(xx) x835xxxx8|
|4x6xxx0    |null    |(xx) x5x6465xx|
|x6x000x    |null    |(xx) xxx044xx4|
|5x65533    |null    |(xx) xx686x0xx|
|x3668xx    |null    |(5x) 33x8x3x4 |
|x3668xx    |null    |(5x) 8040x8x6 |
+-----------+--------+--------------+

我不确定这是否是您希望的输出,但这应该可以帮助您实现目标。

最新问题
© www.soinside.com 2019 - 2024. All rights reserved.