我正在尝试使用 Golang 进行 PUB SUB ZMQ 通信。我试图从 PUB 向 SUB 发送大量消息(例如 10000 条),并检查接收 SUB 中所有消息所需的时间。但我从来没有收到 SUB 中的所有消息。
可能是什么原因以及如何解决这个问题?
发布代码 -
package main
import (
"fmt"
"log"
"os"
"strconv"
"time"
zmq "github.com/pebbe/zmq4"
)
const letters = "abcdefghijklmnopqrstuvwxyzABCDEFGHIJKLMNOPQRSTUVWXYZ"
func SampleStrOfSize(size int) string {
b := make([]byte, size)
idx := 0
for i := 0; i < size; i++ {
b[i] = letters[idx]
idx = (idx + 1) % len(letters)
}
// fmt.Println("message: ", string(b))
return string(b)
}
func main() {
publisher, err := zmq.NewSocket(zmq.PUB)
if err != nil {
os.Exit()
}
defer publisher.Close()
connectionStr := "tcp://127.0.0.1:5555"
err = publisher.Bind(connectionStr)
if err != nil {
log.Fatal(err)
}
fmt.Println("Bound to", connectionStr)
// Make some random message string
message := SampleStrOfSize(1024)
msgCount := 0
if len(os.Args) > 1 {
if msgCount, err = strconv.Atoi(os.Args[1]); err != nil {
return
}
} else {
fmt.Println("No msg count provided")
return
}
if msgCount <= 0 {
fmt.Printf("Invalid msg count (%v) provided", msgCount)
return
}
// Giving some time for SUB to connect
time.Sleep(time.Second * 5)
fmt.Println("Starting message sending")
start := time.Now()
for i := 0; i < msgCount; i++ {
// Send same message every time
_, err = publisher.Send(message, 0)
if err != nil {
fmt.Printf("Error occured while sending message. %v", err)
}
}
elapsed := time.Since(start)
fmt.Printf("Sending %d messages took %s\n", msgCount, elapsed)
// Do not close immediately
time.Sleep(time.Second * 30)
}
SUB代码-
package main
import (
"fmt"
"os"
"strconv"
"time"
zmq "github.com/pebbe/zmq4"
)
func main() {
subscriber, err := zmq.NewSocket(zmq.SUB)
if err != nil {
fmt.Println("Failed to open socket")
os.Exit(1)
}
defer subscriber.Close()
err = subscriber.Connect("tcp://127.0.0.1:5555")
if err != nil {
fmt.Println("Connect failed")
os.Exit(1)
}
msgCount := 0
count := 0
if len(os.Args) > 1 {
if msgCount, err = strconv.Atoi(os.Args[1]); err != nil {
return
}
} else {
fmt.Println("No msg count provided")
os.Exit(1)
}
fmt.Printf("Expecting %d messages\n", msgCount)
// Subscribe for all messages
err = subscriber.SetSubscribe("")
if err != nil {
fmt.Println("Failed to subscribe for all messages")
os.Exit(1)
}
var start time.Time
for {
_, err := subscriber.Recv(0)
if count == 0 {
start = time.Now()
}
if err != nil {
fmt.Println("Receive failed")
}
count++
if count == msgCount {
break
} else if 0 == count%1000 {
// Print time for every 1000 messages
elapsed := time.Since(start)
fmt.Printf("Received %d messages in %s\n", count, elapsed)
}
}
elapsed := time.Since(start)
fmt.Printf("Received %d messages in %s\n", msgCount, elapsed)
}
当我使用
msgCount
10000 运行这两个代码时,我在 SUB 中没有收到 10000 条消息。例如,有一次我在 SUB 中得到以下输出 -
Expecting 10000 messages
Received 1000 messages in 17.435321ms
Received 2000 messages in 25.530057ms
Received 3000 messages in 27.80558ms
Received 4000 messages in 1m40.583143061s
Received 5000 messages in 1m40.590513201s
Received 6000 messages in 1m40.597145666s
3000 条消息后可能会出现异常延迟?
ZMQ 已达到高水位线
高水位线是对最大数量的硬性限制 未完成的消息 ZeroMQ 正在内存中为任何单个对等点排队 指定的套接字正在与之通信。
如果达到此限制,套接字将进入异常状态 根据套接字类型,ZeroMQ 将采取适当的操作 例如阻止或删除已发送的消息。
https://zeromq.org/socket-api/#high-water-mark
默认值为 1000,您可以使用 pebble/zmq4 lib 设置它: https://pkg.go.dev/github.com/pebbe/zmq4#Socket.SetSndhwm
高水位线导致消息被丢弃
当PUB套接字由于达到高电平而进入静音状态时 订阅者的水印,然后是任何将发送到的消息 相关订户应被删除,直至静音 状态结束。
https://zeromq.org/socket-api/#pub-socket
您的订阅者将接收消息,但速度不如发布者生成消息的速度。这导致未偿还债务的数量增加。看起来收到大约 3000 条消息后 HWM 就被击中,然后触发静音,直到 1 分 40 秒。
我假设您没有收到全部 10,000 条消息?因为中间的 4000 个被出版商扔掉了。