我正在使用 Watermill 开发发送消息的软件,它会经过 service1、service2 和最后一个 service。我使用切片来控制消息的顺序(FIFO,因为 GoChannel 应该尊重 FIFO)。几次运行后,我遇到了 Watermill 交换消息的问题。例如,我发送消息A和消息B,但在最后一个服务中,消息B先到达,然后消息A。附件是一个小脚本,其中反映了这个问题。这似乎是一个竞争条件,因为它并不总是发生,但是当运行脚本并出现问题时,它会显示如下消息: Slice value 68ad9d74-c0eb-476f-9cc0-5da98d947b61 value in message f01fff7d-1fb6 -45a4-bda6-07e021511d3f.
/*
This application is a test of Watermill, a Go library for working efficiently with message streams.
Sending and recieving menssages from a channel.
*/
package main
import (
"context"
"fmt"
"log"
"os"
"sync"
"time"
"github.com/ThreeDotsLabs/watermill"
"github.com/ThreeDotsLabs/watermill/message"
"github.com/ThreeDotsLabs/watermill/pubsub/gochannel"
)
var pubSub1 *gochannel.GoChannel
var safeSlice *SafeSlice
// Safe Slice struct just for control of the messages
type SafeSlice struct {
mu sync.Mutex
slice []string
}
func (s *SafeSlice) Append(value string) {
s.mu.Lock()
defer s.mu.Unlock()
s.slice = append(s.slice, value)
}
func (s *SafeSlice) Get(index int) (string, bool) {
s.mu.Lock()
defer s.mu.Unlock()
if index < 0 || index >= len(s.slice) {
return "Index out of scope", false
}
return s.slice[index], true
}
func (s *SafeSlice) Remove(index int) bool {
s.mu.Lock()
defer s.mu.Unlock()
if index < 0 || index >= len(s.slice) {
return false
}
s.slice = append(s.slice[:index], s.slice[index+1:]...)
return true
}
// service1 function is a handler for the "service-1" service. It appends the message UUID to the
// safe slice and publishes the message to the "service-2-input" channel.
func service1(msg *message.Message) error {
safeSlice.Append(msg.UUID)
err := pubSub1.Publish("service-2-input", msg)
if err != nil {
panic(err)
}
return nil
}
// service2 function is a handler for the "service-2" service. It receives a message, performs
// some logic, and returns a slice of messages.
func service2(msg *message.Message) ([]*message.Message, error) {
fmt.Printf("Message in service 2 %v\n", msg)
// Add some logic
return message.Messages{msg}, nil
}
// service_last function is a handler for the "service_last" service. It compares the message
// UUID with the first UUID in the safe slice and removes the first UUID if they match.
func service_last(msg *message.Message) error {
uuid, _ := safeSlice.Get(0)
fmt.Printf("service_last %v\n", msg)
if msg.UUID == uuid {
fmt.Println("OK")
safeSlice.Remove(0)
} else {
fmt.Printf("Slice value %s value in message %s\n", uuid, msg.UUID)
os.Exit(0)
}
return nil
}
func main() {
logger := watermill.NewStdLogger(true, true)
safeSlice = &SafeSlice{}
pubSub1 = gochannel.NewGoChannel(gochannel.Config{}, logger)
router, err := message.NewRouter(message.RouterConfig{}, logger)
if err != nil {
log.Fatalf("could not create router: %v", err)
}
// Create handlers for each service
router.AddNoPublisherHandler("service-1", "service-1-input", pubSub1, service1)
router.AddHandler("service-2", "service-2-input", pubSub1, "service_last-input", pubSub1, service2)
router.AddNoPublisherHandler("service_last", "service_last-input", pubSub1, service_last)
// Start the router
go func() {
if err := router.Run(context.Background()); err != nil {
log.Fatalf("could not run router: %v", err)
}
}()
time.Sleep(1 * time.Second)
for {
// Publish a message to start the pipeline
msg := message.NewMessage(watermill.NewUUID(), []byte{})
if err := pubSub1.Publish("service-1-input", msg); err != nil {
log.Fatalf("could not publish message: %v", err)
}
//time.Sleep(1000 * time.Millisecond)
}
// Allow some time for the message to be processed
select {}
}
BlockPublishUntilSubscriberAck
= false
。
pubSub1 = gochannel.NewGoChannel(gochannel.Config{}, logger)
现在您正在快速连续发送几条消息。
for {
msg := message.NewMessage(watermill.NewUUID(), []byte{})
if err := pubSub1.Publish("service-1-input", msg); err != nil {
log.Fatalf("could not publish message: %v", err)
}
}
订阅启动多个运行
service1
的 goroutine,每个 goroutine 在重新发布消息之前将一条消息附加到全局变量 safeSlice
。
func service1(msg *message.Message) error {
safeSlice.Append(msg.UUID)
return pubSub1.Publish("service-2-input", msg)
}
绝对有可能一个 Goroutine 将第一条消息附加到
safeSlice
,然后另一个 Goroutine 将第二条消息附加到 safeSlice
,并在第一条消息重新发布之前重新发布它。由于 safeSlice.Append
和 pubSub1.Publish
都使用锁,因此您不必等待很长时间。
依赖事件基础系统中的顺序确实很困难,并且在 99.9% 的用例中完全没有必要。此外,在异步系统中使用可变全局变量确实很棘手。