问题:从本质上讲,它不是为每个流记录运行C *表的联接,反而是有针对火花流中的每个记录的微批处理(微批处理)运行联接吗?] >
我们几乎最终确定要使用spark-sql 2.4.x版本,对于Cassandra-3.x版本使用datastax-spark-cassandra-connector。
但是在以下情况下,有一个关于效率的基本问题。
对于流数据记录(即streamingDataSet),我需要从Cassandra(C *)表中查找现有记录(即cassandraDataset)。
即
Dataset<Row> streamingDataSet = //kafka read dataset Dataset<Row> cassandraDataset= //loaded from C* table those records loaded earlier from above.
要查找我需要加入以上数据集的数据
即
Dataset<Row> joinDataSet = cassandraDataset.join(cassandraDataset).where(//somelogic)
进一步处理joinDataSet以实现业务逻辑...
在上述情况下,我的理解是,对于收到的每条记录从kafka流中,它将查询C *表,即数据库调用。
如果C *表包含以下内容,则不会花费大量时间和网络带宽数十亿条记录?方法/程序应该是什么然后改进查找C *表?
在这种情况下最好的解决方案是什么?我无法从加载一次C *表并在数据不断添加到C *表中时进行查找...即新的查找可能需要新保存的数据。
如何处理这种情况?任何建议请..
问题:从本质上讲,这意味着,不是为每个流记录运行C *表的联接,而是为火花中的每个记录的微批处理(微批处理)运行联接...
[如果您使用的是Apache Cassandra,则只有一种可能有效地通过RDD API's joinWithCassandraTable
连接Cassandra中的数据。 Spark Cassandra连接器(SCC)的开源版本仅支持它,而在DSE版本中,有一个代码可以对Cassandra执行有效的连接,也适用于Spark SQL-称为joinWithCassandraTable
。如果您在Spark SQL中针对Cassandra表使用DSE Direct Join,Spark将需要从Cassandra中读取所有数据,然后执行连接-这非常慢。