我是使用镶木地板文件的新手,我想开发一个 mapreduce 作业,它使用以下 shcema 读取许多输入的镶木地板文件:
{
optional int96 dropoff_datetime;
optional float dropoff_latitude;
optional float dropoff_longitude;
optional int32 dropoff_taxizone_id;
optional float ehail_fee;
optional float extra;
optional float fare_amount;
optional float improvement_surcharge;
optional float mta_tax;
optional int32 passenger_count;
optional binary payment_type (UTF8);
optional int96 pickup_datetime;
optional float pickup_latitude;
optional float pickup_longitude;
optional int32 pickup_taxizone_id;
optional int32 rate_code_id;
optional binary store_and_fwd_flag (UTF8);
optional float tip_amount;
optional float tolls_amount;
optional float total_amount;
optional float trip_distance;
optional binary trip_type (UTF8);
optional binary vendor_id (UTF8);
required int64 trip_id;
}
我的工作目的是计算每天每小时的平均行程速度,所以我需要提取所有行程距离和接送时间来计算持续时间,然后计算速度,但是我在字段时出错
trip_distance
不存在:这是堆栈跟踪的一部分:
18/02/28 03:19:01 INFO mapreduce.Job: map 2% reduce 0%
18/02/28 03:19:10 INFO mapreduce.Job: Task Id : attempt_1519722054260_0016_m_000011_2, Status : FAILED
Error: java.lang.RuntimeException: not found 20(trip_distance) element number 0 in group:
dropoff_datetime: Int96Value{Binary{12 constant bytes, [0, 0, 0, 0, 0, 0, 0, 0, -116, 61, 37, 0]}}
payment_type: ""
pickup_datetime: Int96Value{Binary{12 constant bytes, [0, 120, 66, 9, 78, 72, 0, 0, 3, 125, 37, 0]}}
pickup_latitude: 40.7565
pickup_longitude: -73.9781
pickup_taxizone_id: 161
store_and_fwd_flag: ""
trip_type: "uber"
vendor_id: ""
trip_id: 4776003633207
at org.apache.parquet.example.data.simple.SimpleGroup.getValue(SimpleGroup.java:97)
at org.apache.parquet.example.data.simple.SimpleGroup.getValueToString(SimpleGroup.java:119)
at ParquetAssignmentSpeedAverageHours$ParquetMap.map(ParquetAssignmentSpeedAverageHours.java:48)
at ParquetAssignmentSpeedAverageHours$ParquetMap.map(ParquetAssignmentSpeedAverageHours.java:37)
at org.apache.hadoop.mapreduce.Mapper.run(Mapper.java:146)
at org.apache.hadoop.mapred.MapTask.runNewMapper(MapTask.java:787)
at org.apache.hadoop.mapred.MapTask.run(MapTask.java:341)
at org.apache.hadoop.mapred.YarnChild$2.run(YarnChild.java:170)
at java.security.AccessController.doPrivileged(Native Method)
at javax.security.auth.Subject.doAs(Subject.java:422)
at org.apache.hadoop.security.UserGroupInformation.doAs(UserGroupInformation.java:1866)
at org.apache.hadoop.mapred.YarnChild.main(YarnChild.java:164)
这是我的映射器类:
public static class ParquetMap extends Mapper<Text, Group, IntWritable, DoubleWritable> {
private DoubleWritable one = new DoubleWritable(1);
private IntWritable time = new IntWritable();
private DoubleWritable result = new DoubleWritable();
@Override
public void map(Text key, Group value, Context context) throws IOException, InterruptedException {
double duration;
double distance;
double speed;
Binary pickupTimestamp = value.getInt96("pickup_datetime", 0);
Binary dropoffTimestamp = value.getInt96("dropoff_datetime", 0);
if (value.getValueToString(20, 0) != null) { //the trip_distance field
distance = value.getFloat("trip_distance", 0);
} else {
distance = 0;
}
try {
if (!pickupTimestamp.equals(dropoffTimestamp)) {
duration = ((double)(getTimestampMillis(dropoffTimestamp) - getTimestampMillis(pickupTimestamp))/3600000);
speed = (float) (distance / duration);
result.set((speed));
Calendar cal = Calendar.getInstance();
cal.setTimeInMillis(getTimestampMillis(pickupTimestamp));
time.set(cal.get(Calendar.HOUR_OF_DAY));
context.write(time, result);
}
} catch (Exception e) {
e.printStackTrace();
}
}
}
有人可以帮忙吗? 谢谢,
这是一个运行时异常
java.lang.RuntimeException
;这基本上表明代码有问题。
public String getValueToString(int fieldIndex, int index)
方法内部调用了getValue(int fieldIndex, int index)
方法。
getValue(...)
的实现如下
private Object getValue(int fieldIndex, int index) {
List<Object> list;
try {
list = data[fieldIndex];
} catch (IndexOutOfBoundsException e) {
throw new RuntimeException("not found " + fieldIndex + "(" + schema.getFieldName(fieldIndex) + ") in group:\n" + this);
}
try {
return list.get(index);
} catch (IndexOutOfBoundsException e) {
throw new RuntimeException("not found " + fieldIndex + "(" + schema.getFieldName(fieldIndex) + ") element number " + index + " in group:\n" + this);
}
}
在这里,如果
fieldIndex
或 index
不存在,它会抛出 IndexOutOfBoundsException
,它被重新抛出为 RuntimeException
.
我可能建议不要直接调用
getValueToString(...)
,必须检查该字段是否存在。
由于您的数据集中所有字段都是可选的,因此使用固定
fieldIndex
查找它是不可靠的。在这种情况下,假设它存在并让它失败并使用 try-catch
块来检测不存在,然后设置一个默认值:
try{
distance = value.getFloat("trip_distance", 0);
}catch(RuntimeException e){
distance = 0;
}
您应该能够将值对象类型转换为 SimpleGroup,使用 getFieldRepetitionCount 获取值的大小,如果为 0,则值为空。
import org.apache.parquet.example.data.simple.SimpleGroup;
...
SimpleGroup sgValue = (SimpleGroup) value;
if (sgValue.getFieldRepetitionCount(20) != 0) { //the trip_distance field
distance = value.getFloat("trip_distance", 0);
} else {
distance = 0;
}