代码是
Dataset<Row> mainData=df.select( "data.*").filter("data.eventdesc='logout'");
Dataset<Row> groupByData = mainData.groupBy("ipaddress1").count().filter("count > 1");
mainData.filter(mainData.col("ipaddress1").contains(groupByData.col("ipaddress1")));
主要数据输出是
+-------+----------+------------+---------+---------------------+-----------+
|id |resource id|resource name|event-desc|event-date |ipaddress1 |
+-------+----------+------------+---------+---------------------+-----------+
|2010001|119 |Netopia |logout |+56975-05-07 23:01:37|25:34:21:44|
|2010001|119 |Netopia |logout |+56975-05-07 23:01:37|25:34:21:44|
|2010001|119 |Netopia |logout |+56975-05-07 23:01:37|25:34:21:45|
|2010001|119 |Netopia |logout |+56975-05-07 23:01:37|25:34:21:45|
|2010001|119 |Netopia |logout |+56975-05-07 23:01:37|25:34:21:44|
|2010001|119 |Netopia |logout |+56975-05-07 23:01:37|25:34:21:46|
+-------+----------+------------+---------+---------------------+-----------+
按数据分组是
+-----------+-----+
|ipaddress1 |count|
+-----------+-----+
|25:34:21:45|2 |
|25:34:21:44|3 |
+-----------+-----+
我需要过滤上面的组数据代码中存在的主要数据,但它没有按预期工作,任何人都可以建议任何可能的解决方案吗?
您正在尝试根据 groupByData 的结果过滤 mainData。但是,Spark 不支持像 SQL 连接那样使用另一个数据集的列直接过滤数据集。
为了实现你的目标,你需要在 ipaddress1 列上将 mainData 与 groupByData 连接起来。
import org.apache.spark.sql.Dataset;
import org.apache.spark.sql.Row;
import org.apache.spark.sql.SparkSession;
public class FilterMainData {
public static void main(String[] args) {
SparkSession spark = SparkSession.builder()
.appName("Filter Main Data")
.getOrCreate();
// Example of loading the data
Dataset<Row> df = ...; // Load your DataFrame
// Extract and filter the main data
Dataset<Row> mainData = df.selectExpr("data.*")
.filter("eventdesc = 'logout'");
// Group by ipaddress1 and count, then filter for count > 1
Dataset<Row> groupByData = mainData.groupBy("ipaddress1")
.count()
.filter("count > 1");
// Join mainData with groupByData on ipaddress1
Dataset<Row> filteredData = mainData.join(
groupByData,
mainData.col("ipaddress1").equalTo(groupByData.col("ipaddress1")),
"inner"
).select(mainData.col("*")); // Select only columns from mainData
// Show the filtered data
filteredData.show();
}
}