我有一个五节点集群,其中三个节点包含DataNodes和TaskTrackers。
我通过Sqoop从Oracle导入了大约1000万行,并在Oozie工作流程中通过MapReduce进行处理。
MapReduce作业大约需要30分钟,并且只使用一个reducer。
编辑 - 如果我自己运行MapReduce代码,与Oozie分开,job.setNumReduceTasks(4)
正确建立了4个reducer。
我尝试了以下方法手动将reducers的数量设置为4,但没有成功:
在Oozie中,在map reduce节点的标记中设置以下属性:
<property><name>mapred.reduce.tasks</name><value>4</value></property>
在MapReduce java代码的Main方法中:
Configuration conf = new Configuration();
Job job = new Job(conf, "10 million rows");
...
job.setNumReduceTasks(4);
我也尝试过:
Configuration conf = new Configuration();
Job job = new Job(conf, "10 million rows");
...
conf.set("mapred.reduce.tasks", "4");
我的map函数看起来类似于:
public void map(Text key, Text value, Context context) {
CustomObj customObj = new CustomObj(key.toString());
context.write(new Text(customObj.getId()), customObj);
}
我认为ID有80,000个不同的值。
我的Reduce函数看起来类似于:
public void reduce(Text key, Iterable<CustomObj> vals, Context context) {
OtherCustomObj otherCustomObj = new OtherCustomObj();
...
context.write(null, otherCustomObj);
}
Mapper中发出的自定义对象实现了WritableComparable,但Reducer中发出的另一个自定义对象没有实现WritableComparable。
以下是有关系统计数器,作业计数器和map-reduce框架的日志,其中指定仅启动了一个reduce任务。
map 100% reduce 100%
Job complete: job_201401131546_0425
Counters: 32
File System Counters
FILE: Number of bytes read=1370377216
FILE: Number of bytes written=2057213222
FILE: Number of read operations=0
FILE: Number of large read operations=0
FILE: Number of write operations=0
HDFS: Number of bytes read=556345690
HDFS: Number of bytes written=166938092
HDFS: Number of read operations=18
HDFS: Number of large read operations=0
HDFS: Number of write operations=1
Job Counters
Launched map tasks=11
Launched reduce tasks=1
Data-local map tasks=11
Total time spent by all maps in occupied slots (ms)=1268296
Total time spent by all reduces in occupied slots (ms)=709774
Total time spent by all maps waiting after reserving slots (ms)=0
Total time spent by all reduces waiting after reserving slots (ms)=0
Map-Reduce Framework
Map input records=9440000
Map output records=9440000
Map output bytes=666308476
Input split bytes=1422
Combine input records=0
Combine output records=0
Reduce input groups=80000
Reduce shuffle bytes=685188530
Reduce input records=9440000
Reduce output records=2612760
Spilled Records=28320000
CPU time spent (ms)=1849500
Physical memory (bytes) snapshot=3581157376
Virtual memory (bytes) snapshot=15008251904
Total committed heap usage (bytes)=2848063488
编辑:我修改了MapReduce以引入自定义分区器,排序比较器和分组比较器。出于某种原因,代码现在启动两个减速器(通过Oozie安排),但不是四个。
我在每个TaskTracker(和JobTracker)上将mapred.tasktracker.map.tasks.maximum
属性设置为20,重新启动它们但没有结果。
作为起点,mapred-site.xml中以下属性的值是什么
<property>
<name>mapred.tasktracker.map.tasks.maximum</name>
<value>4</value>
</property>