我想删除数据集的重复值。
前
+----------+---------------+--------------------+--------------------+---------+----+-------------+
| e_key|f_timestamp_day| key| value|f_country|f_os|received_date|
+----------+---------------+--------------------+--------------------+---------+----+-------------+
| Tryout| 2020-04-01| item_guid_list| a^a^a^b | FR| iOS| 2020-04-01|
| Tryout| 2020-04-01| sku_list| c^c^d^e^f^f| FR| iOS| 2020-04-01|
至
+----------+---------------+--------------------+--------------------+---------+----+-------------+
| e_key|f_timestamp_day| key| value|f_country|f_os|received_date|
+----------+---------------+--------------------+--------------------+---------+----+-------------+
| Tryout| 2020-04-01| item_guid_list| a | FR| iOS| 2020-04-01|
| Tryout| 2020-04-01| item_guid_list| b | FR| iOS| 2020-04-01|
| Tryout| 2020-04-01| sku_list| c | FR| iOS| 2020-04-01|
| Tryout| 2020-04-01| sku_list| d | FR| iOS| 2020-04-01|
| Tryout| 2020-04-01| sku_list| e | FR| iOS| 2020-04-01|
| Tryout| 2020-04-01| sku_list| f | FR| iOS| 2020-04-01|
不过
当我使用flatmap
结果是
++
||
++
||
||
||
||
||
我的代码是
StructType structType = new StructType();
structType.add("e_key", DataTypes.StringType);
structType.add("f_timestamp_day", DataTypes.StringType);
structType.add("key", DataTypes.StringType);
structType.add("value", DataTypes.StringType);
structType.add("f_country", DataTypes.StringType);
structType.add("f_os", DataTypes.StringType);
structType.add("received_date", DataTypes.StringType);
Dataset<Row> drop_duplicate_feature =
explode_events.flatMap(
(FlatMapFunction<Row, Row>)row->{
List<Row> list = new ArrayList<Row>();
String value = row.getString(3);
String[] array_of_value = value.split("\\^");
array_of_value = new HashSet<String>(Arrays.asList(array_of_value)).toArray(new String[0]);
for(int index = 0; index < array_of_value.length; index++){
list.add(
RowFactory.create(row.get(0),row.get(1),row.get(2),array_of_value[index],row.get(4),row.get(5),row.get(6))
);
}
return list.iterator();
}
, RowEncoder.apply(structType)
);
我使用flatmap生成不同的行,并将其添加到列表中。
为什么RowEncoder.apply()不工作?