将 open62541 集成到现有的 libev 事件循环中

问题描述 投票:0回答:2

我有一个现有的应用程序,它广泛使用 libev 来实现其事件循环。我现在想添加 OPC UA 服务器功能,但不确定如何最好地将 open62541 事件循环集成到 libev 中。

我想到以下可能性:

  1. 从 libev 事件循环中调用
    UA_Server_run_iterate
    waitInternal
    时间为 0。这意味着服务器永远无法休眠(在
    ev_idle
    中轮询 open62541),或者来自 OPC UA 客户端的请求将遇到最多 50ms 的额外延迟(open62541 的默认最大等待时间)。
  2. 修补 open62541 以允许服务器网络层检索当前使用的文件描述符(服务器套接字和连接)。这将允许为这些文件描述符添加 libev 事件,从而仅在必要时轮询
    UA_Server_run_iterate
  3. 实现使用 libev 的自定义服务器网络层。这似乎意味着相当多的代码重复...是否有任何用于实现自定义网络层的示例/教程?
  4. 在单独的线程中运行 open62541 事件循环。我真的真的很想避免这种情况,因为像 libev 这样的事件系统的全部目的就是避免与异步操作相关的问题。例如,来自 open62541 的所有回调都必须与主 libev 线程同步。

就复杂性和性能而言,您认为上述哪个选项是“最佳”?

您能想到上面未列出的其他选项吗?


也发布在 open62541 邮件列表上。

libev open62541
2个回答
2
投票

我推荐选项 1 或选项 2。(免责声明:我是 open62541 的核心开发人员之一)

  1. 从 libev 事件循环内调用 UA_Server_run_iterate,waitInternal 时间为 0。这要么意味着服务器永远无法休眠(在 ev_idle 中轮询 open62541),要么来自 OPC UA 客户端的请求将经历高达50ms(open62541默认最大等待时间)。

目前这可能是您可以选择的最佳选择。您可以按照固定间隔(例如每 10 毫秒)调用 UA_Server_run_iterate,具体取决于应用程序的要求。所有其他选项都需要修补 open62541,并且目前内部 API 正在进行很多工作,因为当前正在添加许多功能。也请看我最后的笔记!

  1. 修补 open62541 以允许服务器网络层检索当前使用的文件描述符(服务器套接字和连接)。这将允许为这些文件描述符添加 libev 事件,从而仅在必要时轮询 UA_Server_run_iterate。

您可能不需要修补 open62541,因为如果网络层是 TCP 层,您可以通过服务器配置的网络层获取套接字的文件描述符:

server->config.networkLayers[i].serverSockets[j]
。 由于可能存在不同类型的网络层,这也可能引入大量的维护工作。例如。 pubsub 使用 UDP,其中套接字存储在
config->pubsubTransportLayers

内部
  1. 实现使用 libev 的自定义服务器网络层。这似乎意味着相当多的代码重复...是否有任何用于实现自定义网络层的示例/教程?

您可以使用插件接口实现自己的网络层,即编写自己的网络层(https://github.com/open62541/open62541/blob/master/arch/ua_network_tcp.c)。由于这是使用内部 API,因此您需要进行大量的维护工作和补丁修复。 -> 工作太多

  1. 在单独的线程中运行 open62541 事件循环。我真的真的很想避免这种情况,因为像 libev 这样的事件系统的全部目的就是避免与异步操作相关的问题。例如,来自 open62541 的所有回调都必须与主 libev 线程同步。

我想说这不是一个好的选择,因为你引入了异步回调。


一般注意事项:

我们目前有一份内部草案和草图来重新设计网络接口,特别是为所有套接字 FD 提供一个选择。目前我们对多个 FD 有多个选择。

另请查看以下 PR,我们已经开始进行返工: https://github.com/open62541/open62541/pull/2271


0
投票

对于单线程Linux应用程序,使用open62541 epoll。使用 arch/eventloop_posix_epoll.c 中的 UA_EventLoopPOSIX_registerFD 函数添加您自己的文件描述符。 epoll_open62541.h:

#ifndef EPOLL_OPEN62541_H_INCLUDED
#define EPOLL_OPEN62541_H_INCLUDED

#ifdef MY_DEBUG
#include </home/user/Raduga/src/OPC_UA/open62541-1.4.8_2files/open62541.h>
#else
#include <open62541/server.h>
#include <open62541/server_config_default.h>
#endif // MY_DEBUG

// from open62541
#define UA_SOCKET int
#define UA_FD UA_SOCKET

#define ZIP_ENTRY(type)                         \
struct {                                        \
    struct type *left;                          \
    struct type *right;                         \
}

#define UA_FDEVENT_IN 1

struct UA_EventSource;
struct UA_RegisteredFD;

typedef void (*UA_FDCallback)(UA_EventSource *es, UA_RegisteredFD *rfd, short event);

struct UA_RegisteredFD
{
    UA_DelayedCallback dc; /* Used for async closing. Must be the first member
                            * because the rfd is freed by the delayed callback
                            * mechanism. */

    ZIP_ENTRY(UA_RegisteredFD) zipPointers; /* Register FD in the EventSource */
    UA_FD fd;
    short listenEvents; /* UA_FDEVENT_IN | UA_FDEVENT_OUT*/

    UA_EventSource *es; /* Backpointer to the EventSource */
    UA_FDCallback eventSourceCB;
};
struct System; 
int epoll_open62541_add_fd(UA_RegisteredFD* rfd,UA_FD fd,UA_FDCallback ua_cb,System*sys);

#endif // EPOLL_OPEN62541_H_INCLUDED

epoll_open62541.cpp:

#include <iostream>
#include <unistd.h>

#include "epoll_open62541.h"

using namespace std;

extern UA_ServerConfig *config;

struct UA_EventLoopPOSIX;

extern "C" UA_StatusCode
UA_EventLoopPOSIX_registerFD(UA_EventLoopPOSIX *el, UA_RegisteredFD *rfd);

int epoll_open62541_add_fd(UA_RegisteredFD *rfd, UA_FD fd, UA_FDCallback ua_cb, System *sys)
{
    UA_EventLoop *el = config->eventLoop;
    rfd->dc.callback = NULL;
    rfd->fd = fd;
    rfd->listenEvents = UA_FDEVENT_IN;
    rfd->es = (UA_EventSource *)sys; // Data to callback
    rfd->eventSourceCB = ua_cb;

    /* Register in the EventLoop */
    UA_StatusCode res = UA_EventLoopPOSIX_registerFD((UA_EventLoopPOSIX *)el, rfd);
    if (res != UA_STATUSCODE_GOOD)
    {
        printf("TCP %u\t| Error registering the socket", (unsigned)fd);
        return res;
    }
    return 0;
}

使用示例:

UA_RegisteredFD rfd;

struct DataCB
{
    int a;
} dataCB;

void ua_epoll_callback(UA_EventSource *es, UA_RegisteredFD *rfd, short event)
{
    // Сообщение таймера надо прочитать
    uint64_t exp;
    int s = read(rfd->fd, &exp, sizeof(uint64_t));
    if (s != sizeof(uint64_t))
    {
        printf("timer_1sec_fd error");
        //exit_my(1);
    }
    cout<<"ua_epoll_callback"<<endl;
}

int test_epoll_open62541_222(UA_Server *server,UA_ServerConfig *config)
{
    timer_1sec_fd = timerfd_create(CLOCK_REALTIME, 0);
    timer_start_fd(timer_1sec_fd, 1 * 1000, 2 * 1000);

    UA_StatusCode res = epoll_open62541_add_fd(&rfd,timer_1sec_fd,ua_epoll_callback,(System*)&dataCB);
    if(res != UA_STATUSCODE_GOOD)
    {
        printf("TCP %u\t| Error epoll_open62541_add_fd",
               (unsigned)timer_1sec_fd);
        return res;
    }
    return 0;
}
© www.soinside.com 2019 - 2024. All rights reserved.