如何在 ZeroMQ 中发布一个多线程的套接字

问题描述 投票:0回答:2

我用的是NetMQ,ZeroMQ的c#库,我实现如下。

这里的问题是,在PublishAsync的情况下,多个线程从外部生成和调用数据。

但是,由于Publish是在一个叫_pubThread的线程上进行的,所以好像有延迟

请告诉我如何解决这种情况以及为什么我不能在 ZeroMQ 的一个套接字中使用多个线程。

public class NetMqManager
{
    private NetMQQueue<(string, string)> _queue = new NetMQQueue<(string, string)>();

    private readonly Thread _subThread;

    private readonly Thread _pubThread;

    private readonly SubscriberSocket _subscriber;

    private readonly PublisherSocket _publisher;

    private readonly ZeroMqEndPoint _endPoint;

    public NetMqManager(ZeroMqEndPoint endPoint)
    {
        _endPoint = endPoint;

        _publisher = new PublisherSocket();
        _publisher.Options.SendHighWatermark = 1500;
        _publisher.SendReady += Publisher_SendReady;
        _pubThread = new Thread(() =>
        {
            var poller = new NetMQPoller { _publisher };
            poller.Run();
        });

        _subscriber = new SubscriberSocket();
        _subscriber.Options.SendHighWatermark = 1500;
        _subscriber.ReceiveReady += Subscriber_ReceiveReady;
        _subThread = new Thread(() =>
        {
            var poller = new NetMQPoller { _subscriber };
            poller.Run();
        });
    }

    public async Task RunAsync()
    {
        await Task.Run(() =>
        {
            _publisher.Bind($"tcp://*:{_endPoint.PubPort}");
            _subscriber.Bind($"tcp://*:{_endPoint.SubPort}");

            _pubThread.Start();
            _subThread.Start();
        });
    }

    public async void PublishAsync(string topic, string payload)
    {
        await Task.Run(() => _queue.Enqueue((topic, payload)));
    }

    public async void SubscribeAsync(string topic)
    {
        _subscriber.Subscribe(topic);
    }

    private void Publisher_SendReady(object? sender, NetMQSocketEventArgs e)
    {
        var (topic, payload) = _queue.Dequeue();
        _publisher.SendMoreFrame(topic).SendFrame(payload);
    }

    private void Subscriber_ReceiveReady(object? sender, NetMQSocketEventArgs e)
    {
        var topic = e.Socket.ReceiveFrameString();
        var payload = e.Socket.ReceiveFrameString();

        Console.WriteLine($"Topic: {topic}, Payload: {payload}");
    }
}
c# .net zeromq netmq
2个回答
0
投票

Q: “请让我知道如何解决这种情况,以及为什么我不能在 ZeroMQ 的一个套接字中使用多个线程。”

A 部分:“... ZeroMQ 中一个套接字中的多个线程”

ZeroMQ 本机 API 可以指定在

Context()
-实例(核心引擎元素)内使用多少个 I/O 线程,此外还可以指定每个
Socket
-实例的最终级别的详细信息及其亲和力朝着如此准备
Context()
-实例的I / O线程池。

如果您的 NetMQ 包装器将此传递给用户级代码,请相应地使用它来提高 I/O 性能。

B 部分:要避免的性能损失

如果您的用户级代码线程间性能需要提高,请避免使用与使用

tcp://
-传输类相关的非常昂贵的设置/解码,一旦数据仅在同一进程的线程之间流动-
inproc://
-Transport Class 应该是任何附加开销中负载最少的。

C部分:脆弱还是自力更生?

如果您的代码依赖于多帧 ZeroMQ 消息组合,您将浪费本机

PUB/SUB
-Archetype 中存在的所有开销处理,其中对所有激活的订阅执行纯左对齐的字节方式字符串匹配,所以依赖多帧组合是资源浪费和另一个主要低效率的另一个来源(虽然它可能是 NetMQ-wrapper 设计妥协的一些不需要的副作用 - IIRC 这是我遇到的一个案例 6-8 年之前)。

如果您的代码也努力成为一个健壮的、自我恢复的代码,那么永远不要使用盲目假设进行编码,即房间里只有公平和诚实的消息传递参与者。在

inproc://
案例中不是那么多,但是如果使用任何“网络开放”传输类
 { tcp:// | udp:// | pgm:// | epgm:// | tipc:// | norm:// | vmci:// }
,当不合规的消息到达时,应该始终处理案例。在这里,依赖于在每条消息中(始终且仅)有两个帧,在第一种情况下,空(零帧)、单帧或 3+ 帧消息到达时,会使您的代码陷入死锁或异常。您的
.poll()/.recv()
-方法必须处理接收环路中实际存在的帧数量的任何情况,而不是将自己射入我们自己的腿。

最后但同样重要的是,零之禅:

Martin SUSTRIK 和 Pieter HINTJENS 的宣传自始至终都是清晰而健全的——从不分享,从不阻止。

虽然在框架的所谓线程安全现代化上花费了许多努力,但设计格言(恕我直言)在用户级代码上仍然是首选,并且“共享”套接字实例对做。最好保持正确,使用线程间链接(如

inproc://
-s )将 manySocketLessThreads-to-oneSocketOwner 移动并专注于这些智能信号/消息元平面互连两侧的主要工作逻辑。

更好的性能,
更好的关注点分离,
更好的调试


0
投票

我很确定有多个线程需要发布的最佳答案是

  • 为每个 PUB 线程提供自己的套接字,绑定到它们自己唯一的连接字符串。
  • 在每个 SUB 客户端中,将 SUB 套接字多次连接到您的所有发布者连接字符串。
  • 这样,每个 SUBsciber 都通过一个套接字监听所有 PUBlisher 线程。每个发布者线程都有自己的专用套接字,因此无需与任何其他发布者线程协调即可使用它。

这是 ZeroMQ 套接字经常被遗忘的方面;它们可以绑定和连接不止一次。请参阅手册。人们经常将 ZMQ 套接字视为 BSD 套接字,但它们并不相同!您永远不会梦想 BSD 套接字通过 TCP、inproc、IPC 等多种传输方式连接到多个服务器。但它是如此有用的功能!

© www.soinside.com 2019 - 2024. All rights reserved.