我们正在使用 Flink 版本 1.13.5 并尝试从 AWS S3 位置读取 ORC 文件。而且,我们正在将应用程序部署在自我管理的 Flink 集群中。请找到下面的代码来读取ORC文件,
import org.apache.flink.streaming.api.scala._
import org.apache.flink.streaming.api.functions.source.FileProcessingMode
import org.apache.hadoop.conf.Configuration
import java.util.concurrent.TimeUnit
import java.time.LocalDateTime
import org.apache.flink.core.fs.Path
import org.apache.flink.orc.OrcRowInputFormat
val en = StreamExecutionEnvironment.getExecutionEnvironment
val config = new Configuration
config.setInt("fs.s3.connection.maximum", 15000)
config.setString("fs.s3.readahead.range", "1M")
val Schema = "struct<empId:int,name:string,salary:double>"
val orcRowInput = new OrcRowInputFormat("s3a://NotApplicable", Schema, config)
val fileRead = en.readFile(orcRowInput, "s3a://bucket/folder/", FileProcessingMode.PROCESS_CONTINUOUSLY, TimeUnit.HOURS.toMillis(24)).name("s3a://bucket/folder/").uid("s3a://bucket/folder/")
env.execute("Flink APP")
当我们在 Flink 1.13.5 版本集群中以 jar 形式执行上述应用程序时,我们面临以下异常,
运行失败,失败原因:java.lang.NoClassDefFoundError:org/apache/hadoop/fs/FileSystem 在 java.lang.ClassLoader.defineClass(本机方法)
我们的 fat jar 本身中有以下依赖项 jar。
此外,我们在下面的文件夹结构中的 flink 集群的所有节点中都有 presto 和 flink-s3 hadoop jar,
我们的 Flink 集群中的 /etc/hadoop/conf 文件夹下没有安装 hadoop 包。并且,我们的 Flink 集群中也没有设置 HADOOP_HOME。我们是否需要在 Flink 集群中安装 hadoop 才能使用上述代码从 S3 读取 ORC 文件?或者我们错过了什么吗?请帮助我们纠正这个问题。预先感谢。
您不能将 Hadoop 包含在 fat JAR 中;需要在启动 Flink 集群之前加载它。如果您想将 Flink 与 Hadoop 结合使用,则需要包含 Hadoop 依赖项的 Flink 设置,而不是将 Hadoop 添加为应用程序依赖项。 Flink 将使用 HADOOP_CLASSPATH 环境变量指定的 Hadoop 依赖项,可以通过以下方式设置:
export HADOOP_CLASSPATH=`hadoop classpath`