我有3列的cassandra表。
id text,
value text,
mappings map<text,text>
可以说样本数据如下:
id | value | mappings
-----------------------------------------------
1ABC | xyz | {"a":"abc","b":"bcd"}
在一个火花作业中,我为id 1ABC
的b
映射计算了一个新值,作为HashMap Ex: "b":"xyz"
(可以将映射转换为JavaRDD)
如何使用cassandra java spark连接器将此值附加(覆盖)到表中?我一直在看this如何处理CQL集合追加的示例,但似乎无法弄清楚如何在Java中执行此操作。任何指针将不胜感激。
通过传递新参数或使用Spark会话中的参数来创建cassandra连接器。
import com.datastax.spark.connector.cql.CassandraConnector;
CassandraConnector connector = CassandraConnector.apply(spark.sparkContext().conf()); // or pass different values for spark.cassandra.connection.host, username and password
rdd.foreach(new VoidFunction<TestBean>() {
@Override
public void call(TestBean t) throws Exception {
final String id = t.getId();
final Map<String, String> mappings = t.getMappings();
boolean isUpdated = connector.withSessionDo(new AbstractFunction1<Session, Boolean>() {
@Override
public Boolean apply(Session v1) {
ResultSet updateResultSet = v1.execute(v1.prepare("update test set mappings = mappings + ? where id = ?")
.setConsistencyLevel(ConsistencyLevel.LOCAL_QUORUM)
.bind(mappings, id));
return updateResultSet.wasApplied();
}
});
}
});