我用的是NetMQ,ZeroMQ的c#库,我实现如下。
这里的问题是,在PublishAsync的情况下,多个线程从外部生成和调用数据。
但是,由于Publish是在一个叫_pubThread的线程上进行的,所以好像有延迟
请告诉我如何解决这种情况以及为什么我不能在 ZeroMQ 的一个套接字中使用多个线程。
public class NetMqManager
{
private NetMQQueue<(string, string)> _queue = new NetMQQueue<(string, string)>();
private readonly Thread _subThread;
private readonly Thread _pubThread;
private readonly SubscriberSocket _subscriber;
private readonly PublisherSocket _publisher;
private readonly ZeroMqEndPoint _endPoint;
public NetMqManager(ZeroMqEndPoint endPoint)
{
_endPoint = endPoint;
_publisher = new PublisherSocket();
_publisher.Options.SendHighWatermark = 1500;
_publisher.SendReady += Publisher_SendReady;
_pubThread = new Thread(() =>
{
var poller = new NetMQPoller { _publisher };
poller.Run();
});
_subscriber = new SubscriberSocket();
_subscriber.Options.SendHighWatermark = 1500;
_subscriber.ReceiveReady += Subscriber_ReceiveReady;
_subThread = new Thread(() =>
{
var poller = new NetMQPoller { _subscriber };
poller.Run();
});
}
public async Task RunAsync()
{
await Task.Run(() =>
{
_publisher.Bind($"tcp://*:{_endPoint.PubPort}");
_subscriber.Bind($"tcp://*:{_endPoint.SubPort}");
_pubThread.Start();
_subThread.Start();
});
}
public async void PublishAsync(string topic, string payload)
{
await Task.Run(() => _queue.Enqueue((topic, payload)));
}
public async void SubscribeAsync(string topic)
{
_subscriber.Subscribe(topic);
}
private void Publisher_SendReady(object? sender, NetMQSocketEventArgs e)
{
var (topic, payload) = _queue.Dequeue();
_publisher.SendMoreFrame(topic).SendFrame(payload);
}
private void Subscriber_ReceiveReady(object? sender, NetMQSocketEventArgs e)
{
var topic = e.Socket.ReceiveFrameString();
var payload = e.Socket.ReceiveFrameString();
Console.WriteLine($"Topic: {topic}, Payload: {payload}");
}
}
Q: “请让我知道如何解决这种情况,以及为什么我不能在 ZeroMQ 的一个套接字中使用多个线程。”
A 部分:“... ZeroMQ 中一个套接字中的多个线程”
ZeroMQ 本机 API 可以指定在
Context()
-实例(核心引擎元素)内使用多少个 I/O 线程,此外还可以指定每个 Socket
-实例的最终级别的详细信息及其亲和力朝着如此准备Context()
-实例的I / O线程池。
如果您的 NetMQ 包装器将此传递给用户级代码,请相应地使用它来提高 I/O 性能。
B 部分:要避免的性能损失
如果您的用户级代码线程间性能需要提高,请避免使用与使用
tcp://
-传输类相关的非常昂贵的设置/解码,一旦数据仅在同一进程的线程之间流动-inproc://
-Transport Class 应该是任何附加开销中负载最少的。
C部分:脆弱还是自力更生?
如果您的代码依赖于多帧 ZeroMQ 消息组合,您将浪费本机
PUB/SUB
-Archetype 中存在的所有开销处理,其中对所有激活的订阅执行纯左对齐的字节方式字符串匹配,所以依赖多帧组合是资源浪费和另一个主要低效率的另一个来源(虽然它可能是 NetMQ-wrapper 设计妥协的一些不需要的副作用 - IIRC 这是我遇到的一个案例 6-8 年之前)。
如果您的代码也努力成为一个健壮的、自我恢复的代码,那么永远不要使用盲目假设进行编码,即房间里只有公平和诚实的消息传递参与者。在
inproc://
案例中不是那么多,但是如果使用任何“网络开放”传输类像 { tcp:// | udp:// | pgm:// | epgm:// | tipc:// | norm:// | vmci:// }
,当不合规的消息到达时,应该始终处理案例。在这里,依赖于在每条消息中(始终且仅)有两个帧,在第一种情况下,空(零帧)、单帧或 3+ 帧消息到达时,会使您的代码陷入死锁或异常。您的.poll()/.recv()
-方法必须处理接收环路中实际存在的帧数量的任何情况,而不是将自己射入我们自己的腿。
最后但同样重要的是,零之禅:
Martin SUSTRIK 和 Pieter HINTJENS 的宣传自始至终都是清晰而健全的——从不分享,从不阻止。
虽然在框架的所谓线程安全现代化上花费了许多努力,但设计格言(恕我直言)在用户级代码上仍然是首选,并且“共享”套接字实例对做。最好保持正确,使用线程间链接(如
inproc://
-s )将 manySocketLessThreads-to-oneSocketOwner 移动并专注于这些智能信号/消息元平面互连两侧的主要工作逻辑。
更好的性能,
更好的关注点分离,
更好的调试
我很确定有多个线程需要发布的最佳答案是
这是 ZeroMQ 套接字经常被遗忘的方面;它们可以绑定和连接不止一次。请参阅手册。人们经常将 ZMQ 套接字视为 BSD 套接字,但它们并不相同!您永远不会梦想 BSD 套接字通过 TCP、inproc、IPC 等多种传输方式连接到多个服务器。但它是如此有用的功能!