Kafka 消费者在需要时不会暂停

问题描述 投票:0回答:1

我正在使用 lib 的 v2 https://github.com/confluenceinc/confluence-kafka-go 从多个分区使用我的主题。

当我启动消费者时,看起来不错,但出于业务原因,我需要在特定时间段内停止消费者一段时间,但似乎我的 Pause 方法不起作用。

我怀疑这是因为 PartitionAny,但不确定。

这是我的适配器:

package adapter

import (
    "context"
    "fmt"
    "log"
    "strings"

    "project/internal/core/port"
    "project//pkg/configuration/item"
    kafka "github.com/confluentinc/confluent-kafka-go/v2/kafka"
)

const (
    saslMechanismSha512 = "SCRAM-SHA-512"

    sessionTimeoutInMs = 180000 // => 3 minutes
)

type KafkaAdapter struct {
    Consumer *kafka.Consumer
}

func NewKafkaAdapter(ctx context.Context, consumer *kafka.Consumer, topic string) (*KafkaAdapter, error) {
    err := consumer.Subscribe(topic, nil)
    if err != nil {
        return nil, fmt.Errorf("error subscribing to topic %s: %v", topic, err)
    }

    return &KafkaAdapter{
        Consumer: consumer,
    }, nil
}

func (k *KafkaAdapter) Consume(ctx context.Context) (*port.Message, error) {
    select {
    case <-ctx.Done():
        return nil, context.Canceled
    default:
        message, err := k.Consumer.ReadMessage(-1) // -1 keeps undefined timeout while seeking for new messages
        if err != nil {
            return nil, err
        }

        headers := getMessageHeaders(message.Headers)

        streamName := getStreamName(headers)

        return &port.Message{
            Value:     message.Value,
            Key:       message.Key,
            Headers:   headers,
            Stream:    streamName,
            Timestamp: message.Timestamp,
            Offset:    int64(message.TopicPartition.Offset),
        }, nil
    }
}

func (k *KafkaAdapter) CommitMessage(ctx context.Context) error {
    _, err := k.Consumer.Commit()

    return err
}

func (k *KafkaAdapter) Unsubscribe(ctx context.Context) {
    k.Consumer.Unsubscribe()
}

func SetupKafkaConsumer(ctx context.Context, topic item.Topic) (*kafka.Consumer, error) {
    consumerConfig := &kafka.ConfigMap{
        "bootstrap.servers":  strings.Join(topic.Endpoints, ","),
        "group.id":           topic.Name,
        "session.timeout.ms": sessionTimeoutInMs,
        "enable.auto.commit": false,
    }

    if topic.User != "" && topic.Password != "" {
        consumerConfig.SetKey("sasl.username", topic.User)
        consumerConfig.SetKey("sasl.password", topic.Password)
        consumerConfig.SetKey("security.protocol", "SASL_SSL")
        consumerConfig.SetKey("sasl.mechanism", saslMechanismSha512)
        consumerConfig.SetKey("sasl.mechanisms", saslMechanismSha512)
    }

    consumer, err := kafka.NewConsumer(consumerConfig)
    if err != nil {
        log.Fatalf("error creating Kafka consumer: %v", err)
        return nil, err
    }

    return consumer, nil
}

func getMessageHeaders(messageHeaders []kafka.Header) []port.MessageHeader {
    var headers []port.MessageHeader
    for _, kafkaHeader := range messageHeaders {
        header := port.MessageHeader{
            Key:   string(kafkaHeader.Key),
            Value: kafkaHeader.Value,
        }
        headers = append(headers, header)
    }

    return headers
}

func getStreamName(headers []port.MessageHeader) string {
    var streamName string
    for _, header := range headers {
        if header.Key == "sn" {
            streamName = string(header.Value)
            break
        }
    }

    return streamName
}

这是我的 main.go 文件:

package main

import (
    "context"
    "fmt"
    "log"
    "os"
    "os/signal"
    "syscall"

    "project/config"
    adapter "project/internal/adapter/messaging"
    "project/internal/core/service"
    "project//mdmcore"
    "project//pkg/application/bootstrap"
    "project//pkg/configuration/item"
    "project//pkg/repositories/models"

    "github.com/confluentinc/confluent-kafka-go/v2/kafka"
    "github.com/robfig/cron/v3"
)

const (
    saslMechanismSha512 = "SCRAM-SHA-512"
)

var (
    topicExample = "topic-example"
)

func main() {
    ctx, cancel := context.WithCancel(context.Background())
    defer cancel()

    config.SetVladConfigLocal()

    appConfig, err := mdmcore.GetConfigurations("./config/config.toml")
    if err != nil {
        log.Fatalf("Error getting configurations: %v", err)
    }

    topicConfiguration := item.ReadTopic(appConfig, topicExample, false) // false = will get reader configuration

    consumer, err := adapter.SetupKafkaConsumer(ctx, topicConfiguration)
    if err != nil {
        log.Fatalf("error creating Kafka consumer: %v", err)
    }
    defer consumer.Close()

    topicPartition := kafka.TopicPartition{
        Topic:     &topicExample,
        Offset:    kafka.OffsetStored,
        Partition: kafka.PartitionAny,
    }

    err = consumer.Assign([]kafka.TopicPartition{topicPartition})
    if err != nil {
        panic(fmt.Sprintf("error assigning topic/partitions: %v", err))
    }

    kafkaReader, err := adapter.NewKafkaAdapter(ctx, consumer, topicExample)
    if err != nil {
        log.Fatalf("error creating Kafka adapter: %v", err)
    }

    repo, err := bootstrap.NewRepositories(appConfig)
    if err != nil {
        log.Fatalf("error creating a repository: %v", err)


    }

    dataManager := models.NewGormDataManager(repo.Db)

    messageService := service.NewMessageService(kafkaReader, dataManager)

    signalChannel := make(chan os.Signal, 1)
    signal.Notify(signalChannel, os.Interrupt, syscall.SIGTERM)

    go func() {
        <-signalChannel
        cancel()
    }()

    c := cron.New()
    c.AddFunc("22 18 * * *", func() {
        // I need to pause during a specific time
        consumer.Pause([]kafka.TopicPartition{topicPartition})
    })

    c.AddFunc("28 18 * * *", func() {
        // And then, resume it when needed
        consumer.Resume([]kafka.TopicPartition{topicPartition})
    })
    c.Start()

    messageService.StartConsuming(ctx, topicExample)

    <-ctx.Done()
}

我正在使用

consumer.Pause([]kafka.TopicPartition{topicPartition})
,但没有效果。

而且我不确定我的消费者是连接到所有分区还是仅连接到一个分区。

go apache-kafka cron consumer confluent-kafka-go
1个回答
0
投票

我找到了让它发挥作用的方法。

我用

Assign
方法改变了
Subscribe
方法,这样我就不用担心kafka分区平衡了。

而且,不要使用方法

.Pause
.Resume

类似这样的:

func main() {
    ctx, cancel := context.WithCancel(context.Background())
    defer cancel()

    appConfig, err := mdmcore.GetConfigurations("./config/config.toml")
    if err != nil {
        log.Fatalf("Error getting configurations: %v", err)
    }

    topicConfiguration := item.ReadTopic(appConfig, someTopic, false) // false = will get reader configuration

    consumer, err := adapter.SetupKafkaConsumer(ctx, topicConfiguration)
    if err != nil {
        log.Fatalf("error creating Kafka consumer: %v", err)
    }
    defer consumer.Close()

    kafkaReader, err := adapter.NewKafkaAdapter(ctx, consumer, someTopic)
    if err != nil {
        log.Fatalf("error creating Kafka adapter: %v", err)
    }

    repo, err := bootstrap.NewRepositories(appConfig)
    if err != nil {
        log.Fatalf("error creating a repository for my-topic flow: %v", err)
    }

    dataManager := models.NewGormDataManager(repo.Db)

    messageService := service.NewMessageService(kafkaReader, dataManager)

    signalChannel := make(chan os.Signal, 1)
    signal.Notify(signalChannel, os.Interrupt, syscall.SIGTERM)

    go func() {
        <-signalChannel
        cancel()
    }()

    c := cron.New()
    c.AddFunc("59 23 * * *", func() {
        messageService.Subscribe(ctx, someTopic)
    })

    c.AddFunc("30 2 * * *", func() {
        messageService.Unsubscribe(ctx)
    })
    c.Start()

    messageService.StartConsuming(ctx, someTopic)

    <-ctx.Done()
}
© www.soinside.com 2019 - 2024. All rights reserved.