我有一个 Spring 应用程序,它启动本地 kafka 服务器,我想测试它。一项测试检查服务器是否正在运行,另一项测试检查 kafka 主题中是否有消息。
但是,当我一起运行(使用 mvn 测试)时,它们失败并出现以下错误
java.net.BindException: Address already in use
java.net.BindException: Address already in use
at java.base/sun.nio.ch.Net.bind(Net.java:565)
at java.base/sun.nio.ch.ServerSocketChannelImpl.netBind(ServerSocketChannelImpl.java:344)
at java.base/sun.nio.ch.ServerSocketChannelImpl.bind(ServerSocketChannelImpl.java:301)
at java.base/sun.nio.ch.ServerSocketAdaptor.bind(ServerSocketAdaptor.java:89)
at java.base/sun.nio.ch.ServerSocketAdaptor.bind(ServerSocketAdaptor.java:81)
at org.apache.zookeeper.server.NIOServerCnxnFactory.configure(NIOServerCnxnFactory.java:662)
at org.apache.zookeeper.server.ZooKeeperServerMain.runFromConfig(ZooKeeperServerMain.java:160)
at com.ericsson.gnp.sim.runners.ZooKeeperStarter.lambda$startZooKeeper$0(ZooKeeperStarter.java:24)
使用以下测试设置
@SpringBootTest
@EmbeddedKafka(partitions = 1, topics = {"crud-id.event"})
class KafkaInMemoryTest {
@Autowired
private EmbeddedKafkaBroker embeddedKafkaBroker;
private KafkaConsumer<String, String> consumer;
private KafkaMessageController controller;
private String topic = "crud-id.event";
@BeforeEach
void init() {
Map<String, Object> consumerProps = KafkaTestUtils.consumerProps("my-group", "true", embeddedKafkaBroker);
consumerProps.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class);
consumerProps.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class);
DefaultKafkaConsumerFactory<String, String> consumerFactory = new DefaultKafkaConsumerFactory<>(consumerProps);
consumer = (KafkaConsumer<String, String>) consumerFactory.createConsumer();
consumer.subscribe(Collections.singleton(topic));
controller = new KafkaMessageController(consumer);
}
@Test
public void shouldReturnReceivedKafkaMessageViaRestAPI() {
//given
Map<String, Object> producerProps = KafkaTestUtils.producerProps(embeddedKafkaBroker);
producerProps.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, StringSerializer.class);
producerProps.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, StringSerializer.class);
org.springframework.kafka.core.DefaultKafkaProducerFactory<String, String> producerFactory = new DefaultKafkaProducerFactory<>(producerProps);
var producer = producerFactory.createProducer();
String publishedMessage = "";
producer.send(new ProducerRecord<>(topic, publishedMessage));
producer.flush();
//when
String receivedMessage = controller.getMessageFromQueue(topic);
//then
assertThat(receivedMessage).isEqualTo(publishedMessage);
}
...
}
public class KafkaServerTest {
private KafkaRunner kafkaRunner = new KafkaRunner();
@Test
void shouldStartKafkaServer() throws Exception {
//given
String host = "localhost";
int kafkaPort = 19092;
long timeoutMs = Duration.ofSeconds(2).toMillis();
//when
kafkaRunner.run();
//then
boolean isKafkaRunning = isKafkaReachable(host, kafkaPort, timeoutMs);
Assertions.assertTrue(isKafkaRunning, "Kafka server should be running on " + host + ":" + kafkaPort);
}
...
}
以及kafka服务器的配置
public void startKafka(String zookeeperConnect) {
Properties props = new Properties();
props.put("zookeeper.connect", zookeeperConnect);
props.put("broker.id", "0");
props.put("log.dirs", System.getProperty("java.io.tmpdir") + "/kafka-logs");
props.put("replication.factor", "1");
props.put("listeners", "PLAINTEXT://localhost:19092");
props.put("offsets.topic.replication.factor", "1");
KafkaConfig kafkaConfig = new KafkaConfig(props);
kafkaServer = new KafkaServer(kafkaConfig, Time.SYSTEM, Option.empty(), false);
kafkaServer.startup();
}
public void run(String... args) throws Exception {
ZooKeeperStarter zooKeeper = new ZooKeeperStarter();
KafkaStarter kafka = new KafkaStarter();
zooKeeper.startZooKeeper(12181);
kafka.startKafka("localhost:12181");
}
private ZooKeeperServerMain zooKeeperServer;
public void startZooKeeper(int port) throws Exception {
File dataDir = new File(System.getProperty("java.io.tmpdir"), "zookeeper");
org.apache.zookeeper.server.ZooKeeperServer server = new org.apache.zookeeper.server.ZooKeeperServer();
server.setTxnLogFactory(new FileTxnSnapLog(dataDir, dataDir));
server.setTickTime(2000);
zooKeeperServer = new ZooKeeperServerMain();
ServerConfig config = new ServerConfig();
config.parse(new String[] { String.valueOf(port), dataDir.getAbsolutePath() });
new Thread(() -> {
try {
zooKeeperServer.runFromConfig(config);
} catch (Exception e) {
e.printStackTrace();
}
}).start();
}
抛出异常的测试是KafkaServerTest。我尝试在运行时分配不同的端口,但没有成功。
@EmbeddedKafka
的巨大好处之一是它可以使用随机端口启动代理。
另一个优化工具是启动全局嵌入式Kafka。
请参阅文档中的更多信息。我们还讨论了如何从代码中仅启动一次 Kafka 代理:https://docs.spring.io/spring-kafka/reference/testing.html#same-broker-multiple-tests