Появилась задача: имеется большой поток данных от множества клиентов, который нужно обработать очень быстро (желательно в реалтайме), сложить в БД и разослать другому множеству клиентов, при этом данные не структурируются ни в таблицы, ни в документы. Выбор технологий для реализации пал на Redis + C++.
Если заглянуть сюда, то видим 4 варианта библиотек для реализации своего клиента для Redis под данную задачу: hiredis, credis, libredis и C++ Client.
Казалось бы, последнее — это то, что нужно. Но эта библиотека находится в статусе «expiremental», что сразу же намекает на то, что нужно вооружаться напильником, энтузиазмом и большим терпением. А деньги платят не за количество труда, а за результат. К тому же функциональность этого клиента пока что оставляет желать лучшего.
Libredis. Эта штука видимо является плагином к движку PHP. Не подходит.
Credis. Очень простенькая библиотека, позволяющая выполнять только базовые команды Redis, без возможности выстроить составную команду (эта возможность стоит у них сейчас в TODO), а ведь есть модель данных, и хотелось бы делать атомарно несколько запросов одновременно.
Итак, выбор пал на hiredis — официальный клиент.
Следуя свежим веяниям IT-технологий и учитывая огромное количество клиентов, я задумался о том, что необходимо использовать неблокирующее соединение с Redis, тем более эта СУБД предоставляет такую возможность. В свою очередь hiredis предоставляет адаптеры для использования с 3 библиотеками событийно-ориентированной обработки данных: это ae, libevent и libev.
Тут важно подчеркнуть, что клиент для Redis — это всего лишь часть сервиса, которую нужно отправить в отдельный поток и каким-то образом отправлять в этот поток данные из других потоков, которые обрабатывают соединения с клиентами.
Ae. Не понял, что за библиотека, гугл мне не помог, если кто знает — просьба откомментить пруфлинком.
Libevent. Нет поддержки версий 2.x.x, только 1.x.x. Этот факт делает невозможным использование функционала в многопоточном приложении.
Итак, выбор пал на libev. Далее подробнее.
Статей, как оказалось, нет под данной теме. Есть публикации от производителей hiredis о том, как осуществить однопоточную событийно-ориентированную обработку данных (вот пруф) и есть документация по libev, в которой четко сказано: многопоточность не поддерживается по умолчанию, потому что нет однозначного алгоритма сделать потокобезопасную реализацию, но есть возможность создать отдельный watcher с типом ev_async, который будет принимать данные из других потоков.
Хочу еще сразу сделать оговорку: я попробовал несколько вариантов написания кода, но либо знаний не хватает, либо надо глубже в исходники рыть, вобщем в итоге лучшим пока что вариантом оказалось просто скопировать предоставленный адаптер для libev в свой проект и дописать необходимый функционал.
Итак. Для начала в структуру события redisLibevEvents добавляем еще один watcher:
Далее. Добавляем функцию-callback для обработки событий, приходящих из других потоков:
Далее. В функцию привязывания контекста к циклу обработки событий, которая называется redisLibevAttach, добавляем в конец следущее:
Теперь не хватает только одного: получения указателя на созданный watcher, так как он нужен для отправления событий. Для этого обрабатываем еще разок напильником функцию redisLibevAttach:
Собственно после уже можно использовать.
Функция инициализации для цикла событий:
Функция потока:
Ну и функция добавления нового события:
Ну вот и все. Полученная смесь C и C++ вполне жизнеспособна, хоть и не очень красиво выглядит.
Надеюсь кому-нибудь пригодится, пока не вышли официальные годные реализации на С++.
P.S.: Читайте продолжение во второй части.
Выбор библиотеки для Redis
Если заглянуть сюда, то видим 4 варианта библиотек для реализации своего клиента для Redis под данную задачу: hiredis, credis, libredis и C++ Client.
Казалось бы, последнее — это то, что нужно. Но эта библиотека находится в статусе «expiremental», что сразу же намекает на то, что нужно вооружаться напильником, энтузиазмом и большим терпением. А деньги платят не за количество труда, а за результат. К тому же функциональность этого клиента пока что оставляет желать лучшего.
Libredis. Эта штука видимо является плагином к движку PHP. Не подходит.
Credis. Очень простенькая библиотека, позволяющая выполнять только базовые команды Redis, без возможности выстроить составную команду (эта возможность стоит у них сейчас в TODO), а ведь есть модель данных, и хотелось бы делать атомарно несколько запросов одновременно.
Итак, выбор пал на hiredis — официальный клиент.
Выбор событийно-ориентированной библиотеки
Следуя свежим веяниям IT-технологий и учитывая огромное количество клиентов, я задумался о том, что необходимо использовать неблокирующее соединение с Redis, тем более эта СУБД предоставляет такую возможность. В свою очередь hiredis предоставляет адаптеры для использования с 3 библиотеками событийно-ориентированной обработки данных: это ae, libevent и libev.
Тут важно подчеркнуть, что клиент для Redis — это всего лишь часть сервиса, которую нужно отправить в отдельный поток и каким-то образом отправлять в этот поток данные из других потоков, которые обрабатывают соединения с клиентами.
Ae. Не понял, что за библиотека, гугл мне не помог, если кто знает — просьба откомментить пруфлинком.
Libevent. Нет поддержки версий 2.x.x, только 1.x.x. Этот факт делает невозможным использование функционала в многопоточном приложении.
Итак, выбор пал на libev. Далее подробнее.
Hiredis+libev+multithread
Статей, как оказалось, нет под данной теме. Есть публикации от производителей hiredis о том, как осуществить однопоточную событийно-ориентированную обработку данных (вот пруф) и есть документация по libev, в которой четко сказано: многопоточность не поддерживается по умолчанию, потому что нет однозначного алгоритма сделать потокобезопасную реализацию, но есть возможность создать отдельный watcher с типом ev_async, который будет принимать данные из других потоков.
Хочу еще сразу сделать оговорку: я попробовал несколько вариантов написания кода, но либо знаний не хватает, либо надо глубже в исходники рыть, вобщем в итоге лучшим пока что вариантом оказалось просто скопировать предоставленный адаптер для libev в свой проект и дописать необходимый функционал.
Итак. Для начала в структуру события redisLibevEvents добавляем еще один watcher:
typedef struct redisLibevEvents {
redisAsyncContext *context;
struct ev_loop *loop;
int reading, writing;
ev_io rev, wev;
ev_async aev; // Это специально обученный watcher для получения событий из других потоков.
} redisLibevEvents;
Далее. Добавляем функцию-callback для обработки событий, приходящих из других потоков:
void redisLibevAsyncEvent(EV_P_ ev_async *watcher, int revents) { // Функция написана по образу и подобию callback-ов для watcher-ов типа ev_io.
#if EV_MULTIPLICITY
((void)loop);
#endif
((void)revents);
redisLibevEvents *e = (redisLibevEvents*)watcher->data;
redisAsyncHandleRead(e->context); // Отправляем событие в обработку.
free((redisLibevEvents*)watcher->data); // Тут освобождаем память, которую будем динамически выделять для создания события.
}
Далее. В функцию привязывания контекста к циклу обработки событий, которая называется redisLibevAttach, добавляем в конец следущее:
ev_async_init(&e->aev, redisLibevAsyncEvent); // Инициализируем watcher.
ev_async_start(e->loop, &e->aev); // И запускаем.
Теперь не хватает только одного: получения указателя на созданный watcher, так как он нужен для отправления событий. Для этого обрабатываем еще разок напильником функцию redisLibevAttach:
int redisLibevAttach(EV_P_ redisAsyncContext *ac, ev_async **_pEvAsync) {
...
*_pEvAsync = &e->aev;
return REDIS_OK;
}
Собственно после уже можно использовать.
Функция инициализации для цикла событий:
m_pRedisAsyncContext = redisAsyncConnect(m_strIP, m_nDBPort); // Создаем контекст.
m_pEventLoop = ev_loop_new(EVFLAG_AUTO); // Создаем кастомный цикл событий.
redisLibevAttach(m_pEventLoop, m_pRedisAsyncContext, &m_pAEV); // Привязываем к циклу событий контекст.
redisAsyncSetConnectCallback(m_pRedisAsyncContext, redisConnectCallbackFunction); // Привязываем callback для события соединения с Redis.
redisAsyncSetDisconnectCallback(m_pRedisAsyncContext, redisDisconnectCallbackFunction); // Привязываем callback для дисконнекта.
Функция потока:
ev_loop(m_pEventLoop, 0); // Запуск цикла обработки событий.
Ну и функция добавления нового события:
// Сначала ожидаем семафор, или мьютекс (кому как больше нравится) для синхронизации потоков.
redisAsyncCommand(m_pRedisAsyncContext, redisAsyncCommandCallbackFunction, _pPrivData, _pcQuery); // Добавляем в контекст запрос (_pcQuery) с информацией для его идентификации (_pPrivData) и callback-функцией для получения результата.
redisLibevEvents * pRedisLibevEvent = (redisLibevEvents*)malloc(sizeof(redisLibevEvents)); // Создаем структуру для события.
pRedisLibevEvent->context = m_pRedisAsyncContext; // Помещаем контекст в событие.
m_pAEV->data = pRedisLibevEvent; // Рассказываем watcher-у про созданное событие.
ev_async_send(m_pEventLoop, m_pAEV); // Отправляем событие на обработку.
// Отпускаем семафор/мьютекс.
Ну вот и все. Полученная смесь C и C++ вполне жизнеспособна, хоть и не очень красиво выглядит.
Надеюсь кому-нибудь пригодится, пока не вышли официальные годные реализации на С++.
P.S.: Читайте продолжение во второй части.