在Spark Streaming应用程序中联接数据的最佳方法是什么?

问题描述 投票:1回答:1

问题:从本质上讲,它不是为每个流记录运行C *表的联接,反而是有针对火花流中的每个记录的微批处理(微批处理)运行联接吗?] >

我们几乎最终确定要使用spark-sql 2.4.x版本,对于Cassandra-3.x版本使用datastax-spark-cassandra-connector。

但是在以下情况下,有一个关于效率的基本问题。

对于流数据记录(即streamingDataSet),我需要从Cassandra(C *)表中查找现有记录(即cassandraDataset)。

Dataset<Row> streamingDataSet = //kafka read dataset

Dataset<Row> cassandraDataset= //loaded from C* table those records loaded earlier from above.

要查找我需要加入以上数据集的数据

Dataset<Row> joinDataSet = cassandraDataset.join(cassandraDataset).where(//somelogic)

进一步处理joinDataSet以实现业务逻辑...

在上述情况下,我的理解是,对于收到的每条记录从kafka流中,它将查询C *表,即数据库调用。

如果C *表包含以下内容,则不会花费大量时间和网络带宽数十亿条记录?方法/程序应该是什么然后改进查找C *表?

在这种情况下最好的解决方案是什么?我无法从加载一次C *表并在数据不断添加到C *表中时进行查找...即新的查找可能需要新保存的数据。

如何处理这种情况?任何建议请..

问题:从本质上讲,这意味着,不是为每个流记录运行C *表的联接,而是为火花中的每个记录的微批处理(微批处理)运行联接...

apache-spark cassandra apache-spark-sql spark-streaming datastax-enterprise
1个回答
2
投票

[如果您使用的是Apache Cassandra,则只有一种可能有效地通过RDD API's joinWithCassandraTable连接Cassandra中的数据。 Spark Cassandra连接器(SCC)的开源版本仅支持它,而在DSE版本中,有一个代码可以对Cassandra执行有效的连接,也适用于Spark SQL-称为joinWithCassandraTable。如果您在Spark SQL中针对Cassandra表使用DSE Direct Join,Spark将需要从Cassandra中读取所有数据,然后执行连接-这非常慢。

© www.soinside.com 2019 - 2024. All rights reserved.