如何创建Cassandra ITrigger析构函数?

问题描述 投票:0回答:1

我正在使用实现ITrigger接口的触发器jar将数据从Cassandra发送到Kafka。

为了确保仅在某些服务器可用时才尝试向Kafka发送消息(以避免超时)-我产生了一个新线程,该线程尝试以设置的频率连接到Kafka,并在出现以下情况时更新布尔值:状态更改。

在我的Cassandra触发器的构造函数中,我为此Kafka检查器产生了一个新线程。

        // Start the background thread to monitor apache health
         b_array = cr.get_broker_array();
         bt = new background_thread(b_array, ms, lLog);
         bt.start();

我的问题是,没有任何事件可以让我知道何时重新加载主触发器。

如果触发器重新加载了

./nodetool reloadtriggers

旧线程一直在后台运行,并且不会被清除。我已经看到了一些参考资料,即使用finalize()是一个坏主意,不应该这样做。

当前刷新触发器的唯一方法实际上是重新启动Cassandra节点-这是错误的。

将对如何实现或实现这一目标的任何投入表示赞赏。

java multithreading cassandra apache-kafka triggers
1个回答
0
投票

这是我解决问题的方法。

  • 跟踪在后台线程上收到的最后一条消息的时间。
  • 如果新消息在15分钟内没有到达,我将在每个循环中将kafka运行状况检查速度降低30秒。
  • 如果有4个小时没有收到新消息。我将退出运行状况检查循环。每次重新加载触发器时,都会启动触发器的新实例并启动新线程。 较旧的触发器/线程实例不会收到新消息。因此超时等待将杀死后台僵尸线程(这是我最初的问题)。
  • [处理新消息时,我检查了后台线程的状态是否为TERMINATED,并在这种情况下启动了一个新的后台线程。 (使用写锁定来确保并发消息不会启动多个线程)。
© www.soinside.com 2019 - 2024. All rights reserved.