如果客户端订阅了两个主题,如何区分来自 Mercure hub 的一个主题或另一个主题的数据

问题描述 投票:0回答:1

我在 Symony 应用程序上有一个仪表板,其中有两个 ChartJS 图表,一个用于温度数据,另一个用于压力数据。我需要实时更新;为此,我尝试将

MercureBundle
与两个主题一起使用:
['realtime-notif/temperature/{sensorId}', 'realtime-notif/pressure/{sensorId}']
。虽然主题听起来相似,但符合数据的逻辑不同,因为两个 ChartJS 不同,为此我有两个带有 AMQP 队列的 Messenger 消息处理程序,一个在主题
'realtime-notif/temperature/{sensorId}'
中发布 Mercure 更新,另一个消息处理程序类发布在
'realtime-notif/pressure/{sensorId}'
。我会尽量将代码总结得简洁。

#mercure.yaml:
mercure:
    hubs:
        default:
            url: '%env(MERCURE_URL)%'
            public_url: '%env(MERCURE_PUBLIC_URL)%'
            jwt:
                secret: '%env(MERCURE_JWT_SECRET)%'
                publish: ['realtime-notif/temperature/{sensorId}', 'realtime-notif/pressure/{sensorId}']
                subscribe: ['realtime-notif/temperature/{sensorId}', 'realtime-notif/pressure/{sensorId}']

#The TemperatureMessageHandler class: 
class TemperatureMessageHandler implements MessageHandlerInterface
{

    private $mercureHub;
    private $managerRegistry;

    public function __construct(HubInterface $mercureHub, ManagerRegistry $managerRegistry)
    {
        $this->mercureHub = $mercureHub;
        $this->managerRegistry = managerRegistry;
    }

    public function __invoke(TemperatureMessage $message)
    {
        try {
            $graphId=$message->getGraphId();
            $lastElapsedTime=$message->getLastElapsedTime();
            
            $em=$this->managerRegistry->getManager();

            $storedData = $em->getRepository(Temperature::class)->findLastRecordsForGraph($graphId, $lastElapsedTime);
            
            /**
             Set the data source for the temperature graph to a specific format from $toredData
            **/
            $formatedChartData = [];
                **....**

            $update = new Update(
                    sprintf('realtime-notif/temperature/%s', $graphId),
                    \json_encode($$formatedChartData),
                    true
            );

            $this->mercureHub->publish($update);
        } catch (\Exception $exc) {
            
        }
    }
}

还有,

#The PressureMessageHandler class:
 class PressureMessageHandler implements MessageHandlerInterface
{

    private $mercureHub;
    private $managerRegistry;

    public function __construct(HubInterface $mercureHub, ManagerRegistry $managerRegistry)
    {
        $this->mercureHub = $mercureHub;
        $this->managerRegistry = managerRegistry;
    }

    public function __invoke(PressureMessage $message)
    {
        try {
            $graphId = $message->getGraphId();
            $lastElapsedTime = $message->getLastElapsedTime();
            
            $em = $this->managerRegistry->getManager();

            $storedData = $em->getRepository(Pressure::class)->findLastRecordsForGraph($graphId, $lastElapsedTime);
            
            /**
             Set the data source for the pressure graph to a specific format from $toredData
            **/
            $formatedChartData = [];
                **....**
            

            $update = new Update(
                    sprintf('realtime-notif/pressure/%s', $graphId),
                    \json_encode($$formatedChartData),
                    true
            );

            $this->mercureHub->publish($update);
        } catch (\Exception $exc) {
            
        }
    }
}

我的问题是,我不知道如何在客户端区分从 Mercure hub 收到的数据是来自

EventSource
对象的消息事件中的温度主题还是压力主题。

<script type="text/javascript">
                $(document).ready(function () {
                    /**Create two graph on page ready **/
                    let temperatureGraphObject = createTemperatureGraph(canvasTemperaturaGraph);
                    let pressureGRaphObject = createPressureGraph(canvasPressureGraph);
                    
                    /** 
                    I have two function updateTemperatureGraph(temperatureGraphObject, newTemperaturaData) and updatePressureGraph(pressureGraphObject, newPresureData)
                    **/
                    
                    /**Subscribe client to topics for data updates **/
                    {% set topics = ['realtime-notif/temperature/'~temperatureSensorId, 'realtime-notif/pressure/'~pressureSensorId] %}

                    const eventSource = new EventSource("{{ mercure(topics, { subscribe:topics})|escape('js')}}", {withCredentials: true});

                    eventSource.onopen = function () {
                        console.log('New socket connection!');
                    };

                    eventSource.onmessage = function (e) {
                        console.log('New data received');
                        var data = JSON.parse(e.data);
                        
                        /** 
                        The problem is here, how differentiate the topics data to call updateTemperaturaGraph(temperatureGraphObject, data) or  updatePressureGraph(pressureGraphObject, data)
                        **/                     
                    };

                    eventSource.onerror = function () {
                        console.log('Socket connection lost!');
                    };
                });
            </script>

那么,如何区分主题数据来调用 updateTemperaturaGraph(TemperatureGraphObject, data) 或 updatePressureGraph(PressureGraphObject, data) 到

onmessage
事件中呢?

如果我只向客户端订阅一个主题,则收到的所有数据都将是主题图的一种,当然该图会正确更新。

symfony chart.js mercure
1个回答
1
投票

解决方案是为要从 MessageHandler 相关类在 Mercure 中心发布的更新类构造函数设置 type 属性。因此,对于与温度通知相关的消息,我们将设置

type
属性 = '温度更新',对于与压力通知相关的消息,我们将设置
type
属性 = '压力更新'。

__invoke
上的
TemperatureMessageHandler
功能:

public function __invoke(TemperatureMessage $message)
{
    try {
        $graphId = $message->getGraphId();
        
        // Collect the data for the update
        $data = [/* ... */];

        $update = new Update(
            sprintf('realtime-notif/temperature/%s', $graphId),
            \json_encode($data),
            true,
            null,
            'temperatureUpdate'
        );

        $this->mercureHub->publish($update);
    } catch (\Exception $exc) {
        
    }
}

__invoke
上的
PressureMessageHandler
功能:

public function __invoke(PressureMessage $message)
{
    try {
        $graphId = $message->getGraphId();

        // Collect the data for the update
        $data = [/* ... */];

        $update = new Update(
            sprintf('realtime-notif/pressure/%s', $graphId),
            \json_encode($data),
            true,
            null,
            'pressureUpdate'
        );

        $this->mercureHub->publish($update);
    } catch (\Exception $exc) {
        
    }
}

在客户端,必须为

EventSource
对象创建两个新的EventListener,其名称等于创建的新类型。每一位新监听者都会处理 Mercure 中心发布的相关消息类型:

<script type="text/javascript">
    $(document).ready(function () {
        /**Create two graph on page ready **/
        let temperatureGraphObject = createTemperatureGraph(canvasTemperaturaGraph);
        let pressureGRaphObject = createPressureGraph(canvasPressureGraph);
        
        /** 
        I have two function updateTemperatureGraph(temperatureGraphObject, newTemperaturaData) and updatePressureGraph(pressureGraphObject, newPresureData)
        **/
        
        /**Subscribe client to topics for data updates **/
        {% set topics = ['realtime-notif/temperature/'~temperatureSensorId, 'realtime-notif/pressure/'~pressureSensorId] %}

        const eventSource = new EventSource("{{ mercure(topics, { subscribe:topics})|escape('js')}}", {withCredentials: true});

        eventSource.onopen = function () {
            console.log('New socket connection!');
        };

        eventSource.addEventListener("temperaturaUpdate", function (e) {
            let parsedData = null;
            try {
                parsedData = JSON.parse(e.data);
            } catch (error) {
                console.log(error);
            }
            
            if (parsedData) {
                updateTemperatureGraph(temperatureGraphObject, parsedData);
            }
        }, false);
        
        eventSource.addEventListener("pressureUpdate", function (e) {
            let parsedData = null;
            try {
                parsedData = JSON.parse(e.data);
            } catch (error) {
                console.log(error);
            }
            
            if (parsedData) {
                updatePressureGraph(pressureGraphObject, parsedData);
            }
        }, false);

        eventSource.onerror = function () {
            console.log('Socket connection lost!');
        };
    });
</script>

这样,Mercure hub 发布分类消息,每个 EventListener 都会按照消息到达订阅客户端的顺序负责处理相应的消息,而不管它订阅的主题是什么。

© www.soinside.com 2019 - 2024. All rights reserved.