我正在处理一些具有一些键值标头和有效负载的数据。我已成功将标头解析为以下数组:
+------------------------------------------------------------+--------------+
|col(header) | col(payload) |
+------------------------------------------------------------+--------------+
[someheader, key1, value1, key2, value2, key3, value3] | payload |
someheader 将被删除,剩余的需要转换为地图,如下所示:
{
key1: value1,
key2: value2,
key3: value3,
}
然后我将这些键值转换为列,如下所示(架构是已知的,尽管知道如何使用动态架构来做到这一点会很酷):
+-------------+--------+--------+-----------+
|key1 |key2 | key3 |payload |
+-------------+--------+--------+-----------+
|value1 |value2 | value3 |payload |
我们如何在 pyspark 中进行数组到映射的转换,而无需 python UDF 或任何性能次优的类似结构?另外,不想涉及熊猫。
这是我到目前为止所拥有的 - 它有效,但阅读 Spark 文档,已知 UDF 归因于较低的性能(以及某种反模式):
# imports
from pyspark.sql import *
from pyspark.sql.types import MapType, StringType
from pyspark.sql.functions import col, split, expr, udf, regexp_replace, lit
# udf function to convert arr to map
def to_map(arr):
map_value = dict()
for i in range(1,len(arr),2):
key = arr[i]
value = arr[i+1]
map_value[key] = value
return map_value
# spark session with AWS creds
spark = SparkSession\
.builder\
.appName("test-app")\
.config("spark.hadoop.fs.s3a.access.key", "<aws access key>") \
.config("spark.hadoop.fs.s3a.secret.key", "<aws secret key>") \
.config("spark.hadoop.fs.s3a.impl", "org.apache.hadoop.fs.s3a.S3AFileSystem") \
.getOrCreate()
# read the file
df = spark.read.text('s3a://commoncrawl/crawl-data/CC-MAIN-2023-50/segments/1700679099281.67/wet/CC-MAIN-20231128083443-20231128113443-00003.warc.wet.gz', lineSep="\n\r\n\r\n")
# trim whitespaces, not just spaces
df = df.select(regexp_replace(df.value, r"^\s+|\s+$", "").alias("value"))
# split string into header and payload columns
df = df.select(split(df.value, '\r\n\r\n').alias('s')) \
.withColumn('header', expr('s[0]')) \
.withColumn('payload', expr('s[1]')) \
.select(col('header'), col('payload'))
# split the header into key value pair list
# example: WARC/1.0\r\nWARC-Type: warcinfo\r\nWARC-Date: 2023-12-12T01:40:15Z\r\nWARC-Filename: CC-MAIN-20231128083443-20231128113443-00003.warc.wet.gz\r\nWARC-Record-ID: <urn:uuid:6082596d-524a-4e49-b1dd-86582dc01a2f>\r\nContent-Type: application/warc-fields\r\nContent-Length: 382
# becomes: [WARC/1.0, WARC-Type, warcinfo, WARC-Date, 2023-12-12T01:40:15Z, WARC-Filename, CC-MAIN-20231128083443-20231128113443-00003.warc.wet.gz, WARC-Record-ID, <urn:uuid:6082596d-524a-4e49-b1dd-86582dc01a2f>, Content-Type, application/warc-fields, Content-Length, 382]
df = df.select(split(df.header, ': |\r\n').alias('h'), col('payload'))
# create a UDF for the to_map function defined above
array_to_map = udf(to_map, MapType(StringType(),StringType()))
# convert array to map, and then map to columns using the known schema (how would one do it without a known schema?)
df = df.withColumn('header_map',array_to_map(df.h)) \
.select(expr("header_map['WARC-Filename']").alias("warcFileName"),
expr("header_map['Content-Length']").cast("integer").alias("contentLength"),
expr("to_timestamp(header_map['WARC-Date'])").alias("warcDate"),
expr("header_map['WARC-Record-ID']").alias("warcRecordId"),
expr("header_map['Content-Type']").alias("contentType"),
expr("header_map['WARC-Type']").alias("warcType"),
expr("header_map['WARC-Refers-To']").alias("warcRefersTo"),
expr("header_map['WARC-Identified-Content-Language']").alias("language"),
expr("header_map['WARC-Block-Digest']").alias("blockDigest"),
expr("header_map['WARC-Target-URI']").alias("url"),
expr("header_map['WARC-Target-URI']").alias("docId"),
expr("header_map['WARC-Target-URI']").alias("partitionKey"),
col('payload').alias("docText")) \
.where(col('warcType') == lit('conversion')) \
.drop('warcFileName')
df.show(n=5, truncate=False)
df.printSchema()
+-------------+-------------------+-----------------------------------------------+-----------+----------+-----------------------------------------------+--------+-------------------------------------+---------------------------------------------------------------------+---------------------------------------------------------------------+---------------------------------------------------------------------+-----------------+
|contentLength|warcDate |warcRecordId |contentType|warcType |warcRefersTo |language|blockDigest |url |docId |partitionKey |docText |
+-------------+-------------------+-----------------------------------------------+-----------+----------+-----------------------------------------------+--------+-------------------------------------+---------------------------------------------------------------------+---------------------------------------------------------------------+---------------------------------------------------------------------+-----------------+
|27923 |2023-11-28 09:43:44|<urn:uuid:783397a2-6c03-41ac-ab5b-bd57685a0ed2>|text/plain |conversion|<urn:uuid:20764b8a-6ab0-4c23-ba54-40854a70b3af>|zho |sha1:4XCBUNJVBXKQ63MKM2KTMGUPGC74FTSD|http://027sunflower.com/list/%E5%90%88%E5%AE%B6%E4%B8%AD%E5%AE%9DA |http://027sunflower.com/list/%E5%90%88%E5%AE%B6%E4%B8%AD%E5%AE%9DA |http://027sunflower.com/list/%E5%90%88%E5%AE%B6%E4%B8%AD%E5%AE%9DA | ... <truncated> |
|23738 |2023-11-28 09:18:49|<urn:uuid:87276745-6924-46c4-bf1a-55b51cd5c9b0>|text/plain |conversion|<urn:uuid:ed65b632-8ba1-4f1a-b57c-f359cbaea9a2>|zho |sha1:SERSYY5GWIAD2CBRTFHRYJVDFC5QBJRZ|http://0455.hk/index.asp?areaid=53 |http://0455.hk/index.asp?areaid=53 |http://0455.hk/index.asp?areaid=53 | ... <truncated> |
|43827 |2023-11-28 10:30:30|<urn:uuid:306608d1-bb8d-4071-83aa-388a9333791c>|text/plain |conversion|<urn:uuid:7acff67a-fd7c-4747-a269-e542af700ce3>|eng,zho |sha1:RYKE6ANAB674262OQ2VBDINQM3IXQC45|http://0554c.com/index/Article/info.html?cate=5&id=11 |http://0554c.com/index/Article/info.html?cate=5&id=11 |http://0554c.com/index/Article/info.html?cate=5&id=11 | ... <truncated> |
|1137 |2023-11-28 10:58:30|<urn:uuid:0870b29d-fc06-4a44-97ef-6876bc0fbbee>|text/plain |conversion|<urn:uuid:17042b32-0223-4589-8ceb-644a887e2776>|zho,eng |sha1:J7XVE2KML4YTJYN33VEL4SZRJSAGBKRJ|http://0571ztq.com/jiage/class/?31.html |http://0571ztq.com/jiage/class/?31.html |http://0571ztq.com/jiage/class/?31.html | ... <truncated> |
|1921 |2023-11-28 10:41:49|<urn:uuid:f95cd1dd-3bf8-4272-9f3b-10f847d543e6>|text/plain |conversion|<urn:uuid:fb642dce-08c9-4b41-bc93-13907080b95d>|deu,eng |sha1:IWDUDKUJVRYILJZ4UIHGADV2FFB3JCPS|http://06.live-radsport.ch/index.php?src=email&id=225657&type=details|http://06.live-radsport.ch/index.php?src=email&id=225657&type=details|http://06.live-radsport.ch/index.php?src=email&id=225657&type=details| ... <truncated> |
+-------------+-------------------+-----------------------------------------------+-----------+----------+-----------------------------------------------+--------+-------------------------------------+---------------------------------------------------------------------+---------------------------------------------------------------------+---------------------------------------------------------------------+-----------------+
only showing top 5 rows
root
|-- contentLength: integer (nullable = true)
|-- warcDate: timestamp (nullable = true)
|-- warcRecordId: string (nullable = true)
|-- contentType: string (nullable = true)
|-- warcType: string (nullable = true)
|-- warcRefersTo: string (nullable = true)
|-- language: string (nullable = true)
|-- blockDigest: string (nullable = true)
|-- url: string (nullable = true)
|-- docId: string (nullable = true)
|-- partitionKey: string (nullable = true)
|-- docText: string (nullable = true)
您确实可以通过使用
pyspark.sql.functions
中的一些函数来避免使用UDF。
我们首先创建数据框:
from pyspark.sql.session import SparkSession
spark = SparkSession.builder.getOrCreate()
df = spark.createDataFrame(
[
(
["someheader", "key1", "value1", "key2", "value2", "key3", "value3"],
"payload",
),
],
["header", "payload"],
)
然后,我们计算输出:
# Calculating the array column's length to be able to extract the keys and
# values later on
array_length = (
df.withColumn("array_length", F.size("header"))
.agg(F.first("array_length"))
.collect()[0][0]
)
output = (
df.select(
F.explode(
F.map_from_arrays(
F.array([F.col("header")[i] for i in range(1, array_length, 2)]),
F.array([F.col("header")[i] for i in range(2, array_length, 2)]),
)
),
"payload",
)
.groupBy("payload")
.pivot("key")
.agg(F.first("value"))
)
>>> output.show()
+-------+------+------+------+
|payload| key1| key2| key3|
+-------+------+------+------+
|payload|value1|value2|value3|
+-------+------+------+------+
想法如下:
然后您可以简单地进行分组和旋转以获得您想要的数据框。
注意:这可以动态处理任何长度/名称的键,但无法处理原始
header
列在 1 个数据帧中具有不同长度数组的情况