我需要在 PHP 中与托管在节点上的 websocket 服务器建立通信。第一部分是建立连接以侦听 ws 服务器的响应。第二个是能够发送我自己的命令并跟踪第一部分中侦听器中的响应。我正在使用 symfony 项目和ratchet/pawl 包。
我创建了这样一个客户端来建立连接并发送消息:
<?php
namespace App\WebSocket;
use Ratchet\Client\WebSocket;
use Ratchet\Client\Connector;
use React\EventLoop\Loop;
use Psr\Log\LoggerInterface;
class WebsocketClient
{
private $connection;
private $loop;
private $logger;
public function __construct(LoggerInterface $logger)
{
$this->loop = Loop::get();
$this->logger = $logger;
}
public function connect(string $url)
{
$connector = new Connector($this->loop);
$connector($url)->then(function (WebSocket $conn) {
$this->connection = $conn;
dump('Connected to WebSocket');
$conn->on('message', function ($msg) {
dump("Received message: {$msg}");
$this->onMessage($msg);
});
$conn->on('close', function ($code = null, $reason = null) {
dump('Connection closed ({$code} - {$reason})');
});
}, function (\Exception $e) {
dump("Could not connect: {$e->getMessage()}");
});
}
public function send(string $message)
{
if ($this->connection) {
$this->connection->send($message);
} else {
echo "No connection established.\n";
}
}
private function onMessage($message)
{
$this->logger->info("Received message: {$message}");
}
public function run()
{
$this->loop->run();
}
public function close()
{
if ($this->connection) {
$this->connection->close();
}
}
}
我还创建了一个 symfony 命令来设置 ws 服务器监听:
<?php
namespace App\Command;
use App\WebSocket\WebsocketClient;
use Symfony\Component\Console\Command\Command;
use Symfony\Component\Console\Input\InputInterface;
use Symfony\Component\Console\Output\OutputInterface;
use Symfony\Component\Console\Style\SymfonyStyle;
class WebsocketListenCommand extends Command
{
protected static $defaultName = 'app:listen-websocket';
private $webSocketClient;
public function __construct(WebsocketClient $webSocketClient)
{
parent::__construct();
$this->webSocketClient = $webSocketClient;
}
protected function configure(): void
{
$this
->setDescription('Listens for and processes messages from WebSocket');
}
protected function execute(InputInterface $input, OutputInterface $output): int
{
$io = new SymfonyStyle($input, $output);
$io->success('Listening for WebSocket messages...');
$this->webSocketClient->connect('ws://x.x.x.x:y');
$this->webSocketClient->run();
return Command::SUCCESS;
}
}
运行上述命令后,一切正常。它实时接收回复。
然后我想向 websocket 服务器发送消息并跟踪响应(同时保持我之前设置的监听)。它在代码中的任何位置注入服务并尝试发送消息:
$this->webSocketClient->send('{"some json, doesnt matter"}');
尝试使用
send
方法时,它看不到任何连接,无法发送消息。我尝试在发送过程中自己设置连接,但随后我的侦听器停止响应。问题出在哪里?也许这里需要使用 ZeroMQ 或其他一些解决方案?我将感谢您的帮助。
它将服务注入代码中的任何位置
您将为每个实例化获得一个不同的 PHP 对象。因此,除非您使用新对象显式调用
connect()
,否则根本无法建立连接,并且 send()
不可能工作。
如果您在新对象中显式调用
connect()
,那么您将获得不同的连接。这可能适合也可能不适合您的要求,这取决于您想要做什么。
因此,如果您需要接收和发送,最好的选择可能是让一个 PHP 客户端脚本充当长期监听器。它只会坐下来等待接收消息。然后,当您需要发送消息时,您将创建一个新的客户端对象,建立一个新连接,发送消息,然后立即断开连接。
或者,您也可以简单地使用 HTTP POST 来发送消息。
此外,您也可以通过 Mercure 使用 SSE。
如果这些都不可能实现,那么您就遇到了一个不小的问题需要解决。构建要发送的消息所需的任何数据都需要以某种方式传递到长期监听器脚本中,以便它可以使用已打开的同一连接执行发送。这可以通过轻量级消息队列来完成,因此您的无限侦听器循环将侦听 websocket 消息,然后检查队列中是否有新的挂起发送,并发送在那里找到的任何内容。但这是相当迂回的,您应该能够使用其中一种更简单的方法。