Pull to refresh

Comments 16

Метод Publish в цикле отправляет сообщения в каналы подписчиков. Если подписчик по каким либо причинам не вычитывает сообщение из канала, то цикл блокируется??

for _, ch := range chnls {

// может быть здесь хотя бы select залепить от блокировки

ch <- message

}

Подписчик не может не вычитывать - это часть Notifiyer. Но вот если хэндлер подписчика будет долго тупить - заблокируется. В конце статьи есть об этом. В боевом коде конечно так лучше не делать. И в библиотеке я предусмотрел несколько стратегий на этот счет

Главная проблема такой реализации, мне кажется, в том, что при любом перезапуске сервиса потеряются все недополученные сообщения. Фикс этой проблемы приведет к изобретению своей версии редиски, кролика и им подобным :)

Это не проблема, ИМХО. Это же внутрисервисные сообщения - между компонентами одного приложения. Завершилась работа сервиса, завершились и внутренние сообщения и компоненты уничтожились. Это не замена Кролику )

так он же может завершиться нештатно? сервис перезапустится, publisher останется доволен, что всё отправил (до перезапуска), а ни один subscriber ничего не получит и потеряется какая-нибудь важная запись в бд например)

Ну если нештатно, тогда всё что угодно можно предполагать ). Тут и редиски с кроликами не помогут ))). Безусловно, в описанной тобой ситуации нужно не просто пулять сообщения, а что-то придумывать. Я когда писал либу и статью, ориентировался на System.Messaging в Delphi https://docwiki.embarcadero.com/CodeExamples/Athens/en/System.Messaging_(Delphi). По опыту знаю - очень удобная штука для сложных проектов. Кардинально уменьшает связанность и хаос в коде.

Жаль только не потокобезопасная из коробки.

Я сделал простейшие обертки для System.Messaging для работы в потоках. И если слать record-ы а не объекты - вообще проблем нет.

  TEvent = (eMenuUpdated, eResetSession, eReadDiscCard, eSettingsChanged,
    eStartScreenSaver, eStopScreenSaver);

  TCommonMessage = record
    Event: TEvent;
    Value: Variant;
    constructor Create(AEvent: TEvent; AValue: Variant);
  end;

  TCommonMessageListenerMethod = procedure(M: TCommonMessage) of object;

procedure SendCommonMessage(AEvent: TEvent; AValue: Variant); overload;
procedure SendCommonMessage(AEvent: TEvent); overload;
procedure QueueCommonMessageUIThread(AEvent: TEvent; AValue: Variant);
procedure SendMessage(AMessage: TMessage);
procedure RunUiThread(AThreadProc: TThreadProcedure);
procedure QueueUiThread(AThreadProc: TThreadProcedure);
procedure SendMessageUIThread(AMessage: TMessage);
procedure QueueMessageUIThread(AMessage: TMessage);
procedure SubscribeToCommonMessage(AListenerMethod
  : TCommonMessageListenerMethod);

implementation

procedure SendCommonMessage(AEvent: TEvent; AValue: Variant); overload;
begin
  SendMessage(TMessage<TCommonMessage>.Create(TCommonMessage.Create(AEvent,
    AValue)));
end;

procedure SendCommonMessage(AEvent: TEvent); overload;
begin
  SendCommonMessage(AEvent, 0);
end;

procedure QueueCommonMessageUIThread(AEvent: TEvent; AValue: Variant);
begin
  QueueUiThread(
    procedure
    begin
      SendMessage(TMessage<TCommonMessage>.Create(TCommonMessage.Create(AEvent,
        AValue)));
    end);
end;

procedure SendMessage(AMessage: TMessage);
begin
  TMessageManager.DefaultManager.SendMessage(nil, AMessage, True)
end;

procedure RunUiThread(AThreadProc: TThreadProcedure);
begin
  TThread.Synchronize(nil, AThreadProc);
end;

procedure QueueUiThread(AThreadProc: TThreadProcedure);
begin
  TThread.Queue(nil, AThreadProc);
end;

procedure SendMessageUIThread(AMessage: TMessage);
begin
  RunUiThread(
    procedure
    begin
      SendMessage(AMessage)
    end);
end;

procedure QueueMessageUIThread(AMessage: TMessage);
begin
  QueueUiThread(
    procedure
    begin
      SendMessage(AMessage)
    end);
end;

А ещё то, что поздноподписавшиеся могут пропустить что-то интересное, что случилось до их прихода

В итоге, такая реализация накладывает определенные ограничения на порядок инициализации, вводя неочевидные зависимости.

В системе с большим количеством компонент это может больно стукнуть в какой-то момент

Предполагается, что все подписки инициализируются при старте приложения. Как любые другие зависимости. Вы же не говорите, что, допустим, накладываются ограничения на HTTP сервер из-за того, что БД не инициализирована до его старта? Это не полноценный брокер сообщений для микросервисов и хранить сообщения он не должен, ИМХО. Впрочем, если нужен именно такой сценарий - нужно сочинять что-то другое, не спорю.

Это понятно, да. Но возникают сложности, если, к примеру, подписка меняется динамически.

Делать очередь - тоже такое себе. Или она должна быть конечной, и тогда в какой-то момент она всё равно должна начать терять. Или бесконечной, но тогда она может выжрать всю память. Или блокировать отправителя, но есть риск навсегда его заблокировать.

Есть интересный паттерн, я применял его несколько раз в разных проектах.

Представим себе, что есть некоторое состояние, и смысл потока событий уведомить заинтересованных получателей в изменении этого состояния.

Если допустить, что нам достаточно, чтобы состояние с точки зрения отправителя и с точки зрения получателей достаточно, чтобы было eventually consistent, то на стороне отправителя можно хранить его представление о состоянии получателей и присылать только обновления.

Сложность тут растет пропорционально числу подписчиков и нет очереди, которая может переполниться.

Как частный случай, отправитель может просто уведомлять заинтересованные лица, что состояние изменилось, а они могут перечитывать его целиком. Очередь сообщений тут тоже не нужна, нет разницы, сколько таких уведомлений получит получатель, лишь бы последнее получил.

Ну вообще, я использовал раньше в проектах на Delphi подобный Notifiyer для сложных проектов, где один компонент должен уведомлять о чем-то в процессе работы другой компонент. Раньше для этого в Delphi использовали TNotifyEvent и подписывались, передавая хэндлер (коллбэк) - OnEvent. И всё это делало код очень связанным и запутанным. Так написан весь VCL в Delphi. Начиная с Delphi XE5 в нем появилась либа System.Messaging подобная тому, что я описал в статье. Использовать её после TNotifyEvent - просто кайф )). Собственно, глядя на неё и статью писал. И эти библиотеки не подразумевают хранение и передачу истории сообщений, динамическое подключение подписчиков. Это уже, на мой взгляд - другое. Это уже - брокер сообщений полноценный. Это гораздо больше. Как-то так )

TNotifyEvent - это часть системы событий - база ООП. Система сообщений полноценная для VCL - это использование Dispatch. И.е. события, которые работают через message модификатор (такие используется для работы хендлов и контролов winapi). Собственные сообщения тоже можно создавать. Но работа, конечно, намного менее удобная, чем System.Messaging. Однако, потокобезопасная.

С чего она потокобезопасная вдруг стала? Там просто всё в основном потоке, это ж VCL )). Потокобезопасность обеспечивает Винда. Цикл обработки сообщений крутится в главном потоке. Нет там нигде никакой синхронизации в коде Delphi. И я не про windowsMessage вообще, не про оконные сообщения и их обработку. Я про коллбеки обычные говорил. Типа - объект имеет поле “FOnCreate: TNotifyEvent” и другой объект в это поле кладет свой коллбэк: Obj.OnCreate := … И вот тут возникает связанность, от которой спасает паттерн Pub/Sub как раз.

Прочитайте внимательно ещё раз. Потокобезопасная система сообщений, для VCL - это Dispatch. Т.е. SendMessage/PostMessage. Ключевое слово было Dispatch.

Вы о них не говорили, зато говорил я. Это хорошая система, которая тоже обеспечивает асинхронность и множественные подписи на событие.

Sign up to leave a comment.

Articles