zeromq 相关问题

ZeroMQ(ZMQ,0MQ,ØMQ)是一种高性能,异步,传输级的不可知消息库,旨在用于可伸缩,分布式和/或并发应用程序。它提供了一个消息队列,但与面向消息的中间件不同,ZeroMQ系统可以在没有专用消息代理的情况下运行。许可证:LGPL具有静态链接异常

ZeroMQ:推/拉时延迟非常高

总结,我发现使用zmq.PUSH / PULL套接字的接收器的延迟很高。详细信息我是ZeroMQ的新手,正在尝试从发送方向接收方发送几兆字节(我的最终目标是发送...

回答 1 投票 0

如何在MFC应用程序中启动ZeroMQ线程?

(Visual Studio 2017,C ++ 17,cppzmq 4.6.0)我是MFC编程的新手。我只是想在一个线程上创建一个ZeroMQ答复服务器,该服务器将随时准备接收用户请求并运行某些...

回答 1 投票 0

在这种情况下我应该使用哪个ZMQ模式?

我是ZMQ和多线程设计的新手,所以我希望对我的场景的设计模式有所了解:我有一个充当中间件的服务器,它将从...接收外部数据。

回答 1 投票 0


zeromq:发布到多个接收者,但其中一个除外

我有N个发布者订阅者。该消息是一个简单的布尔值。消息传递模式与常规PubSub略有不同:当一个订户收到一个true时,其他所有...

回答 1 投票 0

ZeroMQ多主题过滤用法中一些丢失的消息

我有一个使用JeroMQ 0.5.2用Java编写的ZeroMQ订户应用程序。在代码中,我订阅了多个这样的主题:套接字订阅者= context.createSocket(SocketType.SUB);订户...。

回答 1 投票 0

pyzmq:套接字因“ ZMQError:无法在当前状态下完成操作”而中断]

对于ZMQ而言,我还很陌生,并且试图构建非常基本的消息传递系统代码完全是基于此处的示例,出于某种原因,在最后一条消息到达前端之后,由于某种原因而有些曲折...

回答 1 投票 0

zmq为什么将多个消息打包到一个TCP帧?

我使用带有ZMQ_DONTWAIT标志的ZMQ_PUSH套接字发送消息。每个消息间隔300毫秒(因此,我严重怀疑我的线程调度是否如此)。但是不时有几个(最多7个...

回答 1 投票 1

ZeroMQ如何使用C 我是ZMQ的新手。我有多个发布商和一个客户。寻求建议以最佳方式实施它。当前,它利用单个客户端和服务器的回复-请求模式;必须将其扩展到多个发布者和一个订户。 此应用程序将在不支持C11的QNX系统上运行,因此zmq::mutlipart_t没有帮助。 void TransportLayer::Init() { socket.bind( "tcp://*:5555" ); } void TransportLayer::Receive() { while ( true ) { zmq::message_t request; string protoBuf; socket.recv( &request ); uint16_t id = *( (uint16_t*)request.data() ); protoBuf = std::string( static_cast<char*>( request.data() + sizeof( uint16_t ) ), request.size() - sizeof( uint16_t ) ); InterfaceLayer::getInstance()->ParseProtoBufTable( protoBuf ); } Send(); usleep( 1 ); } void TransportLayer::Send() { zmq::message_t reply( 1 ); memcpy( reply.data(), "#", 1 ); socket.send( reply ); } 这是我编写的代码,最初旨在仅侦听一个客户端,现在我必须扩展它以侦听多个客户端。 我尝试使用zmq::multipart_t,但这需要C11支持,但我们使用的QNX版本不支持C11。 我尝试实施建议的解决方案。我创建了2个发布商,它们连接到相同的静态位置。 观察: I]执行顺序:1.启动订户2.启动Publisher1(它仅发布一个数据值) 订户未收到此数据。 II]修改了Publisher1以在while循环中发送相同的数据执行顺序:1.启动订户2.启动Publisher1 3.启动Publsiher2。 现在我看到订户正在从两个发布者处接收数据。 这表明存在数据丢失的可能性。 我如何确保绝对不会丢失数据? 这是我的源代码: 发布者2: dummyFrontEnd::dummyFrontEnd():context(1),socket(context,ZMQ_PUB) { } void dummyFrontEnd::Init() { socket.connect("tcp://127.0.0.1:5555"); cout << "Connecting .... " << endl; } void dummyFrontEnd::SendData() { while ( std::getline(file, line_str) ) { std::stringstream ss(line_str); std::string direction; double tdiff; int i, _1939, pgn, priority, source, length, data[8]; char J, p, _0, dash, d; ss >> tdiff >> i >> J >> _1939 >> pgn >> p >> priority >> _0 >> source >> dash >> direction >> d >> length >> data[0] >> data[1] >> data[2] >> data[3] >> data[4] >> data[5] >> data[6] >> data[7]; timestamp += tdiff; while ( gcl_get_time_ms() - start_time < uint64_t(timestamp * 1000.0) - first_time ) { usleep(1); } if (arguments.verbose) { std::cout << timestamp << " " << i << " " << J << " " << _1939 << " " << pgn << " " << p << " " << priority << " " << _0 << " " << source << " " << dash << " " << direction << " " << d << " " << length << " " << data[0] << " " << data[1] << " " << data[2] << " " << data[3] << " " << data[4] << " " << data[5] << " " << data[6] << " " << data[7] << std::endl; } uint64_t timestamp_ms = (uint64_t)(timestamp * 1000.0); protoTable.add_columnvalues(uint64ToString(timestamp_ms)); /* timestamp */ protoTable.add_columnvalues(intToString(pgn)); /* PGN */ protoTable.add_columnvalues(intToString(priority)); /* Priority */ protoTable.add_columnvalues(intToString(source)); /* Source */ protoTable.add_columnvalues(direction); /* Direction */ protoTable.add_columnvalues(intToString(length)); /* Length */ protoTable.add_columnvalues(intToString(data[0])); /* data1 */ protoTable.add_columnvalues(intToString(data[1])); /* data2 */ protoTable.add_columnvalues(intToString(data[2])); /* data3 */ protoTable.add_columnvalues(intToString(data[3])); /* data4 */ protoTable.add_columnvalues(intToString(data[4])); /* data5 */ protoTable.add_columnvalues(intToString(data[5])); /* data6 */ protoTable.add_columnvalues(intToString(data[6])); /* data7 */ protoTable.add_columnvalues(intToString(data[7])); /* data8 */ zmq::message_t create_values(protoTable.ByteSizeLong()+sizeof(uint16_t)); *((uint16_t*)create_values.data()) = TABLEMSG_ID; // ID protoTable.SerializeToArray(create_values.data()+sizeof(uint16_t), protoTable.ByteSizeLong()); socket.send(create_values); protoTable.clear_columnvalues(); usleep(1); } } 发布者1: dummyFrontEnd::dummyFrontEnd():context(1),socket(context,ZMQ_PUB) { } void dummyFrontEnd::Init() { socket.connect("tcp://127.0.0.1:5555"); cout << "Connecting .... " << endl; } void dummyFrontEnd::SendData() { cout << "In SendData" << endl; while(1) { canlogreq canLogObj = canlogreq::default_instance(); canLogObj.set_fromhours(11); canLogObj.set_fromminutes(7); canLogObj.set_fromseconds(2); canLogObj.set_fromday(16); canLogObj.set_frommonth(5); canLogObj.set_fromyear(2020); canLogObj.set_tohours(12); canLogObj.set_tominutes(7); canLogObj.set_toseconds(4); canLogObj.set_today(17); canLogObj.set_tomonth(5); canLogObj.set_toyear(2020); zmq::message_t logsnippetmsg(canLogObj.ByteSizeLong() + sizeof(uint16_t)); *((uint16_t*)logsnippetmsg.data()) = 20; canLogObj.SerializeToArray(logsnippetmsg.data()+sizeof(uint16_t), canLogObj.ByteSizeLong()); socket.send(logsnippetmsg); usleep(1); canLogObj.clear_fromhours(); canLogObj.clear_fromminutes(); canLogObj.clear_fromseconds(); canLogObj.clear_fromday(); canLogObj.clear_frommonth(); canLogObj.clear_fromyear(); canLogObj.clear_tohours(); canLogObj.clear_tominutes(); canLogObj.clear_toseconds(); canLogObj.clear_today(); canLogObj.clear_tomonth(); canLogObj.clear_toyear(); } } 订户: TransportLayer::TransportLayer():context(1),socket(context,ZMQ_SUB){ } void TransportLayer::Init() { socket.bind("tcp://*:5555"); socket.setsockopt(ZMQ_SUBSCRIBE, "", 0); } void TransportLayer::Receive() { cout << "TransportLayer::Receive " << " I am in server " << endl; static int count = 1; // Producer thread. while ( true ){ zmq::message_t request; string protoBuf; socket.recv(&request); uint16_t id = *((uint16_t*)request.data()); cout << "TransportLayer : " << "request.data: " << request.data() << endl; cout << "TransportLayer : count " << count << endl; count = count + 1; cout << "TransportLayer : request.data.size " << request.size() << endl; protoBuf = std::string(static_cast<char*>(request.data() + sizeof(uint16_t)), request.size() - sizeof(uint16_t)); cout << "ProtoBuf : " << protoBuf << endl; InterfaceLayer *interfaceLayObj = InterfaceLayer::getInstance(); switch(id) { case TABLEMSG_ID: cout << "Canlyser" << endl; interfaceLayObj->ParseProtoBufTable(protoBuf); break; case LOGSNIPPET_ID: cout << "LogSnip" << endl; interfaceLayObj->ParseProtoBufLogSnippet(protoBuf); interfaceLayObj->logsnippetSignal(); // publish the signal break; default: break; } usleep(1); } } 我是ZMQ的新手。我有多个发布商和一个客户。寻求建议以最佳方式实施它。当前,它利用单个客户端和服务器的回复-请求模式; ...

我是ZMQ的新手。我有多个发布商和一个客户。寻求建议以最佳方式实施它。当前,它利用单个客户端和服务器的回复-请求模式; ...

回答 1 投票 1

为什么ZeroMQ服务器没有收到来自客户端的任何请求?

我有一个使用MQL4(类似于c ++的语言)设置的ZeroMQ服务器,以及一个使用Python设置的ZeroMQ客户端。由PUSH / PULL-sockets组成的连接效果很好,但是当我尝试使用REQ -...] >>

回答 1 投票 0


在Fargate中使用ZeroMQ Golang

我正在尝试在awsvpc模式下在Fargate上运行的ECS中使用ZeroMQ。我有2个不同的服务,每个服务都运行自己的任务并启用了服务发现功能。我在...

回答 1 投票 0

用户上的ZMQ接收不遵守设置的超时时间

我正在尝试将ZMQ通信集成到测试用例中,但是即使我为操作设置了超时500ms,zmq :: socket_t :: recv也存在无限期阻塞的问题。 ...

回答 1 投票 0

FFmpeg CLI-使用ZMQ交换RTMP源(zmqsend)

我的设置如下:带有RTMP模块的Nginx多个RTMP流对,每个对都有一个主RTMP端点和一个备份RTMP端点(因此,流传输到rtmp:// localhost / main / $ STREAM_NAME和rtmp:// ...] >

回答 1 投票 0

ZMQ(Jeromq)-套接字发送文档中缺少参数

我目前正在尝试修改简单的hwclient / hwserver示例,以将具有预定义大小的字节数组发送到服务器。我已经看到基本上在以下功能中提供了功能...

回答 1 投票 0

如何在ZeroMQ python中正确声明套接字类型?

我有一个奇怪的问题。我为不和谐的bot开发了一个Web界面,并使用ZeroMQ在bot的进程和fastAPI进程之间进行通信。我的程序是结构化的,因此fastAPI ...

回答 1 投票 0

如何以异步方式执行ZeroMQ PUSH / PULL原型?

根据我的要求,我想在端口中发起PULL,并希望从其他端口接收到我的PULL端口。对于PULL端口,它异步侦听,并且在收到消息时,它会...

回答 2 投票 0

zeromq python to c ++ Pub / Sub

我下面有可以正常运行的python代码-python pub / sub zeromq我可以通过c ++订阅此python服务器吗?我添加了c ++客户端代码,但该代码也无法在Python中运行:def ...

回答 1 投票 0

ZeroMQ Connector仍成功发送数据,但未能从MT4服务器获得响应

我是这个社区的新手。我正在关注有关如何将python与metatrader 4进行接口的7个视频教程,并且按照本视频中的指示在iPython内核上运行它们没有问题。No no ...

回答 1 投票 2

使用Python在Raspberry Pi上的代理

我正在使用Python在Raspberry Pi上编程代理,并且用JavaScript编程了sub.js和pub.js。 pub和sub都经过了测试,可以正常工作,但是代理代码没有反应。 ...

回答 1 投票 0

© www.soinside.com 2019 - 2024. All rights reserved.