我的应用程序需要从 PUSH 套接字提取数据。数据被处理并写入文件。我希望应用程序做最少的工作将消息拉入进程内存,以便a)服务器不会被阻止并且不会丢弃任何消息,b)我可以监视由于处理缓慢而导致的消息积压,然后进行处理异步消息。
我可以使用内部具有三个套接字的包装套接字来实现此目的:
inproc://
连接的套接字,我可以在发送和接收消息时记录消息数量。所以数据流是:主题 1
rx.recv()
inproc_tx.send()
queued_messages++
主题 2
inproc_rx.recv()
queued_messages--
do_processing()
这似乎是缓冲消息的明智方法,因为它已经具有来自
zmq::message_t
的 recv()
,而且我确信它会比滚动我自己的排队逻辑更有效地实现。它可以工作,但是当底层代码已经知道有多少消息排队来决定何时删除新消息时,似乎有很多开销和额外的代码。
是否有 API 可以查询 ZeroMQ PULL 套接字上的队列使用情况?或者有更好的方法来实现上述内容?
您可能低估了 ZeroMQ 在幕后免费为您做的事情! PUSH/PULL 模式用于将消息从 PUSHer 发送到可用的 PULLer。如果您有超过 1 个 PULLer,结果是如果其中一个稍微落后于需求,ZeroMQ 将自动向其发送更少的消息。
我还认为您在缓冲收到的消息时会犯一个错误。如果您在线程 1 中处理它的速度落后,那么您在线程 2 中处理它的速度仍然会落后;您只是用一些额外的延迟来掩盖性能不佳。如果您只是将线程 1 和线程 2 作为拉取器(而不是通过管道连接在一起),那么异步处理消息就不会利用 PUSH/PULL 为您所做的事情。请查看指南这部分中的图 5。
就积压而言,您可以将 PUSH 套接字上的高水位线设置为某个可接受的值,并在 zmq_send() 上设置超时。如果您检测到 zmq_send() 超时,那么一切都不好;拉手已经落后了。事实上,这就是您需要知道的全部。我不知道有什么方法可以发现有多少消息在排队等待,但是当人们直接了解它时,它要么太多(超时),要么根本没有(拉动者正在跟上)。如果 PULLers 开始落后并且您想尽早检测到这一点,请将 PUSH 高水位线设置为某个较低值。