我在 Dart/Flutter 项目中使用 ZMQ 库进行网络通信。我有一对 PUB/SUB TCP 套接字,发布者每 100 毫秒推送一条消息。我使用此代码来监听接收方的消息:
m_frameDataSubscripiton = m_subscribeSocket.messages.listen((msg) async {
Iterator<ZFrame> it = msg.iterator;
if (it.moveNext()) {
while (it.current.hasMore) {
it.moveNext();
}
if (it.current.payload.isNotEmpty) {
print("Got the frame packet at " + DateTime.now().millisecondsSinceEpoch.toString() + " size " + it.current.payload.length.toString());
}
}
},
onError: (Object error) {
m_status = "subscription error";
notifyListeners();
},
onDone: () {
m_status = "done";
notifyListeners();
});
消息接收得很好,但我注意到它们的时间分布有些奇怪。似乎对 listen(msg) 的调用被“打包”为突发,调用之间间隔 1-2 毫秒,然后有 1000 毫秒的暂停。下面是接收到的时间戳表,可以更好地说明这种情况:
我尝试了不同的策略来使接收时间更加统一,包括设置接收套接字的较低高水位线,但这会引发所有低于默认值(1000)的值的异常。我还尝试使用 RxDart 包并在流中使用定时缓冲区(100 毫秒)轮询时间,但这导致订阅仍然每 1000 毫秒轮询一次,而其余消息则被简单地丢弃。 有没有办法强制订阅在每次收到一条消息时刷新其缓冲区?
事实证明,原因在于 dart ZMQ 库内的硬编码轮询持续时间:https://github.com/enwi/dartzmq/issues/4