KafkaStreams 和 MockProcessorContext - 获取传感器为空

问题描述 投票:0回答:1

我正在使用

MockProcessorContext
和此代码进行单元测试

val props = Properties()
props[StreamsConfig.DEFAULT_KEY_SERDE_CLASS_CONFIG] = Serdes.String().javaClass
props[StreamsConfig.DEFAULT_VALUE_SERDE_CLASS_CONFIG] = Serdes.String().javaClass

val processorUnderTest = MyProcessor("test-store-name", keySerde, valueSerde)
val context = MockProcessorContext(props)     

context.addStateStore(
      Stores.keyValueStoreBuilder(
         Stores.inMemoryKeyValueStore("test-store-name"),
                Serdes.String(),
                Serdes.String()
      ).build()
)

但是当

MyProcessor
存储我得到的值时

这个错误

Cannot invoke "org.apache.kafka.common.metrics.Sensor.shouldRecord()" because "sensor" is null
java.lang.NullPointerException: Cannot invoke "org.apache.kafka.common.metrics.Sensor.shouldRecord()" because "sensor" is null
    at org.apache.kafka.streams.processor.internals.metrics.StreamsMetricsImpl.maybeMeasureLatency(StreamsMetricsImpl.java:864)
    at org.apache.kafka.streams.state.internals.MeteredKeyValueStore.put(MeteredKeyValueStore.java:330)

我正在使用“org.apache.kafka:kafka-streams-test-utils:3.7.0”

java kotlin apache-kafka-streams
1个回答
0
投票

我发现商店没有正确初始化。如果您最终到达这里,以下是使用

org.apache.kafka:kafka-streams-test-utils:3.7.0

进行的单元测试
import org.apache.kafka.common.serialization.Serdes;
import org.apache.kafka.streams.processor.Punctuator;
import org.apache.kafka.streams.processor.api.MockProcessorContext;
import org.apache.kafka.streams.processor.api.Record;
import org.apache.kafka.streams.state.KeyValueStore;
import org.apache.kafka.streams.state.StoreBuilder;
import org.apache.kafka.streams.state.Stores;
import org.junit.jupiter.api.Assertions;
import org.junit.jupiter.api.Test;

import java.time.Instant;

class MyProcessorTest {

  /**
   * <a href="https://github.com/apache/kafka/blob/trunk/streams/test-utils/src/test/java/org/apache/kafka/streams/MockProcessorContextTest.java"> an example</a>
   */
  @Test
  void test() {

    MyProcessor processor = new MyProcessor();

    final MockProcessorContext<String,String> context = new MockProcessorContext<>();

    final StoreBuilder<KeyValueStore<String, String>> storeBuilder = Stores.keyValueStoreBuilder(
        Stores.inMemoryKeyValueStore("my-state"),
        Serdes.String(),
        Serdes.String())
      .withLoggingDisabled();

    final KeyValueStore<String, String> store = storeBuilder.build();

    store.init(context.getStateStoreContext(), store);

    context.addStateStore(store);

    processor.init(context);
    processor.process(new Record<>("key", "value", Instant.now().toEpochMilli()));

    final MockProcessorContext.CapturedPunctuator capturedPunctuator = context.scheduledPunctuators().get(0);
    final Punctuator punctuator = capturedPunctuator.getPunctuator();
    punctuator.punctuate(0L);

    var forwarded = context.forwarded().iterator();
    Assertions.assertTrue(forwarded.hasNext());
  }
}

这是处理器

import org.apache.kafka.streams.processor.PunctuationType;
import org.apache.kafka.streams.processor.api.Processor;
import org.apache.kafka.streams.processor.api.ProcessorContext;
import org.apache.kafka.streams.processor.api.Record;
import org.apache.kafka.streams.state.KeyValueStore;

import java.time.Duration;
import java.time.Instant;

public class MyProcessor implements Processor<String, String, String, String> {

  private KeyValueStore<String, String> store;
  private ProcessorContext<String,String> context;
  @Override
  public void init(org.apache.kafka.streams.processor.api.ProcessorContext<String, String> context) {
    Processor.super.init(context);
    store = context.getStateStore("my-state");
    this.context = context;
    this.context.schedule(Duration.ZERO, PunctuationType.WALL_CLOCK_TIME, timestamp -> {
      store.all().forEachRemaining( kv -> {
        context.forward(new Record<>(kv.key, kv.value, Instant.now().toEpochMilli()));
      });
    });
  }

  @Override
  public void process(Record<String, String> record) {
    System.out.println(context.metrics().metrics());
    store.put(record.key(), record.value());
  }
}
最新问题
© www.soinside.com 2019 - 2024. All rights reserved.