我正在使用 Redis 在 Java 中制作一个 Minecraft spigot 插件,以便在连接到同一 Redis 实例的多个不同服务器上广播自定义消息。我使用 https://github.com/InvisRaidinq/redstone 库来简化 jedis 函数的处理和编码。但是,如果在很长一段时间内没有通过通道发送消息,那么所有 jedis 订阅/onRecieve 逻辑都会停止工作并且不执行任何操作,这是有效的。我仍然能够向频道发布消息,并且它们确实会发送到 Redis 实例,但所有订阅逻辑什么也不做。
这就是我在代码中使用库来执行插件命令之一的方式:
AlertPacket.java
public class AlertPacket implements RedstonePacket {
private final String alert;
public AlertPacket(String alert) {
this.alert = alert;
}
@Override
public void onReceive() {
Bukkit.getScheduler().runTask(Main.getInstance(), () -> {
for (Player player : Bukkit.getOnlinePlayers()) {
player.sendMessage(alert);
}
});
}
}
AlertCommand.java
redstone.sendPacket(new AlertPacket(input));
Main.java
redstone = Redstone.builder()
.address(getConfig().getString("redis.address"))
.port(getConfig().getInt("redis.port"))
.withCredentials(AuthCredentials.of(getConfig().getString("redis.username"), getConfig().getString("redis.password")))
.channel("main-channel")
.build();
我还尝试使用原始库的分叉和更新的代码库:https://github.com/PedroMPagani/redstone但是我仍然遇到与没有分叉之前相同的问题。
当我尝试使用插件但订阅逻辑不起作用时,我在控制台中收到此错误:
[08:38:50 WARN]: Exception in thread "Thread-7" redis.clients.jedis.exceptions.JedisConnectionException: Unexpected end of stream.
[08:38:50 WARN]: at Main.jar//redis.clients.jedis.util.RedisInputStream.ensureFill(RedisInputStream.java:248)
[08:38:50 WARN]: at Main.jar//redis.clients.jedis.util.RedisInputStream.readByte(RedisInputStream.java:47)
[08:38:50 WARN]: at Main.jar//redis.clients.jedis.Protocol.process(Protocol.java:136)
[08:38:50 WARN]: at Main.jar//redis.clients.jedis.Protocol.read(Protocol.java:222)
[08:38:50 WARN]: at Main.jar//redis.clients.jedis.Connection.readProtocolWithCheckingBroken(Connection.java:350)
[08:38:50 WARN]: at Main.jar//redis.clients.jedis.Connection.getUnflushedObject(Connection.java:316)
[08:38:50 WARN]: at Main.jar//redis.clients.jedis.JedisPubSubBase.process(JedisPubSubBase.java:115)
[08:38:50 WARN]: at Main.jar//redis.clients.jedis.JedisPubSubBase.proceed(JedisPubSubBase.java:92)
[08:38:50 WARN]: at Main.jar//redis.clients.jedis.Jedis.subscribe(Jedis.java:7941)
[08:38:50 WARN]: at Main.jar//xyz.invisraidinq.redstone.manager.connection.ConnectionManager.lambda$initSubscription$2(ConnectionManager.java:78)
[08:38:50 WARN]: at java.base/java.lang.Thread.run(Thread.java:1583)
我希望它能够工作,只需保持订阅打开并执行逻辑,直到我手动禁用插件和/或停止服务器。
我猜您的插件和 Redis 服务器之间的连接意外关闭,可能是由于空闲超时或网络问题。可能是由于空闲超时和网络不稳定而发生,或者 Jedis 在使用 Pub/Sub 模型时可能无法自动重新连接或正确处理断开连接,从而导致订阅逻辑静默停止。
我们可能可以有连接丢失时自动重新连接的逻辑,如下所示
public class SubscriptionHandler extends JedisPubSub {
private final String redisAddress;
private final int redisPort;
private final String channel;
private volatile boolean running = true;
public SubscriptionHandler(String redisAddress, int redisPort, String channel) {
this.redisAddress = redisAddress;
this.redisPort = redisPort;
this.channel = channel;
}
@Override
public void onMessage(String channel, String message) {
Bukkit.getScheduler().runTask(Main.getInstance(), () -> {
for (Player player : Bukkit.getOnlinePlayers()) {
player.sendMessage(message);
}
});
}
public void start() {
new Thread(() -> {
while (running) {
try (Jedis jedis = new Jedis(redisAddress, redisPort)) {
jedis.subscribe(this, channel); // Blocking call
} catch (Exception e) {
// Log the error and retry after a short delay
Bukkit.getLogger().warning("Redis subscription lost, retrying in 5 seconds: " + e.getMessage());
try {
Thread.sleep(5000); // Wait before retrying
} catch (InterruptedException ignored) {
}
}
}
}).start();
}
public void stop() {
this.running = false;
this.unsubscribe();
}
}
public class Main extends JavaPlugin {
private SubscriptionHandler subscriptionHandler;
@Override
public void onEnable() {
// Initialize the SubscriptionHandler with your Redis details
subscriptionHandler = new SubscriptionHandler(
getConfig().getString("redis.address"),
getConfig().getInt("redis.port"),
"main-channel"
);
subscriptionHandler.start();
}
@Override
public void onDisable() {
// Stop the subscription when the plugin is disabled
if (subscriptionHandler != null) {
subscriptionHandler.stop();
}
}
}
这里的主要逻辑是
SubscriptionHandler
不断尝试订阅Redis通道,即使发生断开连接。如果抛出异常,它会在短暂延迟后重试连接。
如果问题仍然存在,我们可以添加 PING 消息或某种形式的保持活动机制来防止连接空闲。