我运行了如下所述的spark-submit命令,它执行从数据库加载的数据集,处理,并在最后阶段将多个数据集推送到Oracle DB。
./spark-submit --class com.sample.Transformation --conf spark.sql.shuffle.partitions=5001
--num-executors=40 --executor-cores=1 --executor-memory=5G
--jars /scratch/rmbbuild/spark_ormb/drools-jars/ojdbc6.jar,
/scratch/rmbbuild/spark_ormb/drools-jars/kie-api-7.7.0.Final.jar,
/scratch/rmbbuild/spark_ormb/drools-jars/drools-core-7.7.0.Final.jar,
/scratch/rmbbuild/spark_ormb/drools-jars/drools-compiler-7.7.0.Final.jar,
/scratch/rmbbuild/spark_ormb/drools-jars/kie-soup-maven-support-7.7.0.Final.jar,
/scratch/rmbbuild/spark_ormb/drools-jars/kie-internal-7.7.0.Final.jar,
/scratch/rmbbuild/spark_ormb/drools-jars/xstream-1.4.10.jar,
/scratch/rmbbuild/spark_ormb/drools-jars/kie-soup-commons-7.7.0.Final.jar,
/scratch/rmbbuild/spark_ormb/drools-jars/ecj-4.4.2.jar,
/scratch/rmbbuild/spark_ormb/drools-jars/mvel2-2.4.0.Final.jar,
/scratch/rmbbuild/spark_ormb/drools-jars/kie-soup-project-datamodel-commons-7.7.0.Final.jar,
/scratch/rmbbuild/spark_ormb/drools-jars/kie-soup-project-datamodel-api-7.7.0.Final.jar
--driver-class-path /scratch/rmbbuild/spark_ormb/drools-jars/ojdbc6.jar
--master spark://10.180.181.41:7077 "/scratch/rmbbuild/spark_ormb/POC-jar/Transformation-0.0.1-SNAPSHOT.jar"
> /scratch/rmbbuild/spark_ormb/POC-jar/logs/logs12.txt
但是,在将数据集写入数据库时需要一些预先假定的时间,不知道为什么它在开始写入过程之前消耗这么长时间。附上截图,清楚地突出了我面临的问题。请在评论解决方案之前浏览屏幕截图。 Spark Dashboard Stages截图:
如果我们查看屏幕截图,我会突出显示大约10分钟的时间,这是在每个数据集写入数据库之前消耗的。即使我将batchsize更改为100000,如下所示:
outputDataSetforsummary.write().mode("append").format("jdbc").option("url", connection)
.option("batchSize", "100000").option("dbtable", CI_TXN_DTL).save();
所以,如果任何人都可以解释为什么每次消耗这个预写时间,以及如何避免这个时间。
我附上代码以获得该程序的更多描述。
public static void main(String[] args) {
SparkConf conf = new
// SparkConf().setAppName("Transformation").setMaster("local");
SparkConf().setAppName("Transformation").setMaster("spark://xx.xx.xx.xx:7077");
String connection = "jdbc:oracle:thin:ABC/abc@//xx.x.x.x:1521/ABC";
// Create Spark Context
SparkContext context = new SparkContext(conf);
// Create Spark Session
SparkSession sparkSession = new SparkSession(context);
Dataset<Row> txnDf = sparkSession.read().format("jdbc").option("url", connection).option("dbtable", CI_TXN_DETAIL_STG).load();
//Dataset<Row> txnDf = sparkSession.read().format("jdbc").option("url", connection).option("dbtable", "CI_TXN_DETAIL_STG").load();
Dataset<Row> newTxnDf = txnDf.drop(ACCT_ID);
Dataset<Row> accountDf = sparkSession.read().format("jdbc").option("url", connection).option("dbtable", CI_ACCT_NBR).load();
// Dataset<Row> accountDf = sparkSession.read().format("jdbc").option("url", connection).option("dbtable", "CI_ACCT_NBR").load();
Dataset<Row> joined = newTxnDf.join(accountDf, newTxnDf.col(ACCT_NBR).equalTo(accountDf.col(ACCT_NBR))
.and(newTxnDf.col(ACCT_NBR_TYPE_CD).equalTo(accountDf.col(ACCT_NBR_TYPE_CD))), "inner");
Dataset<Row> finalJoined = joined.drop(accountDf.col(ACCT_NBR_TYPE_CD)).drop(accountDf.col(ACCT_NBR))
.drop(accountDf.col(VERSION)).drop(accountDf.col(PRIM_SW));
initializeProductDerivationCache(sparkSession,connection);
ClassTag<List<String>> evidenceForDivision = scala.reflect.ClassTag$.MODULE$.apply(List.class);
Broadcast<List<String>> broadcastVarForDiv = context.broadcast(divisionList, evidenceForDivision);
ClassTag<List<String>> evidenceForCurrency = scala.reflect.ClassTag$.MODULE$.apply(List.class);
Broadcast<List<String>> broadcastVarForCurrency = context.broadcast(currencySet, evidenceForCurrency);
ClassTag<List<String>> evidenceForUserID = scala.reflect.ClassTag$.MODULE$.apply(List.class);
Broadcast<List<String>> broadcastVarForUserID = context.broadcast(userIdList, evidenceForUserID);
Encoder<RuleParamsBean> encoder = Encoders.bean(RuleParamsBean.class);
Dataset<RuleParamsBean> ds = new Dataset<RuleParamsBean>(sparkSession, finalJoined.logicalPlan(), encoder);
Dataset<RuleParamsBean> validateDataset = ds.map(ruleParamsBean -> validateTransaction(ruleParamsBean,broadcastVarForDiv.value(),broadcastVarForCurrency.value(),
broadcastVarForUserID.value()),encoder);
Dataset<RuleParamsBean> filteredDS = validateDataset.filter(validateDataset.col(BO_STATUS_CD).notEqual(TFMAppConstants.TXN_INVALID));
//For formatting the data to be inserted in table --> Dataset<Row>finalvalidateDataset = validateDataset.select("ACCT_ID");
Encoder<TxnDetailOutput>txndetailencoder = Encoders.bean(TxnDetailOutput.class);
Dataset<TxnDetailOutput>txndetailDS =validateDataset.map(ruleParamsBean ->outputfortxndetail(ruleParamsBean),txndetailencoder );
KieServices ks = KieServices.Factory.get();
KieContainer kContainer = ks.getKieClasspathContainer();
ClassTag<KieBase> classTagTest = scala.reflect.ClassTag$.MODULE$.apply(KieBase.class);
Broadcast<KieBase> broadcastRules = context.broadcast(kContainer.getKieBase(KIE_BASE), classTagTest);
Encoder<PritmRuleOutput> outputEncoder = Encoders.bean(PritmRuleOutput.class);
Dataset<PritmRuleOutput> outputDataSet = filteredDS.flatMap(rulesParamBean -> droolprocesMap(broadcastRules.value(), rulesParamBean), outputEncoder);
Dataset<Row>piParamDS1 =outputDataSet.select(PRICEITEM_PARM_GRP_VAL);
Dataset<Row> piParamDS = piParamDS1.withColumnRenamed(PRICEITEM_PARM_GRP_VAL, PARM_STR);
priceItemParamGrpValueCache.createOrReplaceTempView("temp1");
Dataset<Row>piParamDSS = piParamDS.where(queryToFiltertheDuplicateParamVal);
Dataset<Row> priceItemParamsGrpDS = piParamDSS.select(PARM_STR).distinct().withColumn(PRICEITEM_PARM_GRP_ID, functions.monotonically_increasing_id());
Dataset<Row> finalpriceItemParamsGrpDS = priceItemParamsGrpDS.withColumn(PARM_COUNT, functions.size(functions.split(priceItemParamsGrpDS.col(PARM_STR),TOKENIZER)));
finalpriceItemParamsGrpDS.persist(StorageLevel.MEMORY_ONLY());
finalpriceItemParamsGrpDS.distinct().write().mode("append").format("jdbc").option("url", connection).option("dbtable", CI_PRICEITEM_PARM_GRP_K).option("batchSize", "1000").save();
Dataset<Row> PritmOutput = outputDataSet.join(priceItemParamsGrpDS,outputDataSet.col(PRICEITEM_PARM_GRP_VAL).equalTo(priceItemParamsGrpDS.col(PARM_STR)),"inner");
Dataset<Row> samplePritmOutput = PritmOutput.drop(outputDataSet.col(PRICEITEM_PARM_GRP_ID))
.drop(priceItemParamsGrpDS.col(PARM_STR));
priceItemParamsGrpDS.createOrReplaceTempView(PARM_STR);
Dataset<Row> priceItemParamsGroupTable =sparkSession.sql(FETCH_QUERY_TO_SPLIT);
Dataset<Row> finalpriceItemParamsGroupTable = priceItemParamsGroupTable.selectExpr("PRICEITEM_PARM_GRP_ID","split(col, '=')[0] as PRICEITEM_PARM_CD ","split(col, '=')[1] as PRICEITEM_PARM_VAL");
finalpriceItemParamsGroupTable.persist(StorageLevel.MEMORY_ONLY());
finalpriceItemParamsGroupTable.distinct().write().mode("append").format("jdbc").option("url", connection).option("dbtable", CI_PRICEITEM_PARM_GRP).option("batchSize", "1000").save();
}
它重新加载整个数据并在每个write to db
动作中一次又一次地连接数据帧。
请添加validateDataset.persist(StorageLevel.MEMORY_ONLY())
- (你应该考虑在mem或磁盘上或你自己的mem_and_disk取决于你的数据帧大小。它是否适合mem)
例如:
Dataset<RuleParamsBean> validateDataset = ds.map(ruleParamsBean -> validateTransaction(ruleParamsBean,broadcastVarForDiv.value(),broadcastVarForCurrency.value(),broadcastVarForUserID.value()),encoder)
.persist(StorageLevel.MEMORY_ONLY());