我需要扫描包含生产中1亿条记录的表。搜索将在第一个聚类关键字上进行。要求是找到第一个聚类键与条件匹配的唯一分区键。该表如下所示-
员工姓名,公司名称,lastdateloggedin,现场发言,swipetimestamp
分区键-员工编号群集密钥-公司名称,lastdateloggedin
我想选择公司名称='XYZ'的不同(employeeid),company,swipetimestamp。这是我想从表中获取的内容的SQL表示形式。
SparkConf conf = new SparkConf().set("spark.cassandra.connection.enabled", "true")
.set("spark.cassandra.auth.username", "XXXXXXXXXX")
.set("spark.cassandra.auth.password", "XXXXXXXXX")
.set("spark.cassandra.connection.host", "hostname")
.set("spark.cassandra.connection.port", "29042")
.set("spark.cassandra.connection.factory", ConnectionFactory.class)
.set("spark.cassandra.connection.cluster_name", "ZZZZ")
.set("spark.cassandra.connection.application_name", "ABC")
.set("spark.cassandra.connection.local_dc", "DC1")
.set("spark.cassandra.connection.cachedClusterFile", "/tmp/xyz/test.json")
.set("spark.cassandra.connection.ssl.enabled", "true")
.set("spark.cassandra.input.fetch.size_in_rows","10000") //
.set("spark.driver.allowMultipleContexts","true")
.set("spark.cassandra.connection.ssl.trustStore.path", "sampleabc-spark-util/src/main/resources/x.jks")
.set("spark.cassandra.connection.ssl.trustStore.password", "cassandrasam");
CassandraJavaRDD<CassandraRow> ctable = javaFunctions(jsc).cassandraTable("keyspacename", "employeedetails").
select("employeeid", "companyname","swipetimestamp").where("companyname= ?","XYZ");
List<CassandraRow> cassandraRows = ctable.distinct().collect();
此代码在非生产环境中运行,具有近500万个数据。由于这是生产环境,因此我想谨慎处理此查询。问题-
我建议使用Dataframe API代替RDD-从理论上讲,SCC可以对该API进行更多优化。如果在第一个聚类列上有条件,则此条件应由SCC向下推至Cassandra,然后将在此处进行过滤。您可以通过在数据帧上使用.expalin
并在*
部分中检查是否有标有PushedFilters
的规则来进行检查。
关于配置-使用spark.cassandra.input.fetch.size_in_rows
的默认版本-如果您的值太高,则超时的机会更大。您仍然可以关闭具有默认值的节点,因为SCC正在读取LOCAL_ONE
,并且该节点使单个节点过载。有时,使用LOCAL_QUORUM
读取会更快,因为它不会使单个节点过载过多,也不会重新启动正在读取数据的任务。
并且我建议确保您使用的是最新的Spark Cassandra Connector-2.5.0-it has a lot of new optimizations and new functionality ...