我有一个Java-spark代码,可以读取某些属性文件。这些属性与spark-submit
一起传递,如:
spark-submit
--master yarn \
--deploy-mode cluster \
--files /home/aiman/SalesforceConn.properties,/home/aiman/columnMapping.prop,/home/aiman/sourceTableColumns.prop \
--class com.sfdc.SaleforceReader \
--verbose \
--jars /home/ebdpbuss/aiman/Salesforce/ojdbc-7.jar /home/aiman/spark-salesforce-0.0.1-SNAPSHOT-jar-with-dependencies.jar SalesforceConn.properties columnMapping.prop sourceTableColumns.prop
我写的代码是:
SparkSession spark = SparkSession.builder().master("yarn").config("spark.submit.deployMode","cluster").getOrCreate();
JavaSparkContext jsc = new JavaSparkContext(spark.sparkContext());
Configuration config = jsc.hadoopConfiguration();
FileSystem fs = FileSystem.get(config);
//args[] is the file names that is passed as arguments.
String connDetailsFile = args[0];
String mapFile = args[1];
String sourceColumnsFile = args[2];
String connFile = SparkFiles.get(connDetailsFile);
String mappingFile = SparkFiles.get(mapFile);
String srcColsFile = SparkFiles.get(sourceColumnsFile);
Properties prop = loadProperties(fs,connFile);
Properties mappings = loadProperties(fs,mappingFile);
Properties srcColProp = loadProperties(fs,srcColsFile);
我上面使用的loadProperties()
方法:
private static Properties loadProperties(FileSystem fs, String path)
{
Properties prop = new Properties();
FSDataInputStream is = null;
try{
is = fs.open(new Path(path));
prop.load(is);
} catch(Exception e){
e.printStackTrace();
System.exit(1);
}
return prop;
}
它给了我例外:
Exception in thread "main" org.apache.spark.SparkException: Application application_1550650871366_125913 finished with failed status
at org.apache.spark.deploy.yarn.Client.run(Client.scala:1187)
at org.apache.spark.deploy.yarn.Client$.main(Client.scala:1233)
at org.apache.spark.deploy.yarn.Client.main(Client.scala)
at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
at sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62)
at sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
at java.lang.reflect.Method.invoke(Method.java:498)
at org.apache.spark.deploy.SparkSubmit$.org$apache$spark$deploy$SparkSubmit$$runMain(SparkSubmit.scala:782)
at org.apache.spark.deploy.SparkSubmit$.doRunMain$1(SparkSubmit.scala:180)
at org.apache.spark.deploy.SparkSubmit$.submit(SparkSubmit.scala:205)
at org.apache.spark.deploy.SparkSubmit$.main(SparkSubmit.scala:119)
at org.apache.spark.deploy.SparkSubmit.main(SparkSubmit.scala)
19/03/01 14:34:00 INFO ShutdownHookManager: Shutdown hook called
使用--files将路径传递给文件时,它们将存储在每个执行程序的本地目录(临时)中。因此,如果文件名没有更改,那么您可以按如下方式使用它们,而不是使用参数中提供的完整路径。
String connDetailsFile = "SalesforceConn.properties";
String mapFile = "columnMapping.prop";
String sourceColumnsFile = "sourceTableColumns.prop";
如果文件名每次都更改,那么您必须剥离文件的路径并只使用文件名。这是因为spark不会将其识别为路径,而是将整个字符串视为文件名。例如,/ home / aman / SalesforceConn.properties将被视为文件名,spark会给你一个例外,说它找不到名为/home/aiman/SalesforceConn.properties的文件
所以你的代码应该是这样的。
String connDetailsFile = args[0].split("/").last
String mapFile = args[1].split("/").last
String sourceColumnsFile = args[2].split("/").last