并发 HashMap 弱一致性在迭代期间丢失过去的键 - 多线程发布/订阅

问题描述 投票:0回答:1

注意到 ConcurrentHashMap 迭代的奇怪行为。 场景:

  • 单个发布者线程不断向并发哈希映射添加新键。注意键按顺序添加。
  • 多个订阅者线程尝试读取并发哈希图中的所有当前值。

许多订阅者似乎丢失了更早添加的密钥。例如,当订阅者读取大小为 100 的地图时。预计订阅者将至少看到从 0 到 99 的键。但是,它似乎不会返回所有键。以下是重现该问题的示例代码。换句话说,即使在地图中添加示例密钥 100 后,在随机读取时,密钥 100 似乎丢失了。

就解释而言,我认为当通过链接解析密钥并且读取所有请求时,则不会返回与冲突相关的密钥。但是,我正在寻找是否有其他人看到过这一点,并且可以为以下行为提供更好的解释。

下面的代码有 3 个变体:

  1. 出现问题的地方(
    test()
    )
  2. 如果地图保持足够大并且碰撞〜“不会发生”,则随机接收完整视图(
    testWithNoCollisions
    )。
  3. 使用锁来确保订阅者读取整个映射时,写入者会被阻塞 (
    testWithLocks
    )。
import java.util.HashSet;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.CountDownLatch;
import java.util.concurrent.Executors;
import java.util.concurrent.atomic.AtomicBoolean;
import java.util.stream.Collectors;
import java.util.stream.IntStream;

public class ConcurrentHashMapWeakConsistency {

    public static void main(String[] args) throws InterruptedException {
        test();
//        testWithNoCollisions();// Sometimes it passes when there is no collision
//        testWithLocks();// This one works but it has external synchronize mechanism.
    }

    public static void test() throws InterruptedException {
        var map = new ConcurrentHashMap<String, Integer>();

        var publisher = Executors.newSingleThreadExecutor();
        var totalMessages = 1_000_000;
        publisher.submit(() -> {
            for (int i = 0; i < totalMessages; i++) {
                var key = "key-" + i;
                map.put(key, i);
                if (i % 10000 == 0) {
                    System.out.printf("Published %d messages.\n", i);
                }
            }
            System.out.printf("Published all %d messages\n", totalMessages);
        });

        var subscriberCount = 100;
        var subscribers = Executors.newFixedThreadPool(10);
        var subsLatch = new CountDownLatch(subscriberCount);

        IntStream.range(0, 100).parallel().forEach(subscriberId -> subscribers.submit(() -> {
            try {
                var existingKeys = new HashSet<>(map.keySet());
                var size = existingKeys.size();

                //Note the keys are inserted by publisher in sequential order.
                // Hence, existing keys values should have all keys from range 0 to size-1
                // This is where weak consistency shows up.
                var missingKeys = IntStream.range(0, size).filter(num -> !existingKeys.contains("key-" + num))
                        .sorted()
                        .boxed()
                        .toList();

                if (!missingKeys.isEmpty()) {
                    var sortedExistingKeys = existingKeys.stream()
                            .sorted((k1, k2) -> Integer.parseInt(k1.split("-")[1]) - Integer.parseInt(k2.split("-")[1]))
                            .toList();
                    var start = sortedExistingKeys.get(0);
                    var end = sortedExistingKeys.get(sortedExistingKeys.size() - 1);
                    var missingStart = "key-" + missingKeys.get(0);
                    var missingEnd = "key-" + missingKeys.get(missingKeys.size() - 1);
                    throw new RuntimeException(String.format("Subscriber %d missing %d keys! Result key range is [ %s, %s]. However, many numbers in range [ %s, %s ] are missing", subscriberId, missingKeys.size(), start, end, missingStart, missingEnd));
                }
            } catch (Exception ex) {
                ex.printStackTrace();
                System.exit(-1);
            } finally {
                subsLatch.countDown();
                System.out.printf("Subscriber %d finished reading from map.\n", subscriberId);
            }
        }));


        subsLatch.await();
        publisher.shutdownNow();
        subscribers.shutdownNow();
    }

    public static void testWithNoCollisions() throws InterruptedException {
        var map = new ConcurrentHashMap<String, Integer>(1_000_000, .25f);

        var publisher = Executors.newSingleThreadExecutor();
        var totalMessages = 10_000;
        publisher.submit(() -> {
            for (int i = 0; i < totalMessages; i++) {
                var key = "key-" + i;
                map.put(key, i);
                if (i % 10000 == 0) {
                    System.out.printf("Published %d messages.\n", i);
                }
            }
            System.out.printf("Published all %d messages\n", totalMessages);
        });

        var subscriberCount = 100;
        var subscribers = Executors.newFixedThreadPool(10);
        var subsLatch = new CountDownLatch(subscriberCount);

        IntStream.range(0, 100).parallel().forEach(subscriberId -> subscribers.submit(() -> {
            try {
                var existingKeys = new HashSet<>(map.keySet());
                var size = existingKeys.size();

                //Note the keys are inserted by publisher in sequential order.
                // Hence, existing keys values should have all keys from range 0 to size-1
                // This is where weak consistency shows up.
                var missingKeys = IntStream.range(0, size).filter(num -> !existingKeys.contains("key-" + num))
                        .sorted()
                        .boxed()
                        .toList();

                if (!missingKeys.isEmpty()) {
                    var sortedExistingKeys = existingKeys.stream()
                            .sorted((k1, k2) -> Integer.parseInt(k1.split("-")[1]) - Integer.parseInt(k2.split("-")[1]))
                            .toList();
                    var start = sortedExistingKeys.get(0);
                    var end = sortedExistingKeys.get(sortedExistingKeys.size() - 1);
                    var missingStart = missingKeys.get(0);
                    var missingEnd = missingKeys.get(missingKeys.size() - 1);
                    throw new RuntimeException(String.format("Subscriber %d missing %d keys! Result key range is [ %s, %s]. However, many numbers in range [ %s, %s ] are missing", subscriberId, missingKeys.size(), start, end, missingStart, missingEnd));
                }
            } catch (Exception ex) {
                ex.printStackTrace();
                System.exit(-1);
            } finally {
                subsLatch.countDown();
                System.out.printf("Subscriber %d finished reading from map.\n", subscriberId);
            }
        }));


        subsLatch.await();
        publisher.shutdownNow();
        subscribers.shutdownNow();
    }

    public static void testWithLocks() throws InterruptedException {
        var map = new ConcurrentHashMap<String, Integer>();

        var publisher = Executors.newSingleThreadExecutor();
        var totalMessages = 1_000_000;
        var subscriberActive = new AtomicBoolean(false);
        publisher.submit(() -> {
            for (int i = 0; i < totalMessages; i++) {
                var key = "key-" + i;
                // get the subscriber lock
                while (!subscriberActive.compareAndSet(false, true)) ;
                map.put(key, i);
                subscriberActive.compareAndSet(true, false);
                if (i % 10000 == 0) {
                    System.out.printf("Published %d messages.\n", i);
                }
            }
            System.out.printf("Published all %d messages\n", totalMessages);
        });

        var subscriberCount = 100;
        var subscribers = Executors.newFixedThreadPool(10);
        var subsLatch = new CountDownLatch(subscriberCount);

        IntStream.range(0, 100).parallel().forEach(subscriberId -> subscribers.submit(() -> {
            try {
                while (!subscriberActive.compareAndSet(false, true)) ;
                var existingKeys = new HashSet<>(map.keySet());
                subscriberActive.compareAndSet(true, false);

                var size = existingKeys.size();

                //Note the keys are inserted by publisher in sequential order.
                // Hence, existing keys values should have all keys from range 0 to size-1
                // This is where weak consistency shows up.
                var missingKeys = IntStream.range(0, size).filter(num -> !existingKeys.contains("key-" + num))
                        .sorted()
                        .mapToObj(i -> Integer.valueOf(i))
                        .collect(Collectors.toList());

                if (!missingKeys.isEmpty()) {
                    var sortedExistingKeys = existingKeys.stream()
                            .sorted((k1, k2) -> Integer.parseInt(k1.split("-")[1]) - Integer.parseInt(k2.split("-")[1]))
                            .toList();
                    var start = sortedExistingKeys.get(0);
                    var end = sortedExistingKeys.get(sortedExistingKeys.size() - 1);
                    var missingStart = missingKeys.get(0);
                    var missingEnd = missingKeys.get(missingKeys.size() - 1);
                    throw new RuntimeException(String.format("Subscriber %d missing %d keys! Result key range is [ %s, %s]. However, many numbers in range [ %s, %s ] are missing", subscriberId, missingKeys.size(), start, end, missingStart, missingEnd));
                }
            } catch (Exception ex) {
                ex.printStackTrace();
                System.exit(-1);
            } finally {
                subsLatch.countDown();
                System.out.printf("Subscriber %d finished reading from map.\n", subscriberId);
            }
        }));


        subsLatch.await();
        publisher.shutdownNow();
        subscribers.shutdownNow();
    }
}

在 ConcurrenHashMap 中,当对映射进行并发写入和完整读取时,读取返回的结果通常会丢失过去的键。我期待它能提供某个时间点的地图快照。然而,根据上面的测试代码,这似乎不是结果。

java publish-subscribe concurrenthashmap
1个回答
0
投票

ConcurrentHashMap 不是事务性数据库。根据 documentatïon:

更新状态为每个键

检索操作(包括get)一般不会阻塞,因此可能 与更新操作重叠(包括放置和删除)。检索 反映最近完成的更新操作的结果 坚持他们的发作。 (更正式地说,更新操作 给定的键与任何(非空)具有发生之前关系 检索报告更新值的键。)对于聚合 putAll 和clear 等操作,并发检索可能会反映出 仅插入或删除某些条目。同样,迭代器, Spliterators 和 Enumerations 返回反映状态的元素 哈希表在创建时或创建后的某个时刻 迭代器/枚举。他们不扔 并发修改异常。然而,迭代器被设计为 一次仅由一个线程使用。请记住,结果 聚合状态方法,包括 size、isEmpty 和 containsValue 通常仅当地图未经历并发时才有用 其他线程中的更新。否则这些方法的结果 反映可能足以监测或 估计目的,但不用于程序控制

.

最新问题
© www.soinside.com 2019 - 2025. All rights reserved.