我正在尝试创建一个 cutome 命令,该命令应该使用来自 activemq 队列的消息。 我正在使用 php-amqplib,并且创建了自定义连接器和 ActiveMQServiceProvider。 我在运行时遇到此错误
sail artisan horizon:consume-activemq
PhpAmqpLib\Exception\AMQPInvalidFrameException
Invalid frame type 65
...
7 app/Queue/Connectors/ActiveMQConnector.php:15
PhpAmqpLib\Connection\AMQPStreamConnection::__construct()
8 app/Console/Commands/ConsumeActiveMQMessages.php:29
App\Queue\Connectors\ActiveMQConnector::connect()
我检查了docker日志,似乎php-amqplib不支持AMQP v1.0
Connection attempt from non AMQP v1.0 client. AMQP,0,0,9,1
2024-02-22 13:24:44 WARN | Transport Connection to: tcp://192.168.65.1:32999 failed: org.apache.activemq.transport.amqp.AmqpProtocolException: Connection from the client using unsupported AMQP attempted
我是否理解错误或配置错误?
这些是我的配置:
queue.php
'connections' => [
...
'activemq' => [
'driver' => 'activemq',
'host' => env('ACTIVEMQ_HOST', 'localhost'),
'port' => env('ACTIVEMQ_PORT', 61613),
'username' => env('ACTIVEMQ_USERNAME', 'guest'),
'password' => env('ACTIVEMQ_PASSWORD', 'guest'),
'queue' => env('ACTIVEMQ_QUEUE', ''),
'exchange_name' => env('ACTIVEMQ_EXCHANGE_NAME', ''),
],
mylocal.env
ACTIVEMQ_HOST=host.docker.internal
ACTIVEMQ_PORT=5672
ACTIVEMQ_USER=admin
ACTIVEMQ_PASSWORD=admin
ACTIVEMQ_QUEUE=activemqTest
ActiveMQServiceProvider.php
<?php
namespace App\Providers;
use App\Queue\Connectors\ActiveMQConnector;
use Illuminate\Queue\QueueManager;
use Illuminate\Support\ServiceProvider;
class ActiveMQServiceProvider extends ServiceProvider
{
/**
* Register services.
*/
public function register(): void
{
}
/**
* Bootstrap services.
*/
public function boot(): void
{
$this->app->make(QueueManager::class)->addConnector('activemq', function () {
return new ActiveMQConnector();
});
}
}
ActiveMQConnector.php
<?php
namespace App\Queue\Connectors;
use Illuminate\Queue\Connectors\ConnectorInterface;
use PhpAmqpLib\Connection\AMQPStreamConnection;
class ActiveMQConnector implements ConnectorInterface
{
/**
* @throws \Exception
*/
public function connect(array $config)
{
return new AMQPStreamConnection(
$config['host'],
$config['port'],
$config['username'],
$config['password'],
$config['vhost']
);
}
}
ConsumeActiveMQMessages.php
<?php
namespace App\Console\Commands;
use App\Queue\Connectors\ActiveMQConnector;
use Illuminate\Console\Command;
use PhpAmqpLib\Message\AMQPMessage;
class ConsumeActiveMQMessages extends Command
{
protected $signature = 'horizon:consume-activemq';
protected $description = 'Consume messages from ActiveMQ and process them within Horizon';
/**
* @throws \Exception
*/
public function handle()
{
$connector = new ActiveMQConnector();
$config = [
'host' => config('queue.connections.activemq.host'),
'port' => config('queue.connections.activemq.port'),
'username' => config('queue.connections.activemq.username'),
'password' =>config('queue.connections.activemq.password'),
'vhost' => config('queue.connections.activemq.vhost') !== null ?config('queue.connections.activemq.vhost') : '/',
];
$connection = $connector->connect($config);
$channel = $connection->channel();
$callback = function (AMQPMessage $message) {
$this->processMessage($message);
};
$channel->basic_consume(config('queue.connections.activemq.queue'), '', false, true, false, false, $callback);
while ($channel->is_consuming()) {
$channel->wait();
}
$channel->close();
$connection->close();
}
protected function processMessage(AMQPMessage $message)
{
$this->info('Received message: ' . $message->getBody());
}
}
Laravel:V10.10 PHP V8.1
我尝试更改主机和端口,但没有任何改变。
ActiveMQ(经典和 Artemis)采用 AMQP 1.0 ISO 标准 AMQP 规范,而 php-amqplib 似乎仅支持 0.9.1 草案标准,因此您无法使用该客户端连接到 ActiveMQ。如果您需要使用该运行时,您需要找到一个支持 AMQP 1.0 的 php 客户端。