Spark Java 结构化流数据集过滤器

问题描述 投票:0回答:1

代码是

Dataset<Row> mainData=df.select( "data.*").filter("data.eventdesc='logout'");
Dataset<Row> groupByData = mainData.groupBy("ipaddress1").count().filter("count > 1");
mainData.filter(mainData.col("ipaddress1").contains(groupByData.col("ipaddress1")));

主要数据输出是

+-------+----------+------------+---------+---------------------+-----------+
|id     |resource id|resource name|event-desc|event-date       |ipaddress1  |
+-------+----------+------------+---------+---------------------+-----------+
|2010001|119       |Netopia     |logout   |+56975-05-07 23:01:37|25:34:21:44|
|2010001|119       |Netopia     |logout   |+56975-05-07 23:01:37|25:34:21:44|
|2010001|119       |Netopia     |logout   |+56975-05-07 23:01:37|25:34:21:45|
|2010001|119       |Netopia     |logout   |+56975-05-07 23:01:37|25:34:21:45|
|2010001|119       |Netopia     |logout   |+56975-05-07 23:01:37|25:34:21:44|
|2010001|119       |Netopia     |logout   |+56975-05-07 23:01:37|25:34:21:46|
+-------+----------+------------+---------+---------------------+-----------+

按数据分组是

+-----------+-----+
|ipaddress1  |count|
+-----------+-----+
|25:34:21:45|2    |
|25:34:21:44|3    |
+-----------+-----+

我需要过滤上面的组数据代码中存在的主要数据,但它没有按预期工作,任何人都可以建议任何可能的解决方案吗?

java apache-spark spark-structured-streaming
1个回答
0
投票

您正在尝试根据 groupByData 的结果过滤 mainData。但是,Spark 不支持像 SQL 连接那样使用另一个数据集的列直接过滤数据集。

为了实现你的目标,你需要在 ipaddress1 列上将 mainData 与 groupByData 连接起来。

import org.apache.spark.sql.Dataset;
import org.apache.spark.sql.Row;
import org.apache.spark.sql.SparkSession;

public class FilterMainData {
    public static void main(String[] args) {
        SparkSession spark = SparkSession.builder()
                .appName("Filter Main Data")
                .getOrCreate();

        // Example of loading the data
        Dataset<Row> df = ...;  // Load your DataFrame

        // Extract and filter the main data
        Dataset<Row> mainData = df.selectExpr("data.*")
                                  .filter("eventdesc = 'logout'");

        // Group by ipaddress1 and count, then filter for count > 1
        Dataset<Row> groupByData = mainData.groupBy("ipaddress1")
                                           .count()
                                           .filter("count > 1");

        // Join mainData with groupByData on ipaddress1
        Dataset<Row> filteredData = mainData.join(
                groupByData,
                mainData.col("ipaddress1").equalTo(groupByData.col("ipaddress1")),
                "inner"
        ).select(mainData.col("*")); // Select only columns from mainData

        // Show the filtered data
        filteredData.show();
    }
}
© www.soinside.com 2019 - 2024. All rights reserved.