我有这个简单的连接池,它似乎工作正常,但如果任何连接断开连接,它将阻止该方法的所有读取器:
type RabbitMQConnectionPool struct {
mu sync.Mutex
counter int
connections []*amqp.Connection
connString string
}
func (p *RabbitMQConnectionPool) GetConnection() (*amqp.Connection, int) {
// Get the next connection index
p.mu.Lock() // blocks here until***
defer func() {
p.mu.Unlock()
}()
for {
var size = len(p.connections)
p.counter = p.counter + 1
if p.counter > size-1 {
// no modulus, just logic
p.counter = 0
}
if p.connections[p.counter] != nil {
// check for nil just in case fml
if !p.connections[p.counter].IsClosed() {
// if conn is still alive, return it
return p.connections[p.counter], p.counter
}
}
// otherwise, create a new conn
conn, err := amqp.Dial(p.connString)
if err != nil {
vibelog.Stdout.Error("67eea0ee-ae1a-436b-8490-c6f232e6e033", err)
continue
}
p.connections[p.counter] = conn
return conn, p.counter // blocks until here***
}
}
有没有办法重新修改它,以便它可能会单独阻止每个项目而不是全部阻止?
为了减少阻塞,请为每个池元素使用唯一的互斥体:
type RabbitMQConnectionPool struct {
mu sync.Mutex
counter int
connections []muconn
connString string
}
type muconn struct {
mu sync.Mutex
connection *amqp.Connection
}
func (p *RabbitMQConnectionPool) GetConnection() (*amqp.Connection, int) {
for {
p.mu.Lock()
p.counter = (p.counter + 1) % len(p.connections)
counter := p.counter
p.mu.Unlock
c := &p.connections[counter]
c.mu.Lock()
defer c.mu.Unlock()
if c.connection != nil && !c.connetion.IsClosed() {
return c.connection, counter
}
// otherwise, create a new conn
connection, err := amqp.Dial(p.connString)
if err != nil {
vibelog.Stdout.Error("67eea0ee-ae1a-436b-8490-c6f232e6e033", err)
continue
}
c.connection = connection
return connection, counter
}
}