使用--files传递文件时,Spark-Submit在群集模式下失败

问题描述 投票:0回答:1

我有一个Java-spark代码,可以读取某些属性文件。这些属性与spark-submit一起传递,如:

spark-submit 
--master yarn \
--deploy-mode cluster \
--files /home/aiman/SalesforceConn.properties,/home/aiman/columnMapping.prop,/home/aiman/sourceTableColumns.prop \
--class com.sfdc.SaleforceReader \
--verbose \
--jars /home/ebdpbuss/aiman/Salesforce/ojdbc-7.jar /home/aiman/spark-salesforce-0.0.1-SNAPSHOT-jar-with-dependencies.jar SalesforceConn.properties columnMapping.prop sourceTableColumns.prop

我写的代码是:

SparkSession spark = SparkSession.builder().master("yarn").config("spark.submit.deployMode","cluster").getOrCreate();
JavaSparkContext jsc = new JavaSparkContext(spark.sparkContext());
Configuration config = jsc.hadoopConfiguration();
FileSystem fs = FileSystem.get(config);

//args[] is the file names that is passed as arguments.
String connDetailsFile = args[0];
String mapFile = args[1];
String sourceColumnsFile = args[2];

String connFile = SparkFiles.get(connDetailsFile);
String mappingFile = SparkFiles.get(mapFile);
String srcColsFile = SparkFiles.get(sourceColumnsFile);

Properties prop = loadProperties(fs,connFile);
Properties mappings = loadProperties(fs,mappingFile);
Properties srcColProp = loadProperties(fs,srcColsFile);

我上面使用的loadProperties()方法:

private static Properties loadProperties(FileSystem fs, String path)
{
    Properties prop = new Properties();
    FSDataInputStream is = null;
    try{
        is = fs.open(new Path(path));
        prop.load(is);
    } catch(Exception e){
        e.printStackTrace();
        System.exit(1);
    }

    return prop;        
}

它给了我例外:

Exception in thread "main" org.apache.spark.SparkException: Application application_1550650871366_125913 finished with failed status
        at org.apache.spark.deploy.yarn.Client.run(Client.scala:1187)
        at org.apache.spark.deploy.yarn.Client$.main(Client.scala:1233)
        at org.apache.spark.deploy.yarn.Client.main(Client.scala)
        at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
        at sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62)
        at sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
        at java.lang.reflect.Method.invoke(Method.java:498)
        at org.apache.spark.deploy.SparkSubmit$.org$apache$spark$deploy$SparkSubmit$$runMain(SparkSubmit.scala:782)
        at org.apache.spark.deploy.SparkSubmit$.doRunMain$1(SparkSubmit.scala:180)
        at org.apache.spark.deploy.SparkSubmit$.submit(SparkSubmit.scala:205)
        at org.apache.spark.deploy.SparkSubmit$.main(SparkSubmit.scala:119)
        at org.apache.spark.deploy.SparkSubmit.main(SparkSubmit.scala)
19/03/01 14:34:00 INFO ShutdownHookManager: Shutdown hook called
apache-spark spark-submit
1个回答
0
投票

使用--files将路径传递给文件时,它们将存储在每个执行程序的本地目录(临时)中。因此,如果文件名没有更改,那么您可以按如下方式使用它们,而不是使用参数中提供的完整路径。

String connDetailsFile = "SalesforceConn.properties";
String mapFile = "columnMapping.prop";
String sourceColumnsFile = "sourceTableColumns.prop";

如果文件名每次都更改,那么您必须剥离文件的路径并只使用文件名。这是因为spark不会将其识别为路径,而是将整个字符串视为文件名。例如,/ home / aman / SalesforceConn.properties将被视为文件名,spark会给你一个例外,说它找不到名为/home/aiman/SalesforceConn.properties的文件

所以你的代码应该是这样的。

String connDetailsFile = args[0].split("/").last
String mapFile = args[1].split("/").last
String sourceColumnsFile = args[2].split("/").last
© www.soinside.com 2019 - 2024. All rights reserved.