将一个查询的结果馈送到同一 Spark 结构化流应用程序中的另一个查询

问题描述 投票:0回答:1

我刚刚开始研究 Spark 结构化流并提出了一个实现问题。

所以我正在使用 Apache Pulsar 来流数据,并且想知道是否可以在同一个程序中运行不同的查询,并且要么连接结果,要么将一个查询的结果提供给另一个查询,而不将结果放入另一个查询中主题或汇点。

举个例子,对于模式,

root
 |-- amount: long (nullable = true)
 |-- cardNum: string (nullable = true)
 |-- category: string (nullable = true)
 |-- merchant: string (nullable = true)
 |-- ts: long (nullable = true)
 |-- userId: string (nullable = true)
 |-- __key: binary (nullable = true)
 |-- __topic: string (nullable = true)
 |-- __messageId: binary (nullable = true)
 |-- __publishTime: timestamp (nullable = true)
 |-- __eventTime: timestamp (nullable = true)
 |-- __messageProperties: map (nullable = true)
 |    |-- key: string
 |    |-- value: string (valueContainsNull = true)

处理器代码

public class CardTransactionStreamProcessor {

    public static final String PULSAR_SERVICE_URL = "pulsar://127.0.0.1:6650";
    public static final String TOPIC = "spark/tutorial/card-txn";
    public static final String SUB = "my-sub";

    public static void main(String[] args) throws TimeoutException, StreamingQueryException, InterruptedException {

        SparkSession sparkSession = SparkAppUtil.getSparkSession("card-txn-stream");

        sparkSession.sparkContext().setLogLevel("error");

        Dataset<Row> lines = sparkSession.readStream()
                .format("pulsar")
                .option("service.url", PULSAR_SERVICE_URL)
                .option("topic", TOPIC)
                .option("startingOffsets", "earliest")
                .load();

        lines.printSchema();

        Dataset<CardTransactionDTO> cardTransactionDTODataset = lines.as(ExpressionEncoder.javaBean(CardTransactionDTO.class));

        cardTransactionDTODataset.printSchema();

        // Top spends merchant vise
        cardTransactionDTODataset.groupBy("merchant")
                .agg(sum("amount").alias("amount"))
                .sort("merchant")
                .writeStream().outputMode("complete")
                .format("console")
                .start();

        // Top spends category vise
        cardTransactionDTODataset.groupBy("category")
                .agg(sum("amount").alias("amount"))
                .sort("category")
                .writeStream().outputMode("complete")
                .format("console").start();

        sparkSession.streams().awaitAnyTermination();
    }
}

在上面的示例中,我想了解,如何获取第一个查询的输出并将其提供给第二个查询,或者将两个查询的结果连接形成一个 DataFrame,我可以将其接收,说脉冲星主题本身。

java apache-spark apache-spark-sql spark-structured-streaming apache-pulsar
1个回答
0
投票

是的,可以使用

Dataset<T>

如果您想合并

Dataset<T>
中两个查询的输出,可以使用数据集中的
union
方法进行合并。

尝试这个更新的代码:

package org.example;

import org.apache.spark.sql.Dataset;
import org.apache.spark.sql.Row;
import org.apache.spark.sql.SparkSession;
import org.apache.spark.sql.catalyst.encoders.ExpressionEncoder;
import org.apache.spark.sql.streaming.DataStreamWriter;
import org.apache.spark.sql.streaming.StreamingQuery;
import org.apache.spark.sql.streaming.StreamingQueryException;
import static org.apache.spark.sql.functions.sum;

public class CardTransactionStreamProcessor {

    public static final String PULSAR_SERVICE_URL = "pulsar://127.0.0.1:6650";
    public static final String TOPIC = "spark/tutorial/card-txn";
    public static final String SUB = "my-sub";

    public static void main(String[] args) throws TimeoutException, StreamingQueryException, InterruptedException {

        SparkSession sparkSession = SparkAppUtil.getSparkSession("card-txn-stream");

        sparkSession.sparkContext().setLogLevel("error");

        Dataset<Row> lines = sparkSession.readStream().format("pulsar").option("service.url", PULSAR_SERVICE_URL)
                .option("topic", TOPIC).option("startingOffsets", "earliest").load();

        lines.printSchema();

        // Top spends merchant vise
        Dataset<Row> spendsByMerchantWise = lines.groupBy("merchant").agg(sum("amount").alias("amount"))
                .sort("merchant");

        // Top spends category vise
        Dataset<Row> spendsByCatgoryWise = lines.groupBy("category").agg(sum("amount").alias("amount"))
                .sort("category");

        // union of rows from both Data sets.
        Dataset<Row> union = spendsByMerchantWise.union(spendsByCatgoryWise);

        // convert to Dataset<CardTransactionDTO>
        Dataset<CardTransactionDTO> cardTransactionDTODataset = union
                .as(ExpressionEncoder.javaBean(CardTransactionDTO.class));

        cardTransactionDTODataset.writeStream().outputMode("complete").format("console").start();

        sparkSession.streams().awaitAnyTermination();
    }
}
© www.soinside.com 2019 - 2024. All rights reserved.