Как стать автором
Обновить

PostgreSQL libpq connection pool

Время на прочтение5 мин
Количество просмотров44K
Для работы с PostgreSQL на языке С++, есть замечательная библиотека libpq. Библиотека отлично документирована, есть даже полный перевод на русский язык, от компании PostgresPRO.

При написании серверного бекэнда, столкнулся с тем, что в этой библиотеке нет никакого пула коннектов, а работа с БД, предполагалась в довольно интенсивном режиме и одного коннекта было явно мало. Каждый раз устанавливать соединение для отправки полученных данных, было бы просто безумием, т.к. соединение самая долгая операция, решено было написать свой пул коннектов.

Идея в том, что мы при старте программы создаём несколько соединений и храним их в очереди.

Когда приходят данные, мы просто берём свободное соединеие из очереди, а если свободных соединений нет — ждём когда появится, используем его для вставки данных, а затем помещаем соединение обратно. Идея довольна простая, быстро реализуема и самое главное, скорость работы очень высокая.

Создадим в PostgreSQL базу с именем demo, табличкой demo такой

структуры
-- Table: public.demo

-- DROP TABLE public.demo;

CREATE TABLE public.demo
(
  id integer NOT NULL DEFAULT nextval('demo_id_seq'::regclass),
  name character varying(256),
  CONSTRAINT demo_pk PRIMARY KEY (id)
)
WITH (
  OIDS=FALSE
);
ALTER TABLE public.demo
  OWNER TO postgres;


Пишем класс который будет представлять собой соединение к БД, параметры соединения, будут прописаны прямо в коде для его упрощения, в реальности конечно их надо хранить в конфигурационном файле и при запуске считывать оттуда, чтобы при изменении парметров сервера, не приходилось перекомпилировать программу.

pgconnection.h
#ifndef PGCONNECTION_H
#define PGCONNECTION_H

#include <memory>
#include <mutex>
#include <libpq-fe.h>

class PGConnection
{
public:
    PGConnection();
    std::shared_ptr<PGconn> connection() const;

private:
    void establish_connection();

    std::string m_dbhost = "localhost";
    int         m_dbport = 5432;
    std::string m_dbname = "demo";
    std::string m_dbuser = "postgres";
    std::string m_dbpass = "postgres";

    std::shared_ptr<PGconn>  m_connection;

};


#endif //PGCONNECTION_H

pgconnection.cpp
#include "pgconnection.h"
PGConnection::PGConnection()
{
    m_connection.reset( PQsetdbLogin(m_dbhost.c_str(), std::to_string(m_dbport).c_str(), nullptr, nullptr, m_dbname.c_str(), m_dbuser.c_str(), m_dbpass.c_str()), &PQfinish );

    if (PQstatus( m_connection.get() ) != CONNECTION_OK && PQsetnonblocking(m_connection.get(), 1) != 0 )
    {
       throw std::runtime_error( PQerrorMessage( m_connection.get() ) );
    }

}


std::shared_ptr<PGconn> PGConnection::connection() const
{
    return m_connection;
}


Чтобы предотвратить возможную утечку ресурсов, соединиение мы будем хранить в умном указателе.

В конструкторе мы вызывам функцию PQsetdbLogin, которая устанавливает соединение к БД, возвращая указатель на соедининие PGconn* и переводим соедининие в асинхронный режим работы.

При завершении работы, соединие должно быть удалено функцией PQfinish, которой передается указатель, возвращённый функцией PQsetdbLogin. Поэтому последним параметром в вызове m_connection.reset() мы передаём адрес функции &PQfinish. Когда умный указатель выйдет из области видимости и счётчик ссылок обнулиться, он вызовет эту функцию, тем самым корректно завершив соединие.

Теперь нам нужен класс, который будет создавать, хранить и управлять работой пула коннектов.

pgbackend.h

#ifndef PGBACKEND_H
#define PGBACKEND_H

#include <memory>
#include <mutex>
#include <string>
#include <queue>
#include <condition_variable>
#include <libpq-fe.h>
#include "pgconnection.h"


class PGBackend
{
public:
    PGBackend();
    std::shared_ptr<PGConnection> connection();
    void freeConnection(std::shared_ptr<PGConnection>);

private:
    void createPool();
    
    std::mutex m_mutex;
    std::condition_variable m_condition;
    std::queue<std::shared_ptr<PGConnection>> m_pool;

    const int POOL = 10;


};

#endif //PGBACKEND_H


pgbackend.cpp

#include <iostream>
#include <thread>
#include <fstream>
#include <sstream>
#include "pgbackend.h"

PGBackend::PGBackend()
{

    createPool();
  
}

void PGBackend::createPool()
{
    std::lock_guard<std::mutex> locker_( m_mutex );

    for ( auto i = 0; i< POOL; ++i ){
         m_pool.emplace ( std::make_shared<PGConnection>() );
    }
}

std::shared_ptr<PGConnection> PGBackend::connection()
{

    std::unique_lock<std::mutex> lock_( m_mutex );

    while ( m_pool.empty() ){
            m_condition.wait( lock_ );
    }

    auto conn_ = m_pool.front();
    m_pool.pop();

    return  conn_;
}


void PGBackend::freeConnection(std::shared_ptr<PGConnection> conn_)
{
    std::unique_lock<std::mutex> lock_( m_mutex );
    m_pool.push( conn_ );
    lock_.unlock();
    m_condition.notify_one();
}


В функции createPool создаём пул соедининий, я установил 10 соединений. Далее — создаём класс PGBackend, и работаем с ним через функции connection — котороя возвращает свободное соединение к БД, и freeConnection — которая помещяет соединие обратно в очередь.

Всё это работает на основе условных переменных, если очередь пуста, значит свободных соединений нет, и поток засыпает, пока не будет разбужен через условную переменную.

Простейший пример, в котором используется наш бекэнд с пулом коннектов, приведён в файле main.cpp. В «боевых условия» у вас конечно будет какой-то цикл событий, при наступлении которых будет проводиться работа с БД. У меня это boost::asio, которая работает асинхронно и принимая события из сети, пишет всё в БД. Приводить здесь её излишне, чтобы не усложнять идею с пулом коннектов. Здесь мы просто создаём 50 потоков, которые работают с сервером через один экземляр PGBackend.

main.cpp

#include <thread>
#include <iostream>
#include "pgbackend.h"

void testConnection(std::shared_ptr<PGBackend> pgbackend)
{
//получаем свободное соединение
    auto conn = pgbackend->connection();

    std::string demo = "SELECT max(id) FROM demo; " ;
    PQsendQuery( conn->connection().get(), demo.c_str() );

    while ( auto res_ = PQgetResult( conn->connection().get()) ) {
        if (PQresultStatus(res_) == PGRES_TUPLES_OK && PQntuples(res_)) {
            auto ID = PQgetvalue (res_ ,0, 0);
            std::cout<< ID<<std::endl;
        }

        if (PQresultStatus(res_) == PGRES_FATAL_ERROR){
            std::cout<< PQresultErrorMessage(res_)<<std::endl;
        }

        PQclear( res_ );
    }

//возвращаем соединение в очередь
    pgbackend->freeConnection(conn);

}


int main(int argc, char const *argv[])
{
	
	auto pgbackend = std::make_shared<PGBackend>();

    
    std::vector<std::shared_ptr<std::thread>> vec;

    for ( size_t i = 0; i< 50 ; ++i ){

        vec.push_back(std::make_shared<std::thread>(std::thread(testConnection, pgbackend)));
    }

    for(auto &i : vec) {
        i.get()->join();
    }

 
    return 0;
}



Компилируется это всё командой:

g++ main.cpp pgbackend.cpp pgconnection.cpp -o pool -std=c++14 -I/usr/include/postgresql/ -lpq -lpthread

Будьте внимательны с количеством соединений БД — этот параметр задаётся параметром max_connections (integer).

Исходный код
Теги:
Хабы:
Всего голосов 26: ↑25 и ↓1+24
Комментарии20

Публикации

Истории

Работа

Программист C++
102 вакансии
QT разработчик
3 вакансии

Ближайшие события