我有一段使用 udf 发出 HTTP 请求的代码:
def insert(name, insertURL):
url = insertURL
try:
payload = json.dumps({
"records": [
{
"fields": {
"name": name
}
}
]
})
headers = {
'Content-Type': 'application/json',
'Accept': 'application/json'
}
response = requests.request(
"POST", url, headers=headers, data=payload)
dataRes = response.json()
return dataRes
except Exception as e
print("Error Occurred:", e)
def insertUDF(insertURL):
return udf(lambda z: insert(z, insertURL), StringType())
def main():
match args.subparser:
case 'Insert':
fake = Faker()
columns = ["serial", "name"]
data = []
for i in range(10):
data.append((i,fake.name()))
df = spark.createDataFrame(data=data, schema=columns)
df.show(truncate=False)
df.withColumn("Name", insertUDF(args.insertURL)(col("name"))).show()
if __name__ == "__main__":
main()
我想更改
payload
,使其需要多个字段来提高性能,从而使有效负载变为:
payload = json.dumps({
"records": [
{
"fields": {
"name": name
},
"fields": {
"name": name
},
"fields": {
"name": name
},
"fields": {
"name": name
}
}
]
})
其中
name
中的 fields
采用数据帧中存在的第一个名称,然后是同一负载中的第二个、第三个,依此类推,例如,如果数据帧是:
serial|name |
+-----+-------------------+
|0 |Christina Bishop |
|1 |Vincent Smith |
|2 |Deanna Brown |
|3 |James Medina MD |
|4 |Ashley Harper |
|5 |Tina Schultz |
|6 |Patrick Archer |
|7 |Mark Campbell |
|8 |Anthony Roach |
|9 |Justin Jackson |
然后在第一个有效负载名称中,直到包含
James
。
我该怎么做?
没有办法将多行传递到UDF中,只能通过groupby和collect_list或collect_set来实现类似。也就是说,考虑到您的用例,使用 foreachPartition 并自行分组会更好。