模拟 Kafka API 以进行单元测试

问题描述 投票:0回答:2

我想在 GO 中模拟消费者和生产者的 Confluence Kafka API 进行单元测试,有什么方法(流程/步骤/库)可以成功模拟它们吗?

unit-testing go apache-kafka confluent-platform
2个回答
0
投票

我建议您在不使用任何模拟库的情况下执行此操作,以防万一您开始进行单元测试和模拟。首先用您自己的实现抽象 kafka:

// producer.go
package producer

type Producer interface {
    Close()
    Events() chan kafka.Event
    Flush(timeoutMs int) int
    GetFatalError() error
    GetMetadata(topic *string, allTopics bool, timeoutMs int) (*kafka.Metadata, error)
    Len() int
    OffsetsForTimes(times []kafka.TopicPartition, timeoutMs int) (offsets []kafka.TopicPartition, err error)
    Produce(msg *kafka.Message, deliveryChan chan kafka.Event) error
    ProduceChannel() chan *kafka.Message
    QueryWatermarkOffsets(topic string, partition int32, timeoutMs int) (low, high int64, err error)
    SetOAuthBearerToken(oauthBearerToken kafka.OAuthBearerToken) error
    SetOAuthBearerTokenFailure(errstr string) error
    String() string
    TestFatalError(code kafka.ErrorCode, str string) kafka.ErrorCode
}

type KProducer struct {}

func (k *KProducer) Produce(msg *kafka.Message, deliveryChan chan kafka.Event) error {
    return nil
}

func (k *KProducer) LastProduced() *ProducerMessage {
    return nil
}

func (k *KProducer) ProducedAt(_ int) *ProducerMessage {
    return nil
}

然后像这样创建一个 ProducerMock :

// producer_mock.go
package producer

import (
    "github.com/confluentinc/confluent-kafka-go/kafka"
    "testing"
)

type ProducerMock struct {
    ProducedMessages []*ProducerMessage
    ProducedCount int
}

func (p * ProducerMock) Produce(msg *kafka.Message, deliveryChan chan kafka.Event) error {
    p.ProducedMessages = append(p.ProducedMessages, &ProducerMessage{
        Message: msg,
    })
    p.ProducedCount += 1
    return nil
}

func (p * ProducerMock) LastProduced() *ProducerMessage {
    if len(p.ProducedMessages) == 0 {
        return nil
    }
    return p.ProducedMessages[len(p.ProducedMessages) - 1]
}

func (p * ProducerMock) ProducedAt(idx int) *ProducerMessage {
    if idx >= len(p.ProducedMessages) {
        return nil
    }
    return p.ProducedMessages[idx]
}

func (p * ProducerMock) AssertProducedCount(t *testing.T, n int) {
    if p.ProducedCount != n {
        t.Errorf("ProducedCount should be equal %d but it's equal %d", n, p.ProducedCount)
    }
}

测试时,尝试使用 ProducerMock 并使用创建的断言方法:

// producer_test.go
package producer

import (
    "encoding/json"
    "github.com/confluentinc/confluent-kafka-go/kafka"
    "testing"
)

type AppEvent struct {
    Foo string `json:"foo"`
}

type MyApp struct {
    producer Producer
}

func (a * MyApp) sendMessage() {
    event := AppEvent{
        Foo: "Bar",
    }
    value, _ := json.Marshal(event)
    topic := "topic1"
    _ = a.producer.Produce(&kafka.Message{
        TopicPartition: kafka.TopicPartition{
            Topic: &topic,
            Partition: 1,
            Offset: 4,
        },
        Value:          value,
    }, nil)
}

func TestProduce(t *testing.T) {
    p := ProducerMock{}
    app := MyApp{&p}
    p.AssertProducedCount(t, 0)
    app.sendMessage()
    p.AssertProducedCount(t, 1)
    p.LastProduced().AssertTopic(t, "topic1")
    p.LastProduced().AssertPartition(t, 1)
    p.LastProduced().AssertOffset(t, 4)
    p.LastProduced().AssertValueEquals(t, AppEvent{
        Foo: "Bar",
    })
}

最后,为您的消费者做同样的事情,您可以继续扩展它以测试您的特定需求...


-2
投票

producer_test.go
在kafka模块中有一个有用的测试生成器:

p, err := NewProducer(&ConfigMap{
    "socket.timeout.ms":  10,
    "message.timeout.ms": 10})

https://github.com/confluenceinc/confluence-kafka-go/blob/master/kafka/ Producer_test.go

© www.soinside.com 2019 - 2024. All rights reserved.