我有一个数据帧,包括发送方(id,int),接收方(id,int),通信时间(int)。
A B C
1 5 10
1 6 20
1 7 20
1 8 11
我的目标是找到最大通信时间并返回1 6,20(格式为A B,C)由于A1,B6和A1,B7的最大通讯时间均为20,因此我只需要保持最小的B id编号即可。
在映射步骤中,我已经将A分隔为键,将(B,C)分隔为值。
到目前为止,我可以返回A和最大C的输出,但是我很难返回B的值。我下面的代码无法更改min_Receiver,如何解决此问题?
public static class IntSumReducer
extends Reducer<Text,Text,Text,Text> {
//private IntWritable result = new IntWritable();
public void reduce(Text key, Iterable<Text> values,
Context context
) throws IOException, InterruptedException {
int max_val = 0;
int val_str = 0;
int val_str_1 = 0;
int min_Receiver = Integer.MAX_VALUE;
int tempReceiver = 0;
for (Text val : values) {
String[] compositeString = val.toString().split(",");
val_str = Integer.parseInt(compositeString[1]);
//tempReceiver = Integer.parseInt(compositeString[0]);
if( val_str>max_val) {
max_val = val_str;
}
}
for (Text val_1 : values){
String[] compositeString = val_1.toString().split(",");
tempReceiver = Integer.parseInt(compositeString[0]);
val_str_1 = Integer.parseInt(compositeString[1]);
if (val_str_1 == max_val && tempReceiver < min_Receiver)
{
min_Receiver =tempReceiver;
}
}
//result.set(max_val);
context.write(key, new Text(min_Receiver + "," + max_val));}}
预期输出为
1 6,20
实际输出是
1 2147483647,20
在地图上,我已经将A作为键,将B,C作为值。因此,compositeString包含两个变量。值的格式为B,C。
取决于您的分度数
使用这样的行以得到最长时间的Text
Text answer = StreamSupport.stream(values.spliterator(),false) //all this does is get you a stream of Text
.max(Comparator.comparingInt(s->getComTime(s))) // return the object that evaluates to the max value
.orElse(""); // the stream was empty
以及创建从这样的字符串/文本获取通信时间的方法:
private static int getComTime(Text line){
String[] vals = line.toString().split(",");
return Integer.parseInt(vals[2]);
}
。stream()的布尔选项是,如果您要顺序= false
或并行= true
...。如果分隔符不同或对象稍有不同,则可能需要调整getComTime
但这应该非常接近正确。
answer
将保存生成最大值的特定Text
。如果流为空,则返回一个空字符串。或删除.orElse()并获得一个Optional,如果您想这样处理的话。
那么你就可以做
if(!answer.isEmpty()){/* got an answer do something with it */}