我正在构建一个微服务,它在消费者模式下使用 Netty4 组件 (http://camel.apache.org/netty4.html) 具有 Apache Camel 路由。因此,在我的微服务中,我正在构建的这条路由将通过 TCP 连接接收消息。为此,我这样做了:
@Override
public void configure() throws Exception {
this.from("netty4:tcp://localhost:7000?textline=true&encoding=utf8")
.process(new Processor() {
@Override
public void process(final Exchange exchange) throws Exception {
log.info("[Processor] - Incoming Message -> {}", exchange.getIn().getBody(String.class));
}
}).to("bean:messageService");
}
嗯,我正常收到消息。为了测试,我使用 telnet:
$ telnet localhost 7000
Trying 127.0.0.1...
Connected to localhost.
Escape character is '^]'.
TheMessage
问题是当我想将消息发送回该路由中建立的同一 TCP 通道时。在同步模式下,我可以使用 Exchange 对象轻松地做到这一点。 但是,在异步模式下,我不知道如何向生产者发送回消息。
接收并发送消息的 Spring 服务是这样的:
@Service
public class MessageService {
private static final Logger log = LoggerFactory.getLogger(MessageService.class);
private List<String> messageStore = new LinkedList<>();
public void sendToTCP(final String message) {
log.info("[Service] - Sending Message over TCP Channel --> {}", message);
}
@Handler
public void receiveFromTCP(final Exchange exchange) {
final String messageFromTcp = exchange.getIn().getBody(String.class);
log.info("[Service] - Message Received from TCP Channel --> {}", messageFromTcp);
this.messageStore.add(messageFromTcp);
}
public List<String> getReceivedMessages() {
return messageStore;
}
}
在简历中,我需要在这个方法中添加一些代码,类似这样:
public void sendToTCP(final String message) {
log.info("[Service] - Sending Message over TCP Channel --> {}", message);
// Send message to producer here
camelContext.createProducerTemplate.send....
}
我无法创建另一条通往生产者的路由,因为我不知道生产者 IP。我确实需要使用生产者和我的应用程序之间已经建立的 TCP 通道。通信需要通过 TCP,其他工具(例如队列)不是一个选择。
我在 GitHub 上上传了一个示例项目:https://github.com/rgiaviti/so-camel-netty4-tcp
我正在使用:
根据@vikram-palakurthi 的评论找到了解决方案。
我使用了Netty4属性reuseChannel。解决方案代码与此接近:
private static final Logger log = LoggerFactory.getLogger(MessageService.class);
private List<String> messageStore = new LinkedList<>();
private Channel openedChannel;
public void sendToTCP(final String message) {
log.info("[Service] - Sending Message over TCP Channel --> {}", message);
log.info("[Service] - Channel is Active? {}", this.openedChannel.isActive());
log.info("[Service] - Channel is Open? {}", this.openedChannel.isOpen());
log.info("[Service] - Channel is Writeble? {}", this.openedChannel.isWritable());
this.openedChannel.write(message);
this.openedChannel.flush();
}
@Handler
public void receiveFromTCP(final Exchange exchange) {
this.openedChannel = exchange.getProperty(NettyConstants.NETTY_CHANNEL, Channel.class);
final String messageFromTcp = exchange.getIn().getBody(String.class);
log.info("[Service] - Message Received from TCP Channel --> {}", messageFromTcp);
this.messageStore.add(messageFromTcp);
}
Camel Routes、处理器...的完整解决方案可以在这里找到:https://github.com/rgiaviti/so-camel-netty4-tcp/tree/solution
但是,我们需要知道这是一个简单的解决方案。开发者仍然需要处理多个通道、生产者关闭通道、检查通道......
我也开发了 Vertx 解决方案。 https://github.com/rgiaviti/so-camel-netty4-tcp/tree/vertx-solution
我用骆驼方式实现得更近。 可以向就绪端口 7000 发送消息。
package com.github.so.services;
import java.util.LinkedList;
import java.util.List;
import org.apache.camel.ProducerTemplate;
import org.apache.camel.Exchange;
import org.apache.camel.Handler;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.springframework.stereotype.Service;
import org.springframework.beans.factory.annotation.Autowired;
@Service
public class MessageService {
private static final Logger log = LoggerFactory.getLogger(MessageService.class);
private List<String> messageStore = new LinkedList<>();
@Autowired
private ProducerTemplate producerTemplate;
public void sendToTCP(final String message) {
log.info("[Service] - Sending Message over TCP Channel --> {}", message);
producerTemplate.sendBody("netty4:tcp://localhost:7000?textline=true&encoding=utf8", message);
}
@Handler
public void receiveFromTCP(final Exchange exchange) {
final String messageFromTcp = exchange.getIn().getBody(String.class);
log.info("[Service] - Message Received from TCP Channel --> {}", messageFromTcp);
this.messageStore.add(messageFromTcp);
}
public List<String> getReceivedMessages() {
return messageStore;
}
}