Comments 16
Метод Publish в цикле отправляет сообщения в каналы подписчиков. Если подписчик по каким либо причинам не вычитывает сообщение из канала, то цикл блокируется??
for _, ch := range chnls {
// может быть здесь хотя бы select залепить от блокировки
ch <- message
}
Вы правы. Всё есть тут: https://github.com/istovpets/messaging/blob/master/messaging.go#L237 А в данной статье - нарочито упрощенный учебный пример.
Подписчик не может не вычитывать - это часть Notifiyer. Но вот если хэндлер подписчика будет долго тупить - заблокируется. В конце статьи есть об этом. В боевом коде конечно так лучше не делать. И в библиотеке я предусмотрел несколько стратегий на этот счет
Главная проблема такой реализации, мне кажется, в том, что при любом перезапуске сервиса потеряются все недополученные сообщения. Фикс этой проблемы приведет к изобретению своей версии редиски, кролика и им подобным :)
Это не проблема, ИМХО. Это же внутрисервисные сообщения - между компонентами одного приложения. Завершилась работа сервиса, завершились и внутренние сообщения и компоненты уничтожились. Это не замена Кролику )
так он же может завершиться нештатно? сервис перезапустится, publisher останется доволен, что всё отправил (до перезапуска), а ни один subscriber ничего не получит и потеряется какая-нибудь важная запись в бд например)
Ну если нештатно, тогда всё что угодно можно предполагать ). Тут и редиски с кроликами не помогут ))). Безусловно, в описанной тобой ситуации нужно не просто пулять сообщения, а что-то придумывать. Я когда писал либу и статью, ориентировался на System.Messaging в Delphi https://docwiki.embarcadero.com/CodeExamples/Athens/en/System.Messaging_(Delphi). По опыту знаю - очень удобная штука для сложных проектов. Кардинально уменьшает связанность и хаос в коде.
Жаль только не потокобезопасная из коробки.
Я сделал простейшие обертки для System.Messaging для работы в потоках. И если слать record-ы а не объекты - вообще проблем нет.
TEvent = (eMenuUpdated, eResetSession, eReadDiscCard, eSettingsChanged,
eStartScreenSaver, eStopScreenSaver);
TCommonMessage = record
Event: TEvent;
Value: Variant;
constructor Create(AEvent: TEvent; AValue: Variant);
end;
TCommonMessageListenerMethod = procedure(M: TCommonMessage) of object;
procedure SendCommonMessage(AEvent: TEvent; AValue: Variant); overload;
procedure SendCommonMessage(AEvent: TEvent); overload;
procedure QueueCommonMessageUIThread(AEvent: TEvent; AValue: Variant);
procedure SendMessage(AMessage: TMessage);
procedure RunUiThread(AThreadProc: TThreadProcedure);
procedure QueueUiThread(AThreadProc: TThreadProcedure);
procedure SendMessageUIThread(AMessage: TMessage);
procedure QueueMessageUIThread(AMessage: TMessage);
procedure SubscribeToCommonMessage(AListenerMethod
: TCommonMessageListenerMethod);
implementation
procedure SendCommonMessage(AEvent: TEvent; AValue: Variant); overload;
begin
SendMessage(TMessage<TCommonMessage>.Create(TCommonMessage.Create(AEvent,
AValue)));
end;
procedure SendCommonMessage(AEvent: TEvent); overload;
begin
SendCommonMessage(AEvent, 0);
end;
procedure QueueCommonMessageUIThread(AEvent: TEvent; AValue: Variant);
begin
QueueUiThread(
procedure
begin
SendMessage(TMessage<TCommonMessage>.Create(TCommonMessage.Create(AEvent,
AValue)));
end);
end;
procedure SendMessage(AMessage: TMessage);
begin
TMessageManager.DefaultManager.SendMessage(nil, AMessage, True)
end;
procedure RunUiThread(AThreadProc: TThreadProcedure);
begin
TThread.Synchronize(nil, AThreadProc);
end;
procedure QueueUiThread(AThreadProc: TThreadProcedure);
begin
TThread.Queue(nil, AThreadProc);
end;
procedure SendMessageUIThread(AMessage: TMessage);
begin
RunUiThread(
procedure
begin
SendMessage(AMessage)
end);
end;
procedure QueueMessageUIThread(AMessage: TMessage);
begin
QueueUiThread(
procedure
begin
SendMessage(AMessage)
end);
end;
А ещё то, что поздноподписавшиеся могут пропустить что-то интересное, что случилось до их прихода
В итоге, такая реализация накладывает определенные ограничения на порядок инициализации, вводя неочевидные зависимости.
В системе с большим количеством компонент это может больно стукнуть в какой-то момент
Предполагается, что все подписки инициализируются при старте приложения. Как любые другие зависимости. Вы же не говорите, что, допустим, накладываются ограничения на HTTP сервер из-за того, что БД не инициализирована до его старта? Это не полноценный брокер сообщений для микросервисов и хранить сообщения он не должен, ИМХО. Впрочем, если нужен именно такой сценарий - нужно сочинять что-то другое, не спорю.
Это понятно, да. Но возникают сложности, если, к примеру, подписка меняется динамически.
Делать очередь - тоже такое себе. Или она должна быть конечной, и тогда в какой-то момент она всё равно должна начать терять. Или бесконечной, но тогда она может выжрать всю память. Или блокировать отправителя, но есть риск навсегда его заблокировать.
Есть интересный паттерн, я применял его несколько раз в разных проектах.
Представим себе, что есть некоторое состояние, и смысл потока событий уведомить заинтересованных получателей в изменении этого состояния.
Если допустить, что нам достаточно, чтобы состояние с точки зрения отправителя и с точки зрения получателей достаточно, чтобы было eventually consistent, то на стороне отправителя можно хранить его представление о состоянии получателей и присылать только обновления.
Сложность тут растет пропорционально числу подписчиков и нет очереди, которая может переполниться.
Как частный случай, отправитель может просто уведомлять заинтересованные лица, что состояние изменилось, а они могут перечитывать его целиком. Очередь сообщений тут тоже не нужна, нет разницы, сколько таких уведомлений получит получатель, лишь бы последнее получил.
Ну вообще, я использовал раньше в проектах на Delphi подобный Notifiyer для сложных проектов, где один компонент должен уведомлять о чем-то в процессе работы другой компонент. Раньше для этого в Delphi использовали TNotifyEvent и подписывались, передавая хэндлер (коллбэк) - OnEvent. И всё это делало код очень связанным и запутанным. Так написан весь VCL в Delphi. Начиная с Delphi XE5 в нем появилась либа System.Messaging подобная тому, что я описал в статье. Использовать её после TNotifyEvent - просто кайф )). Собственно, глядя на неё и статью писал. И эти библиотеки не подразумевают хранение и передачу истории сообщений, динамическое подключение подписчиков. Это уже, на мой взгляд - другое. Это уже - брокер сообщений полноценный. Это гораздо больше. Как-то так )
TNotifyEvent - это часть системы событий - база ООП. Система сообщений полноценная для VCL - это использование Dispatch. И.е. события, которые работают через message модификатор (такие используется для работы хендлов и контролов winapi). Собственные сообщения тоже можно создавать. Но работа, конечно, намного менее удобная, чем System.Messaging. Однако, потокобезопасная.
С чего она потокобезопасная вдруг стала? Там просто всё в основном потоке, это ж VCL )). Потокобезопасность обеспечивает Винда. Цикл обработки сообщений крутится в главном потоке. Нет там нигде никакой синхронизации в коде Delphi. И я не про windowsMessage вообще, не про оконные сообщения и их обработку. Я про коллбеки обычные говорил. Типа - объект имеет поле “FOnCreate: TNotifyEvent” и другой объект в это поле кладет свой коллбэк: Obj.OnCreate := … И вот тут возникает связанность, от которой спасает паттерн Pub/Sub как раз.
Паттерны конкурентности в Go. Подробный разбор. Часть 3. Pub/Sub