在vertx的多个模块中进行日志处理时,一个基本要求是我们应该能够关联单个请求的所有日志。
由于 vertx 是异步的,所以保存 logid、conversationid、eventid 的最佳位置是什么。
我们可以实施任何解决方案或模式吗?
在基于线程的系统中,当前上下文由当前线程保存,因此 MDC 或任何 ThreadLocal 都可以。
在 Vertx 等基于 Actor 的系统中,您的上下文就是消息,因此您必须为发送的每条消息添加相关 ID。
对于任何处理程序/回调,您必须将其作为方法参数传递或引用最终方法变量。
要通过事件总线发送消息,您可以将有效负载包装在 JsonObject 中,并将相关 ID 添加到包装器对象中
vertx.eventBus().send("someAddr",
new JsonObject().put("correlationId", "someId")
.put("payload", yourPayload));
或者您可以使用
DeliveryOption
将相关 ID 添加为标题
//send
vertx.eventBus().send("someAddr", "someMsg",
new DeliveryOptions().addHeader("correlationId", "someId"));
//receive
vertx.eventBus().consumer("someAddr", msg -> {
String correlationId = msg.headers().get("correlationId");
...
});
还有更复杂的选项,例如在 eventbus 上使用拦截器,Emanuel Idi 用它来实现对 Vert.x 的 Zipkin 支持,https://github.com/emmanuelidi/vertx-zipkin,但我'我不确定此集成的当前状态。
关于这个问题,令人惊讶的是缺乏好的答案,这很奇怪,因为它是如此简单。
假设您在收到请求或消息时在 MDC 上下文中设置correlationId,我发现传播它的最简单方法是使用拦截器在上下文之间传递值:
vertx.eventBus()
.addInboundInterceptor(deliveryContext -> {
MultiMap headers = deliveryContext.message().headers();
if (headers.contains("correlationId")) {
MDC.put("correlationId", headers.get("correlationId"));
deliveryContext.next();
}
})
.addOutboundInterceptor(deliveryContext -> {
deliveryContext.message().headers().add("correlationId", MDC.get("correlationId"));
deliveryContext.next();
});
如果多个模块是指在同一个 Vertx 实例上运行的多个 verticle,您应该能够使用普通的日志库,例如 SLF4J、Log4J、JUL 等。然后您可以将日志保存在您选择的目录中,例如
/var/logs/appName
。
但是,如果您的意思是如何关联 Vertx 的多个实例之间的日志,那么我建议您研究 GrayLog 或类似的分布式/集中式日志记录应用程序。如果您对每个请求使用唯一的 ID,则可以传递该 ID 并在日志中使用它。或者,根据您的授权系统,如果您每个请求使用唯一的令牌,您可以记录这些令牌。集中式日志系统可用于根据该信息聚合和过滤日志。
令人惊讶的是没有人提到这个 Reactiveverse 项目Eclipse Vert.x 的上下文日志记录
来自他们的页面:
在传统的 Java 开发模型(例如 Spring 或 Java EE)中, 服务器实现每个请求一个线程的设计。作为结果, 可以将上下文数据存储在 ThreadLocal 变量中 登录时使用它。 logback 和 log4j2 都将此命名为 Mapped 诊断上下文 (MDC)。
Vert.x 实现了反应器模式。在实践中,这意味着许多 并发请求可以由同一个线程处理,从而防止 使用 ThreadLocals 来存储上下文数据。
该项目使用另一种存储上下文数据的方法 并使得在 Vert.x 应用程序中进行 MDC 日志记录成为可能。
Clive Evans 提出的拦截器示例效果很好。我添加了一个更详细的示例,展示了它是如何工作的:
import io.vertx.core.AbstractVerticle;
import io.vertx.core.DeploymentOptions;
import io.vertx.core.MultiMap;
import io.vertx.core.Promise;
import io.vertx.core.Vertx;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.slf4j.MDC;
import java.time.Duration;
import java.util.UUID;
public class PublisherSubscriberInterceptor {
private static final Logger LOG = LoggerFactory.getLogger(PublisherSubscriberInterceptor.class);
public static final String ADRESS = "sender.address";
public static void main(String[] args) {
Vertx vertx = Vertx.vertx();
createInterceptors(vertx);
vertx.deployVerticle(new Publisher());
vertx.deployVerticle(new Subscriber1());
//For our example lets deploy subscriber2 2 times.
vertx.deployVerticle(Subscriber2.class.getName(), new DeploymentOptions().setInstances(2));
}
private static void createInterceptors(Vertx vertx) {
vertx.eventBus()
.addInboundInterceptor(deliveryContext -> {
MultiMap headers = deliveryContext.message().headers();
if (headers.contains("myId")) {
MDC.put("myId", headers.get("myId"));
deliveryContext.next();
}
})
.addOutboundInterceptor(deliveryContext -> {
deliveryContext.message().headers().add("myId", MDC.get("myId"));
deliveryContext.next();
});
}
public static class Publisher extends AbstractVerticle {
@Override
public void start(Promise<Void> startPromise) throws Exception {
startPromise.complete();
vertx.setPeriodic(Duration.ofSeconds(5).toMillis(), id -> {
MDC.put("myId", UUID.randomUUID().toString());
vertx.eventBus().publish(Publish.class.getName(), "A message for all");
});
}
}
public static class Subscriber1 extends AbstractVerticle {
private static final Logger LOG = LoggerFactory.getLogger(Subscriber1.class);
@Override
public void start(Promise<Void> startPromise) throws Exception {
startPromise.complete();
vertx.eventBus().consumer(Publish.class.getName(), message-> {
LOG.debug("Subscriber1 Received: {}", message.body());
});
}
}
public static class Subscriber2 extends AbstractVerticle {
private static final Logger LOG = LoggerFactory.getLogger(Subscriber2.class);
@Override
public void start(Promise<Void> startPromise) throws Exception {
startPromise.complete();
vertx.eventBus().consumer(Publish.class.getName(), message-> {
LOG.debug("Subscriber2 Received: {}", message.body());
});
}
}
}
您可以看到发布2条消息的日志示例:
13:37:14.315 [vert.x-eventloop-thread-3][myId=a2f0584c-9d4e-48a8-a724-a24ea12f7d80] DEBUG o.s.v.l.PublishSubscribeInterceptor$Subscriber2 - Subscriber2 Received: A message for all
13:37:14.315 [vert.x-eventloop-thread-1][myId=a2f0584c-9d4e-48a8-a724-a24ea12f7d80] DEBUG o.s.v.l.PublishSubscribeInterceptor$Subscriber1 - Subscriber1 Received: A message for all
13:37:14.315 [vert.x-eventloop-thread-4][myId=a2f0584c-9d4e-48a8-a724-a24ea12f7d80] DEBUG o.s.v.l.PublishSubscribeInterceptor$Subscriber2 - Subscriber2 Received: A message for all
13:37:19.295 [vert.x-eventloop-thread-1][myId=63b5839e-3b0b-43a5-b379-92bd1466b870] DEBUG o.s.v.l.PublishSubscribeInterceptor$Subscriber1 - Subscriber1 Received: A message for all
13:37:19.295 [vert.x-eventloop-thread-3][myId=63b5839e-3b0b-43a5-b379-92bd1466b870] DEBUG o.s.v.l.PublishSubscribeInterceptor$Subscriber2 - Subscriber2 Received: A message for all
13:37:19.295 [vert.x-eventloop-thread-4][myId=63b5839e-3b0b-43a5-b379-92bd1466b870] DEBUG o.s.v.l.PublishSubscribeInterceptor$Subscriber2 - Subscriber2 Received: A message for all
kotlinx-coroutines
和 vertx,您可以将 MDC 上下文附加到 vertx:
MDC.put("key", value)
GlobalScope.launch(routingContext.vertx().orCreateContext.dispatcher().plus(MDCContext()) {...}
vertx-sync
和
ThreadLocal
作为相关 ID。 (即“FiberLocal”)。对我来说效果很好。