我正在为下面的课程编写单元测试用例。我正在尝试模拟管理客户端,以便我可以调用下面的方法创建主题。但出现空指针异常。
@Service
public class TopicService {
private static final Logger LOG = LoggerFactory.getLogger(TopicService.class);
@Autowired
private AdminClient adminClient;
public void createTopic(Topic topic) throws ExecutionException, InterruptedException {
adminClient
.createTopics(Collections.singletonList(ServiceHelper.fromTopic(topic)))
.values()
.get(topic.getName())
.get();
}
}
单元测试用例如下
package org.kafka.service;
import lombok.extern.slf4j.Slf4j;
import org.apache.kafka.clients.admin.AdminClient;
import org.apache.kafka.clients.admin.CreateTopicsResult;
import org.apache.kafka.clients.admin.ListTopicsResult;
import org.apache.kafka.clients.admin.NewTopic;
import org.apache.kafka.common.KafkaFuture;
import org.apache.kafka.common.internals.KafkaFutureImpl;
import org.junit.Before;
import org.junit.BeforeClass;
import org.junit.Test;
import org.junit.runner.RunWith;
import org.kafka.model.Topic;
import org.mockito.Mockito;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.boot.test.context.SpringBootTest;
import org.springframework.boot.test.mock.mockito.MockBean;
import org.springframework.test.context.junit4.SpringRunner;
import java.util.*;
import java.util.concurrent.ExecutionException;
import static org.apache.kafka.clients.admin.AdminClientConfig.BOOTSTRAP_SERVERS_CONFIG;
import static org.apache.kafka.clients.admin.AdminClientConfig.REQUEST_TIMEOUT_MS_CONFIG;
import static org.apache.kafka.common.internals.Topic.GROUP_METADATA_TOPIC_NAME;
@Slf4j
@RunWith(SpringRunner.class)
@SpringBootTest(classes = {TopicService.class})
public class TopicServiceTest {
@Autowired
TopicService topicService;
@MockBean
AdminClient adminClient;
ListTopicsResult listTopicsResult;
KafkaFuture<Set<String>> future;
NewTopic newTopic;
Topic topic;
Collection<NewTopic> topicList;
CreateTopicsResult createTopicsResult;
Void t;
Map<String,KafkaFuture<Void>> futureMap;
private static final String TARGET_CONSUMER_GROUP_ID = "target-group-id";
private static final Map<String, Object> CONF = new HashMap<>();
@BeforeClass
public static void createAdminClient() {
try {
CONF.put(BOOTSTRAP_SERVERS_CONFIG, "localhost:9092");
CONF.put(REQUEST_TIMEOUT_MS_CONFIG, 120000);
CONF.put("zookeeper.connect", "localhost:21891");
AdminClient adminClient = AdminClient.create(CONF);
} catch (Exception e) {
throw new RuntimeException("create kafka admin client error", e);
}
}
@Before
public void setUp(){
topicList = new ArrayList<>();
newTopic = new NewTopic("topic-7",1, (short) 1);
topicList.add(newTopic);
futureMap = new HashMap<>();
topic = new Topic();
topic.setName("topic-1");
}
@Test
public void createTopic() throws ExecutionException, InterruptedException {
Properties consumerProperties = new Properties();
Mockito.when(adminClient.createTopics(topicList))
.thenReturn(Mockito.mock(CreateTopicsResult.class));
Mockito.when(adminClient.createTopics(topicList).values())
.thenReturn(Mockito.mock(Map.class));
Mockito.when(adminClient.createTopics(topicList)
.values()
.get(GROUP_METADATA_TOPIC_NAME)).thenReturn(Mockito.mock(KafkaFutureImpl.class));
Mockito.when(adminClient.createTopics(topicList)
.values()
.get(GROUP_METADATA_TOPIC_NAME)
.get()).thenReturn(t);
topicService.createTopic(topic);
}
}
package org.kafka.config;
import org.apache.kafka.clients.admin.AdminClient;
import org.apache.kafka.clients.admin.AdminClientConfig;
import org.kafka.reader.Kafka;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.context.annotation.Bean;
import org.springframework.kafka.annotation.EnableKafka;
import org.springframework.kafka.core.KafkaAdmin;
import org.springframework.stereotype.Component;
import java.util.HashMap;
import java.util.Map;
@Component
public class AdminConfigurer {
@Autowired
private Kafka kafkaConfig;
@Bean
public Map<String, Object> kafkaAdminProperties() {
final Map<String, Object> configs = new HashMap<>();
configs.put(AdminClientConfig.BOOTSTRAP_SERVERS_CONFIG, kafkaConfig.getBootstrapServers());
if(kafkaConfig.getProperties().getSasl().getEnabled() && kafkaConfig.getSsl().getEnabled()) {
configs.put("sasl.mechanism", kafkaConfig.getProperties().getSasl().getMechanism());
configs.put("security.protocol", kafkaConfig.getProperties().getSasl().getSecurity().getProtocol());
configs.put("ssl.keystore.location", kafkaConfig.getSsl().getKeystoreLocation());
configs.put("ssl.keystore.password", kafkaConfig.getSsl().getKeystorePassword());
configs.put("ssl.truststore.location", kafkaConfig.getSsl().getTruststoreLocation());
configs.put("ssl.truststore.password", kafkaConfig.getSsl().getTruststorePassword());
configs.put("sasl.jaas.config", String.format(kafkaConfig.getJaasTemplate(),
kafkaConfig.getProperties().getSasl().getJaas().getConfig().getUsername(),
kafkaConfig.getProperties().getSasl().getJaas().getConfig().getPassword()));
configs.put("ssl.endpoint.identification.algorithm", "");
}
return configs;
}
@Bean
public AdminClient getClient() {
return AdminClient.create(kafkaAdminProperties());
}
}
我预计下面的测试用例能够成功运行。但我遇到了以下错误。
java.lang.NullPointerException
at org.kafka.service.TopicService.createTopic(TopicService.java:57)
at org.kafka.service.TopicServiceTest.createTopic(TopicServiceTest.java:100)
at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
at sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62)
at sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
at java.lang.reflect.Method.invoke(Method.java:498)
at org.junit.runners.model.FrameworkMethod$1.runReflectiveCall(FrameworkMethod.java:59)
at org.junit.internal.runners.model.ReflectiveCallable.run(ReflectiveCallable.java:12)
at org.junit.runners.model.FrameworkMethod.invokeExplosively(FrameworkMethod.java:56)
at org.junit.internal.runners.statements.InvokeMethod.evaluate(InvokeMethod.java:17)
我正在使用 2.7.1 spring 版本,具有以下客户端依赖项。
<dependency>
<groupId>org.apache.kafka</groupId>
<artifactId>kafka-clients</artifactId>
<version>3.1.1</version>
<scope>test</scope>
<classifier>test</classifier>
</dependency>
<dependency>
<groupId>org.springframework.kafka</groupId>
<artifactId>spring-kafka</artifactId>
</dependency>
应该是
@Configuration
而不是@Component
,这样Spring才能捡到豆子吗?