根据这个 回答,
消息总线是一个消息基础设施,允许不同的系统通过一组共享的接口(消息总线)进行通信。
以下是 createHub()
功能&。Run()
的方法,由 main()
来创建消息中心,以沟通一个发布者与多个订阅者。
type PubHub struct {
subscribers map[*subscriptionmediator.HandlerSubscription]struct{}
Register chan *subscriptionmediator.HandlerSubscription
Unregister chan *subscriptionmediator.HandlerSubscription
Broadcast chan *events.Env
}
func createHub() *PubHub {
return &PubHub{
subscribers: map[*subscriptionmediator.HandlerSubscription]struct{}{},
Register: make(chan *subscriptionmediator.HandlerSubscription),
Unregister: make(chan *subscriptionmediator.HandlerSubscription),
Broadcast: make(chan *events.Envelope),
}
}
func (h *PubHub) Run() {
for {
select {
case subscriber := <-h.Register:
h.subscribers[subscriber] = struct{}{}
case subscriber := <-h.Unregister:
if _, ok := h.subscribers[subscriber]; ok {
delete(h.subscribers, subscriber)
}
case message := <-h.Broadcast:
for subscriber := range h.subscribers {
subscriber.DataChannel <- message
}
}
}
}
其中每个订户注册,如下图所示。
subscription := &subscriptionmediator.HandlerSubscription{
conn,
make(chan *events.Envelope),
}
hub.Register <- subscription
DataChannel
用于发布者之间的通讯&多个订户之间的通讯。
type HandlerSubscription struct {
ConnInstance *websocket.Conn
DataChannel chan *events.Envelope
}
1) 上述代码是否可以认为是基于消息总线的pub-sub模式?
2) 如何避免一个用户阻挡其余所有用户在一个通道上的信号?subscriber.DataChannel <- message
以上代码是否可以认为是遵循基于消息总线的pub-sub模式?
可以。