我在 Symony 应用程序上有一个仪表板,其中有两个 ChartJS 图表,一个用于温度数据,另一个用于压力数据。我需要实时更新;为此,我尝试将
MercureBundle
与两个主题一起使用:['realtime-notif/temperature/{sensorId}', 'realtime-notif/pressure/{sensorId}']
。虽然主题听起来相似,但符合数据的逻辑不同,因为两个 ChartJS 不同,为此我有两个带有 AMQP 队列的 Messenger 消息处理程序,一个在主题 'realtime-notif/temperature/{sensorId}'
中发布 Mercure 更新,另一个消息处理程序类发布在'realtime-notif/pressure/{sensorId}'
。我会尽量将代码总结得简洁。
#mercure.yaml:
mercure:
hubs:
default:
url: '%env(MERCURE_URL)%'
public_url: '%env(MERCURE_PUBLIC_URL)%'
jwt:
secret: '%env(MERCURE_JWT_SECRET)%'
publish: ['realtime-notif/temperature/{sensorId}', 'realtime-notif/pressure/{sensorId}']
subscribe: ['realtime-notif/temperature/{sensorId}', 'realtime-notif/pressure/{sensorId}']
#The TemperatureMessageHandler class:
class TemperatureMessageHandler implements MessageHandlerInterface
{
private $mercureHub;
private $managerRegistry;
public function __construct(HubInterface $mercureHub, ManagerRegistry $managerRegistry)
{
$this->mercureHub = $mercureHub;
$this->managerRegistry = managerRegistry;
}
public function __invoke(TemperatureMessage $message)
{
try {
$graphId=$message->getGraphId();
$lastElapsedTime=$message->getLastElapsedTime();
$em=$this->managerRegistry->getManager();
$storedData = $em->getRepository(Temperature::class)->findLastRecordsForGraph($graphId, $lastElapsedTime);
/**
Set the data source for the temperature graph to a specific format from $toredData
**/
$formatedChartData = [];
**....**
$update = new Update(
sprintf('realtime-notif/temperature/%s', $graphId),
\json_encode($$formatedChartData),
true
);
$this->mercureHub->publish($update);
} catch (\Exception $exc) {
}
}
}
还有,
#The PressureMessageHandler class:
class PressureMessageHandler implements MessageHandlerInterface
{
private $mercureHub;
private $managerRegistry;
public function __construct(HubInterface $mercureHub, ManagerRegistry $managerRegistry)
{
$this->mercureHub = $mercureHub;
$this->managerRegistry = managerRegistry;
}
public function __invoke(PressureMessage $message)
{
try {
$graphId = $message->getGraphId();
$lastElapsedTime = $message->getLastElapsedTime();
$em = $this->managerRegistry->getManager();
$storedData = $em->getRepository(Pressure::class)->findLastRecordsForGraph($graphId, $lastElapsedTime);
/**
Set the data source for the pressure graph to a specific format from $toredData
**/
$formatedChartData = [];
**....**
$update = new Update(
sprintf('realtime-notif/pressure/%s', $graphId),
\json_encode($$formatedChartData),
true
);
$this->mercureHub->publish($update);
} catch (\Exception $exc) {
}
}
}
我的问题是,我不知道如何在客户端区分从 Mercure hub 收到的数据是来自
EventSource
对象的消息事件中的温度主题还是压力主题。
<script type="text/javascript">
$(document).ready(function () {
/**Create two graph on page ready **/
let temperatureGraphObject = createTemperatureGraph(canvasTemperaturaGraph);
let pressureGRaphObject = createPressureGraph(canvasPressureGraph);
/**
I have two function updateTemperatureGraph(temperatureGraphObject, newTemperaturaData) and updatePressureGraph(pressureGraphObject, newPresureData)
**/
/**Subscribe client to topics for data updates **/
{% set topics = ['realtime-notif/temperature/'~temperatureSensorId, 'realtime-notif/pressure/'~pressureSensorId] %}
const eventSource = new EventSource("{{ mercure(topics, { subscribe:topics})|escape('js')}}", {withCredentials: true});
eventSource.onopen = function () {
console.log('New socket connection!');
};
eventSource.onmessage = function (e) {
console.log('New data received');
var data = JSON.parse(e.data);
/**
The problem is here, how differentiate the topics data to call updateTemperaturaGraph(temperatureGraphObject, data) or updatePressureGraph(pressureGraphObject, data)
**/
};
eventSource.onerror = function () {
console.log('Socket connection lost!');
};
});
</script>
那么,如何区分主题数据来调用 updateTemperaturaGraph(TemperatureGraphObject, data) 或 updatePressureGraph(PressureGraphObject, data) 到
onmessage
事件中呢?
如果我只向客户端订阅一个主题,则收到的所有数据都将是主题图的一种,当然该图会正确更新。
解决方案是为要从 MessageHandler 相关类在 Mercure 中心发布的更新类构造函数设置 type 属性。因此,对于与温度通知相关的消息,我们将设置
type
属性 = '温度更新',对于与压力通知相关的消息,我们将设置 type
属性 = '压力更新'。
__invoke
上的TemperatureMessageHandler
功能:
public function __invoke(TemperatureMessage $message)
{
try {
$graphId = $message->getGraphId();
// Collect the data for the update
$data = [/* ... */];
$update = new Update(
sprintf('realtime-notif/temperature/%s', $graphId),
\json_encode($data),
true,
null,
'temperatureUpdate'
);
$this->mercureHub->publish($update);
} catch (\Exception $exc) {
}
}
__invoke
上的PressureMessageHandler
功能:
public function __invoke(PressureMessage $message)
{
try {
$graphId = $message->getGraphId();
// Collect the data for the update
$data = [/* ... */];
$update = new Update(
sprintf('realtime-notif/pressure/%s', $graphId),
\json_encode($data),
true,
null,
'pressureUpdate'
);
$this->mercureHub->publish($update);
} catch (\Exception $exc) {
}
}
在客户端,必须为
EventSource
对象创建两个新的EventListener,其名称等于创建的新类型。每一位新监听者都会处理 Mercure 中心发布的相关消息类型:
<script type="text/javascript">
$(document).ready(function () {
/**Create two graph on page ready **/
let temperatureGraphObject = createTemperatureGraph(canvasTemperaturaGraph);
let pressureGRaphObject = createPressureGraph(canvasPressureGraph);
/**
I have two function updateTemperatureGraph(temperatureGraphObject, newTemperaturaData) and updatePressureGraph(pressureGraphObject, newPresureData)
**/
/**Subscribe client to topics for data updates **/
{% set topics = ['realtime-notif/temperature/'~temperatureSensorId, 'realtime-notif/pressure/'~pressureSensorId] %}
const eventSource = new EventSource("{{ mercure(topics, { subscribe:topics})|escape('js')}}", {withCredentials: true});
eventSource.onopen = function () {
console.log('New socket connection!');
};
eventSource.addEventListener("temperaturaUpdate", function (e) {
let parsedData = null;
try {
parsedData = JSON.parse(e.data);
} catch (error) {
console.log(error);
}
if (parsedData) {
updateTemperatureGraph(temperatureGraphObject, parsedData);
}
}, false);
eventSource.addEventListener("pressureUpdate", function (e) {
let parsedData = null;
try {
parsedData = JSON.parse(e.data);
} catch (error) {
console.log(error);
}
if (parsedData) {
updatePressureGraph(pressureGraphObject, parsedData);
}
}, false);
eventSource.onerror = function () {
console.log('Socket connection lost!');
};
});
</script>
这样,Mercure hub 发布分类消息,每个 EventListener 都会按照消息到达订阅客户端的顺序负责处理相应的消息,而不管它订阅的主题是什么。