我有两个 df -
first 包含开始和结束位置,如
id start end
1 4 8
2 2 6
2 5 7
第二个 df 带有 id-string
id string
1 my beautiful data
2 lorem ipsum
“按原样”连接,然后将字符串剪切到所需的位置 - 内存不足失败。 第一个 df 大约有 1kk 个条目,第二个大约有 10 个条目,但每个字符串大约有 100Mb
所以我在想是否可以在连接过程中剪切字符串,然后每个字符串都会有几个字符 - 并且它的大小是可以接受的。
结果应该是这样的:
id 开始结束序列
1 | 4 | 8 | beaut
2 | 2 | 6 | orem i
2 | 5 | 7 | m i
非常感谢!
更新:
目前我只是进行一对一连接,然后得到类似的子字符串
df1 = df1.join(df2, "id")
df1.withColumn("substring" df1['string'].substr(df1.start, df1.end))).show()
但是加入因 OOM 失败,这里是优化的物理计划(我尝试了广播、重新分区),但无论如何,中间结果的这种大小是不可接受的。单独显示和处理此 dfs,没有错误:
exonsDF = exons_raw.join(dnaDF, "seq_region_id").explain("cost")
== Optimized Logical Plan ==
Project [seq_region_id#63L, exon_id#62L, seq_region_start#64L, seq_region_end#65L, seq_region_strand#66, phase#67, end_phase#68, is_current#69, is_constitutive#70, stable_id#71, version#72, created_date#73, modified_date#74, transcript_id#75L, rank#76, sequence#148], Statistics(sizeInBytes=12.3 PiB)
+- Join Inner, (seq_region_id#63L = cast(seq_region_id#147 as bigint)), Statistics(sizeInBytes=14.4 PiB)
:- Repartition 10, true, Statistics(sizeInBytes=13.6 MiB)
: +- Filter isnotnull(seq_region_id#63L), Statistics(sizeInBytes=13.6 MiB)
: +- Relation [exon_id#62L,seq_region_id#63L,seq_region_start#64L,seq_region_end#65L,seq_region_strand#66,phase#67,end_phase#68,is_current#69,is_constitutive#70,stable_id#71,version#72,created_date#73,modified_date#74,transcript_id#75L,rank#76] orc, Statistics(sizeInBytes=13.6 MiB)
+- Repartition 10000, true, Statistics(sizeInBytes=1083.8 MiB)
+- Filter isnotnull(seq_region_id#147), Statistics(sizeInBytes=1083.8 MiB)
+- Relation [seq_region_id#147,sequence#148] orc, Statistics(sizeInBytes=1083.8 MiB)
== Physical Plan ==
AdaptiveSparkPlan isFinalPlan=false
+- Project [seq_region_id#63L, exon_id#62L, seq_region_start#64L, seq_region_end#65L, seq_region_strand#66, phase#67, end_phase#68, is_current#69, is_constitutive#70, stable_id#71, version#72, created_date#73, modified_date#74, transcript_id#75L, rank#76, sequence#148]
+- SortMergeJoin [seq_region_id#63L], [cast(seq_region_id#147 as bigint)], Inner
:- Sort [seq_region_id#63L ASC NULLS FIRST], false, 0
: +- Exchange hashpartitioning(seq_region_id#63L, 200), ENSURE_REQUIREMENTS, [plan_id=335]
: +- Exchange RoundRobinPartitioning(10), REPARTITION_BY_NUM, [plan_id=330]
: +- Filter isnotnull(seq_region_id#63L)
: +- FileScan orc [exon_id#62L,seq_region_id#63L,seq_region_start#64L,seq_region_end#65L,seq_region_strand#66,phase#67,end_phase#68,is_current#69,is_constitutive#70,stable_id#71,version#72,created_date#73,modified_date#74,transcript_id#75L,rank#76] Batched: true, DataFilters: [isnotnull(seq_region_id#63L)], Format: ORC, Location: InMemoryFileIndex(1 paths)[file:/nfs/production/flicek/ensembl/infrastructure/mira/19tmp/exons], PartitionFilters: [], PushedFilters: [IsNotNull(seq_region_id)], ReadSchema: struct<exon_id:bigint,seq_region_id:bigint,seq_region_start:bigint,seq_region_end:bigint,seq_regi...
+- Sort [cast(seq_region_id#147 as bigint) ASC NULLS FIRST], false, 0
+- Exchange hashpartitioning(cast(seq_region_id#147 as bigint), 200), ENSURE_REQUIREMENTS, [plan_id=336]
+- Exchange RoundRobinPartitioning(10000), REPARTITION_BY_NUM, [plan_id=331]
+- Filter isnotnull(seq_region_id#147)
+- FileScan orc [seq_region_id#147,sequence#148] Batched: true, DataFilters: [isnotnull(seq_region_id#147)], Format: ORC, Location: InMemoryFileIndex(1 paths)[file:/nfs/production/flicek/ensembl/infrastructure/mira/6tmp/sequence], PartitionFilters: [], PushedFilters: [IsNotNull(seq_region_id)], ReadSchema: struct<seq_region_id:string,sequence:string>
尝试通过 seq_region_id 字段进行重新分区,而不是按数字重新分区:
exon = exon.repartition(exon['seq_region_id'])
sequence = sequence.repartition(sequence['seq_region_id'].cast("bigint"))
演员阵容是必要的
cast(seq_region_id#147 as bigint)
是磁盘上具有字符串、外显子具有 bigint 的序列所必需的。 这可能是故意的,但考虑到名称和这个示例,它看起来像是一个错误,这也将加快在磁盘上纠正的速度。
考虑到 3.5 的使用,排序合并连接绝对是处理这个问题最快/最有效的方法,spark 会为你选择这个。
最后确保在子字符串操作之后删除“sequence”。 这将使 Spark 知道最终输出中不需要它,并且还有机会降低内存使用量。