AdminClient 的 Kafka Spring 启动测试用例中的问题

问题描述 投票:0回答:1

我正在为下面的课程编写单元测试用例。我正在尝试模拟管理客户端,以便我可以调用下面的方法创建主题。但出现空指针异常。

@Service
public class TopicService {

  private static final Logger LOG = LoggerFactory.getLogger(TopicService.class);
@Autowired
  private AdminClient adminClient;

public void createTopic(Topic topic) throws ExecutionException, InterruptedException {
    adminClient
            .createTopics(Collections.singletonList(ServiceHelper.fromTopic(topic)))
            .values()
            .get(topic.getName())
            .get();
  }
}

单元测试用例如下

package org.kafka.service;

import lombok.extern.slf4j.Slf4j;
import org.apache.kafka.clients.admin.AdminClient;
import org.apache.kafka.clients.admin.CreateTopicsResult;
import org.apache.kafka.clients.admin.ListTopicsResult;
import org.apache.kafka.clients.admin.NewTopic;
import org.apache.kafka.common.KafkaFuture;
import org.apache.kafka.common.internals.KafkaFutureImpl;
import org.junit.Before;
import org.junit.BeforeClass;
import org.junit.Test;
import org.junit.runner.RunWith;
import org.kafka.model.Topic;
import org.mockito.Mockito;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.boot.test.context.SpringBootTest;
import org.springframework.boot.test.mock.mockito.MockBean;
import org.springframework.test.context.junit4.SpringRunner;

import java.util.*;
import java.util.concurrent.ExecutionException;

import static org.apache.kafka.clients.admin.AdminClientConfig.BOOTSTRAP_SERVERS_CONFIG;
import static org.apache.kafka.clients.admin.AdminClientConfig.REQUEST_TIMEOUT_MS_CONFIG;
import static org.apache.kafka.common.internals.Topic.GROUP_METADATA_TOPIC_NAME;

@Slf4j
@RunWith(SpringRunner.class)
@SpringBootTest(classes = {TopicService.class})
public class TopicServiceTest {

    @Autowired
    TopicService topicService;

    @MockBean
    AdminClient adminClient;
    ListTopicsResult listTopicsResult;

    KafkaFuture<Set<String>> future;

    NewTopic newTopic;

    Topic topic;

    Collection<NewTopic> topicList;

    CreateTopicsResult createTopicsResult;
    Void t;

    Map<String,KafkaFuture<Void>> futureMap;

    private static final String TARGET_CONSUMER_GROUP_ID = "target-group-id";

    private static final Map<String, Object> CONF = new HashMap<>();

    @BeforeClass
    public static void createAdminClient() {
        try {
            CONF.put(BOOTSTRAP_SERVERS_CONFIG, "localhost:9092");
            CONF.put(REQUEST_TIMEOUT_MS_CONFIG, 120000);
            CONF.put("zookeeper.connect", "localhost:21891");

           AdminClient adminClient = AdminClient.create(CONF);
        } catch (Exception e) {
            throw new RuntimeException("create kafka admin client error", e);
        }
    }

    @Before
    public void setUp(){
        topicList = new ArrayList<>();
        newTopic = new NewTopic("topic-7",1, (short) 1);
        topicList.add(newTopic);
        futureMap = new HashMap<>();
        topic = new Topic();
        topic.setName("topic-1");
    }

    @Test
    public void createTopic() throws ExecutionException, InterruptedException {

        Properties consumerProperties = new Properties();

        Mockito.when(adminClient.createTopics(topicList))
                .thenReturn(Mockito.mock(CreateTopicsResult.class));

        Mockito.when(adminClient.createTopics(topicList).values())
                .thenReturn(Mockito.mock(Map.class));

        Mockito.when(adminClient.createTopics(topicList)
                .values()
                .get(GROUP_METADATA_TOPIC_NAME)).thenReturn(Mockito.mock(KafkaFutureImpl.class));

        Mockito.when(adminClient.createTopics(topicList)
                .values()
                .get(GROUP_METADATA_TOPIC_NAME)
                .get()).thenReturn(t);
         topicService.createTopic(topic);
    }
}

package org.kafka.config;

import org.apache.kafka.clients.admin.AdminClient;
import org.apache.kafka.clients.admin.AdminClientConfig;
import org.kafka.reader.Kafka;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.context.annotation.Bean;
import org.springframework.kafka.annotation.EnableKafka;
import org.springframework.kafka.core.KafkaAdmin;
import org.springframework.stereotype.Component;

import java.util.HashMap;
import java.util.Map;

@Component
public class AdminConfigurer {


    @Autowired
    private Kafka kafkaConfig;



    @Bean
    public Map<String, Object> kafkaAdminProperties() {
        final Map<String, Object> configs = new HashMap<>();
        configs.put(AdminClientConfig.BOOTSTRAP_SERVERS_CONFIG, kafkaConfig.getBootstrapServers());
        if(kafkaConfig.getProperties().getSasl().getEnabled() && kafkaConfig.getSsl().getEnabled()) {
            configs.put("sasl.mechanism", kafkaConfig.getProperties().getSasl().getMechanism());
            configs.put("security.protocol", kafkaConfig.getProperties().getSasl().getSecurity().getProtocol());
            configs.put("ssl.keystore.location", kafkaConfig.getSsl().getKeystoreLocation());
            configs.put("ssl.keystore.password", kafkaConfig.getSsl().getKeystorePassword());
            configs.put("ssl.truststore.location", kafkaConfig.getSsl().getTruststoreLocation());
            configs.put("ssl.truststore.password", kafkaConfig.getSsl().getTruststorePassword());
            configs.put("sasl.jaas.config", String.format(kafkaConfig.getJaasTemplate(),
                    kafkaConfig.getProperties().getSasl().getJaas().getConfig().getUsername(),
                    kafkaConfig.getProperties().getSasl().getJaas().getConfig().getPassword()));
            configs.put("ssl.endpoint.identification.algorithm", "");
        }
        return configs;
    }


    @Bean
    public AdminClient getClient() {
        return AdminClient.create(kafkaAdminProperties());
    }

}

我预计下面的测试用例能够成功运行。但我遇到了以下错误。

java.lang.NullPointerException
    at org.kafka.service.TopicService.createTopic(TopicService.java:57)
    at org.kafka.service.TopicServiceTest.createTopic(TopicServiceTest.java:100)
    at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
    at sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62)
    at sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
    at java.lang.reflect.Method.invoke(Method.java:498)
    at org.junit.runners.model.FrameworkMethod$1.runReflectiveCall(FrameworkMethod.java:59)
    at org.junit.internal.runners.model.ReflectiveCallable.run(ReflectiveCallable.java:12)
    at org.junit.runners.model.FrameworkMethod.invokeExplosively(FrameworkMethod.java:56)
    at org.junit.internal.runners.statements.InvokeMethod.evaluate(InvokeMethod.java:17)

我正在使用 2.7.1 spring 版本,具有以下客户端依赖项。

                <dependency>
            <groupId>org.apache.kafka</groupId>
            <artifactId>kafka-clients</artifactId>
            <version>3.1.1</version>
            <scope>test</scope>
            <classifier>test</classifier>
        </dependency>

                 <dependency>
            <groupId>org.springframework.kafka</groupId>
            <artifactId>spring-kafka</artifactId>
        </dependency>
spring spring-boot apache-kafka spring-kafka spring-test
1个回答
0
投票

应该是

@Configuration
而不是
@Component
,这样Spring才能捡到豆子吗?

© www.soinside.com 2019 - 2024. All rights reserved.