在Spark中,在将数据集写入数据库时 ,需要一些预先假定的时间进行保存操作1

问题描述 投票:1回答:1

我运行了如下所述的spark-submit命令,它执行从数据库加载的数据集,处理,并在最后阶段将多个数据集推送到Oracle DB。

./spark-submit --class com.sample.Transformation --conf spark.sql.shuffle.partitions=5001 
    --num-executors=40 --executor-cores=1 --executor-memory=5G 
    --jars /scratch/rmbbuild/spark_ormb/drools-jars/ojdbc6.jar,
        /scratch/rmbbuild/spark_ormb/drools-jars/kie-api-7.7.0.Final.jar,
        /scratch/rmbbuild/spark_ormb/drools-jars/drools-core-7.7.0.Final.jar,
        /scratch/rmbbuild/spark_ormb/drools-jars/drools-compiler-7.7.0.Final.jar,
        /scratch/rmbbuild/spark_ormb/drools-jars/kie-soup-maven-support-7.7.0.Final.jar,
        /scratch/rmbbuild/spark_ormb/drools-jars/kie-internal-7.7.0.Final.jar,
        /scratch/rmbbuild/spark_ormb/drools-jars/xstream-1.4.10.jar,
        /scratch/rmbbuild/spark_ormb/drools-jars/kie-soup-commons-7.7.0.Final.jar,
        /scratch/rmbbuild/spark_ormb/drools-jars/ecj-4.4.2.jar,
        /scratch/rmbbuild/spark_ormb/drools-jars/mvel2-2.4.0.Final.jar,
        /scratch/rmbbuild/spark_ormb/drools-jars/kie-soup-project-datamodel-commons-7.7.0.Final.jar,
        /scratch/rmbbuild/spark_ormb/drools-jars/kie-soup-project-datamodel-api-7.7.0.Final.jar 
    --driver-class-path /scratch/rmbbuild/spark_ormb/drools-jars/ojdbc6.jar 
    --master spark://10.180.181.41:7077 "/scratch/rmbbuild/spark_ormb/POC-jar/Transformation-0.0.1-SNAPSHOT.jar" 
        > /scratch/rmbbuild/spark_ormb/POC-jar/logs/logs12.txt

但是,在将数据集写入数据库时​​需要一些预先假定的时间,不知道为什么它在开始写入过程之前消耗这么长时间。附上截图,清楚地突出了我面临的问题。请在评论解决方案之前浏览屏幕截图。 Spark Dashboard Stages截图:

如果我们查看屏幕截图,我会突出显示大约10分钟的时间,这是在每个数据集写入数据库之前消耗的。即使我将batchsize更改为100000,如下所示:

outputDataSetforsummary.write().mode("append").format("jdbc").option("url", connection)
    .option("batchSize", "100000").option("dbtable", CI_TXN_DTL).save();

所以,如果任何人都可以解释为什么每次消耗这个预写时间,以及如何避免这个时间。

Edited[1]

我附上代码以获得该程序的更多描述。

   public static void main(String[] args) {

        SparkConf conf = new
            //  SparkConf().setAppName("Transformation").setMaster("local");
        SparkConf().setAppName("Transformation").setMaster("spark://xx.xx.xx.xx:7077");
        String connection = "jdbc:oracle:thin:ABC/abc@//xx.x.x.x:1521/ABC";

        // Create Spark Context
        SparkContext context = new SparkContext(conf);
        // Create Spark Session
        SparkSession sparkSession = new SparkSession(context);
        Dataset<Row> txnDf  = sparkSession.read().format("jdbc").option("url", connection).option("dbtable", CI_TXN_DETAIL_STG).load();
        //Dataset<Row> txnDf  = sparkSession.read().format("jdbc").option("url", connection).option("dbtable", "CI_TXN_DETAIL_STG").load();
        Dataset<Row> newTxnDf  = txnDf.drop(ACCT_ID);

        Dataset<Row> accountDf = sparkSession.read().format("jdbc").option("url", connection).option("dbtable", CI_ACCT_NBR).load();
        //  Dataset<Row> accountDf = sparkSession.read().format("jdbc").option("url", connection).option("dbtable", "CI_ACCT_NBR").load();

        Dataset<Row> joined = newTxnDf.join(accountDf, newTxnDf.col(ACCT_NBR).equalTo(accountDf.col(ACCT_NBR))
                .and(newTxnDf.col(ACCT_NBR_TYPE_CD).equalTo(accountDf.col(ACCT_NBR_TYPE_CD))), "inner");
        Dataset<Row> finalJoined = joined.drop(accountDf.col(ACCT_NBR_TYPE_CD)).drop(accountDf.col(ACCT_NBR))
                .drop(accountDf.col(VERSION)).drop(accountDf.col(PRIM_SW));


        initializeProductDerivationCache(sparkSession,connection);


        ClassTag<List<String>> evidenceForDivision = scala.reflect.ClassTag$.MODULE$.apply(List.class);
        Broadcast<List<String>> broadcastVarForDiv = context.broadcast(divisionList, evidenceForDivision);

        ClassTag<List<String>> evidenceForCurrency = scala.reflect.ClassTag$.MODULE$.apply(List.class);
        Broadcast<List<String>> broadcastVarForCurrency = context.broadcast(currencySet, evidenceForCurrency);

        ClassTag<List<String>> evidenceForUserID = scala.reflect.ClassTag$.MODULE$.apply(List.class);
        Broadcast<List<String>> broadcastVarForUserID = context.broadcast(userIdList, evidenceForUserID);



        Encoder<RuleParamsBean> encoder = Encoders.bean(RuleParamsBean.class);
        Dataset<RuleParamsBean> ds = new Dataset<RuleParamsBean>(sparkSession, finalJoined.logicalPlan(), encoder);
        Dataset<RuleParamsBean> validateDataset = ds.map(ruleParamsBean -> validateTransaction(ruleParamsBean,broadcastVarForDiv.value(),broadcastVarForCurrency.value(),
                                                            broadcastVarForUserID.value()),encoder);


        Dataset<RuleParamsBean> filteredDS = validateDataset.filter(validateDataset.col(BO_STATUS_CD).notEqual(TFMAppConstants.TXN_INVALID));
        //For formatting the data to be inserted in table -->   Dataset<Row>finalvalidateDataset = validateDataset.select("ACCT_ID");



        Encoder<TxnDetailOutput>txndetailencoder = Encoders.bean(TxnDetailOutput.class);
        Dataset<TxnDetailOutput>txndetailDS =validateDataset.map(ruleParamsBean ->outputfortxndetail(ruleParamsBean),txndetailencoder );




        KieServices ks = KieServices.Factory.get();
        KieContainer kContainer = ks.getKieClasspathContainer();
        ClassTag<KieBase> classTagTest = scala.reflect.ClassTag$.MODULE$.apply(KieBase.class);
        Broadcast<KieBase> broadcastRules = context.broadcast(kContainer.getKieBase(KIE_BASE), classTagTest);

        Encoder<PritmRuleOutput> outputEncoder = Encoders.bean(PritmRuleOutput.class);
        Dataset<PritmRuleOutput> outputDataSet = filteredDS.flatMap(rulesParamBean -> droolprocesMap(broadcastRules.value(), rulesParamBean), outputEncoder);

        Dataset<Row>piParamDS1 =outputDataSet.select(PRICEITEM_PARM_GRP_VAL);
        Dataset<Row> piParamDS = piParamDS1.withColumnRenamed(PRICEITEM_PARM_GRP_VAL, PARM_STR);

        priceItemParamGrpValueCache.createOrReplaceTempView("temp1");
        Dataset<Row>piParamDSS = piParamDS.where(queryToFiltertheDuplicateParamVal);
        Dataset<Row> priceItemParamsGrpDS = piParamDSS.select(PARM_STR).distinct().withColumn(PRICEITEM_PARM_GRP_ID, functions.monotonically_increasing_id());
        Dataset<Row> finalpriceItemParamsGrpDS = priceItemParamsGrpDS.withColumn(PARM_COUNT, functions.size(functions.split(priceItemParamsGrpDS.col(PARM_STR),TOKENIZER)));
        finalpriceItemParamsGrpDS.persist(StorageLevel.MEMORY_ONLY());
        finalpriceItemParamsGrpDS.distinct().write().mode("append").format("jdbc").option("url", connection).option("dbtable", CI_PRICEITEM_PARM_GRP_K).option("batchSize", "1000").save();



        Dataset<Row> PritmOutput = outputDataSet.join(priceItemParamsGrpDS,outputDataSet.col(PRICEITEM_PARM_GRP_VAL).equalTo(priceItemParamsGrpDS.col(PARM_STR)),"inner");
        Dataset<Row> samplePritmOutput = PritmOutput.drop(outputDataSet.col(PRICEITEM_PARM_GRP_ID))
                .drop(priceItemParamsGrpDS.col(PARM_STR));

        priceItemParamsGrpDS.createOrReplaceTempView(PARM_STR);
        Dataset<Row> priceItemParamsGroupTable =sparkSession.sql(FETCH_QUERY_TO_SPLIT);
        Dataset<Row> finalpriceItemParamsGroupTable = priceItemParamsGroupTable.selectExpr("PRICEITEM_PARM_GRP_ID","split(col, '=')[0] as PRICEITEM_PARM_CD ","split(col, '=')[1] as PRICEITEM_PARM_VAL");
        finalpriceItemParamsGroupTable.persist(StorageLevel.MEMORY_ONLY());
        finalpriceItemParamsGroupTable.distinct().write().mode("append").format("jdbc").option("url", connection).option("dbtable", CI_PRICEITEM_PARM_GRP).option("batchSize", "1000").save();
}

Help me out if you find any solution to reduce the write speed into Database.

scala apache-spark pyspark spark-java
1个回答
1
投票

它重新加载整个数据并在每个write to db动作中一次又一次地连接数据帧。

请添加validateDataset.persist(StorageLevel.MEMORY_ONLY()) - (你应该考虑在mem或磁盘上或你自己的mem_and_disk取决于你的数据帧大小。它是否适合mem)

例如:

 Dataset<RuleParamsBean> validateDataset = ds.map(ruleParamsBean -> validateTransaction(ruleParamsBean,broadcastVarForDiv.value(),broadcastVarForCurrency.value(),broadcastVarForUserID.value()),encoder)
.persist(StorageLevel.MEMORY_ONLY());
© www.soinside.com 2019 - 2024. All rights reserved.