我想在 GO 中模拟消费者和生产者的 Confluence Kafka API 进行单元测试,有什么方法(流程/步骤/库)可以成功模拟它们吗?
我建议您在不使用任何模拟库的情况下执行此操作,以防万一您开始进行单元测试和模拟。首先用您自己的实现抽象 kafka:
// producer.go
package producer
type Producer interface {
Close()
Events() chan kafka.Event
Flush(timeoutMs int) int
GetFatalError() error
GetMetadata(topic *string, allTopics bool, timeoutMs int) (*kafka.Metadata, error)
Len() int
OffsetsForTimes(times []kafka.TopicPartition, timeoutMs int) (offsets []kafka.TopicPartition, err error)
Produce(msg *kafka.Message, deliveryChan chan kafka.Event) error
ProduceChannel() chan *kafka.Message
QueryWatermarkOffsets(topic string, partition int32, timeoutMs int) (low, high int64, err error)
SetOAuthBearerToken(oauthBearerToken kafka.OAuthBearerToken) error
SetOAuthBearerTokenFailure(errstr string) error
String() string
TestFatalError(code kafka.ErrorCode, str string) kafka.ErrorCode
}
type KProducer struct {}
func (k *KProducer) Produce(msg *kafka.Message, deliveryChan chan kafka.Event) error {
return nil
}
func (k *KProducer) LastProduced() *ProducerMessage {
return nil
}
func (k *KProducer) ProducedAt(_ int) *ProducerMessage {
return nil
}
然后像这样创建一个 ProducerMock :
// producer_mock.go
package producer
import (
"github.com/confluentinc/confluent-kafka-go/kafka"
"testing"
)
type ProducerMock struct {
ProducedMessages []*ProducerMessage
ProducedCount int
}
func (p * ProducerMock) Produce(msg *kafka.Message, deliveryChan chan kafka.Event) error {
p.ProducedMessages = append(p.ProducedMessages, &ProducerMessage{
Message: msg,
})
p.ProducedCount += 1
return nil
}
func (p * ProducerMock) LastProduced() *ProducerMessage {
if len(p.ProducedMessages) == 0 {
return nil
}
return p.ProducedMessages[len(p.ProducedMessages) - 1]
}
func (p * ProducerMock) ProducedAt(idx int) *ProducerMessage {
if idx >= len(p.ProducedMessages) {
return nil
}
return p.ProducedMessages[idx]
}
func (p * ProducerMock) AssertProducedCount(t *testing.T, n int) {
if p.ProducedCount != n {
t.Errorf("ProducedCount should be equal %d but it's equal %d", n, p.ProducedCount)
}
}
测试时,尝试使用 ProducerMock 并使用创建的断言方法:
// producer_test.go
package producer
import (
"encoding/json"
"github.com/confluentinc/confluent-kafka-go/kafka"
"testing"
)
type AppEvent struct {
Foo string `json:"foo"`
}
type MyApp struct {
producer Producer
}
func (a * MyApp) sendMessage() {
event := AppEvent{
Foo: "Bar",
}
value, _ := json.Marshal(event)
topic := "topic1"
_ = a.producer.Produce(&kafka.Message{
TopicPartition: kafka.TopicPartition{
Topic: &topic,
Partition: 1,
Offset: 4,
},
Value: value,
}, nil)
}
func TestProduce(t *testing.T) {
p := ProducerMock{}
app := MyApp{&p}
p.AssertProducedCount(t, 0)
app.sendMessage()
p.AssertProducedCount(t, 1)
p.LastProduced().AssertTopic(t, "topic1")
p.LastProduced().AssertPartition(t, 1)
p.LastProduced().AssertOffset(t, 4)
p.LastProduced().AssertValueEquals(t, AppEvent{
Foo: "Bar",
})
}
最后,为您的消费者做同样的事情,您可以继续扩展它以测试您的特定需求...
producer_test.go
在kafka模块中有一个有用的测试生成器:
p, err := NewProducer(&ConfigMap{
"socket.timeout.ms": 10,
"message.timeout.ms": 10})
https://github.com/confluenceinc/confluence-kafka-go/blob/master/kafka/ Producer_test.go