如何关联分布式Vertx系统中的日志事件

问题描述 投票:0回答:7

在vertx的多个模块中进行日志处理时,一个基本要求是我们应该能够关联单个请求的所有日志。

由于 vertx 是异步的,所以保存 logid、conversationid、eventid 的最佳位置是什么。

我们可以实施任何解决方案或模式吗?

logging distributed vert.x
7个回答
7
投票

在基于线程的系统中,当前上下文由当前线程保存,因此 MDC 或任何 ThreadLocal 都可以。

在 Vertx 等基于 Actor 的系统中,您的上下文就是消息,因此您必须为发送的每条消息添加相关 ID。

对于任何处理程序/回调,您必须将其作为方法参数传递或引用最终方法变量。

要通过事件总线发送消息,您可以将有效负载包装在 JsonObject 中,并将相关 ID 添加到包装器对象中

vertx.eventBus().send("someAddr", 
  new JsonObject().put("correlationId", "someId")
                  .put("payload", yourPayload));

或者您可以使用

DeliveryOption

将相关 ID 添加为标题
//send
vertx.eventBus().send("someAddr", "someMsg", 
            new DeliveryOptions().addHeader("correlationId", "someId"));

//receive    
vertx.eventBus().consumer("someAddr", msg -> {
        String correlationId = msg.headers().get("correlationId");
        ...
    });

还有更复杂的选项,例如在 eventbus 上使用拦截器,Emanuel Idi 用它来实现对 Vert.x 的 Zipkin 支持,https://github.com/emmanuelidi/vertx-zipkin,但我'我不确定此集成的当前状态。


6
投票

关于这个问题,令人惊讶的是缺乏好的答案,这很奇怪,因为它是如此简单。

假设您在收到请求或消息时在 MDC 上下文中设置correlationId,我发现传播它的最简单方法是使用拦截器在上下文之间传递值:

vertx.eventBus()
        .addInboundInterceptor(deliveryContext -> {
            MultiMap headers = deliveryContext.message().headers();
            if (headers.contains("correlationId")) {
                MDC.put("correlationId", headers.get("correlationId"));
                deliveryContext.next();
            }
        })
        .addOutboundInterceptor(deliveryContext -> {
            deliveryContext.message().headers().add("correlationId", MDC.get("correlationId"));
            deliveryContext.next();
        });

1
投票

如果多个模块是指在同一个 Vertx 实例上运行的多个 verticle,您应该能够使用普通的日志库,例如 SLF4J、Log4J、JUL 等。然后您可以将日志保存在您选择的目录中,例如

/var/logs/appName

但是,如果您的意思是如何关联 Vertx 的多个实例之间的日志,那么我建议您研究 GrayLog 或类似的分布式/集中式日志记录应用程序。如果您对每个请求使用唯一的 ID,则可以传递该 ID 并在日志中使用它。或者,根据您的授权系统,如果您每个请求使用唯一的令牌,您可以记录这些令牌。集中式日志系统可用于根据该信息聚合和过滤日志。


1
投票

令人惊讶的是没有人提到这个 Reactiveverse 项目Eclipse Vert.x 的上下文日志记录

来自他们的页面:

在传统的 Java 开发模型(例如 Spring 或 Java EE)中, 服务器实现每个请求一个线程的设计。作为结果, 可以将上下文数据存储在 ThreadLocal 变量中 登录时使用它。 logback 和 log4j2 都将此命名为 Mapped 诊断上下文 (MDC)。

Vert.x 实现了反应器模式。在实践中,这意味着许多 并发请求可以由同一个线程处理,从而防止 使用 ThreadLocals 来存储上下文数据。

该项目使用另一种存储上下文数据的方法 并使得在 Vert.x 应用程序中进行 MDC 日志记录成为可能。


0
投票

Clive Evans 提出的拦截器示例效果很好。我添加了一个更详细的示例,展示了它是如何工作的:

import io.vertx.core.AbstractVerticle; import io.vertx.core.DeploymentOptions; import io.vertx.core.MultiMap; import io.vertx.core.Promise; import io.vertx.core.Vertx; import org.slf4j.Logger; import org.slf4j.LoggerFactory; import org.slf4j.MDC; import java.time.Duration; import java.util.UUID; public class PublisherSubscriberInterceptor { private static final Logger LOG = LoggerFactory.getLogger(PublisherSubscriberInterceptor.class); public static final String ADRESS = "sender.address"; public static void main(String[] args) { Vertx vertx = Vertx.vertx(); createInterceptors(vertx); vertx.deployVerticle(new Publisher()); vertx.deployVerticle(new Subscriber1()); //For our example lets deploy subscriber2 2 times. vertx.deployVerticle(Subscriber2.class.getName(), new DeploymentOptions().setInstances(2)); } private static void createInterceptors(Vertx vertx) { vertx.eventBus() .addInboundInterceptor(deliveryContext -> { MultiMap headers = deliveryContext.message().headers(); if (headers.contains("myId")) { MDC.put("myId", headers.get("myId")); deliveryContext.next(); } }) .addOutboundInterceptor(deliveryContext -> { deliveryContext.message().headers().add("myId", MDC.get("myId")); deliveryContext.next(); }); } public static class Publisher extends AbstractVerticle { @Override public void start(Promise<Void> startPromise) throws Exception { startPromise.complete(); vertx.setPeriodic(Duration.ofSeconds(5).toMillis(), id -> { MDC.put("myId", UUID.randomUUID().toString()); vertx.eventBus().publish(Publish.class.getName(), "A message for all"); }); } } public static class Subscriber1 extends AbstractVerticle { private static final Logger LOG = LoggerFactory.getLogger(Subscriber1.class); @Override public void start(Promise<Void> startPromise) throws Exception { startPromise.complete(); vertx.eventBus().consumer(Publish.class.getName(), message-> { LOG.debug("Subscriber1 Received: {}", message.body()); }); } } public static class Subscriber2 extends AbstractVerticle { private static final Logger LOG = LoggerFactory.getLogger(Subscriber2.class); @Override public void start(Promise<Void> startPromise) throws Exception { startPromise.complete(); vertx.eventBus().consumer(Publish.class.getName(), message-> { LOG.debug("Subscriber2 Received: {}", message.body()); }); } } }
您可以看到发布2条消息的日志示例:

13:37:14.315 [vert.x-eventloop-thread-3][myId=a2f0584c-9d4e-48a8-a724-a24ea12f7d80] DEBUG o.s.v.l.PublishSubscribeInterceptor$Subscriber2 - Subscriber2 Received: A message for all 13:37:14.315 [vert.x-eventloop-thread-1][myId=a2f0584c-9d4e-48a8-a724-a24ea12f7d80] DEBUG o.s.v.l.PublishSubscribeInterceptor$Subscriber1 - Subscriber1 Received: A message for all 13:37:14.315 [vert.x-eventloop-thread-4][myId=a2f0584c-9d4e-48a8-a724-a24ea12f7d80] DEBUG o.s.v.l.PublishSubscribeInterceptor$Subscriber2 - Subscriber2 Received: A message for all 13:37:19.295 [vert.x-eventloop-thread-1][myId=63b5839e-3b0b-43a5-b379-92bd1466b870] DEBUG o.s.v.l.PublishSubscribeInterceptor$Subscriber1 - Subscriber1 Received: A message for all 13:37:19.295 [vert.x-eventloop-thread-3][myId=63b5839e-3b0b-43a5-b379-92bd1466b870] DEBUG o.s.v.l.PublishSubscribeInterceptor$Subscriber2 - Subscriber2 Received: A message for all 13:37:19.295 [vert.x-eventloop-thread-4][myId=63b5839e-3b0b-43a5-b379-92bd1466b870] DEBUG o.s.v.l.PublishSubscribeInterceptor$Subscriber2 - Subscriber2 Received: A message for all
    

0
投票
如果您使用 Kotlin 和 Corountines 以及

kotlinx-coroutines

 和 vertx,您可以将 MDC 上下文附加到 vertx:

MDC.put("key", value) GlobalScope.launch(routingContext.vertx().orCreateContext.dispatcher().plus(MDCContext()) {...}
    

-1
投票
使用

vertx-sync

ThreadLocal
 作为相关 ID。 (即“FiberLocal”)。对我来说效果很好。

最新问题
© www.soinside.com 2019 - 2025. All rights reserved.