Jedis 酒吧/订阅频道在很长一段时间没有被交谈后中断

问题描述 投票:0回答:1

我正在使用 Redis 在 Java 中制作一个 Minecraft spigot 插件,以便在连接到同一 Redis 实例的多个不同服务器上广播自定义消息。我使用 https://github.com/InvisRaidinq/redstone 库来简化 jedis 函数的处理和编码。但是,如果在很长一段时间内没有通过通道发送消息,那么所有 jedis 订阅/onRecieve 逻辑都会停止工作并且不执行任何操作,这是有效的。我仍然能够向频道发布消息,并且它们确实会发送到 Redis 实例,但所有订阅逻辑什么也不做。

这就是我在代码中使用库来执行插件命令之一的方式:

AlertPacket.java

public class AlertPacket implements RedstonePacket {
    private final String alert;

    public AlertPacket(String alert) {
        this.alert = alert;
    }

    @Override
    public void onReceive() {
            Bukkit.getScheduler().runTask(Main.getInstance(), () -> {
                for (Player player : Bukkit.getOnlinePlayers()) {
                    player.sendMessage(alert);
                }
            });
    }
}

AlertCommand.java

redstone.sendPacket(new AlertPacket(input));

Main.java

        redstone = Redstone.builder()
                .address(getConfig().getString("redis.address"))
                .port(getConfig().getInt("redis.port"))
                .withCredentials(AuthCredentials.of(getConfig().getString("redis.username"), getConfig().getString("redis.password")))
                .channel("main-channel")
                .build();                  

我还尝试使用原始库的分叉和更新的代码库:https://github.com/PedroMPagani/redstone但是我仍然遇到与没有分叉之前相同的问题。

当我尝试使用插件但订阅逻辑不起作用时,我在控制台中收到此错误:

[08:38:50 WARN]: Exception in thread "Thread-7" redis.clients.jedis.exceptions.JedisConnectionException: Unexpected end of stream.
[08:38:50 WARN]:        at Main.jar//redis.clients.jedis.util.RedisInputStream.ensureFill(RedisInputStream.java:248)
[08:38:50 WARN]:        at Main.jar//redis.clients.jedis.util.RedisInputStream.readByte(RedisInputStream.java:47)
[08:38:50 WARN]:        at Main.jar//redis.clients.jedis.Protocol.process(Protocol.java:136)
[08:38:50 WARN]:        at Main.jar//redis.clients.jedis.Protocol.read(Protocol.java:222)
[08:38:50 WARN]:        at Main.jar//redis.clients.jedis.Connection.readProtocolWithCheckingBroken(Connection.java:350)
[08:38:50 WARN]:        at Main.jar//redis.clients.jedis.Connection.getUnflushedObject(Connection.java:316)
[08:38:50 WARN]:        at Main.jar//redis.clients.jedis.JedisPubSubBase.process(JedisPubSubBase.java:115)
[08:38:50 WARN]:        at Main.jar//redis.clients.jedis.JedisPubSubBase.proceed(JedisPubSubBase.java:92)
[08:38:50 WARN]:        at Main.jar//redis.clients.jedis.Jedis.subscribe(Jedis.java:7941)
[08:38:50 WARN]:        at Main.jar//xyz.invisraidinq.redstone.manager.connection.ConnectionManager.lambda$initSubscription$2(ConnectionManager.java:78)
[08:38:50 WARN]:        at java.base/java.lang.Thread.run(Thread.java:1583)

我希望它能够工作,只需保持订阅打开并执行逻辑,直到我手动禁用插件和/或停止服务器。

java redis jedis
1个回答
0
投票

我猜您的插件和 Redis 服务器之间的连接意外关闭,可能是由于空闲超时或网络问题。可能是由于空闲超时和网络不稳定而发生,或者 Jedis 在使用 Pub/Sub 模型时可能无法自动重新连接或正确处理断开连接,从而导致订阅逻辑静默停止。

我们可能可以有连接丢失时自动重新连接的逻辑,如下所示

public class SubscriptionHandler extends JedisPubSub {
    private final String redisAddress;
    private final int redisPort;
    private final String channel;
    private volatile boolean running = true;

    public SubscriptionHandler(String redisAddress, int redisPort, String channel) {
        this.redisAddress = redisAddress;
        this.redisPort = redisPort;
        this.channel = channel;
    }

    @Override
    public void onMessage(String channel, String message) {
        Bukkit.getScheduler().runTask(Main.getInstance(), () -> {
            for (Player player : Bukkit.getOnlinePlayers()) {
                player.sendMessage(message);
            }
        });
    }

    public void start() {
        new Thread(() -> {
            while (running) {
                try (Jedis jedis = new Jedis(redisAddress, redisPort)) {
                    jedis.subscribe(this, channel);  // Blocking call
                } catch (Exception e) {
                    // Log the error and retry after a short delay
                    Bukkit.getLogger().warning("Redis subscription lost, retrying in 5 seconds: " + e.getMessage());
                    try {
                        Thread.sleep(5000);  // Wait before retrying
                    } catch (InterruptedException ignored) {
                    }
                }
            }
        }).start();
    }

    public void stop() {
        this.running = false;
        this.unsubscribe();
    }
}
public class Main extends JavaPlugin {
    private SubscriptionHandler subscriptionHandler;

    @Override
    public void onEnable() {
        // Initialize the SubscriptionHandler with your Redis details
        subscriptionHandler = new SubscriptionHandler(
                getConfig().getString("redis.address"),
                getConfig().getInt("redis.port"),
                "main-channel"
        );
        subscriptionHandler.start();
    }

    @Override
    public void onDisable() {
        // Stop the subscription when the plugin is disabled
        if (subscriptionHandler != null) {
            subscriptionHandler.stop();
        }
    }
}

这里的主要逻辑是

SubscriptionHandler
不断尝试订阅Redis通道,即使发生断开连接。如果抛出异常,它会在短暂延迟后重试连接。

如果问题仍然存在,我们可以添加 PING 消息或某种形式的保持活动机制来防止连接空闲。

© www.soinside.com 2019 - 2024. All rights reserved.