在水磨Golang中一段时间后交换消息

问题描述 投票:0回答:1

我正在使用 Watermill 开发发送消息的软件,它会经过 service1、service2 和最后一个 service。我使用切片来控制消息的顺序(FIFO,因为 GoChannel 应该尊重 FIFO)。几次运行后,我遇到了 Watermill 交换消息的问题。例如,我发送消息A和消息B,但在最后一个服务中,消息B先到达,然后消息A。附件是一个小脚本,其中反映了这个问题。这似乎是一个竞争条件,因为它并不总是发生,但是当运行脚本并出现问题时,它会显示如下消息: Slice value 68ad9d74-c0eb-476f-9cc0-5da98d947b61 value in message f01fff7d-1fb6 -45a4-bda6-07e021511d3f.

/*
This application is a test of Watermill, a Go library for working efficiently with message streams.
Sending and recieving menssages from a channel.
*/

package main

import (
    "context"
    "fmt"
    "log"
    "os"
    "sync"
    "time"

    "github.com/ThreeDotsLabs/watermill"
    "github.com/ThreeDotsLabs/watermill/message"
    "github.com/ThreeDotsLabs/watermill/pubsub/gochannel"
)

var pubSub1 *gochannel.GoChannel
var safeSlice *SafeSlice

// Safe Slice struct just for control of the messages
type SafeSlice struct {
    mu    sync.Mutex
    slice []string
}

func (s *SafeSlice) Append(value string) {
    s.mu.Lock()
    defer s.mu.Unlock()
    s.slice = append(s.slice, value)
}

func (s *SafeSlice) Get(index int) (string, bool) {
    s.mu.Lock()
    defer s.mu.Unlock()
    if index < 0 || index >= len(s.slice) {
        return "Index out of scope", false
    }
    return s.slice[index], true
}

func (s *SafeSlice) Remove(index int) bool {
    s.mu.Lock()
    defer s.mu.Unlock()
    if index < 0 || index >= len(s.slice) {
        return false
    }
    s.slice = append(s.slice[:index], s.slice[index+1:]...)
    return true
}

// service1 function is a handler for the "service-1" service. It appends the message UUID to the
// safe slice and publishes the message to the "service-2-input" channel.
func service1(msg *message.Message) error {
    safeSlice.Append(msg.UUID)
    err := pubSub1.Publish("service-2-input", msg)
    if err != nil {
        panic(err)
    }

    return nil
}

// service2 function is a handler for the "service-2" service. It receives a message, performs
// some logic, and returns a slice of messages.
func service2(msg *message.Message) ([]*message.Message, error) {
    fmt.Printf("Message in service 2 %v\n", msg)

    // Add some logic

    return message.Messages{msg}, nil
}

// service_last function is a handler for the "service_last" service. It compares the message
// UUID with the first UUID in the safe slice and removes the first UUID if they match.
func service_last(msg *message.Message) error {
    uuid, _ := safeSlice.Get(0)

    fmt.Printf("service_last %v\n", msg)

    if msg.UUID == uuid {
        fmt.Println("OK")
        safeSlice.Remove(0)
    } else {
        fmt.Printf("Slice value %s value in message %s\n", uuid, msg.UUID)
        os.Exit(0)
    }

    return nil
}

func main() {

    logger := watermill.NewStdLogger(true, true)
    safeSlice = &SafeSlice{}

    pubSub1 = gochannel.NewGoChannel(gochannel.Config{}, logger)

    router, err := message.NewRouter(message.RouterConfig{}, logger)
    if err != nil {
        log.Fatalf("could not create router: %v", err)
    }

    // Create handlers for each service
    router.AddNoPublisherHandler("service-1", "service-1-input", pubSub1, service1)
    router.AddHandler("service-2", "service-2-input", pubSub1, "service_last-input", pubSub1, service2)
    router.AddNoPublisherHandler("service_last", "service_last-input", pubSub1, service_last)

    // Start the router
    go func() {
        if err := router.Run(context.Background()); err != nil {
            log.Fatalf("could not run router: %v", err)
        }
    }()

    time.Sleep(1 * time.Second)

    for {
        // Publish a message to start the pipeline
        msg := message.NewMessage(watermill.NewUUID(), []byte{})
        if err := pubSub1.Publish("service-1-input", msg); err != nil {
            log.Fatalf("could not publish message: %v", err)
        }

        //time.Sleep(1000 * time.Millisecond)
    }

    // Allow some time for the message to be processed
    select {}
}
go messaging goroutine
1个回答
0
投票

您使用 默认配置创建一个 GoChannel,尤其是

BlockPublishUntilSubscriberAck
=
false

    pubSub1 = gochannel.NewGoChannel(gochannel.Config{}, logger)

现在您正在快速连续发送几条消息。

    for {
        msg := message.NewMessage(watermill.NewUUID(), []byte{})
        if err := pubSub1.Publish("service-1-input", msg); err != nil {
            log.Fatalf("could not publish message: %v", err)
        }
    }

订阅启动多个运行

service1
的 goroutine,每个 goroutine 在重新发布消息之前将一条消息附加到全局变量
safeSlice

func service1(msg *message.Message) error {
    safeSlice.Append(msg.UUID)
    return pubSub1.Publish("service-2-input", msg)
}

绝对有可能一个 Goroutine 将第一条消息附加到

safeSlice
,然后另一个 Goroutine 将第二条消息附加到
safeSlice
,并在第一条消息重新发布之前重新发布它。由于
safeSlice.Append
pubSub1.Publish
都使用锁,因此您不必等待很长时间。

依赖事件基础系统中的顺序确实很困难,并且在 99.9% 的用例中完全没有必要。此外,在异步系统中使用可变全局变量确实很棘手。

© www.soinside.com 2019 - 2024. All rights reserved.