使用 Spark UDF 将多行传递给函数

问题描述 投票:0回答:1

我有一段使用 udf 发出 HTTP 请求的代码:

def insert(name, insertURL):
    
    url = insertURL

    try: 
        payload = json.dumps({
                    "records": [
                        {
                            "fields": {
                                "name": name
                            }
                        }
                    ]
                })

        headers = {
                    'Content-Type': 'application/json',
                    'Accept': 'application/json'
                }
        response = requests.request(
                    "POST", url, headers=headers, data=payload) 
        dataRes = response.json()
        return dataRes
    
    except Exception as e
        print("Error Occurred:", e)

def insertUDF(insertURL):
    return udf(lambda z: insert(z, insertURL), StringType()) 

def main():

    match args.subparser:

        case 'Insert':
            fake = Faker()

            columns = ["serial", "name"]
            data = []
            for i in range(10):
                data.append((i,fake.name()))

            df = spark.createDataFrame(data=data, schema=columns)
            df.show(truncate=False)
            
            df.withColumn("Name", insertUDF(args.insertURL)(col("name"))).show()

if __name__ == "__main__":

    main()

我想更改

payload
,使其需要多个字段来提高性能,从而使有效负载变为:

payload = json.dumps({
                    "records": [
                        {
                            "fields": {
                                "name": name
                            },
                             "fields": {
                                "name": name
                            },
                             "fields": {
                                "name": name
                            },
                             "fields": {
                                "name": name
                            }
                        }
                    ]
                })

其中

name
中的
fields
采用数据帧中存在的第一个名称,然后是同一负载中的第二个、第三个,依此类推,例如,如果数据帧是:

serial|name               |
+-----+-------------------+
|0    |Christina Bishop   |
|1    |Vincent Smith      |
|2    |Deanna Brown       |
|3    |James Medina MD    |
|4    |Ashley Harper      |
|5    |Tina Schultz       |
|6    |Patrick Archer     |
|7    |Mark Campbell      |
|8    |Anthony Roach      |
|9    |Justin Jackson     |

然后在第一个有效负载名称中,直到包含

James

我该怎么做?

python python-3.x apache-spark pyspark user-defined-functions
1个回答
0
投票

没有办法将多行传递到UDF中,只能通过groupby和collect_list或collect_set来实现类似。也就是说,考虑到您的用例,使用 foreachPartition 并自行分组会更好。

© www.soinside.com 2019 - 2024. All rights reserved.