我遇到了这个异常:
org.zeromq.ZMQException: Errno 4
at org.zeromq.ZMQ$Socket.mayRaise(ZMQ.java:3732) ~[jeromq-0.5.3.jar:na]
at org.zeromq.ZMQ$Socket.recv(ZMQ.java:3530) ~[jeromq-0.5.3.jar:na]
at com.forexassistant.service.zeromq.CurrencyStrengthZeroMQ.sendCurrencyStrengthRequest(CurrencyStrengthZeroMQ.java:30) ~[classes/:na]
at com.forexassistant.service.algorithmlogic.AlgorithmLogic.getCurrencyStrength(AlgorithmLogic.java:209) ~[classes/:na]
at sun.reflect.GeneratedMethodAccessor111.invoke(Unknown Source) ~[na:na]
at sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43) ~[na:1.8.0_241]
at java.lang.reflect.Method.invoke(Method.java:498) ~[na:1.8.0_241]
at org.springframework.scheduling.support.ScheduledMethodRunnable.run(ScheduledMethodRunnable.java:84) ~[spring-context-5.3.22.jar:5.3.22]
at org.springframework.scheduling.support.DelegatingErrorHandlingRunnable.run(DelegatingErrorHandlingRunnable.java:54) ~[spring-context-5.3.22.jar:5.3.22]
at java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:511) [na:1.8.0_241]
at java.util.concurrent.FutureTask.runAndReset(FutureTask.java:308) [na:1.8.0_241]
at java.util.concurrent.ScheduledThreadPoolExecutor$ScheduledFutureTask.access$301(ScheduledThreadPoolExecutor.java:180) [na:1.8.0_241]
at java.util.concurrent.ScheduledThreadPoolExecutor$ScheduledFutureTask.run(ScheduledThreadPoolExecutor.java:294) [na:1.8.0_241]
at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149) [na:1.8.0_241]
at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624) [na:1.8.0_241]
at java.lang.Thread.run(Thread.java:748) [na:1.8.0_241]
Suppressed: org.zeromq.ZMQException: Errno 4
at zmq.Ctx.terminate(Ctx.java:304) ~[jeromq-0.5.3.jar:na]
at org.zeromq.ZMQ$Context.term(ZMQ.java:671) ~[jeromq-0.5.3.jar:na]
at org.zeromq.ZContext.destroy(ZContext.java:136) ~[jeromq-0.5.3.jar:na]
at org.zeromq.ZContext.close(ZContext.java:463) ~[jeromq-0.5.3.jar:na]
at com.forexassistant.service.zeromq.CurrencyStrengthZeroMQ.sendCurrencyStrengthRequest(CurrencyStrengthZeroMQ.java:37) ~[classes/:na]
... 13 common frames omitted
我不知道为什么会发生这种情况,有问题的逻辑是与套接字通信:
public String sendCurrencyStrengthRequest() {
try (ZContext context = new ZContext()) {
ZMQ.Socket socket = context.createSocket(SocketType.PUSH);
socket.connect("tcp://localhost:32868");
ZMQ.Socket socket2 = context.createSocket(SocketType.PULL);
socket2.connect("tcp://localhost:32869");
String msg = "GET_CURRENCY_STRENGTHS";
socket.send(msg,1);
while (!Thread.currentThread().isInterrupted()) {
byte[] reply = socket2.recv(0);
if(reply!=null) {
currencyStrengths= new String(reply, ZMQ.CHARSET);
}
context.close();
break;
}
}
return currencyStrengths;
}
我这样做错了吗? sendCurrencyStrengthRequest() 在 Spring 中计划每 5 秒调用一次,还有另一个函数每 30 分钟调用一次,该函数在不同的上下文中使用不同的拉和推套接字,所有这些都会工作一段时间,然后抛出此错误,有什么想法吗?
我假设,没有显示的是与此配对的另一个程序,作为您创建和连接的 PUSH 和 PULL 套接字的其他端点。
我认为问题出在我们没有看到的其他程序中。是接收
GET_CURRENCY_STRENGTHS
消息,回复,然后立即终止,还是立即清理其上下文,方式与此代码片段大致相同?
如果是这样,问题就出在立即终止/清理上。使用ZMQ发送消息的行为是非阻塞的;您只需将消息推送到由 ZMQ 在后台启动和管理的线程管理的队列中。如果在 ZMQ send() 之后您立即终止程序或清理,这很可能意味着管理线程实际上根本没有任何时间做任何事情 - 也许它们此时甚至还没有被操作系统调度点。
程序终止的结果是操作系统清理了程序本身尚未清理的所有资源 - 套接字、分配的内存、线程等等。
这一切都在 localhost 上的事实很有趣,因为操作系统具有底层 tcp 套接字的完整可见性,并且可以告诉连接端(此程序)比其他端点打开时更多有关 tcp 套接字状态的信息另一台电脑。
如果是这种情况,在连接的另一端(在您给我们的代码片段中),发生的情况是 ZMQ 正在阻塞的系统调用中耐心等待通过 tcp 套接字传入的内容。除此之外,该套接字正在被操作系统拆除(或另一端清理)。因此,套接字读取被终止(因为套接字不再存在),并且您会得到一个丑陋的异常,如图所示。
至少,这是我的猜测。如果这是正确的,请尝试在 zmq send() 和程序终止之间的其他程序中添加一些延迟(我们没有看到),允许事情在后台发生。
代码是否成功的可变性取决于 zmq send() 后是否调度线程的随机性。应用程序和 zme 管理线程之间的通信是通过 IPC 管道(或信号量等其他东西)完成的,所有这些都为操作系统提供了重新调度线程的机会。有时它可能会说一个线程被调度,有时可能不会,这取决于主应用程序线程有多少时间片(以及一堆其他神秘的、神秘的因素)。
一般
特别是在 ZMQ 中,以及一般的其他 Actor 模型系统中,终止是必须达成一致的事情。因为它在传输中缓冲消息,所以您不知道“最后一条消息”是否已到达接收端并且可以安全终止。您需要的是在关闭之前延迟一段时间以使一切安静下来,或者一些明确的消息确认最终消息已传播到各处。