当一起运行测试时,KafkaServer 测试会抛出地址已在使用中

问题描述 投票:0回答:1

我有一个 Spring 应用程序,它启动本地 kafka 服务器,我想测试它。一项测试检查服务器是否正在运行,另一项测试检查 kafka 主题中是否有消息。

但是,当我一起运行(使用 mvn 测试)时,它们失败并出现以下错误

java.net.BindException: Address already in use
java.net.BindException: Address already in use
    at java.base/sun.nio.ch.Net.bind(Net.java:565)
    at java.base/sun.nio.ch.ServerSocketChannelImpl.netBind(ServerSocketChannelImpl.java:344)
    at java.base/sun.nio.ch.ServerSocketChannelImpl.bind(ServerSocketChannelImpl.java:301)
    at java.base/sun.nio.ch.ServerSocketAdaptor.bind(ServerSocketAdaptor.java:89)
    at java.base/sun.nio.ch.ServerSocketAdaptor.bind(ServerSocketAdaptor.java:81)
    at org.apache.zookeeper.server.NIOServerCnxnFactory.configure(NIOServerCnxnFactory.java:662)
    at org.apache.zookeeper.server.ZooKeeperServerMain.runFromConfig(ZooKeeperServerMain.java:160)
    at com.ericsson.gnp.sim.runners.ZooKeeperStarter.lambda$startZooKeeper$0(ZooKeeperStarter.java:24)

使用以下测试设置

@SpringBootTest
@EmbeddedKafka(partitions = 1, topics = {"crud-id.event"})
class KafkaInMemoryTest {

    @Autowired
    private EmbeddedKafkaBroker embeddedKafkaBroker;

    private KafkaConsumer<String, String> consumer;
    private KafkaMessageController controller;
    private String topic = "crud-id.event";

    @BeforeEach
    void init() {
        Map<String, Object> consumerProps = KafkaTestUtils.consumerProps("my-group", "true", embeddedKafkaBroker);
        consumerProps.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class);
        consumerProps.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class);
        DefaultKafkaConsumerFactory<String, String> consumerFactory = new DefaultKafkaConsumerFactory<>(consumerProps);
        consumer = (KafkaConsumer<String, String>) consumerFactory.createConsumer();
        consumer.subscribe(Collections.singleton(topic));

        controller = new KafkaMessageController(consumer);
    }

    @Test
    public void shouldReturnReceivedKafkaMessageViaRestAPI() {
        //given
        Map<String, Object> producerProps = KafkaTestUtils.producerProps(embeddedKafkaBroker);
        producerProps.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, StringSerializer.class);
        producerProps.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, StringSerializer.class);
        org.springframework.kafka.core.DefaultKafkaProducerFactory<String, String> producerFactory = new DefaultKafkaProducerFactory<>(producerProps);
        var producer = producerFactory.createProducer();

        String publishedMessage = "";
        producer.send(new ProducerRecord<>(topic, publishedMessage));
        producer.flush();

        //when
        String receivedMessage = controller.getMessageFromQueue(topic);

        //then
        assertThat(receivedMessage).isEqualTo(publishedMessage);
    }
  ...
}
public class KafkaServerTest {

    private KafkaRunner kafkaRunner = new KafkaRunner();

    @Test
    void shouldStartKafkaServer() throws Exception {
        //given
        String host = "localhost";
        int kafkaPort = 19092;
        long timeoutMs = Duration.ofSeconds(2).toMillis();

        //when
        kafkaRunner.run();

        //then
        boolean isKafkaRunning = isKafkaReachable(host, kafkaPort, timeoutMs);
        Assertions.assertTrue(isKafkaRunning, "Kafka server should be running on " + host + ":" + kafkaPort);
    }
    ...

}

以及kafka服务器的配置

public void startKafka(String zookeeperConnect) {
        Properties props = new Properties();
        props.put("zookeeper.connect", zookeeperConnect);
        props.put("broker.id", "0");
        props.put("log.dirs", System.getProperty("java.io.tmpdir") + "/kafka-logs");
        props.put("replication.factor", "1");
        props.put("listeners", "PLAINTEXT://localhost:19092");
        props.put("offsets.topic.replication.factor", "1");

        KafkaConfig kafkaConfig = new KafkaConfig(props);
        kafkaServer = new KafkaServer(kafkaConfig, Time.SYSTEM, Option.empty(), false);
        kafkaServer.startup();
    }
public void run(String... args) throws Exception {
        ZooKeeperStarter zooKeeper = new ZooKeeperStarter();
        KafkaStarter kafka = new KafkaStarter();

        zooKeeper.startZooKeeper(12181);
        kafka.startKafka("localhost:12181");
    }
private ZooKeeperServerMain zooKeeperServer;

    public void startZooKeeper(int port) throws Exception {
        File dataDir = new File(System.getProperty("java.io.tmpdir"), "zookeeper");
        org.apache.zookeeper.server.ZooKeeperServer server = new org.apache.zookeeper.server.ZooKeeperServer();
        server.setTxnLogFactory(new FileTxnSnapLog(dataDir, dataDir));
        server.setTickTime(2000);

        zooKeeperServer = new ZooKeeperServerMain();
        ServerConfig config = new ServerConfig();
        config.parse(new String[] { String.valueOf(port), dataDir.getAbsolutePath() });

        new Thread(() -> {
            try {
                zooKeeperServer.runFromConfig(config);
            } catch (Exception e) {
                e.printStackTrace();
            }
        }).start();
    }

抛出异常的测试是KafkaServerTest。我尝试在运行时分配不同的端口,但没有成功。

java spring spring-kafka
1个回答
0
投票

@EmbeddedKafka
的巨大好处之一是它可以使用随机端口启动代理。

另一个优化工具是启动全局嵌入式Kafka。

请参阅文档中的更多信息。我们还讨论了如何从代码中仅启动一次 Kafka 代理:https://docs.spring.io/spring-kafka/reference/testing.html#same-broker-multiple-tests

© www.soinside.com 2019 - 2024. All rights reserved.