自动优化导致数据差异

问题描述 投票:0回答:1

我在 Azure Databricks 中有一个增量表,每 10 分钟就会获得

MERGE
d。

在下面的截图中:

enter image description here

在此表的版本历史记录中,我每 10 分钟就会看到一次

MERGE
操作,这是预期的。除此之外,我每运行 3-4 次后就会看到
OPTIMIZE
操作,这篇文章 建议 Databricks 对表进行预测优化,这就是我看到这些
OPTIMIZE
操作的原因。

我的问题:当我使用一些基本的

SELECT
子句在此表上运行简单的
WHERE
语句时,我随机看到数据差异(记录比预期少)。为了深入了解这一点,我记下运行查询并获得较少记录的时间,然后查询该时间表的历史记录。似乎当它执行
OPTIMIZE
时,它正在将较小的文件组合成一个更大的文件,但我陷入了中间,并且存在差异,从而破坏了 Databricks 提供的 ACID 属性。

这是

OPTIMIZE
操作的操作指标:

numRemovedFiles: "2"
numRemovedBytes: "5644151"
p25FileSize: "3234772"
numDeletionVectorsRemoved: "1"
minFileSize: "3234772"
numAddedFiles: "1"
maxFileSize: "3234772"
p75FileSize: "3234772"
p50FileSize: "3234772"
numAddedBytes: "3234772"

这是 10 秒前运行的

MERGE
操作的操作指标:

numTargetRowsCopied: "0"
numTargetRowsDeleted: "0"
numTargetFilesAdded: "1"
numTargetBytesAdded: "2409613"
numTargetBytesRemoved: "0"
numTargetDeletionVectorsAdded: "1"
numTargetRowsMatchedUpdated: "20380"
executionTimeMs: "22332"
materializeSourceTimeMs: "5886"
numTargetRowsInserted: "6"
numTargetRowsMatchedDeleted: "0"
numTargetDeletionVectorsUpdated: "0"
scanTimeMs: "1781"
numTargetRowsUpdated: "20380"
numOutputRows: "20386"
numTargetDeletionVectorsRemoved: "0"
numTargetRowsNotMatchedBySourceUpdated: "0"
numTargetChangeFilesAdded: "1"
numSourceRows: "20386"
numTargetFilesRemoved: "0"
numTargetRowsNotMatchedBySourceDeleted: "0"
rewriteTimeMs: "14510"

造成这种数据差异的原因可能是什么?在这些操作中我是否缺少任何设置?

azure azure-databricks
1个回答
0
投票

正如您提到的,Databricks 对 Delta 表执行预测优化,这涉及将较小的文件组合成较大的文件以提高查询性能。

此外,您可能在

SELECT
操作正在进行时运行
OPTIMIZE
语句,这可能会导致问题。

您可以尝试以下方法:

spark.conf.set("spark.databricks.delta.autoOptimize.optimizeWrite", "false")
spark.sql("OPTIMIZE delta.`/FileStore/tables/delta/sample1`")

结果:

DataFrame[path: string, metrics: struct<numFilesAdded:bigint,numFilesRemoved:bigint,filesAdded:struct<min:bigint,max:bigint,avg:double,totalFiles:bigint,totalSize:bigint>,filesRemoved:struct<min:bigint,max:bigint,avg:double,totalFiles:bigint,totalSize:bigint>,partitionsOptimized:bigint,zOrderStats:struct<strategyName:string,inputCubeFiles:struct<num:bigint,size:bigint>,inputOtherFiles:struct<num:bigint,size:bigint>,inputNumCubes:bigint,mergedFiles:struct<num:bigint,size:bigint>,numOutputCubes:bigint,mergedNumCubes:bigint>,clusteringStats:struct<inputZCubeFiles:struct<numFiles:bigint,size:bigint>,inputOtherFiles:struct<numFiles:bigint,size:bigint>,inputNumZCubes:bigint,mergedFiles:struct<numFiles:bigint,size:bigint>,numOutputZCubes:bigint>,numBins:bigint,numBatches:bigint,totalConsideredFiles:bigint,totalFilesSkipped:bigint,preserveInsertionOrder:boolean,numFilesSkippedToReduceWriteAmplification:bigint,numBytesSkippedToReduceWriteAmplification:bigint,startTimeMs:bigint,endTimeMs:bigint,totalClusterParallelism:bigint,totalScheduledTasks:bigint,autoCompactParallelismStats:struct<maxClusterActiveParallelism:bigint,minClusterActiveParallelism:bigint,maxSessionActiveParallelism:bigint,minSessionActiveParallelism:bigint>,deletionVectorStats:struct<numDeletionVectorsRemoved:bigint,numDeletionVectorRowsRemoved:bigint>,numTableColumns:bigint,numTableColumnsWithStats:bigint,totalTaskExecutionTimeMs:bigint,skippedArchivedFiles:bigint,clusteringMetrics:struct<sizeOfTableInBytesBeforeLazyClustering:bigint,isNewMetadataCreated:boolean,isPOTriggered:boolean,numFilesSkippedWithoutStats:bigint,numFilesClassifiedToIntermediateNodes:bigint,sizeOfFilesClassifiedToIntermediateNodesInBytes:bigint,logicalSizeOfFilesClassifiedToIntermediateNodesInBytes:bigint,numFilesClassifiedToLeafNodes:bigint,sizeOfFilesClassifiedToLeafNodesInBytes:bigint,logicalSizeOfFilesClassifiedToLeafNodesInBytes:bigint,numThreadsForClassifier:int,clusterThresholdStrategy:string,minFileSize:bigint,maxFileSize:bigint,nodeMinNumFilesToCompact:bigint,numIdealFiles:bigint,numClusteringTasksPlanned:int,numCompactionTasksPlanned:int,numOptimizeBatchesPlanned:int,numLeafNodesExpanded:bigint,numLeafNodesClustered:bigint,numGetFilesForNodeCalls:bigint,numSamplingJobs:bigint,numLeafNodesCompacted:bigint,numIntermediateNodesCompacted:bigint,totalSizeOfDataToCompactInBytes:bigint,totalLogicalSizeOfDataToCompactInBytes:bigint,numIntermediateNodesClustered:bigint,numFilesSkippedAfterExpansion:bigint,totalSizeOfFilesSkippedAfterExpansionInBytes:bigint,totalLogicalSizeOfFilesSkippedAfterExpansionInBytes:bigint,totalSizeOfDataToRewriteInBytes:bigint,totalLogicalSizeOfDataToRewriteInBytes:bigint,timeMetrics:struct<classifierTimeMs:bigint,optimizerTimeMs:bigint,metadataLoadTimeMs:bigint,totalGetFilesForNodeCallsTimeMs:bigint,totalSamplingTimeMs:bigint,metadataCreationTimeMs:bigint>,maxOptimizeBatchesInParallel:bigint,currentIteration:int,maxIterations:int,clusteringStrategy:string>>]

将 delta.autoOptimize.optimizeWrite 配置参数设置为 false 我们可以禁用自动优化 然后,使用 OPTIMIZE 命令手动触发优化。

此外,您还可以将优化文件的最大文件大小设置为 1 GB 并优化 Delta 表。

spark.conf.set("spark.databricks.delta.optimizeWrite.maxFileSize", "1000000000")
spark.sql("OPTIMIZE delta.`/FileStore/tables/delta/sample1` ZORDER BY (id)")
© www.soinside.com 2019 - 2024. All rights reserved.