Для работы с PostgreSQL на языке С++, есть замечательная библиотека libpq. Библиотека отлично документирована, есть даже полный перевод на русский язык, от компании PostgresPRO.
При написании серверного бекэнда, столкнулся с тем, что в этой библиотеке нет никакого пула коннектов, а работа с БД, предполагалась в довольно интенсивном режиме и одного коннекта было явно мало. Каждый раз устанавливать соединение для отправки полученных данных, было бы просто безумием, т.к. соединение самая долгая операция, решено было написать свой пул коннектов.
Идея в том, что мы при старте программы создаём несколько соединений и храним их в очереди.
Когда приходят данные, мы просто берём свободное соединеие из очереди, а если свободных соединений нет — ждём когда появится, используем его для вставки данных, а затем помещаем соединение обратно. Идея довольна простая, быстро реализуема и самое главное, скорость работы очень высокая.
Создадим в PostgreSQL базу с именем demo, табличкой demo такой
Пишем класс который будет представлять собой соединение к БД, параметры соединения, будут прописаны прямо в коде для его упрощения, в реальности конечно их надо хранить в конфигурационном файле и при запуске считывать оттуда, чтобы при изменении парметров сервера, не приходилось перекомпилировать программу.
Чтобы предотвратить возможную утечку ресурсов, соединиение мы будем хранить в умном указателе.
В конструкторе мы вызывам функцию PQsetdbLogin, которая устанавливает соединение к БД, возвращая указатель на соедининие PGconn* и переводим соедининие в асинхронный режим работы.
При завершении работы, соединие должно быть удалено функцией PQfinish, которой передается указатель, возвращённый функцией PQsetdbLogin. Поэтому последним параметром в вызове m_connection.reset() мы передаём адрес функции &PQfinish. Когда умный указатель выйдет из области видимости и счётчик ссылок обнулиться, он вызовет эту функцию, тем самым корректно завершив соединие.
Теперь нам нужен класс, который будет создавать, хранить и управлять работой пула коннектов.
В функции createPool создаём пул соедининий, я установил 10 соединений. Далее — создаём класс PGBackend, и работаем с ним через функции connection — котороя возвращает свободное соединение к БД, и freeConnection — которая помещяет соединие обратно в очередь.
Всё это работает на основе условных переменных, если очередь пуста, значит свободных соединений нет, и поток засыпает, пока не будет разбужен через условную переменную.
Простейший пример, в котором используется наш бекэнд с пулом коннектов, приведён в файле main.cpp. В «боевых условия» у вас конечно будет какой-то цикл событий, при наступлении которых будет проводиться работа с БД. У меня это boost::asio, которая работает асинхронно и принимая события из сети, пишет всё в БД. Приводить здесь её излишне, чтобы не усложнять идею с пулом коннектов. Здесь мы просто создаём 50 потоков, которые работают с сервером через один экземляр PGBackend.
Компилируется это всё командой:
Будьте внимательны с количеством соединений БД — этот параметр задаётся параметром max_connections (integer).
Исходный код
При написании серверного бекэнда, столкнулся с тем, что в этой библиотеке нет никакого пула коннектов, а работа с БД, предполагалась в довольно интенсивном режиме и одного коннекта было явно мало. Каждый раз устанавливать соединение для отправки полученных данных, было бы просто безумием, т.к. соединение самая долгая операция, решено было написать свой пул коннектов.
Идея в том, что мы при старте программы создаём несколько соединений и храним их в очереди.
Когда приходят данные, мы просто берём свободное соединеие из очереди, а если свободных соединений нет — ждём когда появится, используем его для вставки данных, а затем помещаем соединение обратно. Идея довольна простая, быстро реализуема и самое главное, скорость работы очень высокая.
Создадим в PostgreSQL базу с именем demo, табличкой demo такой
структуры
-- Table: public.demo -- DROP TABLE public.demo; CREATE TABLE public.demo ( id integer NOT NULL DEFAULT nextval('demo_id_seq'::regclass), name character varying(256), CONSTRAINT demo_pk PRIMARY KEY (id) ) WITH ( OIDS=FALSE ); ALTER TABLE public.demo OWNER TO postgres;
Пишем класс который будет представлять собой соединение к БД, параметры соединения, будут прописаны прямо в коде для его упрощения, в реальности конечно их надо хранить в конфигурационном файле и при запуске считывать оттуда, чтобы при изменении парметров сервера, не приходилось перекомпилировать программу.
pgconnection.h
#ifndef PGCONNECTION_H #define PGCONNECTION_H #include <memory> #include <mutex> #include <libpq-fe.h> class PGConnection { public: PGConnection(); std::shared_ptr<PGconn> connection() const; private: void establish_connection(); std::string m_dbhost = "localhost"; int m_dbport = 5432; std::string m_dbname = "demo"; std::string m_dbuser = "postgres"; std::string m_dbpass = "postgres"; std::shared_ptr<PGconn> m_connection; }; #endif //PGCONNECTION_H
pgconnection.cpp
#include "pgconnection.h" PGConnection::PGConnection() { m_connection.reset( PQsetdbLogin(m_dbhost.c_str(), std::to_string(m_dbport).c_str(), nullptr, nullptr, m_dbname.c_str(), m_dbuser.c_str(), m_dbpass.c_str()), &PQfinish ); if (PQstatus( m_connection.get() ) != CONNECTION_OK && PQsetnonblocking(m_connection.get(), 1) != 0 ) { throw std::runtime_error( PQerrorMessage( m_connection.get() ) ); } } std::shared_ptr<PGconn> PGConnection::connection() const { return m_connection; }
Чтобы предотвратить возможную утечку ресурсов, соединиение мы будем хранить в умном указателе.
В конструкторе мы вызывам функцию PQsetdbLogin, которая устанавливает соединение к БД, возвращая указатель на соедининие PGconn* и переводим соедининие в асинхронный режим работы.
При завершении работы, соединие должно быть удалено функцией PQfinish, которой передается указатель, возвращённый функцией PQsetdbLogin. Поэтому последним параметром в вызове m_connection.reset() мы передаём адрес функции &PQfinish. Когда умный указатель выйдет из области видимости и счётчик ссылок обнулиться, он вызовет эту функцию, тем самым корректно завершив соединие.
Теперь нам нужен класс, который будет создавать, хранить и управлять работой пула коннектов.
pgbackend.h
#ifndef PGBACKEND_H #define PGBACKEND_H #include <memory> #include <mutex> #include <string> #include <queue> #include <condition_variable> #include <libpq-fe.h> #include "pgconnection.h" class PGBackend { public: PGBackend(); std::shared_ptr<PGConnection> connection(); void freeConnection(std::shared_ptr<PGConnection>); private: void createPool(); std::mutex m_mutex; std::condition_variable m_condition; std::queue<std::shared_ptr<PGConnection>> m_pool; const int POOL = 10; }; #endif //PGBACKEND_H
pgbackend.cpp
#include <iostream> #include <thread> #include <fstream> #include <sstream> #include "pgbackend.h" PGBackend::PGBackend() { createPool(); } void PGBackend::createPool() { std::lock_guard<std::mutex> locker_( m_mutex ); for ( auto i = 0; i< POOL; ++i ){ m_pool.emplace ( std::make_shared<PGConnection>() ); } } std::shared_ptr<PGConnection> PGBackend::connection() { std::unique_lock<std::mutex> lock_( m_mutex ); while ( m_pool.empty() ){ m_condition.wait( lock_ ); } auto conn_ = m_pool.front(); m_pool.pop(); return conn_; } void PGBackend::freeConnection(std::shared_ptr<PGConnection> conn_) { std::unique_lock<std::mutex> lock_( m_mutex ); m_pool.push( conn_ ); lock_.unlock(); m_condition.notify_one(); }
В функции createPool создаём пул соедининий, я установил 10 соединений. Далее — создаём класс PGBackend, и работаем с ним через функции connection — котороя возвращает свободное соединение к БД, и freeConnection — которая помещяет соединие обратно в очередь.
Всё это работает на основе условных переменных, если очередь пуста, значит свободных соединений нет, и поток засыпает, пока не будет разбужен через условную переменную.
Простейший пример, в котором используется наш бекэнд с пулом коннектов, приведён в файле main.cpp. В «боевых условия» у вас конечно будет какой-то цикл событий, при наступлении которых будет проводиться работа с БД. У меня это boost::asio, которая работает асинхронно и принимая события из сети, пишет всё в БД. Приводить здесь её излишне, чтобы не усложнять идею с пулом коннектов. Здесь мы просто создаём 50 потоков, которые работают с сервером через один экземляр PGBackend.
main.cpp
#include <thread> #include <iostream> #include "pgbackend.h" void testConnection(std::shared_ptr<PGBackend> pgbackend) { //получаем свободное соединение auto conn = pgbackend->connection(); std::string demo = "SELECT max(id) FROM demo; " ; PQsendQuery( conn->connection().get(), demo.c_str() ); while ( auto res_ = PQgetResult( conn->connection().get()) ) { if (PQresultStatus(res_) == PGRES_TUPLES_OK && PQntuples(res_)) { auto ID = PQgetvalue (res_ ,0, 0); std::cout<< ID<<std::endl; } if (PQresultStatus(res_) == PGRES_FATAL_ERROR){ std::cout<< PQresultErrorMessage(res_)<<std::endl; } PQclear( res_ ); } //возвращаем соединение в очередь pgbackend->freeConnection(conn); } int main(int argc, char const *argv[]) { auto pgbackend = std::make_shared<PGBackend>(); std::vector<std::shared_ptr<std::thread>> vec; for ( size_t i = 0; i< 50 ; ++i ){ vec.push_back(std::make_shared<std::thread>(std::thread(testConnection, pgbackend))); } for(auto &i : vec) { i.get()->join(); } return 0; }
Компилируется это всё командой:
g++ main.cpp pgbackend.cpp pgconnection.cpp -o pool -std=c++14 -I/usr/include/postgresql/ -lpq -lpthread
Будьте внимательны с количеством соединений БД — этот параметр задаётся параметром max_connections (integer).
Исходный код
