Доброго времени суток Хабр. Вдохновленный моделью синхронизации потоков в go и сигналов в QT появилась идея реализовать нечто подобное на c#.

Если интересно, прошу под кат.
В данный момент синхронизация потоков в c# вызывает некоторые затруднения, в частности передача примитивов синхронизации между объектами Вашего приложения и поддержка этого всего в дальнейшем.
Текущая модель с Task и IAsyncResult а так же TPL в целом решают все проблемы при должном проектировании но хотелось создать простой класс через который можно будет отправлять и принимать сигналы с блокировкой потока.
В общем в голове созрел некий интерфейс:
, где T — сущность которую необходимо передать получателю.
Пример вызова:
Для получения объекта сигнала создадим фабрику.
Signal — internal класс для синхронизации внутри одного процесса. Для синхронизации необходима ссылка на объект.
CrossProcessSignal — internal класс который может синхронизировать потоки в отдельных процессах(но об этом чуть позже).
Первое, что приходит на ум, в Receive блокировать выполнение потока с помощью Semaphore а в методе Send вызывать Release() этого семафора с количеством блокированных потоков.
После разблокировки потоков возвращать результат из поля класса T buffer. Но мы не знаем какое количество потоков будет висеть в Receive и нет гарантии что к вызову Release не подбежит еще пара потоков.
В качестве примитива синхронизации был выбран AutoResetEvent. Для каждого нового потока будет создаваться свой AutoResetEvent, хранить все это добро мы будем в словаре Dictionary<int,AutoResetEvent> где ключ это id потока.
Собственно поля класса выглядят так:
Объект sync будет нам необходим при вызове Send, дабы несколько потоков не начали перетирать буфер.
isDisposabled флаг указывающий был ли вызван Dispose(), если не вызван то вызываем его в деструкторе.
Теперь о методе Receive.
GetEvents() достает из словаря AutoResetEvent если есть, если нет то создает новый и кладет его в словарь.
waiter.WaitOne() блокировка потока до ожидания сигнала.
waiter.Reset() сброс текущего состояния AutoResetEvent. Следующий вызов WaitOne приведет к блокировке потока.
Осталось только вызвать метод Set для каждого AutoResetEvent.
Проверить данную модель можно тестом:
Данной реализации есть куда расти в плане надежности. В исходниках есть межпроцессорная реализация этой идеи с передачей сигнала через shared memory, если будет интересно могу написать об этом отдельную статью.
Исходники на Гитхабе

Если интересно, прошу под кат.
В данный момент синхронизация потоков в c# вызывает некоторые затруднения, в частности передача примитивов синхронизации между объектами Вашего приложения и поддержка этого всего в дальнейшем.
Текущая модель с Task и IAsyncResult а так же TPL в целом решают все проблемы при должном проектировании но хотелось создать простой класс через который можно будет отправлять и принимать сигналы с блокировкой потока.
В общем в голове созрел некий интерфейс:
public interface ISignal<T> : IDisposable { void Send(T signal); T Receive(); T Receive(int timeOut); }
, где T — сущность которую необходимо передать получателю.
Пример вызова:
[TestMethod] public void ExampleTest() { var signal = SignalFactory.GetInstanse<string>(); var task1 = Task.Factory.StartNew(() => // старт потока { Thread.Sleep(1000); signal.Send("Some message"); }); // блокировка текущего потока string message = signal.Receive(); Debug.WriteLine(message); }
Для получения объекта сигнала создадим фабрику.
public static class SignalFactory { public static ISignal<T> GetInstanse<T>() { return new Signal<T>(); } public static ISignal<T> GetInstanse<T>(string name) { return new CrossProcessSignal<T>(name); } }
Signal — internal класс для синхронизации внутри одного процесса. Для синхронизации необходима ссылка на объект.
CrossProcessSignal — internal класс который может синхронизировать потоки в отдельных процессах(но об этом чуть позже).
Теперь о реализации Signal
Первое, что приходит на ум, в Receive блокировать выполнение потока с помощью Semaphore а в методе Send вызывать Release() этого семафора с количеством блокированных потоков.
После разблокировки потоков возвращать результат из поля класса T buffer. Но мы не знаем какое количество потоков будет висеть в Receive и нет гарантии что к вызову Release не подбежит еще пара потоков.
В качестве примитива синхронизации был выбран AutoResetEvent. Для каждого нового потока будет создаваться свой AutoResetEvent, хранить все это добро мы будем в словаре Dictionary<int,AutoResetEvent> где ключ это id потока.
Собственно поля класса выглядят так:
private T buffer; Dictionary<int,AutoResetEvent> events = new Dictionary<int, AutoResetEvent>(); private volatile object sync = new object(); private bool isDisposabled = false;
Объект sync будет нам необходим при вызове Send, дабы несколько потоков не начали перетирать буфер.
isDisposabled флаг указывающий был ли вызван Dispose(), если не вызван то вызываем его в деструкторе.
public void Dispose() { foreach(var resetEvent in events.Values) { resetEvent.Dispose(); } isDisposabled = true; } ~Signal() { if (!isDisposabled) { Dispose(); } }
Теперь о методе Receive.
public T Receive() { var waiter = GetEvents(); waiter.WaitOne(); waiter.Reset(); return buffer; }
GetEvents() достает из словаря AutoResetEvent если есть, если нет то создает новый и кладет его в словарь.
waiter.WaitOne() блокировка потока до ожидания сигнала.
waiter.Reset() сброс текущего состояния AutoResetEvent. Следующий вызов WaitOne приведет к блокировке потока.
Осталось только вызвать метод Set для каждого AutoResetEvent.
public void Send(T signal) { lock (sync) { buffer = signal; foreach(var autoResetEvent in events.Values) { autoResetEvent.Set(); } } }
Проверить данную модель можно тестом:
Тест
private void SendTest(string name = "") { ISignal<string> signal; if (string.IsNullOrEmpty(name)) { signal = SignalFactory.GetInstanse<string>(); // создаем локальный сигнал } else { signal = SignalFactory.GetInstanse<string>(name); } var task1 = Task.Factory.StartNew(() => // старт потока { for (int i = 0; i < 10; i++) { // блокировка потока, ожидание сигнала var message = signal.Receive(); Debug.WriteLine($"Thread 1 {message}"); } }); var task2 = Task.Factory.StartNew(() => // старт потока { for (int i = 0; i < 10; i++) { // блокировка потока, ожидание сигнала var message = signal.Receive(); Debug.WriteLine($"Thread 2 {message}"); } }); for (int i = 0; i < 10; i++) { // отправка сигнала ожидающим потокам. signal.Send($"Ping {i}"); Thread.Sleep(50); } }
Листинг класса Signal
using System.Collections.Generic; using System.Threading; namespace Signal { internal class Signal<T> : ISignal<T> { private T buffer; Dictionary<int,AutoResetEvent> events = new Dictionary<int, AutoResetEvent>(); private volatile object sync = new object(); private bool isDisposabled = false; ~Signal() { if (!isDisposabled) { Dispose(); } } public T Receive() { var waiter = GetEvents(); waiter.WaitOne(); waiter.Reset(); return buffer; } public T Receive(int timeOut) { var waiter = GetEvents(); waiter.WaitOne(timeOut); waiter.Reset(); return buffer; } public void Send(T signal) { lock (sync) { buffer = signal; foreach(var autoResetEvent in events.Values) { autoResetEvent.Set(); } } } private AutoResetEvent GetEvents() { var threadId = Thread.CurrentThread.ManagedThreadId; AutoResetEvent autoResetEvent; if (!events.ContainsKey(threadId)) { autoResetEvent = new AutoResetEvent(false); events.Add(threadId, autoResetEvent); } else { autoResetEvent = events[threadId]; } return autoResetEvent; } public void Dispose() { foreach(var resetEvent in events.Values) { resetEvent.Dispose(); } isDisposabled = true; } } }
Данной реализации есть куда расти в плане надежности. В исходниках есть межпроцессорная реализация этой идеи с передачей сигнала через shared memory, если будет интересно могу написать об этом отдельную статью.
Исходники на Гитхабе
