PysparkJDBC用分区阅读 我使用JDBC连接从Postgres中读取Pyspark的数据。阅读的表很大,大约2.4亿行。我试图将其读成16个分区。读取正在执行lik ...

问题描述 投票:0回答:1
df.rdd.foreachPartition(write_partition)

where write_partition仅迭代行,并使用psycopg2进行批处理插入。 我的问题是,我在数据库上看到分区查询加倍。
SELECT "receiptid","itemindex","barcode","productnumberraw","itemdescription","itemdescriptionraw","itemextendedprice","itemdiscountedextendedprice","itemquantity","barcodemanufacturer","barcodebrand","barcodecategory1","barcodecategory2","barcodecategory3","isfetchpromo","ispartnerbrand","subscribeandsave","soldby","yyyymm","retailerid","partition_id" FROM (select receiptid, itemindex, barcode, productnumberraw, itemdescription, itemdescriptionraw, itemextendedprice, itemdiscountedextendedprice, itemquantity, barcodemanufacturer, barcodebrand, barcodecategory1, barcodecategory2, barcodecategory3, isfetchpromo, ispartnerbrand, subscribeandsave, soldby, yyyymm, retailerid,
  MOD(ABS(CAST('x' || md5(receiptid) AS bit(32))::int), 16) AS partition_id from mytable) as subquery  WHERE "partition_id" >= 10 AND "partition_id" < 11  

是什么导致了对数据的双重读取?

如果您看到它们在

pg_stat_activity

中重复,其中一些可能会显示出其他人的指向其他的,这意味着查询正在通过多个工作过程来处理。
在分区表上查看分布在多个工人之间的查询。您专门将其范围缩小到单个分区的事实:

leader_pid

python postgresql pyspark amazon-emr
1个回答
0
投票

pid
WHERE "partition_id" >= 10 AND "partition_id" < 11

QueryPlan



-gather(成本= 1000.00..8766.49行= 2652宽度= 36)(实际时间= 0.329..188.652行= 500000 loops = 1)

输出:test.partition_id,test.payload


工人计划:2工人启动:2->并行附录(成本= 0.00..7501.29行= 1105宽度= 36)(实际时间= 0.016..41.435行= 166667 loops = 3)工人0:实际时间= 0.015..3.932行= 17400 loops =1工人1:实际时间= 0.021..3.897行= 17400 loops =1-> Public.Test1 test_1(成本= 0.00..7474.56行= 1102宽度= 36)(实际时间= 0.014..27.594行= 166667 loops = 3) 输出:test_1.partition_id,test_1.payload过滤器:((test_1.partition_id> = 1)和(test_1.partition_id工人0:实际时间= 0.014..2.648行= 17400 loops =1工人1:实际时间= 0.020..2.648行= 17400 loops =1->并行seq在public.test_default test_2上(成本= 0.00..21.21行= 4宽度= 36)(实际时间= 0.001..0.002行= 0 loops = 0 loops = 1)输出:test_2.partition_id,test_2.payload过滤器:((test_2.partition_id> = 1)和(test_2.partition_id 执行时间:207.413ms如果您将条件更改为指向特定分区并取消默认值的资格,您仍然可以得到多个工人: -gather(成本= 1000.00..8187.90行= 2646宽度= 36)(实际时间= 0.219..154.021行= 500000 loops = 1) 输出:test.partition_id,test.payload

< 2))

计划时间:0.333ms

create table test (partition_id int, payload text) partition by list(partition_id); create table test1 partition of test for values in (1); create table test2 partition of test for values in (2); create table test_default partition of test default; select setseed(.42); insert into test select 1, md5(random()::text) from generate_series(1,5e5);

QueryPlan< 2))

工人计划:2

工人启动:2->平行SEQ在public.test1 test上扫描(成本= 0.00..6923.30行= 1102宽度= 36)(实际时间= 0.023..26.260行= 166667 loops = 3)执行时间:173.165ms

输出:test.partition_id,test.payload

过滤器:( test.partition_id = 1)

工人0:实际时间= 0.023..2.136行= 13440 loops =1

工人1:实际时间= 0.036..2.131行= 13440 loops =1

计划时间:0.212ms

最新问题
© www.soinside.com 2019 - 2024. All rights reserved.