我在 Laravel 中一起配置 Kafka 和 Redis 时遇到问题。
我能够运行Redis来使用内存数据库。所以 Redis 工作正常。
$redis = app()->make('redis');
return $redis->get('name1'); // it runs fine returning value of "name1"
我能够在 Windows 系统中配置 Kafka,以便在终端中生成和消费消息。
成功将Rdkafka配置为php客户端库和扩展。
我在 Laravel for Kafka 中使用的包是
"superbalist/laravel-pubsub": "^3.0", "superbalist/php-pubsub-kafka": "^2.0"
LINK
下面提到的代码是订阅和消费消息
$pubsub = app('pubsub');
$pubsub->subscribe('test1', function ($message) {
var_dump($message); // the code just stuck here
});
浏览器只是继续加载并且不会停止。我试图查看供应商内部的代码,但响应是无法理解的。
我的ENV按照包裹的要求
REDIS_HOST=localhost
REDIS_PASSWORD=null
REDIS_PORT=6379
PUBSUB_CONNECTION=redis
KAFKA_BROKERS=localhost
GOOGLE_CLOUD_PROJECT_ID=your-project-id-here
GOOGLE_CLOUD_KEY_FILE=path/to/your/gcloud-key.json
HTTP_PUBSUB_URI=null
HTTP_PUBSUB_SUBSCRIBE_CONNECTION=redis
如果 Redis 本地服务器和客户端关闭,我会收到错误
从服务器读取行时出错 [tcp://localhost:9092]
如果有人能够在 laravel 中配置它们,请告诉我。
对 subscribe() 方法的调用是阻塞的,这意味着脚本永远不会完成,这就是为什么你的浏览器永远不会停止加载的原因。
调用 subscribe() 的 PHP 脚本需要从 CLI 而不是浏览器运行,因为该代码消耗 Kafka 消息并且需要始终处于活动状态。如果你想向 Kafka 发布消息,你需要使用publish()方法。
来自文档:
// consume messages
// note: this is a blocking call
$adapter->subscribe('my_channel', function ($message) {
var_dump($message);
});
// publish messages
$adapter->publish('my_channel', 'HELLO WORLD');
$adapter->publish('my_channel', ['hello' => 'world']);
$adapter->publish('my_channel', 1);
$adapter->publish('my_channel', false);
您从此 GitHub link 引用的脚本使用 while(true) 循环,该循环旨在无限期运行。对于旨在持续侦听传入消息的 Kafka 消费者来说,这种行为是预期的且正常的。
直接在浏览器中运行此脚本并不理想,因为它永远不会停止,从而导致潜在的超时或资源问题。更好的方法是创建自定义 Laravel 命令来处理 Kafka 消费流程。具体方法如下:
namespace App\Console\Commands;
use Illuminate\Console\Command;
class KafkaConsume extends Command
{
protected $signature = 'kafka:consume';
protected $description = 'Consume Kafka messages';
public function handle()
{
$pubsub = app('pubsub');
$pubsub->subscribe('test1', function ($message) {
var_dump($message); // The code processes each message here
});
}
}
php artisan kafka:consume
这将开始使用消息,您将直接在终端中看到输出。
通过使用命令,您可以更好地管理使用者,处理自定义逻辑,例如调度事件
(YourCustomEvent::dispatch($message))
、日志记录或收到消息时需要执行的任何其他任务。这种方法可以使您的 Kafka 消费流程井井有条,适合长时间运行的任务,而不会干扰您的 Web 应用程序的正常运行。