为什么 Hazelcast Near Cache 与 EntryUpdatedListener 不同步,即使它们位于同一进程中?

问题描述 投票:0回答:2

我了解当值在其他节点上的其他位置更新时,不能保证近端缓存实时同步。

但是我确实希望它与同一节点上的 EntryUpdatedListener 同步,因此是相同的进程 - 或者我错过了什么?

事件顺序:

1 个节点的集群修改相同的键/值,每隔 X 秒将值从 X 翻转到 Y,然后返回到 X。

客户端连接到该集群节点,并添加一个EntryUpdatedListener来观察翻转值。

客户端接收 EntryUpdatedEvent 并打印给定的值 - 正如预期的那样,它给出了最近设置的值。

客户端立即对同一个键执行map.get(应该命中附近的缓存),并打印一个STALE值。

我觉得这很奇怪 - 这意味着同一客户端进程中的两个“通道”显示的数据版本不一致。我只期望在不同的进程之间出现这种情况。

以下是我的重现器代码:

public class ClusterTest {
    private static final int OLD_VALUE = 10000;
    private static final int NEW_VALUE = 88888;
    private static final int KEY = 5;
    private static final int NUMBER_OF_ENTRIES = 10;

    public static void main(String[] args) throws Exception {
        HazelcastInstance instance = Hazelcast.newHazelcastInstance();

        IMap map = instance.getMap("test");
        for (int i = 0; i < NUMBER_OF_ENTRIES; i++) {
            map.put(i, 0);
        }

        System.out.println("Size of map = " + map.size());

        boolean flag = false;

        while(true) {
            int value = flag ? OLD_VALUE : NEW_VALUE;
            flag = !flag;

            map.put(KEY, value);

            System.out.println("Set a value of [" + value + "]: ");

            Thread.sleep(1000);
        }
    }
}

public class ClientTest {
    public static void main(String[] args) throws InterruptedException {
        HazelcastInstance instance = HazelcastClient.newHazelcastClient(new ClientConfig().addNearCacheConfig(new NearCacheConfig("test")));
        IMap map = instance.getMap("test");
        System.out.println("Size of map = " + map.size());

        map.addEntryListener(new MyEntryListener(instance), true);

        new CountDownLatch(1).await();
    }

    static class MyEntryListener
            implements EntryAddedListener,
            EntryUpdatedListener,
            EntryRemovedListener {
        private HazelcastInstance instance;

        public MyEntryListener(HazelcastInstance instance) {
            this.instance = instance;
        }

        @Override
        public void entryAdded(EntryEvent event) {
            System.out.println("Entry Added:" + event);
        }

        @Override
        public void entryRemoved(EntryEvent event) {
            System.out.println("Entry Removed:" + event);
        }

        @Override
        public void entryUpdated(EntryEvent event) {
            Object o = instance.getMap("test").get(event.getKey());
            boolean equals = o.equals(event.getValue());
            String s = "Event matches what has been fetched = " + equals;

            if (!equals) {
                s += ", EntryEvent value has delivered: " + (event.getValue()) + ", and an explicit GET has delivered:" + o;
            }

            System.out.println(s);
        }
    }
}

客户端的输出:

INFO: hz.client_0 [dev] [3.11.1] HazelcastClient 3.11.1 (20181218 - d294f31) is CLIENT_CONNECTED
Jun 20, 2019 4:58:15 PM com.hazelcast.internal.diagnostics.Diagnostics
INFO: hz.client_0 [dev] [3.11.1] Diagnostics disabled. To enable add -Dhazelcast.diagnostics.enabled=true to the JVM arguments.
Size of map = 10
Event matches what has been fetched = true
Event matches what has been fetched = false, EntryEvent value has delivered: 88888, and an explicit GET has delivered:10000
Event matches what has been fetched = true
Event matches what has been fetched = true
Event matches what has been fetched = false, EntryEvent value has delivered: 10000, and an explicit GET has delivered:88888
hazelcast hazelcast-imap
2个回答
0
投票

Near Cache 具有最终一致性保证,而 Listeners 以“即发即忘”的方式工作。这就是为什么两者有两种不同的机制。此外,对近缓存事件进行批处理可以减少网络流量并保持事件系统不那么繁忙(这在存在太多失效或客户端时有所帮助),作为权衡,它可能会增加单个失效的延迟。如果您确信您的系统可以处理每个失效事件,您可以禁用批处理。

您需要在成员端配置该属性,因为事件是在集群成员上生成并发送到客户端的。


0
投票

来自 Hazelcast 文档;

https://docs.hazelcast.com/hazelcast/5.1/cluster-performance/best-practices

失效可以从成员发送到客户端附近缓存或成员附近缓存,可以单独或批量发送。默认行为是批量发送。如果数据结构上存在大量变异操作(例如放置/删除),建议您配置批量失效。这减少了网络流量并使事件系统不那么繁忙,但可能会增加个别失效的延迟。

您可以使用以下系统属性来配置近端缓存失效:

hazelcast.map.invalidation.batch.enabled: Enable or disable batching. Its default value is true. When it is set to false, all invalidations are sent immediately.

hazelcast.map.invalidation.batch.size: Maximum number of invalidations in a batch. Its default value is 100.

hazelcast.map.invalidation.batchfrequency.seconds: If the collected invalidations do not reach the configured batch size, a background process sends them periodically. Its default value is 10 seconds.

如果有很多客户端或很多变异操作,则应保持启用批处理,并且应使用 hazelcast.map.invalidation.batch.size 系统属性将批处理大小配置为合适的值。

© www.soinside.com 2019 - 2024. All rights reserved.