我有 2 个 api,有 2 个主题。 API 1 生成一个主题,另一个 api 使用,该 API 生成对另一个主题的响应,API 1 使用。我想在 api 1 上的消息生成和消费之间设置一个超时,这样在从 api 1 向主题生成消息 15 分钟后,它应该在这段时间内消费它。
我尝试使用不同的主题,但我不想这样做。
您可以通过利用 Kafka 的消息头或状态管理方法与超时机制相结合来实现这一点。我给出了完整的步骤和我拥有的代码作为示例。
第 1 步:当 API 1 向 Kafka 主题生成消息时,请在消息标头或负载中包含时间戳,以指示消息的创建时间。
(生产者/api1) 进口 ( “时间” “github.com/confluenceinc/confluence-kafka-go/kafka” )
producer, _ := kafka.NewProducer(&kafka.ConfigMap{"bootstrap.servers": "localhost:9092"})
defer producer.Close()
message := &kafka.Message{
TopicPartition: kafka.TopicPartition{Topic: &topicName, Partition: kafka.PartitionAny},
Value: []byte("your-message-payload"),
Headers: []kafka.Header{{Key: "timestamp", Value: []byte(time.Now().Format(time.RFC3339))}},
}
_ = producer.Produce(message, nil)
第 2 步:当 API 1 使用响应时,使用消息标头中的时间戳验证经过的时间。如果消息时间超过 15 分钟,则丢弃或按超时处理。
import (
"time"
"github.com/confluentinc/confluent-kafka-go/kafka"
)
consumer, _ := kafka.NewConsumer(&kafka.ConfigMap{
"bootstrap.servers": "localhost:9092",
"group.id": "api1-group",
"auto.offset.reset": "earliest",
})
defer consumer.Close()
_ = consumer.SubscribeTopics([]string{responseTopicName}, nil)
for {
msg, err := consumer.ReadMessage(-1)
if err != nil {
// Handle consumer error
continue
}
var msgTimestamp time.Time
for _, header := range msg.Headers {
if header.Key == "timestamp" {
msgTimestamp, _ = time.Parse(time.RFC3339, string(header.Value))
break
}
}
if time.Since(msgTimestamp) > 15*time.Minute {
// Handle timeout logic
continue
}
processMessage(msg.Value)
}
第 3 步:为确保 API 2 不会生成过时的响应,请在生成响应之前验证传入消息的时间戳。
import (
"time"
"github.com/confluentinc/confluent-kafka-go/kafka"
)
func produceResponse(message *kafka.Message) {
var msgTimestamp time.Time
for _, header := range message.Headers {
if header.Key == "timestamp" {
msgTimestamp, _ = time.Parse(time.RFC3339, string(header.Value))
break
}
}
if time.Since(msgTimestamp) > 15*time.Minute {
// Skip processing and respond with an error or discard
return
}
// Produce valid response
producer, _ := kafka.NewProducer(&kafka.ConfigMap{"bootstrap.servers": "localhost:9092"})
defer producer.Close()
response := &kafka.Message{
TopicPartition: kafka.TopicPartition{Topic: &responseTopicName, Partition: kafka.PartitionAny},
Value: []byte("response-payload"),
Headers: message.Headers,
}
_ = producer.Produce(response, nil)
}