测试GCP Pubsub与TestContainer Pubsub Emulator

问题描述 投票:0回答:1

@SpringBootTest @Testcontainers @ActiveProfiles("test") public class PubSubIntegrationTests { private static final String PROJECT_ID = "test-project"; @Container private static final PubSubEmulatorContainer pubsubEmulator = new PubSubEmulatorContainer( DockerImageName.parse("gcr.io/google.com/cloudsdktool/cloud-sdk:367.0.0-emulators")); @DynamicPropertySource static void emulatorProperties(DynamicPropertyRegistry registry) { registry.add("spring.cloud.gcp.pubsub.emulator-host", pubsubEmulator::getEmulatorEndpoint); } @BeforeAll static void setup() throws Exception { ManagedChannel channel = ManagedChannelBuilder.forTarget("dns:///" + pubsubEmulator.getEmulatorEndpoint()) .usePlaintext() .build(); TransportChannelProvider channelProvider = FixedTransportChannelProvider.create(GrpcTransportChannel.create(channel)); TopicAdminClient topicAdminClient = TopicAdminClient.create( TopicAdminSettings.newBuilder() .setCredentialsProvider(NoCredentialsProvider.create()) .setTransportChannelProvider(channelProvider) .build()); SubscriptionAdminClient subscriptionAdminClient = SubscriptionAdminClient.create( SubscriptionAdminSettings.newBuilder() .setTransportChannelProvider(channelProvider) .setCredentialsProvider(NoCredentialsProvider.create()) .build()); PubSubAdmin admin = new PubSubAdmin(() -> PROJECT_ID, topicAdminClient, subscriptionAdminClient); admin.createTopic("test-topic"); admin.createSubscription("test-subscription", "test-topic"); admin.close(); channel.shutdown(); } // By default, autoconfiguration will initialize application default credentials. // For testing purposes, don't use any credentials. Bootstrap w/ NoCredentialsProvider. @TestConfiguration static class PubSubEmulatorConfiguration { @Bean CredentialsProvider googleCredentials() { return NoCredentialsProvider.create(); } } @Autowired PubSubSender sender; @Autowired PubSubSubscriberTemplate subscriberTemplate; @Autowired PubSubPublisherTemplate publisherTemplate; @Test void testSend() throws ExecutionException, InterruptedException { ListenableFuture<String> future = sender.send("hello!"); List<AcknowledgeablePubsubMessage> msgs = await().until(() -> subscriberTemplate.pull("test-subscription", 10, true), not(empty())); assertEquals(1, msgs.size()); assertEquals(future.get(), msgs.get(0).getPubsubMessage().getMessageId()); assertEquals("hello!", msgs.get(0).getPubsubMessage().getData().toStringUtf8()); for (AcknowledgeablePubsubMessage msg : msgs) { msg.ack(); } } @Test void testWorker() throws ExecutionException, InterruptedException { ListenableFuture<String> future = publisherTemplate.publish("test-topic", "hi!"); List<PubsubMessage> messages = Collections.synchronizedList(new LinkedList<>()); PubSubWorker worker = new PubSubWorker( "test-subscription", subscriberTemplate, (msg) -> { messages.add(msg); }); worker.start(); await().until(() -> messages, not(empty())); assertEquals(1, messages.size()); assertEquals(future.get(), messages.get(0).getMessageId()); assertEquals("hi!", messages.get(0).getData().toStringUtf8()); worker.stop(); } @AfterEach void teardown() { // Drain any messages that are still in the subscription so that they don't interfere with // subsequent tests. await().until(() -> subscriberTemplate.pullAndAck("test-subscription", 1000, true), hasSize(0)); } } 所有对于上述示例都可以正常工作,但是当我想测试我的实现时,如下 @Autowired private FunctionCatalog catalog; @Test void testSendB() throws ExecutionException, InterruptedException { Consumer<PubSubMessage> function = catalog.lookup(MyFunction.class, FUNCTION_DEFINITION); var pubSubMessage = new PubSubMessage(); pubSubMessage.setData(Base64.getEncoder().encodeToString(EMPTY_MESSAGE.getBytes())); function.accept(pubSubMessage); List<AcknowledgeablePubsubMessage> msgs = await().until(() -> subscriberTemplate.pull("test-subscription", 10, true), not(empty())); assertEquals(1, msgs.size()); assertEquals(future.get(), msgs.get(0).getPubsubMessage().getMessageId()); assertEquals("hello!", msgs.get(0).getPubsubMessage().getData().toStringUtf8()); for (AcknowledgeablePubsubMessage msg : msgs) { msg.ack(); } }

会扔错误:

java.lang.RuntimeException: java.util.concurrent.ExecutionException: com.google.api.gax.rpc.NotFoundException: io.grpc.StatusRuntimeException: NOT_FOUND: Resource not found (resource=test-topic).

我的服务实现在示例中使用
Publisher

而不是:

PubSubPublisherTemplate

    private final Publisher publisher;

    public void publishMessage(String message) {
        var byteStr = ByteString.copyFrom(message, StandardCharsets.UTF_8);
        var pubsubApiMessage = getPubsubApiMessage(byteStr);

        try {
            publish(pubsubApiMessage);
        } catch (Exception e) {
            log.error("Error during event publishing: " + e.getMessage(), e);
            throw new RuntimeException(e);
        }
    }

    private void publish(PubsubMessage pubsubApiMessage) throws Exception {
        publisher.publish(pubsubApiMessage).get();
    }

    private PubsubMessage getPubsubApiMessage(ByteString byteStr) {
        return PubsubMessage.newBuilder()
                            .setData(byteStr)
                            .build();
    }
部署到GCP时工作正常,但在这种情况下使用PubSub Emulator。
    

提出,Pubsub仿真器需要自己的测试出版商,可以作为配置中的bean创建。示例:

@Configuration public class PubSubConfig { @Value("${gcp.pubsub.topic.name}") private String topicName; @Value("${gcp.project.id}") private String projectId; //projectId @Value("${spring.cloud.gcp.pubsub.emulator-host}") private String host; private static final CredentialsProvider CREDENTIALS_PROVIDER = NoCredentialsProvider.create(); @Bean public SubscriberStub testSubscriber( FixedTransportChannelProvider fixedTransportChannelProvider) throws IOException { return GrpcSubscriberStub.create(SubscriberStubSettings.newBuilder() .setTransportChannelProvider(fixedTransportChannelProvider) .setCredentialsProvider(CREDENTIALS_PROVIDER) .build()); } @Primary @Bean public Publisher testPublisher(FixedTransportChannelProvider fixedTransportChannelProvider) throws IOException { return Publisher.newBuilder(ProjectTopicName.of(projectId, topicName)) .setChannelProvider(fixedTransportChannelProvider) .setCredentialsProvider(NoCredentialsProvider.create()) .build(); } @Bean public TopicAdminClient getTopicAdminClient( FixedTransportChannelProvider fixedTransportChannelProvider) throws IOException { return TopicAdminClient.create(TopicAdminSettings.newBuilder() .setTransportChannelProvider(fixedTransportChannelProvider) .setCredentialsProvider(CREDENTIALS_PROVIDER) .build()); } @Primary @Bean public FixedTransportChannelProvider getChannelProvider() { var channel = ManagedChannelBuilder.forTarget(host) .usePlaintext() .build(); return FixedTransportChannelProvider.create(GrpcTransportChannel.create(channel)); } @Bean public SubscriptionAdminClient createSubscriptionAdmin(FixedTransportChannelProvider fixedTransportChannelProvider) throws IOException { return SubscriptionAdminClient.create(SubscriptionAdminSettings.newBuilder() .setCredentialsProvider( NoCredentialsProvider.create()) .setTransportChannelProvider( fixedTransportChannelProvider) .build()); } }

java spring-boot google-cloud-pubsub testcontainers google-cloud-pubsub-emulator
1个回答
1
投票

最新问题
© www.soinside.com 2019 - 2025. All rights reserved.