我正在使用 Jer0mq 服务器套接字模型,特别是路由器经销商模式,因为我想验证客户端的身份。我的问题是,当我使用服务器绑定套接字并且客户端尝试连接的循环情况时,我注意到 500 毫秒的随机激增。从可忽略的延迟到高达 500 毫秒的延迟,为什么会发生这种情况?我怎样才能避免这种延迟?这可能吗?我究竟做错了什么?这是我的简单代码来测试它。
package sockets;
import org.zeromq.SocketType;
import org.zeromq.ZContext;
import org.zeromq.ZMQ;
import java.nio.charset.StandardCharsets;
import static sockets.rtdealer.NOFLAGS;
public class ZmqStack {
public static void main(String[] args) throws InterruptedException {
Thread brokerThread = new Thread(() -> {
while (true) {
try (ZContext context = new ZContext()) {
ZMQ.Socket broker = context.createSocket(SocketType.ROUTER);
broker.bind("tcp://*:5555");
String identity = new String(broker.recv());
String data1 = new String(broker.recv());
String identity2 = new String(broker.recv());
String data2 = new String(broker.recv());
System.out.println("Identity: " + identity + " Data: " + data1);
System.out.println("Identity: " + identity2 + " Data: " + data2);
broker.sendMore(identity.getBytes(ZMQ.CHARSET));
broker.send("xxx1".getBytes(StandardCharsets.UTF_8));
broker.sendMore(identity2.getBytes(ZMQ.CHARSET));
broker.send("xxx12");
broker.close();
context.destroy();
}
}
});
brokerThread.setName("broker");
Thread workerThread = new Thread(() -> {
while (true) {
try (ZContext context = new ZContext()) {
ZMQ.Socket worker = context.createSocket(SocketType.DEALER);
String identity = "identity1";
worker.setIdentity(identity.getBytes(ZMQ.CHARSET));
worker.connect("tcp://localhost:5555");
worker.send("Hello1".getBytes(StandardCharsets.UTF_8));
String workload = new String(worker.recv(NOFLAGS));
System.out.println(Thread.currentThread().getName() + " - Received " + workload);
}
}
});
workerThread.setName("worker");
Thread workerThread1 = new Thread(() -> {
while (true) {
try (ZContext context = new ZContext()) {
ZMQ.Socket worker = context.createSocket(SocketType.DEALER);
worker.setIdentity("Identity2".getBytes(ZMQ.CHARSET));
worker.connect("tcp://localhost:5555");
long start = System.currentTimeMillis();
worker.send("Hello2 " + Thread.currentThread().getName());
String workload = new String(worker.recv(NOFLAGS));
long finish = System.currentTimeMillis();
long timeElapsed = finish - start;
System.out.println(Thread.currentThread().getName() + " - Received " + workload);
System.out.println("Elapsed Time: " + timeElapsed);
}
}
});
workerThread1.setName("worker1");
workerThread1.start();
workerThread.start();
brokerThread.start();
}
}
Q1 :“(...)为什么会发生这种情况?”
有关实践经验范围的简介,最好事先阅读:
自 2.1 起就使用 ZeroMQ
重新设计了一个包装器,以连接装备不良、闭源且在幕后爬行的、已编译的编程语言到本机ZeroMQ API
基于ZeroMQ信令和消息元平面设计和维护低延迟(读取微秒)分布式计算系统
这里应该有一些根深蒂固的知识
这主要是由于重复支付组装成本并立即处置(根本不提重复引发的、昂贵且缓慢的垃圾收集)整个基于
Context
的线程本地引擎。有关较小规模延迟影响的更多详细信息,如下。
每一个实例化都是昂贵的(无论是在 TimeDOMAIN ~ CPU 方面还是 SpaceDOMAIN ~ MEM-I/O 方面,以及缓存延迟屏蔽方面的能力丧失),因此,如果没有因某些未提及的原因而禁止,所有ZeroMQ 基础设施实例化最好只在启动时发生一次,并在处理的整个生命周期中保持分配、配置、使用、重用和操作,无论是一个、十个还是一百个 Context() 引擎和它们的子实例化 Socket() 原型及其各自的 TransportClass 特定的 AccessPoint 引擎。
因此,最大和最便宜的改进将来自于消除无限循环之外的这些重复的 ZeroMQ 基础设施重新实例化。通过尽可能避免 ALAP 操作的一次性 ASAP 创建和性能提升配置 - 例如将 ZeroMQ 帧有效负载重新定义为单帧(也避免了阻塞盲多帧舞蹈带来的所有延迟,您的系统将不会被任何第一个编写不当的消息阻塞到自己造成的死锁 - 谁会想要这个,我们会吗?)字节映射字段。
这是低延迟、相当稳健的分布式设计的起点。
我的购物清单将继续(在上面的提示已经重构之后):Q2 :“(...)我怎样才能避免这种延迟?”
-- 使用更便宜的 TransportClasses - 避免 TCP/IP 组件/打包/缓冲/解码/disass 的无意义成本(所有这些都由 O/S 堆栈完成,因此完全超出了 ZeroMQ 零拷贝设计 Zen 的本机能力) of-Zero 哲学)对于所有本地主机线程间传输都有意义(最好使用
inproc:
或
ipc:
或
tipc:
发送和交付花费更少的 TransportClasses)-- 使用任何与性能相关的
Context()
-实例参数来配置、增强和重新平衡您的应用程序对高性能低延迟信令和消息传递的需求。我们可能会增加内部
Context
-I/O 线程的数量,我们甚至可以将它们绑定到硬件上,并保留一些 CPU 核心(仅)用于我们的信令和消息传递元平面,从而确实减少甚至本地主机操作系统的“协作调度”噪音也来自我们预期的高性能低延迟生产力范围。--类似地使用任何
.setsockopt()
配置(在所有分布式系统节点上可用的情况下,请注意,某些包装器确实抄袭了原始本机 API,因此测试两个版本差异,通常是为了主题过滤的成本,退化的主题过滤仅在使用多帧有效负载的私有假设下工作,本地主机或远程节点中的其他语言包装器不需要事先或永远知道......以及这些可能的发送方和接收方 ZeroMQ 包装器副作用(在配备 jeromq 包装器的代码中起作用的代码不需要与其他包装器以相同的方式工作 - 许多这样失败的例子)。这些经验法则值得开始。
祝你好运并随时通知我们