Измеряем потери пакетов с помощью Zabbix External Check и C++ Boost.Asio
Начало
Всем хорош Zabbix, за исключением маленьких таймаутов для встроенных и внешних проверок. В файле конфигурации сервера указан максимальный таймаут 30 секунд, чего совершенно недостаточно, например, для пинга с использованием 10 000 пакетов, даже если мы установим интервал в 100 мс. Как мы знаем, по российским нормативам потери пакетов в сетях связи не должны превышать один пакет на тысячу. Особо требовательные заказчики трактуют эту цифру по-своему: всё что больше или равно 0,05% можно округлить до тех самых 0,1% или одного пакета на тысячу. Поэтому я решил для самых критичных узлов, особенно при поступлении заявки, использовать внешнюю программу (назовём её losshd), которая будет в несколько потоков измерять потери пакетов и записывать результаты в базу, а для Zabbix использовать простую утилиту (назовём её getloss), которая быстро вытащит из базы необходимое значение.
Распишем алгоритм работы нашей утилиты:
По запросу
getloss -hlocalhost --dbname=zabbix --dbuser=zabbix --dbpass=mypassword --address=192.168.0.1
утилита проверяет, есть ли в таблице для демона losshd адрес 192.168.0.1. Если да – достаёт из базы значение поля loss и выводит его в стандартный вывод. Если нет – добавляет его в таблицу и выводит значение 0 (будем считать, что потерь нет, пока их наличие не доказано);Далее мы обновляем в таблице счётчик
last_read.
А вот алгоритм работы демона losshd:
Считываем из базы все ip-адреса, требующие проверки;
Создаём отдельные потоки для отправки пакетов ICMP Echo Request для каждого адреса (далее - сендеры);
Создаём один общий поток для ловли ответов – ICMP Echo Reply (далее - ресивер);
Ожидаем завершения работы всех потоков;
Считаем процент потерь для каждого IP;
Записываем обновления в базу;
После этого мы удаляем из таблицы все строки, у которых
last_readпоследний раз обновлялся более недели назад. Так мы снизим нагрузку на демонlosshd, убирая невостребованные проверки;Начинаем всё с начала.
Итак, приступим к реализации. Я не буду размещать здесь весь код, иначе статья получится слишком объёмной. Можете посмотреть его в моём гитхабе: https://github.com/Lordgprs/losshd.
Сначала создаём таблицу для внешней проверки. Для простоты сделаем её в той же базе, с которой работает Zabbix server. Я использую PostgreSQL:
CREATE TABLE ext_packetlosshd_dbg ( ip inet NOT NULL PRIMARY KEY, loss DOUBLE PRECISION NOT NULL DEFAULT 0, last_update TIMESTAMP WITHOUT TIME ZONE NOT NULL DEFAULT NOW(), last_read TIMESTAMP WITHOUT TIME ZONE NOT NULL DEFAULT NOW() );
Наш код на C++ будет состоять из шести файлов: options.h, options.cpp, getloss.h, getloss.cpp, losshd.h, losshd.cpp. Я приведу выборочно объявления и определения классов и методов, чтобы не перегружать статью.
Утилита getloss
Пишем базовый класс для работы с параметрами командной строки:
class Options { public: Options() = delete; Options(const Options &) = delete; Options(const Options &&) = delete; Options & operator=(const Options &) = delete; Options & operator=(const Options &&) = delete; Options (int, char **); protected: po::options_description desc_; po::variables_map vm_; }; Options::Options(int argc, char **argv): desc_("Allowed options") { }
Здесь удалены все конструкторы по умолчанию, указываем только свой конструктор от параметров argc и argv (впрочем, тоже пустой). Перечень опций и их значения мы планируем использовать в классах-потомках, поэтому делаем их protected.
Пишем класс распознавания опций для утилиты getloss:
class OptionsGetloss : public Options { public: OptionsGetloss() = delete; OptionsGetloss(const OptionsGetloss &) = delete; OptionsGetloss(const OptionsGetloss &&) = delete; OptionsGetloss & operator=(const OptionsGetloss &) = delete; OptionsGetloss & operator=(const OptionsGetloss &&) = delete; OptionsGetloss(int argc, char **argv); void CheckOptions(); std::string get_dbname() const; std::string get_dbhost() const; std::string get_dbuser() const; std::string get_dbpass() const; std::string get_address() const; };
Конструктор и метод, который проверяет аргументы:
OptionsGetloss::OptionsGetloss(int argc, char *argv[]) : Options(argc, argv) { desc_.add_options() ("help", "this help") ("dbname,n", po::value<std::string>(), "database name") ("dbhost,h", po::value<std::string>(), "database server address") ("dbuser,U", po::value<std::string>(), "database user name") ("dbpass,P", po::value<std::string>(), "database password") ("address,A", po::value<std::string>(), "address to ping"); po::store(po::parse_command_line(argc, argv, desc_), vm_); po::notify(vm_); CheckOptions(); } void OptionsGetloss::CheckOptions() { if (vm_.count("help") != 0) { std::cout << desc_ << std::endl; std::exit(EXIT_SUCCESS); } if ( vm_.count("dbname") == 0 || vm_.count("dbhost") == 0 || vm_.count("dbuser") == 0 || vm_.count("address") == 0 ) { std::cerr << "Error via processing arguments!" << std::endl << std::endl; std::cerr << desc_ << std::endl; std::exit(EXIT_FAILURE); } }
Класс для работы с СУБД:
class Database { public: Database() = delete; Database(const OptionsGetloss &); ~Database(); double get_loss(const std::string &) const; private: pqxx::connection conn_; pqxx::work mutable txn_; }; double Database::get_loss(const std::string &ip) const { double loss = 0; std::string req = "SELECT loss FROM ext_packetlosshd_dbg WHERE ip = '" + ip + "'"; auto result = txn_.exec(req); if (result.size() > 0) { loss = result[0][0].as<double>(); txn_.exec("UPDATE ext_packetlosshd_dbg SET last_read = NOW() WHERE ip = '" + ip + "'"); } else txn_.exec("INSERT INTO ext_packetlosshd_dbg (ip) VALUES ('" + ip + "')"); return loss; }
В функции main() мы создаём объекты для парсинга опций и соединения с СУБД и получаем цифру с потерями из базы согласно вышеописанному алгоритму:
int main(int argc, char **argv) { OptionsGetloss options(argc, argv); Database db(options); std::cout << db.get_loss(options.get_address()) << std::endl; return EXIT_SUCCESS; }
Выводим полученное значение в стандартный вывод, чтобы сервер Zabbix смог считать его оттуда. Перевод строки здесь оставлен для отладки - необходимости в нём нет.
Демон losshd
Теперь займёмся сердцем нашей проверки: демоном losshd. Описание структур Ipv4 и Icmp, а также основные методы, такие как подсчёт контрольной суммы, практически без изменений взяты из документации к Boost.Asio:
Структуры Ipv4 и Icmp
class Ipv4 { public: Ipv4(); char8_t version() const; uint16_t header_length() const; char8_t tos() const; uint32_t time_to_live() const; boost::asio::ip::address_v4 source_address() const; uint32_t source_address_uint32() const; friend std::istream &operator>>(std::istream &is, Ipv4 &header); private: char8_t data_[60]; }; class Icmp { public: enum { kEchoReply = 0, kDestinationUnreachable = 3, kSourceQuench = 4, kRedirect = 5, kEchoRequest = 8, kTimeExceeded = 11, kParameterProblem = 12, kTimestampRequest = 13, kTimestampReply = 14, kInfoRequest = 15, kInfoReply = 16, kAddressRequest = 17, kAddressReply = 18 }; Icmp(); char8_t type() const; char8_t code() const; uint16_t identifier() const; uint16_t sequence_number() const; void type(char8_t); void code(char8_t); void checksum(uint16_t); void identifier(uint16_t); void sequence_number(uint16_t); void CalculateChecksum(auto body_begin, auto body_end); friend std::istream& operator>> (std::istream &inputStream, Icmp &header); friend std::ostream& operator<< (std::ostream &outputStream, const Icmp &header); private: uint16_t Decode(int32_t a, int32_t b) const; void Encode(int32_t a, int32_t b, uint16_t n); char8_t data_[8]; }; Ipv4::Ipv4() { std::fill (data_, data_ + sizeof(data_), 0); } char8_t Ipv4::version() const { return (data_[0] >> 4) & 0xF; } uint16_t Ipv4::header_length() const { return (data_[0] & 0xF) << 2; } char8_t Ipv4::tos() const { return data_[1]; } uint32_t Ipv4::time_to_live() const { return data_[8]; } boost::asio::ip::address_v4 Ipv4::source_address() const { boost::asio::ip::address_v4::bytes_type bytes = { {data_[12], data_[13], data_[14], data_[15]} }; return boost::asio::ip::address_v4(bytes); } uint32_t Ipv4::source_address_uint32() const { return (data_[12] << 24) | (data_[13] << 16) | (data_[14] << 8) | data_[15]; } Icmp::Icmp() { std::fill(data_, data_ + sizeof(data_), 0); } char8_t Icmp::type() const { return data_[0]; } char8_t Icmp::code() const { return data_[1]; } uint16_t Icmp::identifier() const { return Decode(4, 5); } uint16_t Icmp::sequence_number() const { return Decode(6, 7); } void Icmp::type(char8_t n) { data_[0] = n; } void Icmp::code(char8_t n) { data_[1] = n; } void Icmp::checksum(uint16_t n) { Encode(2, 3, n); } void Icmp::identifier(uint16_t n) { Encode(4, 5, n); } void Icmp::sequence_number(uint16_t n) { Encode(6, 7, n); } void Icmp::CalculateChecksum(auto body_begin, auto body_end) { uint32_t sum = (type() << 8) + code() + identifier() + sequence_number(); auto body_iterator = body_begin; while (body_iterator != body_end) { sum += (static_cast<char8_t>(*body_iterator++) << 8); if (body_iterator != body_end) sum += static_cast<char8_t>(*body_iterator++); } sum = (sum >> 16) + (sum & 0xFFFF); sum += (sum >> 16); checksum(static_cast<uint16_t>(~sum)); } uint16_t Icmp::Decode(const int32_t a, const int32_t b) const { return (data_[a] << 8) + data_[b]; } void Icmp::Encode(const int32_t a, const int32_t b, const uint16_t n) { data_[a] = static_cast<char8_t>(n >> 8); data_[b] = static_cast<char8_t>(n & 0xFF); } std::istream& operator>>(std::istream& input_stream, Icmp &header) { return input_stream.read(reinterpret_cast<char *>(header.data_), 8); } std::ostream& operator<<(std::ostream& output_stream, const Icmp &header) { return output_stream.write( reinterpret_cast<const char *>(header.data_), 8); } std::istream &operator>>(std::istream &is, Ipv4 &header) { is.read(reinterpret_cast<char *>(header.data_), 20); if (header.version() != 4) is.setstate(std::ios::failbit); std::streamsize options_length = header.header_length() - 20; if (options_length < 0 || options_length > 40) is.setstate(std::ios::failbit); else is.read(reinterpret_cast<char *>(header.data_) + 20, options_length); return is; }
Создадим классы IcmpSender и IcmpReceiver для соответственно отправки и получения icmp-пакетов:
class IcmpSender { public: IcmpSender() = delete; IcmpSender(const IcmpSender &) = delete; IcmpSender(const IcmpSender &&) = delete; IcmpSender & operator=(const IcmpSender &) = delete; IcmpSender & operator=(const IcmpSender &&) = delete; IcmpSender(boost::asio::io_context &, const char *, uint16_t, uint16_t, std::mutex &, std::condition_variable &); private: void StartSend(); icmp::endpoint destination_; icmp::socket socket_; uint16_t sequence_number_; uint16_t count_; chrono::steady_clock::time_point time_sent_; boost::asio::streambuf reply_buffer_; std::size_t interval_; std::size_t size_; std::mutex &mtx_; std::condition_variable &condition_; }; class IcmpReceiver { public: IcmpReceiver() = delete; IcmpReceiver(const IcmpReceiver &) = delete; IcmpReceiver(const IcmpReceiver &&) = delete; IcmpReceiver & operator=(const IcmpReceiver &) = delete; IcmpReceiver & operator=(const IcmpReceiver &&) = delete; IcmpReceiver(boost::asio::io_context &, int &, std::unordered_map<uint32_t, uint32_t> &, std::mutex &, std::condition_variable &); private: void StartReceive(); void CheckIfSendersExist(); void HandleReceive(std::size_t length); icmp::socket socket_; boost::asio::streambuf reply_buffer_; int &senders_count_; std::unordered_map<uint32_t, uint32_t> &ping_results_; std::mutex &mtx_; std::condition_variable &condition_; bool senders_unlocked_ = false; boost::asio::deadline_timer dt_; const boost::posix_time::time_duration kReceiveTimerFrequency = boost::posix_time::seconds(5); };
Пройдёмся по некоторым полям класса IcmpSender:
destination_- IP, который будем пинговать;socket_- сокет Asio для отправки ICMP-пакетов;sequence_number_- номер пакета ICMP;count_,interval_,size_- соответственно, количество отправляемых пакетов, интервал между отправками и размер пакета;mtx_иcondition_variable_- ссылки на мьютекс и переменную состояния для синхронизации потоков.
В классе IcmpReceiver у нас будут следующие поля:
Сокет
socket_и буферreply_buffer_для получения ответа ICMP;senders_count_- ссылка на переменную, содержащую количество активных потоков-сендеров. Когда это поле будет равно нулю, мы прекратим ловить ответы;ping_results_- результаты нашего пинга (количество принятых пакетов). В качестве ключа используем IP-адрес, переведённый в 32-битное число;Так же как в классе-сендере, здесь мы пользуемся ссылками на общий мьютекс и переменную состояния;
kReceiveTimerFrequency- этой константой мы задаём количество секунд, через которое таймер будет проверять, остались ли активные потоки-сендеры.
Рассмотрим процесс отправки пакетов:
void IcmpSender::StartSend() { std::string body(size_ - sizeof(Icmp), '\0'); // Create an ICMP header Icmp icmp; icmp.type(Icmp::kEchoRequest); icmp.code(0); icmp.identifier(static_cast<uint16_t>(getpid())); for (size_t i = 0; i < count_; i++) { icmp.sequence_number(++sequence_number_); icmp.CalculateChecksum(body.begin(), body.end()); // Encode the request packet boost::asio::streambuf requestBuffer; std::ostream outputStream(&requestBuffer); outputStream << icmp << body; // Send the request socket_.send_to(requestBuffer.data(), destination_); std::this_thread::sleep_for(std::chrono::milliseconds(interval_)); } }
Здесь всё просто: заполняем объект данными и отправляем его в сокет с использованием перегруженного оператора сдвига. Между итерациями просто засыпаем на заданное полем interval_ количество миллисекунд.
Теперь рассмотрим процесс ловли ответов. Здесь уже всё посложнее:
void IcmpReceiver::StartReceive() { // Discard any data already in the buffer reply_buffer_.consume(reply_buffer_.size()); if ((!senders_unlocked_) || (senders_unlocked_ && senders_count_ > 0)) { socket_.async_receive(reply_buffer_.prepare(65536), boost::bind(&IcmpReceiver::HandleReceive, this, boost::placeholders::_2)); dt_.async_wait(boost::bind(&IcmpReceiver::CheckIfSendersExist, this)); } if (!senders_unlocked_) { { std::unique_lock<std::mutex> ul(mtx_); // Notifying senders: we are ready to catch echo replies condition_.notify_all(); } senders_unlocked_ = true; } }
Привязываем сокет к методу HandleReceive, который и будет обрабатывать входящие пакеты. Инициализируем таймер, который проверяет, живы ли потоки-сендеры (CheckIfSendersExists). Мы готовы принимать пакеты - сообщаем это остальным потокам используя метод notify_all() переменной состояния. Когда же потоки-сендеры прекратят свою работу, останавливаем таймер и сокет:
void IcmpReceiver::CheckIfSendersExist() { int count; { std::unique_lock<std::mutex> ul(mtx_); count = senders_count_; } // Stopping catching replies if all senders have been finished if (count == 0) { dt_.cancel(); socket_.cancel(); } }
Пока же отправка идёт, ловим пакеты:
void IcmpReceiver::HandleReceive(std::size_t length) { reply_buffer_.commit(length); // Decode the reply packet std::istream input_stream(&reply_buffer_); Ipv4 ipv4; Icmp icmp; input_stream >> ipv4 >> icmp; if (input_stream && icmp.type() == Icmp::kEchoReply && icmp.identifier() == static_cast<uint16_t>(getpid())) { dt_.cancel(); uint32_t src = ipv4.source_address_uint32(); { std::lock_guard lg(mtx_); ping_results_[src]++; } } { std::lock_guard lg(mtx_); if(senders_count_ > 0) StartReceive(); } }
Увеличиваем количество принятых пакетов для IP, с которого пришёл ответ, в контейнере ping_results_.
Итак, мы разобрались как отправлять и принимать пакеты. Напишем класс-планировщик, который будет отвечать за инициализацию потоков и запись результата:
class Scheduler { public: Scheduler(OptionsLosshd &); ~Scheduler(); void Run(); void Clean(); private: uint32_t GetIpFromString (const std::string &str_ip); std::string GetIpFromUint32 (const uint32_t ip) const; std::vector<std::string> GetAddressesForPing() const; auto CreateReceiver(); auto CreateSender(); OptionsLosshd &options_; pqxx::connection conn_; pqxx::nontransaction mutable txn_; std::mutex mtx_; std::vector<std::string> address_list_; std::unordered_map<uint32_t, uint32_t> ping_results_; std::condition_variable condition_; };
Конструктору класса Scheduler мы передаём только ссылку на опции командной строки. Методы CreateReceiver и CreateSender созданы для увеличения читабельности кода - они возвращают лямбда-выражение, передаваемое в конструктор потока std::thread. У нашего планировщика следующие приватные поля:
options_- набор опций командной строки;conn_- соед��нение с СУБД;txn_- обработка запросов к базе данных;address_list_- перечень адресов, которые мы будем пинговать;ping_results_- уже знакомый нам контейнер с результатами пинга;condition_,mtx_- синхронизация потоков.
Получаем перечень адресов, которые будем пинговать, попутно удаляя из таблицы адреса, статистику по которым не снимали более недели:
std::vector<std::string> Scheduler::GetAddressesForPing() const { std::string req = ""; std::vector<std::string> addresses; txn_.exec("DELETE FROM ext_packetlosshd_dbg WHERE last_read + interval '7 days' < NOW()"); for (auto row: txn_.exec("SELECT ip FROM ext_packetlosshd_dbg LIMIT 100")) addresses.push_back(row[0].c_str()); return addresses; }
Запускаем потоки:
void Scheduler::Run() { std::vector<std::thread> threads; int senders_count = address_list_.size(); // Starting ICMP receiver std::thread t(CreateReceiver(), std::ref(senders_count), std::ref(ping_results_), std::ref(mtx_), std::ref(condition_)); // Starting ICMP senders for (size_t i = 0; i < address_list_.size(); i++) { std::thread t(CreateSender(), address_list_[i], std::ref(senders_count), std::ref(mtx_), std::ref(condition_), std::ref(options_)); threads.push_back(std::move(t)); } // Joining senders for (size_t i = 0; i < threads.size(); i++) threads.at(i).join(); // Joining receiver t.join(); std::cout << "End of collecting results:" << std::endl; for (auto i: ping_results_) { std::cout << "from IP " << GetIpFromUint32(i.first) << " received: " << i.second << std::endl; txn_.exec("UPDATE ext_packetlosshd_dbg SET \ loss = " + std::to_string( static_cast<double>(options_.get_count() - i.second) / options_.get_count() * 100) + ", " + " \ last_update = NOW() \ WHERE \ ip = '" + GetIpFromUint32(i.first) + "'"); } std::cout << std::endl; }
После завершения их работы складываем результаты в базу данных.
Отдельно хочу рассмотреть поток-сендер:
auto Scheduler::CreateSender() { return [](std::string address, int &senders, std::mutex &mtx, std::condition_variable &condition, OptionsLosshd &options) { { std::unique_lock<std::mutex> ul(mtx); // Waiting for receiver starts condition.wait(ul); } boost::asio::io_context io_context; IcmpSender pinger(io_context, address.c_str(), options.get_count(), options.get_interval(), mtx, condition); { std::lock_guard lg(mtx); senders--; } }; }
Все потоки-сендеры не начинают отправку пакетов, пока не получат уведомление от потока-ресивера с использованием метода notify_all().
После завершения сбора статистики необходимо подготовиться к новой итерации. Для этого заново получаем актуальный перечень адресов для мониторинга и очищаем контейнер с результатами:
void Scheduler::Clean() { ping_results_.clear(); address_list_ = GetAddressesForPing(); for (auto address: address_list_) ping_results_.insert({GetIpFromString(address), 0}); }
Ну и определим наконец функцию main():
int main(int argc, char *argv[]) { constexpr auto kPauseBetweenIterations = std::chrono::seconds(5); OptionsLosshd options(argc, argv); if (options.is_daemon()) { std::cout << "Running as a daemon..." << std::endl; auto pid = fork(); if (pid > 0) return EXIT_SUCCESS; if (pid < 0) { std::cerr << "Error while doing fork()! Exiting..."; return EXIT_FAILURE; } umask(0); setsid(); if (chdir("/") < 0) { std::cerr << "Error while attempting chdir()!" << std::endl; return EXIT_FAILURE; }; close(STDIN_FILENO); close(STDOUT_FILENO); close(STDERR_FILENO); } Scheduler scheduler(options); while (true) { std::cout << "New iteration started. Pinging hosts..." << std::endl; scheduler.Run(); scheduler.Clean(); std::this_thread::sleep_for(kPauseBetweenIterations); } return EXIT_SUCCESS; }
Реализуем работу демона классическим для UNIX способом: используя системный вызов fork().
Сборка и запуск
Собираем и запускаем наш проект. Я буду это делать на Ubuntu 22.04 LTS с использованием CMake и conan:
conanfile.txt
[requires] boost/1.74.0 libpqxx/6.4.8 [generators] CMakeDeps CMakeToolchain
CMakeLists.txt
cmake_policy(SET CMP0048 NEW) project(LossHD VERSION 1.0.3) cmake_minimum_required(VERSION 3.18) set(CMAKE_CXX_FLAGS "-O2") find_package(Boost REQUIRED COMPONENTS program_options REQUIRED) find_package(libpqxx REQUIRED) add_executable(losshd src/losshd.cpp src/options.cpp) set_property(TARGET losshd PROPERTY CXX_STANDARD 20) set_property(TARGET losshd PROPERTY CXX_STANDARD_REQUIRED On) set_property(TARGET losshd PROPERTY CXX_EXTENSIONS Off) target_link_libraries(losshd boost_program_options pqxx) add_executable(getloss src/getloss.cpp src/options.cpp) set_property(TARGET getloss PROPERTY CXX_STANDARD 20) set_property(TARGET getloss PROPERTY CXX_STANDARD_REQUIRED On) set_property(TARGET getloss PROPERTY CXX_EXTENSIONS Off) target_link_libraries(getloss boost_program_options pqxx)
mkdir build cd build conan install .. -of ../conan cmake .. -DCMAKE_TOOLCHAIN_FILE=../conan/conan_toolchain.cmake \ -DCMAKE_BUILD_TYPE=Release cmake --build .
Мы получили в каталоге build два исполняемых файла: getloss и losshd. Запускаем losshd:
./losshd -nzabbix -h127.0.0.1 -Upostgres -Ppassword -i100 -s1400 -c10000 --daemon
Копируем getloss в каталог /usr/local/bin. В каталоге с внешними проверками zabbix создаём bash-скрипт getloss.sh:
#!/bin/bash /usr/local/bin/getloss -nzabbix -h127.0.0.1 -Upostgres -Ppassword -A $1
Настройка Zabbix
Создаём новый пустой шаблон. Я назвал его ICMP Packet Loss HD. В шаблоне создаём новый элемент данных:

Добавляем шаблон узлам сети, которым требуется точное измерение качества связи. Ждём и наслаждаемся такими графиками:


Заключение
Сервисом уже можно пользоваться, но необходимо всё же доделать некоторые вещи:
Реализовать логирование в произвольный поток вместо
std::cout, а возможно и в базу;Добавить обработку исключений библиотеки libpqxx (обрыв соединения с СУБД и т.д.).
Поскольку у меня есть нехватка качественного код-ревью, буду рад любой критике, которая сделает мой код лучше. Всем спасибо за внимание, надеюсь и вам помогла моя заметка.
