连接两个 df 并在连接期间按第一个 df 字段条件剪切第二个 df 字段?

问题描述 投票:0回答:1

我有两个 df -

first 包含开始和结束位置,如

id start end 

1  4     8

2  2     6

2  5     7

第二个 df 带有 id-string

id string

1   my beautiful data

2   lorem ipsum

“按原样”连接,然后将字符串剪切到所需的位置 - 内存不足失败。 第一个 df 大约有 1kk 个条目,第二个大约有 10 个条目,但每个字符串大约有 100Mb

所以我在想是否可以在连接过程中剪切字符串,然后每个字符串都会有几个字符 - 并且它的大小是可以接受的。

结果应该是这样的:

id 开始结束序列

1  | 4   |  8  | beaut

2  | 2   |  6  | orem i

2  | 5   | 7   | m i


非常感谢!

更新:

目前我只是进行一对一连接,然后得到类似的子字符串

df1 = df1.join(df2, "id") 

df1.withColumn("substring" df1['string'].substr(df1.start, df1.end))).show() 

但是加入因 OOM 失败,这里是优化的物理计划(我尝试了广播、重新分区),但无论如何,中间结果的这种大小是不可接受的。单独显示和处理此 dfs,没有错误:

exonsDF = exons_raw.join(dnaDF, "seq_region_id").explain("cost")

   == Optimized Logical Plan ==
Project [seq_region_id#63L, exon_id#62L, seq_region_start#64L, seq_region_end#65L, seq_region_strand#66, phase#67, end_phase#68, is_current#69, is_constitutive#70, stable_id#71, version#72, created_date#73, modified_date#74, transcript_id#75L, rank#76, sequence#148], Statistics(sizeInBytes=12.3 PiB)
+- Join Inner, (seq_region_id#63L = cast(seq_region_id#147 as bigint)), Statistics(sizeInBytes=14.4 PiB)
   :- Repartition 10, true, Statistics(sizeInBytes=13.6 MiB)
   :  +- Filter isnotnull(seq_region_id#63L), Statistics(sizeInBytes=13.6 MiB)
   :     +- Relation [exon_id#62L,seq_region_id#63L,seq_region_start#64L,seq_region_end#65L,seq_region_strand#66,phase#67,end_phase#68,is_current#69,is_constitutive#70,stable_id#71,version#72,created_date#73,modified_date#74,transcript_id#75L,rank#76] orc, Statistics(sizeInBytes=13.6 MiB)
   +- Repartition 10000, true, Statistics(sizeInBytes=1083.8 MiB)
      +- Filter isnotnull(seq_region_id#147), Statistics(sizeInBytes=1083.8 MiB)
         +- Relation [seq_region_id#147,sequence#148] orc, Statistics(sizeInBytes=1083.8 MiB)

== Physical Plan ==
AdaptiveSparkPlan isFinalPlan=false
+- Project [seq_region_id#63L, exon_id#62L, seq_region_start#64L, seq_region_end#65L, seq_region_strand#66, phase#67, end_phase#68, is_current#69, is_constitutive#70, stable_id#71, version#72, created_date#73, modified_date#74, transcript_id#75L, rank#76, sequence#148]
   +- SortMergeJoin [seq_region_id#63L], [cast(seq_region_id#147 as bigint)], Inner
      :- Sort [seq_region_id#63L ASC NULLS FIRST], false, 0
      :  +- Exchange hashpartitioning(seq_region_id#63L, 200), ENSURE_REQUIREMENTS, [plan_id=335]
      :     +- Exchange RoundRobinPartitioning(10), REPARTITION_BY_NUM, [plan_id=330]
      :        +- Filter isnotnull(seq_region_id#63L)
      :           +- FileScan orc [exon_id#62L,seq_region_id#63L,seq_region_start#64L,seq_region_end#65L,seq_region_strand#66,phase#67,end_phase#68,is_current#69,is_constitutive#70,stable_id#71,version#72,created_date#73,modified_date#74,transcript_id#75L,rank#76] Batched: true, DataFilters: [isnotnull(seq_region_id#63L)], Format: ORC, Location: InMemoryFileIndex(1 paths)[file:/nfs/production/flicek/ensembl/infrastructure/mira/19tmp/exons], PartitionFilters: [], PushedFilters: [IsNotNull(seq_region_id)], ReadSchema: struct<exon_id:bigint,seq_region_id:bigint,seq_region_start:bigint,seq_region_end:bigint,seq_regi...
      +- Sort [cast(seq_region_id#147 as bigint) ASC NULLS FIRST], false, 0
         +- Exchange hashpartitioning(cast(seq_region_id#147 as bigint), 200), ENSURE_REQUIREMENTS, [plan_id=336]
            +- Exchange RoundRobinPartitioning(10000), REPARTITION_BY_NUM, [plan_id=331]
               +- Filter isnotnull(seq_region_id#147)
                  +- FileScan orc [seq_region_id#147,sequence#148] Batched: true, DataFilters: [isnotnull(seq_region_id#147)], Format: ORC, Location: InMemoryFileIndex(1 paths)[file:/nfs/production/flicek/ensembl/infrastructure/mira/6tmp/sequence], PartitionFilters: [], PushedFilters: [IsNotNull(seq_region_id)], ReadSchema: struct<seq_region_id:string,sequence:string>


apache-spark join substring
1个回答
0
投票

尝试通过 seq_region_id 字段进行重新分区,而不是按数字重新分区:

exon = exon.repartition(exon['seq_region_id'])
sequence = sequence.repartition(sequence['seq_region_id'].cast("bigint"))

演员阵容是必要的

cast(seq_region_id#147 as bigint)

是磁盘上具有字符串、外显子具有 bigint 的序列所必需的。 这可能是故意的,但考虑到名称和这个示例,它看起来像是一个错误,这也将加快在磁盘上纠正的速度。

考虑到 3.5 的使用,排序合并连接绝对是处理这个问题最快/最有效的方法,spark 会为你选择这个。

最后确保在子字符串操作之后删除“sequence”。 这将使 Spark 知道最终输出中不需要它,并且还有机会降低内存使用量。

© www.soinside.com 2019 - 2024. All rights reserved.