Search
Write a publication
Pull to refresh

Comments 27

А почему не shared_mutex вместо счетчик потоков, которые занимаются рассылкой ?

https://learn.microsoft.com/ru-ru/cpp/standard-library/shared-mutex?view=msvc-170

Эти улучшения - в следующих статьях. Это первая статья в цикле статей. Если интересно, по ссылке на Github - в соседних файлах есть shared_mutex.

А почему просто не с клонировать вектор подписчиков и итерироваться по нему? Тогда лок вывешивается только на время клонирования и проблема решена, все подписчики на момент вызова получат уведомление. Нет ничего страшного в клонировании вектора

Проблема:

struct Data { int val; };
struct User { std::unique_ptr<Data> data; };

User u;
//...
//подписка
auto subscriptionId = s.subscribe([&](){LOG() << u.data->val;});
//...
//отписка
s.unsubscribe(subscriptionId);
u.data = nullptr;

Если мы в одном потоке скопировали вектор и пошли по нему, а за это время другой поток успел отписаться и занулить указатель, то произойдёт упс.

произойдёт упс.

У вас тут наличествует откровенный мисдизайн.

Ваш эвент должен содержать только данные которыми владеет только сам эвент и которые он освободит в момент своего удаления - то есть конструктор эвента должен клонировать данные, и деструктор эвента должен высвобождать данные, либо ссылки на ресурсы которые не будут удалены за всё время существования эвента.

А если вы используете shared resource - то необходимо использовать защиту ресурса (семафором, менеджером ресурсов, умным указателем - любым удобным вам инструментом) от удаления - а может и от модификации - на все то время, пока на ресурс есть ссылки.

Пихать же указатель в асинхронную обработку и освобождать его, не проверив пока таски не закончились - это безумие и прямой путь к сегфолту.

Как-то делали свою систему реактивности на c++. И подробная задача встала в чуть более обобщённом варианте. Нужна была возможность отдать из объекта callback, который имеет доступ к полям объекта. Не только для рассылок, в объем случае. Сам объект мог начать уничтожаться в произвольный момент из произвольного места. Завернуть весь объект в шаред и удерживать его не всегда было возможно. В итоге сделали под такие callback отдельный класс с блокирующим деструктором. И помещали их в конец определения класса. Пока шла обработка вызова callback, его деструктор просто не давал родительскому объекту уничтожитьия. Бонусом убрали необходимость обязательной рассылки. По типу weak_ptt. Если такой callback умер, тот вызова и не будет. Само потом почиститься. В итоге и ручных отписок в очень большом production приложении практически нет. За исключением какой-то сложной логики

либо ссылки на ресурсы которые не будут удалены за всё время существования эвента.

Ну так в этом и проблема: с точки зрения клиента после возвращения из unsubscribe() эвент больше не существует. А в вашем решении получается, что каких-либо гарантий о том, когда именно он прекращает существовать, у клиента нет вообще (при обходе вектора предыдущий обработчик взял и вызвал sleep(20)).

С практической стороны, если хочется создать обработчик что-то делающий с объектом, то что прикажете делать в деструкторе этого объекта? Никакие умные указатели в полях объекта не помогут, объект должен умереть. Гарантировать что деструктор не будет вызван во время вызова notify()?

Так у вас эта проблема точно так же есть.

Цепочка l.unlock(); s.m_callback() неатомарна. Я вам больше скажу, сам вызов s.m_callback() описывается в несколько инструкций, после любой из которых тред может быть приостановлен и при этом уже разблокирован, но код внутри m_callback еще не начал выполняться.

Поэтому unsubscribe() может просунуться между этими двумя вызовами и вы ВНЕЗАПНО! (нет) получите в точности ту же проблему.

Так что самым правильным будет сделать две реализации - одна синхронная, под глобальным локом, в которой unsubscribe / subscribe не завершатся пока не завершатся все notify() - с которой вы и начали (что и правильно), и вторая более оптимистичная, которая реализует модель "после вызова unsubscribe новые вызовы notify уже не попадут в обработчик, но уже сделанные незавершенные вызовы могут привести к его вызову" (ситуация когда функтор, как вы его назвали, не удаляется).

А половинчатые решения со скрытым race condition как в вашем примере... Не надо их притаскивать. Их очень трудно отдебажить но на них легко подскользнуться.

(На всякий случай: я не автор статьи и не автор текста статьи, я комментатор.)

В случае вызова unsubscribe() во втором варианте он же заблокируется в it->m_waiter.wait(l, [&it](){ return ! it->m_refs; }); . Или я чего-то не вижу?

И это даже хуже. В notify лок взяли, начали цикл, сняли лок внутри цикла, ушли в колобок. И тут кто то вызывает unsubscribe параллельно, unsubscribe весь под локом, лок взят. Unsubscribe ачинает ожидать завершения использования элемента через счётчик ссылок. Под локом, да. В notify завершается обработка этой подписки и пытается взять лок внутри цикла перед уменьшением счётчика ссылок. Но этот лок удерживается unsubscribe. Всё, повисли.

И во время снятого в ожидании лока вызывается второй unsubscribe с теми же параметрами к нам приходит double-free?

Выглядит как проблема. @Corosan , мы чего-то не понимаем, или это действительно failure mode, и предполагается что двойной вызов unsubscribe() есть ошибка?

Двойной вызов unsubscribe для отсоединения одного и того же подписчика - ошибка. Такой задачи я себе не ставил при дизайне кода.

Выглядит как будто структуре не нужен конструктор вовсе - ничто не мешает её инициализировать обычным {} , а в свежих плюсах можно ещё и как в си красиво сделать

{.id=next_id(), .callback=std::move(callback),...}

Попробуйте ;) С учётом наличия там std::condition_variable.

https://godbolt.org/z/rdfja3nro
умвр. остальной код остаётся таким же, исключая строчку про конструирование. Аналогично можно и неинициализированное конструирование сделать, если у вас 11 стандарт. Кондишка имеет дефолтный конструктор и примерно никак не мешает инициализации вашей структуры, бо вы все равно её позднее делаете, также как и обновление счётчика.

Вы мой код из гитхаба возьмите и попробуйте переписать. Что толку структуру на стеке создавать, как в Вашем примере, её же положить в контейнер надо. А переместить Вы её не сможете из-за std::condition_variable.

Перемещаем m_id первым членом и конструктор оказывается не нужен. Если волнует размер структуры - то перешаманить порядок вроде не сложно чтобы оставить тот же размер.

Ок, теперь вижу, спасибо. Подобные конструкции стали корректно компилироваться только в 20-м стандарте языка. Я компилировал с 17-м.

В 20-м стандарте добавили интересный пункт в разделе 9.4 "Initializers". Пункт 17.6.2.2: "Otherwise, if no constructor is viable, the destination type is an aggregate class, and the
initializer is a parenthesized expression-list, the object is initialized as follows...". Похоже на инициализацию агрегата через list-initialization (то, что из 'C' пришло), но в данном случае можно использовать и круглые скобки. Уравняли в правах {} и (). Правда, использовать именованные поля типа .m_ref = 1 - нельзя для (). В emplace используется именно такой вариант.

Вот за стандарт не уточнял. designated initiializers вроде с 21 первого только появились, но с агрегатной инициализацией по идее они достаточно равноправны. Мысль с emplace_back и его ограничениями понял.

Спасибо, классная статья. Понравилась, как иллюстрация "современного" cpp

std::funtion видел только на cpp reference 😅😅 на проде старый добрый указатель на функцию

Вы сами обозначили один из серьёзных недостатков простой реализации - невозможность подписываться/отписываться во время нотифая. Ещё бывает, что хочется управлять порядком вызова (приоритетами), потом не давать вызываться другим, то есть обрывать распространение события. И в конце концов получается монстр не меньше boost::signals, только хуже.

Я сам делал похожую вещь (а кто не делал?). Одно только удаление во время нотифая потребовало дополнительной структуры данных, которая не инвалидирует итераторы. А потом захотелось отписываться не по id, который вечно не ясно где хранить, а по самой функции, и приехали к реализации своего function.

И в конце концов получается монстр не меньше boost::signals, только хуже.

Не, не согласен. См. следующую статью - https://habr.com/ru/articles/843490/, отписка во время нотификации реализована. Что до приоритетов подписчиков - единственное, что для этого нужно, это хранить приоритет для каждого подписчика (+ одна строчка в subscription) и вставлять подписчика в subscribe() не в конец, а в правильное место с линейным поиском по приоритету (список выйдет всегда отсортированным). Плюс две строчки. Чтобы прервать обход подписчиков, если предыдущий вернул false, можно поменять сигнатуру оповещения с void(...) на bool(...). И добавить в notify один 'if'. Плюс одна строчка. Итого 135 строчек. Всё. Keep it simple.

Видел в проде такое решение: если подписчик удаляется во время оповещения, взводим у него флаг "deleted"; "удаленные" подписчики не вызываем, а после окончания оповещений лочим список подписчиков и сносим всё, что "удалилось". Если подписчик добавляется во время оповещения, добавляем его во временный список, который после окончания оповещений мержим с основным (опять же, под локом).
Правда, там подписчики идентифицировались не по уникальному id, а по по фиксированному id получателя ("один получатель -- одна подписка"), так что один и тот же подписчик можно было несколько раз добавить и удалить во время одного оповещения -- и потом страдать при отладке :)

Sign up to leave a comment.

Articles