如何在kafka中实现2个api之间的会话超时

问题描述 投票:0回答:1

我有 2 个 api,有 2 个主题。 API 1 生成一个主题,另一个 api 使用,该 API 生成对另一个主题的响应,API 1 使用。我想在 api 1 上的消息生成和消费之间设置一个超时,这样在从 api 1 向主题生成消息 15 分钟后,它应该在这段时间内消费它。

我尝试使用不同的主题,但我不想这样做。

go apache-kafka
1个回答
0
投票

您可以通过利用 Kafka 的消息头或状态管理方法与超时机制相结合来实现这一点。我给出了完整的步骤和我拥有的代码作为示例。

第 1 步:当 API 1 向 Kafka 主题生成消息时,请在消息标头或负载中包含时间戳,以指示消息的创建时间。

(生产者/api1) 进口 ( “时间” “github.com/confluenceinc/confluence-kafka-go/kafka” )

producer, _ := kafka.NewProducer(&kafka.ConfigMap{"bootstrap.servers": "localhost:9092"})
defer producer.Close()

message := &kafka.Message{
    TopicPartition: kafka.TopicPartition{Topic: &topicName, Partition: kafka.PartitionAny},
    Value:          []byte("your-message-payload"),
    Headers:        []kafka.Header{{Key: "timestamp", Value: []byte(time.Now().Format(time.RFC3339))}},
}

_ = producer.Produce(message, nil)

第 2 步:当 API 1 使用响应时,使用消息标头中的时间戳验证经过的时间。如果消息时间超过 15 分钟,则丢弃或按超时处理。

import (
    "time"
    "github.com/confluentinc/confluent-kafka-go/kafka"
)

consumer, _ := kafka.NewConsumer(&kafka.ConfigMap{
    "bootstrap.servers": "localhost:9092",
    "group.id":          "api1-group",
    "auto.offset.reset": "earliest",
})
defer consumer.Close()

_ = consumer.SubscribeTopics([]string{responseTopicName}, nil)

for {
    msg, err := consumer.ReadMessage(-1)
    if err != nil {
        // Handle consumer error
        continue
    }

    
    var msgTimestamp time.Time
    for _, header := range msg.Headers {
        if header.Key == "timestamp" {
            msgTimestamp, _ = time.Parse(time.RFC3339, string(header.Value))
            break
        }
    }

    if time.Since(msgTimestamp) > 15*time.Minute {
        // Handle timeout logic 
        continue
    }

    
    processMessage(msg.Value)
}

第 3 步:为确保 API 2 不会生成过时的响应,请在生成响应之前验证传入消息的时间戳。

import (
    "time"
    "github.com/confluentinc/confluent-kafka-go/kafka"
)

func produceResponse(message *kafka.Message) {
    var msgTimestamp time.Time
    for _, header := range message.Headers {
        if header.Key == "timestamp" {
            msgTimestamp, _ = time.Parse(time.RFC3339, string(header.Value))
            break
        }
    }

    if time.Since(msgTimestamp) > 15*time.Minute {
        // Skip processing and respond with an error or discard
        return
    }

    // Produce valid response
    producer, _ := kafka.NewProducer(&kafka.ConfigMap{"bootstrap.servers": "localhost:9092"})
    defer producer.Close()

    response := &kafka.Message{
        TopicPartition: kafka.TopicPartition{Topic: &responseTopicName, Partition: kafka.PartitionAny},
        Value:          []byte("response-payload"),
        Headers:        message.Headers, 
    }

    _ = producer.Produce(response, nil)
}
© www.soinside.com 2019 - 2024. All rights reserved.