Брокер сообщений RabbitMQ уже давно активно используется в микросервисах. Он используется, когда требуется асинхронная обработка сообщений от клиентов или при аналогичном межсервисном взаимодействии.
Практически нет языка, под который не была бы сделана соответствующая клиентская библиотека. Для Go такой библиотекой является github.com/streadway/amqp (далее по тексту библиотека amqp). Она имеет широкий функционал, можно подключаться к RabbitMQ, создавать каналы, настраивать очереди и exchange. Не хватает только самой малости – реконнектов. А именно автоматических реконнектов при потери связи.
Поиск в Google показывает, что есть много различных решений на базе библиотеки amqp. На проекте, где я работаю, мы создали ещё парочку. Но ни найденные в сети, ни уже созданные не устраивали по ряду причин:
раздельное обслуживание консьюмера и продюсера – под каждого свой коннект, а документация на RabbitMQ настойчиво не рекомендует плодить подключения и вместо этого использовать каналы (каналы в amqp это легковесные соединения поверх TCP-подключения) поверх одного подключения;
сложные конструкции пула каналов, а то и вовсе их отсутствие – с точки зрения потокобезопасности, как минимум для консьюмера и продюсера, нужно разделять каналы;
отсутствие поддержки backoffPolicy;
отсутствие graceful shutdown.
Сформулируем требования к желаемому решению:
возможность создать общее подключение для консьюмера и продюсера;
простой и прозрачный пул каналов;
поддержка backoffPolicy;
автоматический реконнект при потере соединения;
поддержка graceful shutdown.
Требования появились, можно приступать к реализации. Сразу оговорюсь, статья ориентирована на тех, кто уже хорошо знаком с библиотекой amqp и не ставит своей целью перевести документацию.
"Новый велосипед с треугольными колёсами"
Первый пункт из озвученных потребностей самый простой, просто создаём одно подключение и используем его для всех последующих манипуляций.
С пулом каналов тоже было решено пойти по простому пути и создать map
с ключом в виде следующего объекта:
type ChannelPoolItemKey struct {
Queue string
Consumer string
Exchange string
Key string
}
Такой ключ можно использовать сразу и для консьюмеров и паблишеров. Как было сказано выше, каналы между ними не должны пересекаться для повышения потокобезопасности.
Реализовать backoffPolicy тоже не сложно:
for _, timeout := range c.backoffPolicy {
if connErr := c.connect(ctx); connErr != nil {
logger.Err(connErr).Msg("connection failed, trying to reconnect to rabbitMQ")
time.Sleep(timeout)
continue
}
break
}
где backoffPolicy это массив типа time.Duration
.
Остаётся самое интересное, реконнект и graceful shutdown. Здесь нам поможет пакет golang.org/x/sync/errgroup. Он специально предназначен для управления группами рутин.
При подключении создаётся TCP-подключение и служебный канал. Последний нужен для создания exchange, очередей и биндинга очереди к exchange. Больше он ни для чего не используется, но такая логика упрощает построение пула каналов. Например, при создании exchange не известен ключ роутинга, а при декларировании очереди не известно, с каким exchange она будет связана.
Публичный метод Connect
будет по совместительству контролировать подключение. А приватный метод connect
будет создавать само подключение и пул каналов. Ниже приведён код подключения.
func (c *Connection) connect(_ context.Context) error {
var err error
if c.conn, err = amqp.Dial(c.dsn); err != nil {
return errors.Wrap(err, "connect to rabbitMQ")
}
if c.serviceChannel, err = c.conn.Channel(); err != nil {
return errors.Wrap(err, "create service rabbitMQ channel")
}
c.channelPool = make(map[ChannelPoolItemKey]*amqp.Channel)
return nil
}
// Connect auto reconnect to rabbitmq when we lost connection.
func (c *Connection) Connect(ctx context.Context, errorGroup *errgroup.Group) error {
if !c.isClosed {
if err := c.connect(ctx); err != nil {
return errors.Wrap(err, "connect")
}
}
c.errorGroup = errorGroup
c.chanCtx = ctx
c.errorGroup.Go(func() error {
logger := zerolog.Ctx(ctx)
logger.Info().Msg("starting connection watcher")
for {
select {
case <-ctx.Done():
logger.Info().Msg("connection watcher stopped")
return ctx.Err()
default:
reason, ok := <-c.conn.NotifyClose(make(chan *amqp.Error))
if !ok {
if c.isClosed {
return nil
}
logger.Err(reason).Msg("rabbitMQ connection unexpected closed")
c.mu.Lock()
for _, timeout := range c.backoffPolicy {
if connErr := c.connect(ctx); connErr != nil {
logger.Err(connErr).Msg("connection failed, trying to reconnect to rabbitMQ")
time.Sleep(timeout)
continue
}
break
}
c.mu.Unlock()
}
}
}
})
return nil
}
Как видно, основная идея была связана с мьютексом mu
, который будет блокировать возможность получить доступ к оригинальному коннекту (из библиотеки amqp). Т.е. если происходит какая-то ошибка, консьюмер и продюсер должны попытаться переподключиться, они наткнуться на блокировку и будут ждать восстановления подключения. Как только блокировка будет снята, они смогут заново полностью инициализироваться.
Не забываем, что на стороне сервера может закрыться не только подключение, но и каналы. Для этого по аналогии с подключением используется метод NotifyClose, который регистрирует слушателя для событий о закрытии канала или подключения. Если канал закрывается, то он удаляется из пула и соотвественно ошибка, которая долетит до продюсера/консьюмера вызовет повторное создание канала.
func (c *Connection) GetChannelFromPool(exchange, key, queue, consumer string) (*amqp.Channel, error) {
c.channelPoolMu.Lock()
defer c.channelPoolMu.Unlock()
var err error
poolKey := ChannelPoolItemKey{
Exchange: exchange,
Key: key,
Queue: queue,
Consumer: consumer,
}
ch, ok := c.channelPool[poolKey]
if !ok {
ch, err = c.conn.Channel()
if err != nil {
return nil, errors.Wrap(err, "create channel")
}
c.channelPool[poolKey] = ch
c.chanWatcher(poolKey)
}
return ch, nil
}
func (c *Connection) chanWatcher(poolKey ChannelPoolItemKey) {
ch := c.channelPool[poolKey]
c.errorGroup.Go(func() error {
logger := zerolog.Ctx(c.chanCtx)
logger.Info().Msg("starting channel watcher")
for {
select {
case <-c.chanCtx.Done():
logger.Info().Msg("channel watcher stopped")
return c.chanCtx.Err()
default:
reason, ok := <-ch.NotifyClose(make(chan *amqp.Error))
if !ok {
if c.isClosed {
return nil
}
logger.Err(reason).Msg("rabbitMQ channel unexpected closed")
c.channelPoolMu.Lock()
delete(c.channelPool, poolKey)
c.channelPoolMu.Unlock()
return nil
}
}
}
})
}
После создания подключения переходим к его закрытию и отображению состояния:
func (c *Connection) Close(_ context.Context) error {
c.mu.Lock()
defer c.mu.Unlock()
c.isClosed = true
for _, ch := range c.channelPool {
if err := ch.Close(); err != nil {
return errors.Wrap(err, "close rabbitMQ channel")
}
}
if err := c.conn.Close(); err != nil {
return errors.Wrap(err, "close rabbitMQ connection")
}
return nil
}
func (c *Connection) IsClosed() bool {
return c.isClosed
}
Сам Connection
, который реализует всё вышеописанное, представлен ниже.
type Connection struct {
dsn string
backoffPolicy []time.Duration
conn *amqp.Connection
serviceChannel *amqp.Channel
mu sync.RWMutex
channelPool map[ChannelPoolItemKey]*amqp.Channel
channelPoolMu sync.RWMutex
isClosed bool
errorGroup *errgroup.Group
chanCtx context.Context
}
Конечно не хорошо передавать контекст в структуру. Но это было сделано сознательно, чтобы обёртки над стандартными методами библиотеки amqp были взаимозаменяемы с ними.
Ниже код обёрток над стандартными методами библиотеки amqp:
func (c *Connection) ExchangeDeclare(name, kind string, durable, autoDelete, internal, noWait bool, args amqp.Table) error {
c.mu.RLock()
defer c.mu.RUnlock()
return c.serviceChannel.ExchangeDeclare(name, kind, durable, autoDelete, internal, noWait, args)
}
func (c *Connection) QueueDeclare(name string, durable, autoDelete, exclusive, noWait bool, args amqp.Table) (amqp.Queue, error) {
c.mu.RLock()
defer c.mu.RUnlock()
return c.serviceChannel.QueueDeclare(name, durable, autoDelete, exclusive, noWait, args)
}
func (c *Connection) QueueBind(name, key, exchange string, noWait bool, args amqp.Table) error {
c.mu.RLock()
defer c.mu.RUnlock()
return c.serviceChannel.QueueBind(name, key, exchange, noWait, args)
}
func (c *Connection) Consume(
queue, consumer string,
autoAck, exclusive, noLocal, noWait bool,
args amqp.Table) (<-chan amqp.Delivery, error) {
c.mu.RLock()
defer c.mu.RUnlock()
ch, err := c.GetChannelFromPool("", "", queue, consumer)
if err != nil {
return nil, errors.Wrap(err, "get channel from pool")
}
return ch.Consume(queue, consumer, autoAck, exclusive, noLocal, noWait, args)
}
// nolint:gocritic // pass msg without pointer as in original func in amqp
func (c *Connection) Publish(exchange, key string, mandatory, immediate bool, msg amqp.Publishing) error {
c.mu.RLock()
defer c.mu.RUnlock()
ch, err := c.GetChannelFromPool(exchange, key, "", "")
if err != nil {
return errors.Wrap(err, "get channel from pool")
}
return ch.Publish(exchange, key, mandatory, immediate, msg)
}
Consumer
В конструктор консьюмера передаётся созданное подключение, а далее запускается подписка на события из очереди. Подписка запускается в отдельной рутине, если происходит ошибка, текущая рутина закрывается и создаётся новая.
func (c *Consumer) subscribe(ctx context.Context, errorGroup *errgroup.Group, subscriber Subscriber) error {
logger := zerolog.Ctx(ctx)
var msg <-chan amqp.Delivery
var err error
for {
if msg, err = c.connect(ctx); err != nil {
logger.Err(err).Msg("connect consumer to rabbitMQ")
time.Sleep(10 * time.Second)
continue
}
break
}
logger.Info().Msg("consumer connected")
for {
select {
case <-ctx.Done():
logger.Info().Msg("connection watcher stopped")
if err := subscriber.Shutdown(ctx); err != nil {
logger.Err(err).Msg("shutdown handler")
}
return ctx.Err()
case d, ok := <-msg:
if ok {
logger.Debug().Msgf("got new event %+v", string(d.Body))
if errConsume := subscriber.Consume(ctx, d.Body); errConsume != nil {
logger.Err(errConsume).Msg("consume message")
}
if err := d.Ack(true); err != nil {
logger.Err(err).Msg("ack")
}
} else {
if c.conn.IsClosed() {
return nil
}
logger.Info().Msg("try to reconnect consumer")
errorGroup.Go(func() error {
return c.subscribe(ctx, errorGroup, subscriber)
})
return nil
}
}
}
}
// Subscribe to channel for receiving message
func (c *Consumer) Subscribe(ctx context.Context, errorGroup *errgroup.Group, subscriber Subscriber) error {
errorGroup.Go(func() error {
return c.subscribe(ctx, errorGroup, subscriber)
})
return nil
}
Обработка полученного сообщения выполняется в методе Consume
переданного в консьюмер подписчика, реализующего интерфейс Subscriber
.
type Subscriber interface {
Consume(ctx context.Context, data []byte) error
Shutdown(ctx context.Context) error
}
В этом интерфейсе также есть метод Shutdown
для действий при штатной остановки консьюмера.
В приватном методе connect
выполняется создание exchange, очереди, биндиг очереди к exchange и создание канала на прослушивание событий.
func (c *Consumer) connect(_ context.Context) (<-chan amqp.Delivery, error) {
if err := c.conn.ExchangeDeclare(c.config.ExchangeName, "direct", true,
false, false,
false, nil); err != nil {
return nil, errors.Wrap(err, "declare a exchange")
}
if _, err := c.conn.QueueDeclare(
c.config.RabbitQueue, // name
true, // durable
false, // delete when unused
false, // exclusive
false, // no-wait
nil, // arguments
); err != nil {
return nil, errors.Wrap(err, "declare a queue")
}
if err := c.conn.QueueBind(
c.config.RabbitQueue, // queue name
c.config.RoutingKey, // routing key
c.config.ExchangeName, // exchange
false,
nil,
); err != nil {
return nil, errors.Wrap(err, "bind to queue")
}
msg, err := c.conn.Consume(
c.config.RabbitQueue, // queue
c.config.RabbitConsume, // consume
false, // auto-ack
false, // exclusive
false, // no-local
false, // no-wait
nil, // args
)
if err != nil {
return nil, errors.Wrap(err, "consume message")
}
return msg, nil
}
Publisher
Также как и при создании консьюмера в конструктор паблишера передаётся созданное подключение. При первой попытке опубликовать создаётся exchange для публикаций. Если при публикации возникает ошибка, то пытаемся ещё раз. Если вторая попытка не удалась, то тогда возвращаем ошибку вызвавшему методу.
func (p *Publisher) connect(_ context.Context) error {
p.muConn.Lock()
defer p.muConn.Unlock()
if p.isConnected {
return nil
}
if err := p.conn.ExchangeDeclare(p.config.ExchangeName, "direct", true,
false, false,
false, nil); err != nil {
return errors.Wrap(err, "declare a exchange")
}
p.isConnected = true
return nil
}
// SendMessage publish message to exchange
func (p *Publisher) SendMessage(ctx context.Context, message interface{}) error {
logger := zerolog.Ctx(ctx)
body, err := json.Marshal(message)
if err != nil {
return errors.Wrap(err, "marshal message")
}
ampqMsg := buildMessage(body)
logger.Debug().Msgf("send message: %s", string(body))
if !p.isConnected {
if err := p.connect(ctx); err != nil {
logger.Err(err).Msg("connect publisher to rabbitMQ")
}
}
// We try to send message twice. Between attempts we try to reconnect.
if err := p.sendMessage(ctx, ampqMsg); err != nil {
if errRetryPub := p.sendMessage(ctx, ampqMsg); err != nil {
if errBadMsg := p.badMessages(ctx); errBadMsg != nil {
return errors.Wrap(errBadMsg, "count bad messages")
}
return errors.Wrap(errRetryPub, "retry publish a message")
}
}
if err := p.okMessages(ctx); err != nil {
return errors.Wrap(err, "count ok messages")
}
return nil
}
func (p *Publisher) sendMessage(ctx context.Context, ampqMsg *amqp.Publishing) error {
logger := zerolog.Ctx(ctx)
if !p.isConnected {
if err := p.connect(ctx); err != nil {
logger.Err(err).Msg("connect publisher to rabbitMQ")
}
}
if err := p.conn.Publish(
p.config.ExchangeName,
p.config.RoutingKey,
false,
false,
*ampqMsg,
); err != nil {
p.muConn.Lock()
p.isConnected = false
p.muConn.Unlock()
return errors.Wrap(err, "publish a message")
}
return nil
}
Методы badMessages
и okMessages
используются для подсчёта статистки успеха отправки сообщений. buildMessage
небольшой хелпер для подготовки сообщения для отправки.
Заключение
Написанный код ещё плохо покрыт тестами. Планируется использовать тесты с использованием докера, чтобы проверять функционал на реальном RabbitMQ. Но есть тестовый микросервис, который использует данный функционал. При его запуске можно отправить в очередь консьюмера событие, которое будет обработано сервисом и приведёт к отправке сообщения паблишером. При перезапуске RabbitMQ микросервис автоматически переподключается. Остановка тестового микросервиса также выполняется штатно.