使用 Spark 分解大型 JDBC 写入

问题描述 投票:0回答:1

我们想要将大型 Spark 数据帧复制到 Oracle 中,但我发现调整选项有点有限。查看 Spark 文档,我能为 JDBC write 找到的唯一相关调整属性是

numPartitions

但是,我们要写入的数据帧是700,000,000条记录,而Oracle只有32个核心,所以我不想用太多线程使数据库过载。我的理解是,如果我将

numPartitions
设置为 32,它将有效地对大型数据集执行
.repartition(32)
,然后将每个分区写入 Oracle。 Spark 中 32 个分区不够,会导致内存问题。

有没有办法将工作分解成更多的部分,这样它就不会尝试一次做所有事情,而是一次做 50,000,000(或其他)?

我正在考虑这样的事情,但我希望有更有效的方法:

// Imagine "df" is the incoming dataframe we want to write.

val threads = 32
val recordsPerThread = 500000
val chunkSize = threads * recordsPerThread
val total = df.count
val chunks = (total/chunkSize).ceil.toInt

val chunkDf = df.withColumn("CHUNK_NUM", rand.multiply(chunks).cast(IntegerType))

for (chunkNum <- 0 to chunks) {
  chunkDf.filter(s"CHUNK_NUM = ${chunkNum}")
    .drop("CHUNK_NUM")
    .write
    .format("jdbc")
    .options(...) // DB info + numPartitions = 32
    .save
}

基本上,我将数据集分成“块”,这些块可以用 32 个线程(numPartitions)一次写入。我觉得应该有一种更有效的方法来做到这一点,但我似乎无法在文档中找到它。

我还使用

batchSize
设置为 10000 来减少往返次数,但我仍然受限于我想要与 Oracle 进行往返的线程数量以及 Spark 中的分区可以有多大。

scala apache-spark jdbc
1个回答
0
投票

是我想多了。我们可以通过简单地限制我们为 Spark 提供的资源来限制 Spark 一次写入的数量。如果我将

numPartitions
设置为 500,但只给 Spark 一个 32 核工作线程,它一次只会写入 32 个分区,从而限制了我们对 Oracle 的攻击程度。因此,有效地将工作“分块”。

© www.soinside.com 2019 - 2024. All rights reserved.