我有来自kafka简单cql选择查询的StreamExecutionEnvironment作业。我尝试使用以下代码异步处理此查询:
公共类GenericCassandraReader扩展了RichAsyncFunction {
private static final Logger logger = LoggerFactory.getLogger(GenericCassandraReader.class);
private ExecutorService executorService;
private final Properties props;
private Session client;
public ExecutorService getExecutorService() {
return executorService;
}
public GenericCassandraReader(Properties props, ExecutorService executorService) {
super();
this.props = props;
this.executorService = executorService;
}
@Override
public void open(Configuration parameters) throws Exception {
client = Cluster.builder().addContactPoint(props.getProperty("cqlHost"))
.withPort(Integer.parseInt(props.getProperty("cqlPort"))).build()
.connect(props.getProperty("keyspace"));
}
@Override
public void close() throws Exception {
client.close();
synchronized (GenericCassandraReader.class) {
try {
if (!getExecutorService().awaitTermination(1000, TimeUnit.MILLISECONDS)) {
getExecutorService().shutdownNow();
}
} catch (InterruptedException e) {
getExecutorService().shutdownNow();
}
}
}
@Override
public void asyncInvoke(final UserDefinedType input, final AsyncCollector<ResultSet> asyncCollector) throws Exception {
getExecutorService().submit(new Runnable() {
@Override
public void run() {
ListenableFuture<ResultSet> resultSetFuture = client.executeAsync(input.query);
Futures.addCallback(resultSetFuture, new FutureCallback<ResultSet>() {
public void onSuccess(ResultSet resultSet) {
asyncCollector.collect(Collections.singleton(resultSet));
}
public void onFailure(Throwable t) {
asyncCollector.collect(t);
}
});
}
});
}
}
此代码的每个响应都为Cassandra ResultSet提供了不同数量的字段。
在Flink中处理Cassandra ResultSet的任何想法还是应该使用其他技术来实现我的目标?
在此先感谢您的帮助!
Cassandra ResultSet
不是线程安全的。最好尝试使用Flink Cassandra connector。或者至少以类似的方式编写您的实现