注意到 ConcurrentHashMap 迭代的奇怪行为。 场景:
许多订阅者似乎丢失了更早添加的密钥。例如,当订阅者读取大小为 100 的地图时。预计订阅者将至少看到从 0 到 99 的键。但是,它似乎不会返回所有键。以下是重现该问题的示例代码。换句话说,即使在地图中添加示例密钥 100 后,在随机读取时,密钥 100 似乎丢失了。
就解释而言,我认为当通过链接解析密钥并且读取所有请求时,则不会返回与冲突相关的密钥。但是,我正在寻找是否有其他人看到过这一点,并且可以为以下行为提供更好的解释。
下面的代码有 3 个变体:
test()
)testWithNoCollisions
)。testWithLocks
)。import java.util.HashSet;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.CountDownLatch;
import java.util.concurrent.Executors;
import java.util.concurrent.atomic.AtomicBoolean;
import java.util.stream.Collectors;
import java.util.stream.IntStream;
public class ConcurrentHashMapWeakConsistency {
public static void main(String[] args) throws InterruptedException {
test();
// testWithNoCollisions();// Sometimes it passes when there is no collision
// testWithLocks();// This one works but it has external synchronize mechanism.
}
public static void test() throws InterruptedException {
var map = new ConcurrentHashMap<String, Integer>();
var publisher = Executors.newSingleThreadExecutor();
var totalMessages = 1_000_000;
publisher.submit(() -> {
for (int i = 0; i < totalMessages; i++) {
var key = "key-" + i;
map.put(key, i);
if (i % 10000 == 0) {
System.out.printf("Published %d messages.\n", i);
}
}
System.out.printf("Published all %d messages\n", totalMessages);
});
var subscriberCount = 100;
var subscribers = Executors.newFixedThreadPool(10);
var subsLatch = new CountDownLatch(subscriberCount);
IntStream.range(0, 100).parallel().forEach(subscriberId -> subscribers.submit(() -> {
try {
var existingKeys = new HashSet<>(map.keySet());
var size = existingKeys.size();
//Note the keys are inserted by publisher in sequential order.
// Hence, existing keys values should have all keys from range 0 to size-1
// This is where weak consistency shows up.
var missingKeys = IntStream.range(0, size).filter(num -> !existingKeys.contains("key-" + num))
.sorted()
.boxed()
.toList();
if (!missingKeys.isEmpty()) {
var sortedExistingKeys = existingKeys.stream()
.sorted((k1, k2) -> Integer.parseInt(k1.split("-")[1]) - Integer.parseInt(k2.split("-")[1]))
.toList();
var start = sortedExistingKeys.get(0);
var end = sortedExistingKeys.get(sortedExistingKeys.size() - 1);
var missingStart = "key-" + missingKeys.get(0);
var missingEnd = "key-" + missingKeys.get(missingKeys.size() - 1);
throw new RuntimeException(String.format("Subscriber %d missing %d keys! Result key range is [ %s, %s]. However, many numbers in range [ %s, %s ] are missing", subscriberId, missingKeys.size(), start, end, missingStart, missingEnd));
}
} catch (Exception ex) {
ex.printStackTrace();
System.exit(-1);
} finally {
subsLatch.countDown();
System.out.printf("Subscriber %d finished reading from map.\n", subscriberId);
}
}));
subsLatch.await();
publisher.shutdownNow();
subscribers.shutdownNow();
}
public static void testWithNoCollisions() throws InterruptedException {
var map = new ConcurrentHashMap<String, Integer>(1_000_000, .25f);
var publisher = Executors.newSingleThreadExecutor();
var totalMessages = 10_000;
publisher.submit(() -> {
for (int i = 0; i < totalMessages; i++) {
var key = "key-" + i;
map.put(key, i);
if (i % 10000 == 0) {
System.out.printf("Published %d messages.\n", i);
}
}
System.out.printf("Published all %d messages\n", totalMessages);
});
var subscriberCount = 100;
var subscribers = Executors.newFixedThreadPool(10);
var subsLatch = new CountDownLatch(subscriberCount);
IntStream.range(0, 100).parallel().forEach(subscriberId -> subscribers.submit(() -> {
try {
var existingKeys = new HashSet<>(map.keySet());
var size = existingKeys.size();
//Note the keys are inserted by publisher in sequential order.
// Hence, existing keys values should have all keys from range 0 to size-1
// This is where weak consistency shows up.
var missingKeys = IntStream.range(0, size).filter(num -> !existingKeys.contains("key-" + num))
.sorted()
.boxed()
.toList();
if (!missingKeys.isEmpty()) {
var sortedExistingKeys = existingKeys.stream()
.sorted((k1, k2) -> Integer.parseInt(k1.split("-")[1]) - Integer.parseInt(k2.split("-")[1]))
.toList();
var start = sortedExistingKeys.get(0);
var end = sortedExistingKeys.get(sortedExistingKeys.size() - 1);
var missingStart = missingKeys.get(0);
var missingEnd = missingKeys.get(missingKeys.size() - 1);
throw new RuntimeException(String.format("Subscriber %d missing %d keys! Result key range is [ %s, %s]. However, many numbers in range [ %s, %s ] are missing", subscriberId, missingKeys.size(), start, end, missingStart, missingEnd));
}
} catch (Exception ex) {
ex.printStackTrace();
System.exit(-1);
} finally {
subsLatch.countDown();
System.out.printf("Subscriber %d finished reading from map.\n", subscriberId);
}
}));
subsLatch.await();
publisher.shutdownNow();
subscribers.shutdownNow();
}
public static void testWithLocks() throws InterruptedException {
var map = new ConcurrentHashMap<String, Integer>();
var publisher = Executors.newSingleThreadExecutor();
var totalMessages = 1_000_000;
var subscriberActive = new AtomicBoolean(false);
publisher.submit(() -> {
for (int i = 0; i < totalMessages; i++) {
var key = "key-" + i;
// get the subscriber lock
while (!subscriberActive.compareAndSet(false, true)) ;
map.put(key, i);
subscriberActive.compareAndSet(true, false);
if (i % 10000 == 0) {
System.out.printf("Published %d messages.\n", i);
}
}
System.out.printf("Published all %d messages\n", totalMessages);
});
var subscriberCount = 100;
var subscribers = Executors.newFixedThreadPool(10);
var subsLatch = new CountDownLatch(subscriberCount);
IntStream.range(0, 100).parallel().forEach(subscriberId -> subscribers.submit(() -> {
try {
while (!subscriberActive.compareAndSet(false, true)) ;
var existingKeys = new HashSet<>(map.keySet());
subscriberActive.compareAndSet(true, false);
var size = existingKeys.size();
//Note the keys are inserted by publisher in sequential order.
// Hence, existing keys values should have all keys from range 0 to size-1
// This is where weak consistency shows up.
var missingKeys = IntStream.range(0, size).filter(num -> !existingKeys.contains("key-" + num))
.sorted()
.mapToObj(i -> Integer.valueOf(i))
.collect(Collectors.toList());
if (!missingKeys.isEmpty()) {
var sortedExistingKeys = existingKeys.stream()
.sorted((k1, k2) -> Integer.parseInt(k1.split("-")[1]) - Integer.parseInt(k2.split("-")[1]))
.toList();
var start = sortedExistingKeys.get(0);
var end = sortedExistingKeys.get(sortedExistingKeys.size() - 1);
var missingStart = missingKeys.get(0);
var missingEnd = missingKeys.get(missingKeys.size() - 1);
throw new RuntimeException(String.format("Subscriber %d missing %d keys! Result key range is [ %s, %s]. However, many numbers in range [ %s, %s ] are missing", subscriberId, missingKeys.size(), start, end, missingStart, missingEnd));
}
} catch (Exception ex) {
ex.printStackTrace();
System.exit(-1);
} finally {
subsLatch.countDown();
System.out.printf("Subscriber %d finished reading from map.\n", subscriberId);
}
}));
subsLatch.await();
publisher.shutdownNow();
subscribers.shutdownNow();
}
}
在 ConcurrenHashMap 中,当对映射进行并发写入和完整读取时,读取返回的结果通常会丢失过去的键。我期待它能提供某个时间点的地图快照。然而,根据上面的测试代码,这似乎不是结果。
ConcurrentHashMap 不是事务性数据库。根据 documentatïon:
更新状态为每个键检索操作(包括get)一般不会阻塞,因此可能 与更新操作重叠(包括放置和删除)。检索 反映最近完成的更新操作的结果 坚持他们的发作。 (更正式地说,更新操作 给定的键与任何(非空)具有发生之前关系 检索报告更新值的键。)对于聚合 putAll 和clear 等操作,并发检索可能会反映出 仅插入或删除某些条目。同样,迭代器, Spliterators 和 Enumerations 返回反映状态的元素 哈希表在创建时或创建后的某个时刻 迭代器/枚举。他们不扔 并发修改异常。然而,迭代器被设计为 一次仅由一个线程使用。请记住,结果 聚合状态方法,包括 size、isEmpty 和 containsValue 通常仅当地图未经历并发时才有用 其他线程中的更新。否则这些方法的结果 反映可能足以监测或 估计目的,但不用于程序控制
.