如何在 Karafka 服务器中实现 Rails 日志的跟踪 ID?

问题描述 投票:0回答:1

我正在开发一个 Rails 应用程序,其中我将 Karafka gem 与 Rails 服务器一起使用。为了增强可跟踪性,我通过将以下代码片段添加到每个 config/environment/{env}.rb 文件中,成功将 Rails 配置为包含传入 HTTP 请求的跟踪 ID:

config.log_tags = [ :request_id ]

但是,我在为 Karafka 服务器内的日志实现类似的跟踪 ID 机制时遇到了挑战。是否有中间件解决方案或其他方法允许我生成 UUID 并将其用作 Karafka 服务器中所有日志的跟踪 ID?我在 Kafka 消费者中以及处理传入的 HTTP 请求时重用了相同的代码和日志。任何见解或代码示例将不胜感激。谢谢!

以下是我的

karafka.rb
文件:

class KarafkaApp < Karafka::App
  kafka_config = Settings.kafka
  max_payload_size = 7_000_000 # Setting max payload size as 7MB

  setup do |config|
    config.kafka = {
      'bootstrap.servers': ENV['KAFKA_BROKERS_SCRAM'],
      'max.poll.interval.ms': 1200000
    }
    config.client_id = kafka_config['client_id']
    config.concurrency = 4
    # Recreate consumers with each batch. This will allow Rails code reload to work in the
    # development mode. Otherwise Karafka process would not be aware of code changes
    config.consumer_persistence = !Rails.env.development?

    config.producer = ::WaterDrop::Producer.new do |producer_config|
      # Use all the settings already defined for consumer by default
      producer_config.kafka = ::Karafka::Setup::AttributesMap.producer(config.kafka.dup)

      # Alter things you want to alter
      producer_config.max_payload_size = max_payload_size
      producer_config.kafka[:'message.max.bytes'] = max_payload_size
    end
  end

  routes.draw do
    topic some_topic_1.to_sym do
      consumer SomeTopic1Consumer
    end

    ...
  end
end

我正在使用 Rails v7.0.5 和 Karafka v2.1.11

ruby-on-rails rubygems ruby-on-rails-5 karafka
1个回答
0
投票

这里是 Karafka 作者。感谢您使用我的 OSS。 Karafka 和 Puma/Rails 本质上是不同的。使用 Rails,您可以轻松地向原子请求分配单个跟踪 ID 并在以后使用它。

这与卡拉夫卡不一样。 Karafka 是一个事件流/批处理框架,在不同的假设下运行。每条消息没有单一的“入口点”。您可以实现类似于社区支持的 DataDog 集成中已有的跟踪:

https://karafka.io/docs/Monitoring-and-Logging/#tracing-consumers-using-datadog-logger-listener

它从“工作”层面的角度运作,向 DD 报告大量信息。

是否有中间件解决方案或其他方法可以让我生成 UUID 并将其用作 Karafka 服务器中所有日志的跟踪 ID?

准确地说,答案是“永远不会有”,因为 Karafka 日志的一部分与多个后台线程相关,并且它们以任何方式都与传入数据无关或(直接)影响。还有一些用于调度、内务管理等的线程

说到追踪,我想你的目标也是追踪调度,对吧?如果是这样,WaterDrop 有一个用于跟踪调度的标签 API:https://karafka.io/docs/WaterDrop-Usage/#labeling

Kaafka 拥有为消息的消费和生产提供数据源跟踪所需的所有 API,但操作范围必须精确且缩小。

最好的开始是研究 Karafka 监控 API https://karafka.io/docs/Monitoring-and-Logging/ 并编写自己的侦听器,该侦听器可以在处理之前注入所需的上下文,以便您的记录器可以重新 -使用此数据。

附注我记得,有一些用户正在 Karafka Slack 上进行此类工作:https://slack.karafka.io/。那里可能值得检查。

© www.soinside.com 2019 - 2024. All rights reserved.