我刚刚开始研究 Spark 结构化流并提出了一个实现问题。
所以我正在使用 Apache Pulsar 来流数据,并且想知道是否可以在同一个程序中运行不同的查询,并且要么连接结果,要么将一个查询的结果提供给另一个查询,而不将结果放入另一个查询中主题或汇点。
举个例子,对于模式,
root
|-- amount: long (nullable = true)
|-- cardNum: string (nullable = true)
|-- category: string (nullable = true)
|-- merchant: string (nullable = true)
|-- ts: long (nullable = true)
|-- userId: string (nullable = true)
|-- __key: binary (nullable = true)
|-- __topic: string (nullable = true)
|-- __messageId: binary (nullable = true)
|-- __publishTime: timestamp (nullable = true)
|-- __eventTime: timestamp (nullable = true)
|-- __messageProperties: map (nullable = true)
| |-- key: string
| |-- value: string (valueContainsNull = true)
处理器代码
public class CardTransactionStreamProcessor {
public static final String PULSAR_SERVICE_URL = "pulsar://127.0.0.1:6650";
public static final String TOPIC = "spark/tutorial/card-txn";
public static final String SUB = "my-sub";
public static void main(String[] args) throws TimeoutException, StreamingQueryException, InterruptedException {
SparkSession sparkSession = SparkAppUtil.getSparkSession("card-txn-stream");
sparkSession.sparkContext().setLogLevel("error");
Dataset<Row> lines = sparkSession.readStream()
.format("pulsar")
.option("service.url", PULSAR_SERVICE_URL)
.option("topic", TOPIC)
.option("startingOffsets", "earliest")
.load();
lines.printSchema();
Dataset<CardTransactionDTO> cardTransactionDTODataset = lines.as(ExpressionEncoder.javaBean(CardTransactionDTO.class));
cardTransactionDTODataset.printSchema();
// Top spends merchant vise
cardTransactionDTODataset.groupBy("merchant")
.agg(sum("amount").alias("amount"))
.sort("merchant")
.writeStream().outputMode("complete")
.format("console")
.start();
// Top spends category vise
cardTransactionDTODataset.groupBy("category")
.agg(sum("amount").alias("amount"))
.sort("category")
.writeStream().outputMode("complete")
.format("console").start();
sparkSession.streams().awaitAnyTermination();
}
}
在上面的示例中,我想了解,如何获取第一个查询的输出并将其提供给第二个查询,或者将两个查询的结果连接形成一个 DataFrame,我可以将其接收,说脉冲星主题本身。
是的,可以使用
Dataset<T>
。
如果您想合并
Dataset<T>
中两个查询的输出,可以使用数据集中的 union
方法进行合并。
尝试这个更新的代码:
package org.example;
import org.apache.spark.sql.Dataset;
import org.apache.spark.sql.Row;
import org.apache.spark.sql.SparkSession;
import org.apache.spark.sql.catalyst.encoders.ExpressionEncoder;
import org.apache.spark.sql.streaming.DataStreamWriter;
import org.apache.spark.sql.streaming.StreamingQuery;
import org.apache.spark.sql.streaming.StreamingQueryException;
import static org.apache.spark.sql.functions.sum;
public class CardTransactionStreamProcessor {
public static final String PULSAR_SERVICE_URL = "pulsar://127.0.0.1:6650";
public static final String TOPIC = "spark/tutorial/card-txn";
public static final String SUB = "my-sub";
public static void main(String[] args) throws TimeoutException, StreamingQueryException, InterruptedException {
SparkSession sparkSession = SparkAppUtil.getSparkSession("card-txn-stream");
sparkSession.sparkContext().setLogLevel("error");
Dataset<Row> lines = sparkSession.readStream().format("pulsar").option("service.url", PULSAR_SERVICE_URL)
.option("topic", TOPIC).option("startingOffsets", "earliest").load();
lines.printSchema();
// Top spends merchant vise
Dataset<Row> spendsByMerchantWise = lines.groupBy("merchant").agg(sum("amount").alias("amount"))
.sort("merchant");
// Top spends category vise
Dataset<Row> spendsByCatgoryWise = lines.groupBy("category").agg(sum("amount").alias("amount"))
.sort("category");
// union of rows from both Data sets.
Dataset<Row> union = spendsByMerchantWise.union(spendsByCatgoryWise);
// convert to Dataset<CardTransactionDTO>
Dataset<CardTransactionDTO> cardTransactionDTODataset = union
.as(ExpressionEncoder.javaBean(CardTransactionDTO.class));
cardTransactionDTODataset.writeStream().outputMode("complete").format("console").start();
sparkSession.streams().awaitAnyTermination();
}
}