有一个问题让我困惑了很长时间。那是当我使用netty循环writeAndFlush将DatagramPacket发送到我的udp服务器时,大多数消息都丢失了。但是,如果我暂停线程一段时间,所有消息都将被传递。像这样:
public static void main(String[] args) throws InterruptedException {
int count = 3000;
AtomicInteger integer = new AtomicInteger(count);
AtomicInteger countInteger = new AtomicInteger();
Bootstrap bootstrap = new Bootstrap();
EventLoopGroup group = new NioEventLoopGroup();
ChannelFuture channelFuture = bootstrap.group(group)
.channel(NioDatagramChannel.class)
.option(ChannelOption.SO_BROADCAST, true)
.option(ChannelOption.SO_REUSEADDR, true)
.option(ChannelOption.SO_RCVBUF, 1024 * 1024)
.option(ChannelOption.SO_SNDBUF, 1024 * 1024)
.option(ChannelOption.CONNECT_TIMEOUT_MILLIS, 3000)
.handler(new ChannelInitializer<Channel>() {
@Override
protected void initChannel(Channel ch) throws Exception {
ch.pipeline().addLast(new WakeupMessageEncoder())
.addLast(new WakeupMessageReplyDecoder())
.addLast(new SimpleChannelInboundHandler<WakeupMessageReply>() {
@Override
protected void channelRead0(ChannelHandlerContext ctx, WakeupMessageReply reply) throws Exception {
countInteger.getAndIncrement();
System.out.println(reply);
}
})
;
}
}).bind(0).sync();
Channel channel = channelFuture.channel();
long s = System.currentTimeMillis();
for (int i = 0; i < count; i++) {
//TimeUnit.NANOSECONDS.sleep(1);
WakeupMessage message = new WakeupMessage(UUID.randomUUID().toString(), "192.168.0.3:12000", "89860918700328360182", "test message" + i);
channel.writeAndFlush(message).addListener(f -> integer.decrementAndGet());
}
while (true) {
if (integer.get() <= 0) {
break;
}
}
try {
channel.closeFuture();
System.out.println("done:" + (System.currentTimeMillis() - s) + "ms");
System.out.println(countInteger);
}finally {
group.shutdownGracefully();
}
}
我发送了3000条消息,但只收到了一点......像这样:1744 messages但是如果我像这样睡觉线程:
for (int i = 0; i < count; i++) {
TimeUnit.NANOSECONDS.sleep(1);
WakeupMessage message = new WakeupMessage(UUID.randomUUID().toString(), "192.168.0.3:12000", "89860918700328360182", "test message" + i);
channel.writeAndFlush(message).addListener(f -> integer.decrementAndGet());
}
我将从udp服务器收到所有重播。
那么为什么我必须睡觉线程???
数据报包(UDP)没有保证传送,如果发送速度太快,可能会丢弃。有很多可能的原因,在您的情况下,您最有可能填满发送缓冲区或接收缓冲区或两者。 This文章详细介绍。
另一个选择是当你写和冲洗时sync()
听众,即
channel.writeAndFlush(message).addListener(f -> integer.decrementAndGet()).sync();
在我的测试中,这得到了所有3000个响应,比睡眠时间更短。对于笔记本电脑上的本地UDP echo服务器:
但是,正如@ewramner所指出的那样,两种选择都不能保证你得到所有3000个回复。