我正在使用 lib 的 v2 https://github.com/confluenceinc/confluence-kafka-go 从多个分区使用我的主题。
当我启动消费者时,看起来不错,但出于业务原因,我需要在特定时间段内停止消费者一段时间,但似乎我的 Pause 方法不起作用。
我怀疑这是因为 PartitionAny,但不确定。
这是我的适配器:
package adapter
import (
"context"
"fmt"
"log"
"strings"
"project/internal/core/port"
"project//pkg/configuration/item"
kafka "github.com/confluentinc/confluent-kafka-go/v2/kafka"
)
const (
saslMechanismSha512 = "SCRAM-SHA-512"
sessionTimeoutInMs = 180000 // => 3 minutes
)
type KafkaAdapter struct {
Consumer *kafka.Consumer
}
func NewKafkaAdapter(ctx context.Context, consumer *kafka.Consumer, topic string) (*KafkaAdapter, error) {
err := consumer.Subscribe(topic, nil)
if err != nil {
return nil, fmt.Errorf("error subscribing to topic %s: %v", topic, err)
}
return &KafkaAdapter{
Consumer: consumer,
}, nil
}
func (k *KafkaAdapter) Consume(ctx context.Context) (*port.Message, error) {
select {
case <-ctx.Done():
return nil, context.Canceled
default:
message, err := k.Consumer.ReadMessage(-1) // -1 keeps undefined timeout while seeking for new messages
if err != nil {
return nil, err
}
headers := getMessageHeaders(message.Headers)
streamName := getStreamName(headers)
return &port.Message{
Value: message.Value,
Key: message.Key,
Headers: headers,
Stream: streamName,
Timestamp: message.Timestamp,
Offset: int64(message.TopicPartition.Offset),
}, nil
}
}
func (k *KafkaAdapter) CommitMessage(ctx context.Context) error {
_, err := k.Consumer.Commit()
return err
}
func (k *KafkaAdapter) Unsubscribe(ctx context.Context) {
k.Consumer.Unsubscribe()
}
func SetupKafkaConsumer(ctx context.Context, topic item.Topic) (*kafka.Consumer, error) {
consumerConfig := &kafka.ConfigMap{
"bootstrap.servers": strings.Join(topic.Endpoints, ","),
"group.id": topic.Name,
"session.timeout.ms": sessionTimeoutInMs,
"enable.auto.commit": false,
}
if topic.User != "" && topic.Password != "" {
consumerConfig.SetKey("sasl.username", topic.User)
consumerConfig.SetKey("sasl.password", topic.Password)
consumerConfig.SetKey("security.protocol", "SASL_SSL")
consumerConfig.SetKey("sasl.mechanism", saslMechanismSha512)
consumerConfig.SetKey("sasl.mechanisms", saslMechanismSha512)
}
consumer, err := kafka.NewConsumer(consumerConfig)
if err != nil {
log.Fatalf("error creating Kafka consumer: %v", err)
return nil, err
}
return consumer, nil
}
func getMessageHeaders(messageHeaders []kafka.Header) []port.MessageHeader {
var headers []port.MessageHeader
for _, kafkaHeader := range messageHeaders {
header := port.MessageHeader{
Key: string(kafkaHeader.Key),
Value: kafkaHeader.Value,
}
headers = append(headers, header)
}
return headers
}
func getStreamName(headers []port.MessageHeader) string {
var streamName string
for _, header := range headers {
if header.Key == "sn" {
streamName = string(header.Value)
break
}
}
return streamName
}
这是我的 main.go 文件:
package main
import (
"context"
"fmt"
"log"
"os"
"os/signal"
"syscall"
"project/config"
adapter "project/internal/adapter/messaging"
"project/internal/core/service"
"project//mdmcore"
"project//pkg/application/bootstrap"
"project//pkg/configuration/item"
"project//pkg/repositories/models"
"github.com/confluentinc/confluent-kafka-go/v2/kafka"
"github.com/robfig/cron/v3"
)
const (
saslMechanismSha512 = "SCRAM-SHA-512"
)
var (
topicExample = "topic-example"
)
func main() {
ctx, cancel := context.WithCancel(context.Background())
defer cancel()
config.SetVladConfigLocal()
appConfig, err := mdmcore.GetConfigurations("./config/config.toml")
if err != nil {
log.Fatalf("Error getting configurations: %v", err)
}
topicConfiguration := item.ReadTopic(appConfig, topicExample, false) // false = will get reader configuration
consumer, err := adapter.SetupKafkaConsumer(ctx, topicConfiguration)
if err != nil {
log.Fatalf("error creating Kafka consumer: %v", err)
}
defer consumer.Close()
topicPartition := kafka.TopicPartition{
Topic: &topicExample,
Offset: kafka.OffsetStored,
Partition: kafka.PartitionAny,
}
err = consumer.Assign([]kafka.TopicPartition{topicPartition})
if err != nil {
panic(fmt.Sprintf("error assigning topic/partitions: %v", err))
}
kafkaReader, err := adapter.NewKafkaAdapter(ctx, consumer, topicExample)
if err != nil {
log.Fatalf("error creating Kafka adapter: %v", err)
}
repo, err := bootstrap.NewRepositories(appConfig)
if err != nil {
log.Fatalf("error creating a repository: %v", err)
}
dataManager := models.NewGormDataManager(repo.Db)
messageService := service.NewMessageService(kafkaReader, dataManager)
signalChannel := make(chan os.Signal, 1)
signal.Notify(signalChannel, os.Interrupt, syscall.SIGTERM)
go func() {
<-signalChannel
cancel()
}()
c := cron.New()
c.AddFunc("22 18 * * *", func() {
// I need to pause during a specific time
consumer.Pause([]kafka.TopicPartition{topicPartition})
})
c.AddFunc("28 18 * * *", func() {
// And then, resume it when needed
consumer.Resume([]kafka.TopicPartition{topicPartition})
})
c.Start()
messageService.StartConsuming(ctx, topicExample)
<-ctx.Done()
}
我正在使用
consumer.Pause([]kafka.TopicPartition{topicPartition})
,但没有效果。
而且我不确定我的消费者是连接到所有分区还是仅连接到一个分区。
我找到了让它发挥作用的方法。
我用
Assign
方法改变了Subscribe
方法,这样我就不用担心kafka分区平衡了。
而且,不要使用方法
.Pause
和 .Resume
。
类似这样的:
func main() {
ctx, cancel := context.WithCancel(context.Background())
defer cancel()
appConfig, err := mdmcore.GetConfigurations("./config/config.toml")
if err != nil {
log.Fatalf("Error getting configurations: %v", err)
}
topicConfiguration := item.ReadTopic(appConfig, someTopic, false) // false = will get reader configuration
consumer, err := adapter.SetupKafkaConsumer(ctx, topicConfiguration)
if err != nil {
log.Fatalf("error creating Kafka consumer: %v", err)
}
defer consumer.Close()
kafkaReader, err := adapter.NewKafkaAdapter(ctx, consumer, someTopic)
if err != nil {
log.Fatalf("error creating Kafka adapter: %v", err)
}
repo, err := bootstrap.NewRepositories(appConfig)
if err != nil {
log.Fatalf("error creating a repository for my-topic flow: %v", err)
}
dataManager := models.NewGormDataManager(repo.Db)
messageService := service.NewMessageService(kafkaReader, dataManager)
signalChannel := make(chan os.Signal, 1)
signal.Notify(signalChannel, os.Interrupt, syscall.SIGTERM)
go func() {
<-signalChannel
cancel()
}()
c := cron.New()
c.AddFunc("59 23 * * *", func() {
messageService.Subscribe(ctx, someTopic)
})
c.AddFunc("30 2 * * *", func() {
messageService.Unsubscribe(ctx)
})
c.Start()
messageService.StartConsuming(ctx, someTopic)
<-ctx.Done()
}