Spark-Scala 与 Pyspark Dag 不同吗?

问题描述 投票:0回答:1

我正在将 pyspark 作业转换为 Scala,作业在 emr 中执行。参数、数据和代码相同。但是我发现运行时间不同,因此创建的 dag 也不同。这里我添加了从 UI 读取数据的部分。 如果您在 UI 中看到输出行任务的数量。

**Spark-submit command**
spark-submit --conf spark.sql.files.maxPartitionBytes=268435456 \
--master yarn --deploy-mode cluster  --conf spark.yarn.maxAppAttempts=1 \
--conf spark.sql.adaptive.enabled=true --conf spark.dynamicAllocation.enabled=false \
--conf spark.sql.parquet.filterPushdown=true \
--conf spark.sql.adaptive.coalescePartitions.enabled=true \
--conf spark.sql.adaptive.advisoryPartitionSizeInBytes=268435456 \
--conf spark.sql.adaptive.optimizeSkewsInRebalancePartitions.enabled=true \
--conf spark.sql.adaptive.rebalancePartitionsSmallPartitionFactor=.5 \
--conf spark.sql.adaptive.coalescePartitions.parallelismFirst=false \
--conf spark.sql.adaptive.coalescePartitions.initialPartitionNum=12000 \
--conf spark.sql.adaptive.localShuffleReader.enabled=true \
--conf spark.shuffle.io.connectionTimeout=8000 \
--conf spark.network.timeout=50000s  --conf spark.files.fetchTimeout=600s \
--conf spark.serializer=org.apache.spark.serializer.KryoSerializer --conf spark.kryoserializer.buffer.max=1g \
--conf spark.memory.storageFraction=0.05 --conf spark.memory.fraction=.8 \
--conf spark.shuffle.compress=true --conf spark.shuffle.spill.compress=true \
--conf spark.hadoop.fs.s3.multipart.th.fraction.parts.completed=0.99 \
--conf spark.sql.objectHashAggregate.sortBased.fallbackThreshold=4000000 \
--conf spark.reducer.maxReqsInFlight=12 \
--conf spark.shuffle.io.retryWait=60s \
--conf spark.shuffle.io.maxRetries=10 \
--conf spark.reducer.maxSizeInFlight=256m \
--conf spark.reducer.maxBlocksInFlightPerAddress=100 \
--conf spark.io.compression.codec=zstd \
--conf spark.shuffle.service.enabled=true \
--conf spark.io.compression.zstd.level=3 \
--conf spark.executor.cores=5 \
--conf spark.executor.instances=1200 \
--conf spark.executor.memory=34g --conf spark.driver.memory=60g --conf spark.executor.memoryOverhead=5g --conf spark.driver.memoryOverhead=4g \
--conf spark.hadoop.fs.s3a.fast.output.enabled=true \
--conf spark.executor.extraJavaOptions="-XX:+UseG1GC -XX:+UnlockDiagnosticVMOptions -XX:+G1SummarizeConcMark -XX:InitiatingHeapOccupancyPercent=35 -verbose:gc -XX:+PrintGCDetails -XX:+PrintGCDateStamps -XX:OnOutOfMemoryError='kill -9 %p' -Djavax.net.ssl.trustStore=/home/hadoop/.config/certs/InternalAndExternalTrustStore.jks" --conf spark.driver.extraJavaOptions="-XX:+UseG1GC " \
test.py

第一个 Dag 来自 pyspark 执行,第二个是 Scala。

Pyspark code

Scala Jar

scala apache-spark pyspark amazon-emr
1个回答
0
投票

这个答案,尽管是一个稍微不同的问题,解释了 ColumnarToRow 可能在你的 python 计划中做什么,但简而言之,它是 Spark 执行中使用的 InternalRows 的 jvm 表示的转换层。

在使用 Scala 的典型用例中您不会看到它。

© www.soinside.com 2019 - 2024. All rights reserved.