我有一个 HBase MapReduce Bulkload 应用程序,它包含一个自定义的
MyMapper
类,它有一个静态字段 parser
在应用程序运行期间使用,当我配置作业时,我使用 config
方法初始化静态领域parser
.
但是当作业运行时,注释行抛出空指针异常,好像作业提交到Yarn后,静态字段
parser
变为null.
这是Mapper代码,hadoop的版本是2.7.7.
public class MyMapper extends Mapper<LongWritable, Text, ImmutableBytesWritable, Put> {
private static StringParser parser;
public static void config(StringParser parser) {
MyMapper.parser = parser;
}
@Override
protected void map(LongWritable key, Text value, Context context)
throws IOException, InterruptedException {
String lineValue = value.toString();
String output;
try {
// null pointer exception this line.
output = parser.parse(lineValue);
context.write(new ImmutableBytesWritable(..., ...);
} catch (ParseException e) {
e.printStackTrace();
}
}
}
这里是关于作业提交的代码:
Job job = Job.getInstance(conf, "Batch Import HBase Table:" + tableName);
job.setJarByClass(TextBulkLoadDriver.class);
FileInputFormat.setInputPaths(job, inPath);
// Config Mapper related content, here I set the static field in MyMapper class.
MyMapper.config(parser);
Class<MyMapper> cls = MyMapper.class;
job.setMapperClass(cls);
job.setMapOutputKeyClass(ImmutableBytesWritable.class);
job.setMapOutputValueClass(Put.class);
job.setNumReduceTasks(1);
job.setReducerClass(PutSortReducer.class);
RegionLocator locator = instance.getConnection().getRegionLocator(TableName.valueOf(tableName));
try (Admin admin = instance.getAdmin(); Table table = instance.getTable(tableName)) {
HFileOutputFormat2.configureIncrementalLoad(job, table, locator);
HFileOutputFormat2.setOutputPath(job, outPath);
// run the job
job.waitForCompletion(true);
logger.info("HFileOutputFormat2 file ready on {}", outPath);
LoadIncrementalHFiles loader = new LoadIncrementalHFiles(conf);
loader.doBulkLoad(outPath, admin, table, locator);
} catch (Exception e) {
throw new RuntimeException(e);
}
TIA 所有建议!
静态变量不发送到MapReduce中的分布式数据处理。这些变量仅存储在
jobTracker
运行的内存中,而不存储在执行节点中。
Yarn 通过序列化任务并将其发送到处理节点来将任务分发到节点。静态方法
config
不会在每个节点都被评估,因此使 parser
对象为空。
如果要初始化静态变量,可能需要序列化对象并将其发送到每个映射器。