我正在使用
MockProcessorContext
和此代码进行单元测试
val props = Properties()
props[StreamsConfig.DEFAULT_KEY_SERDE_CLASS_CONFIG] = Serdes.String().javaClass
props[StreamsConfig.DEFAULT_VALUE_SERDE_CLASS_CONFIG] = Serdes.String().javaClass
val processorUnderTest = MyProcessor("test-store-name", keySerde, valueSerde)
val context = MockProcessorContext(props)
context.addStateStore(
Stores.keyValueStoreBuilder(
Stores.inMemoryKeyValueStore("test-store-name"),
Serdes.String(),
Serdes.String()
).build()
)
但是当
MyProcessor
存储我得到的值时
这个错误
Cannot invoke "org.apache.kafka.common.metrics.Sensor.shouldRecord()" because "sensor" is null
java.lang.NullPointerException: Cannot invoke "org.apache.kafka.common.metrics.Sensor.shouldRecord()" because "sensor" is null
at org.apache.kafka.streams.processor.internals.metrics.StreamsMetricsImpl.maybeMeasureLatency(StreamsMetricsImpl.java:864)
at org.apache.kafka.streams.state.internals.MeteredKeyValueStore.put(MeteredKeyValueStore.java:330)
我正在使用“org.apache.kafka:kafka-streams-test-utils:3.7.0”
我发现商店没有正确初始化。如果您最终到达这里,以下是使用
org.apache.kafka:kafka-streams-test-utils:3.7.0
进行的单元测试
import org.apache.kafka.common.serialization.Serdes;
import org.apache.kafka.streams.processor.Punctuator;
import org.apache.kafka.streams.processor.api.MockProcessorContext;
import org.apache.kafka.streams.processor.api.Record;
import org.apache.kafka.streams.state.KeyValueStore;
import org.apache.kafka.streams.state.StoreBuilder;
import org.apache.kafka.streams.state.Stores;
import org.junit.jupiter.api.Assertions;
import org.junit.jupiter.api.Test;
import java.time.Instant;
class MyProcessorTest {
/**
* <a href="https://github.com/apache/kafka/blob/trunk/streams/test-utils/src/test/java/org/apache/kafka/streams/MockProcessorContextTest.java"> an example</a>
*/
@Test
void test() {
MyProcessor processor = new MyProcessor();
final MockProcessorContext<String,String> context = new MockProcessorContext<>();
final StoreBuilder<KeyValueStore<String, String>> storeBuilder = Stores.keyValueStoreBuilder(
Stores.inMemoryKeyValueStore("my-state"),
Serdes.String(),
Serdes.String())
.withLoggingDisabled();
final KeyValueStore<String, String> store = storeBuilder.build();
store.init(context.getStateStoreContext(), store);
context.addStateStore(store);
processor.init(context);
processor.process(new Record<>("key", "value", Instant.now().toEpochMilli()));
final MockProcessorContext.CapturedPunctuator capturedPunctuator = context.scheduledPunctuators().get(0);
final Punctuator punctuator = capturedPunctuator.getPunctuator();
punctuator.punctuate(0L);
var forwarded = context.forwarded().iterator();
Assertions.assertTrue(forwarded.hasNext());
}
}
这是处理器
import org.apache.kafka.streams.processor.PunctuationType;
import org.apache.kafka.streams.processor.api.Processor;
import org.apache.kafka.streams.processor.api.ProcessorContext;
import org.apache.kafka.streams.processor.api.Record;
import org.apache.kafka.streams.state.KeyValueStore;
import java.time.Duration;
import java.time.Instant;
public class MyProcessor implements Processor<String, String, String, String> {
private KeyValueStore<String, String> store;
private ProcessorContext<String,String> context;
@Override
public void init(org.apache.kafka.streams.processor.api.ProcessorContext<String, String> context) {
Processor.super.init(context);
store = context.getStateStore("my-state");
this.context = context;
this.context.schedule(Duration.ZERO, PunctuationType.WALL_CLOCK_TIME, timestamp -> {
store.all().forEachRemaining( kv -> {
context.forward(new Record<>(kv.key, kv.value, Instant.now().toEpochMilli()));
});
});
}
@Override
public void process(Record<String, String> record) {
System.out.println(context.metrics().metrics());
store.put(record.key(), record.value());
}
}