Pull to refresh

RabbitMQ tutorial 5 — Тематики

Reading time4 min
Views43K
Продолжаю серию перевода уроков с официального сайта. Примеры будут на php, но их можно реализовать на большинстве популярных ЯП.

В предыдущей статье мы усовершенствовали системы логирования. Вместо точки доступа fanout (которая подходит только для элементарной трансляции сообщений), мы использовали direct — и получили возможность получать сообщения через определенные выборки.

Хоть direct и усовершенствовал нашу систему, он всё-таки имеет недостаток — он не может составить маршрут(routing) по нескольким критериям.

Например, нам понадобилось разделять логи не только по его типу важности, но и по источнику лога. Вы наверное встречали такую концепцию в unix инструменте syslog, которые различает логи по его типу важности(info/warn/crit...) и по его объекту (auth/cron/kern...).

Мы получаем гибкость в запросе. Например, мы может получить все логи с типом error, пришедшие из 'cron'-а, и все логи пришедшие с 'kern'. Для того, чтобы реализовать это в нашей системе логирования, изучим точку доступа — topic.

Topic


Сообщения, отправляемые в точку доступа topic, не могут отсылаться с произвольным ключом routing_key — это должен быть список слов, разделенный точкой. Слова могут быть любыми, но обычно они ассоциируются с какими-либо свойствами сообщения. Вот примеры правильного routing_key: «stock.usd.nyse», «nyse.vmw», «quick.orange.rabbit». Длина ключа не должна превышать 255 байт.

Binding key составляется по такому же правилу. Логика работы topic такая же как и у direct — сообщения доходят до тех очередей, binding key которых совпадает с routing key сообщения. Но есть 2 специальные возможности для topic:
  • * (star) может быть заменено на ровно 1 слово;
  • # (hash) может быть заменено на 0 или более слов.

Понятнее будет показать на рисунке:



В этом примере мы отправляем письма всем заданным животным. Сообщения содержат routing key состоящий из 3-х слов(с двумя точками). Первое слово характеризует скорость, второе цвет и третье вид: «speed.colour.species».

Очередь Q1 будет связана с ключом "*.orange.*", а очередь Q2 будет связана ключами "*.*.rabbit" и «lazy.#».

Связи могут быть обобщены как:
  1. Очередь Q1 интересуется всеми оранжевыми животными;
  2. Очередь Q2 хочет все знать о кроликах и о ленивых животных.


Сообщение с ключом «quick.orange.rabbit» дойдет до обоих очередей. Сообщение с ключом «lazy.orange.elephant» также придет в обе очереди. Сообщение с типом «quick.orange.fox» дойдет только до 1 очереди, а с типом «lazy.brown.fox» только до второй. Сообщение с типом «lazy.pink.rabbit» дойдет до второй очереди только 1 раз, хоть и соответствует двум связям. Сообщение с типом «quick.brown.fox» не дойдет ни до одной очереди.

Что будет, если послать сообщение с ключом, несоответствующее ни одному правилу: c 1 или 4 словами (например, «quick.orange.male.rabbit»). Такое сообщение никуда не придет и удалиться.

А вот сообщение с ключом «lazy.orange.male.rabbit, хоть и состоит из 4 слов, соответствует последней связи и попадет во вторую очередь.

Topic очень мощная точка доступа. Она может работать как другие точки доступа. Если в binding key поместить „#“, то она поведет себя как fanout. А если не использовать символы „*“ и „#“, то будет себя вести как direct.

Итого получаем


Мы будем использовать точку доступа topic для нашей системы логирования. Сделаем допущение, что routing key лога содержит 2 слова: 'facility.severity'. Код почти такой же как и в прошлой статье.

Скрипта emit_log_topic.php (продюсер):
<?php

require_once __DIR__ . '/vendor/autoload.php';
use PhpAmqpLib\Connection\AMQPConnection;
use PhpAmqpLib\Message\AMQPMessage;

$connection = new AMQPConnection('localhost', 5672, 'guest', 'guest');
$channel = $connection->channel();

$channel->exchange_declare('topic_logs', 'topic', false, false, false);

$routing_key = $argv[1];
if(empty($routing_key)) $routing_key = "anonymous.info";
$data = implode(' ', array_slice($argv, 2));
if(empty($data)) $data = "Hello World!";

$msg = new AMQPMessage($data);

$channel->basic_publish($msg, 'topic_logs', $routing_key);

echo " [x] Sent ",$routing_key,':',$data," \n";

$channel->close();
$connection->close();

?>

Скприпт receive_logs_topic.php (подписчик):

<?php

require_once __DIR__ . '/vendor/autoload.php';
use PhpAmqpLib\Connection\AMQPConnection;

$connection = new AMQPConnection('localhost', 5672, 'guest', 'guest');
$channel = $connection->channel();

$channel->exchange_declare('topic_logs', 'topic', false, false, false);

list($queue_name, ,) = $channel->queue_declare("", false, false, true, false);

$binding_keys = array_slice($argv, 1);
if( empty($binding_keys )) {
    file_put_contents('php://stderr', "Usage: $argv[0] [binding_key]\n");
    exit(1);
}

foreach($binding_keys as $binding_key) {
    $channel->queue_bind($queue_name, 'topic_logs', $binding_key);
}

echo ' [*] Waiting for logs. To exit press CTRL+C', "\n";

$callback = function($msg){
  echo ' [x] ',$msg->delivery_info['routing_key'], ':', $msg->body, "\n";
};

$channel->basic_consume($queue_name, '', false, true, false, false, $callback);

while(count($channel->callbacks)) {

    $channel->wait();
}

$channel->close();
$connection->close();

?>

Чтобы получить все:

$ php receive_logs_topic.php "#"

Чтобы получить все логи объекта kern:

$ phpreceive_logs_topic.php "kern.*"

Если хотите прослушать только логи с типом critical:

$ php receive_logs_topic.php "*.critical"

Создаем множественные связи:

$ php receive_logs_topic.php "kern.*" "*.critical"

Чтобы получить сообщения с ключом „kern.critical“:

$ php emit_log_topic.php "kern.critical" "A critical kernel error"

Заметьте, что код не создает никаких правил для routing key и binding key. Можно попробовать ввести в ключ более двух слов.

Полный код скриптов emit_log_topic.php и receive_logs_topic.php.
Tags:
Hubs:
Total votes 1: ↑1 and ↓0+1
Comments4

Articles