Разработка плана действий
Передо мной стояла задача создать карту подключений пользователей по миру к нашим серверам для использовании статистических данных при оптимизации размещения новых мощностей и распределения общей нагрузки. Из инструментов сборов логов рассматривались Loki, Logstash, Fluent Bit. В итоге был выбран Fluent Bit из-за его относительно легкой настройки, оптимизации и наличия возможности написания собственных скриптов для агрегации на Lua. Получение геоданных из IP предполагается посредством использования баз данных для GeoIP2 от MaxMind.
Выявление ошибок и оптимизация
Первоначально, сбор логов Nginx происходил в стандартном формате с помощью регулярного выражения. Позже, решено было изменить формат логов Nginx на JSON, изначально структурированный формат, что позволило собирать логи более точно.
Однако, спустя какое-то время была выявлена еще одна проблема, касающаяся данных в полях upstream_*. Так как вышестоящих серверов может быть несколько, Nginx складывает данные по каждому из них в соответствующие поля через запятую.
В ходе продумывания плана действий, обращение к базе данных GeoIP2 планировалось на стороне Fluent Bit через встроенный фильтр. В последствие было принято решение отказаться от данной идеи и перенести получение геоданных на сторону Elasticsearch, что позволило сэкономить трафик и оптимизировать агрегацию данных.
Установка и настройка Fluent Bit
Установка Fluent Bit производилась с помощью Helm чарта. Содержимое файла values.yaml представлено ниже. Также был написан скрипт на Lua для разделения полей логов Nginx, идущих через запятую.
values.yaml
kind: DaemonSet serviceAccount: create: true rbac: create: true nodeAccess: false eventsAccess: false podSecurityPolicy: create: false openShift: enabled: false hostNetwork: false service: type: ClusterIP port: 2020 serviceMonitor: enabled: true dashboards: enabled: false resources: limits: cpu: 100m memory: 200Mi requests: cpu: 50m memory: 50Mi flush: 1 logLevel: info metricsPort: 2020 luaScripts: field_filter.lua: | <field_filter.lua> config: service: | [SERVICE] Daemon Off Flush {{ .Values.flush }} Log_Level {{ .Values.logLevel }} Parsers_File /fluent-bit/etc/parsers.conf Parsers_File /fluent-bit/etc/conf/custom_parsers.conf HTTP_Server On HTTP_Listen 0.0.0.0 HTTP_Port {{ .Values.metricsPort }} Health_Check On inputs: | [INPUT] Name tail Tag ingress-nginx-controller.* Path /var/log/containers/*ingress-nginx-controller*.log parser cri DB /var/log/flb_ingress-nginx-controller.db Mem_Buf_Limit 128MB Skip_Long_Lines On filters: | [FILTER] Name parser Match ingress-nginx-controller.* Key_Name message Parser ingress_nginx_controller [FILTER] Name Lua Match ingress-nginx-controller.* type_array_key upstream_addr upstream_response_time upstream_response_length upstream_status script /fluent-bit/scripts/field_filter.lua call field_filter outputs: | [OUTPUT] Name es Match ingress-nginx-controller.* Host elasticsearch-proxy.fluent-bit.svc.cluster.local Port 9200 Index ingress-nginx-controller HTTP_User <user> HTTP_Passwd <password> compress gzip tls Off tls.verify Off Trace_Error On Suppress_Type_Name On Buffer_Size 2MB customParsers: | [PARSER] Name ingress_nginx_controller Format json Time_Keep Off Time_Key time Time_Format %Y-%m-%dT%H:%M:%S%z
Скрипт для разделения полей на Lua. В зависимости от возвращаемого кода запись может быть изменена (код 2), отброшена (код -1) или остаться неизменной (код 0).
field_filter.lua
function field_filter(tag, timestamp, record) code = 0 if record.x_forward_for == nil or record.server_addr == nil then code = -1 return code, timestamp, record end function split_values(v, delimiter) result = {} for match in (v):gmatch("([^" .. delimiter .. "\\s][^\\" .. delimiter .. "]*[^" .. delimiter .. "\\s]*)") do if match == "-" then match = -1 end table.insert(result, match) end return result end function process_value(v) code = 0 if v ~= nil then result = split_values(v, ", ") code = 2 end return code, result end code, record.upstream_addr = process_value(record.upstream_addr) code, record.upstream_response_time = process_value(record.upstream_response_time) code, record.upstream_response_length = process_value(record.upstream_response_length) code, record.upstream_status = process_value(record.upstream_status) return code, timestamp, record end
Установка и настройка Elasticsearch
Установка Elasticsearch была выполнена на виртуальной машине из официальных репозиториев. Пройдены стандартные процедуры по установке SSL сертификатов на Kibana, заведение системного пользователя для Fluent Bit. После запуска системы было произведено тестовое наполнение данными из Fluent Bit с автоматическим созданием индекса. Как и ожидалось, некоторые поля индекса имели тип Text и не соотвествовали типам реальных данных. После ручной правки типов данных индекс приобрел таковую структуру.
index_mappings.json
{ "mappings": { "dynamic": "false", "dynamic_templates": [], "properties": { "@timestamp": { "type": "date" }, "bytes_sent": { "type": "long" }, "client_geo_ip": { "dynamic": "false", "properties": { "city_name": { "type": "text", "fields": { "keyword": { "type": "keyword", "ignore_above": 256 } } }, "continent_name": { "type": "text", "fields": { "keyword": { "type": "keyword", "ignore_above": 256 } } }, "country_iso_code": { "type": "text", "fields": { "keyword": { "type": "keyword", "ignore_above": 256 } } }, "country_name": { "type": "text", "fields": { "keyword": { "type": "keyword", "ignore_above": 256 } } }, "location": { "type": "geo_point", "ignore_malformed": false, "ignore_z_value": true }, "region_iso_code": { "type": "text", "fields": { "keyword": { "type": "keyword", "ignore_above": 256 } } }, "region_name": { "type": "text", "fields": { "keyword": { "type": "keyword", "ignore_above": 256 } } } } }, "duration": { "type": "float" }, "http_referrer": { "type": "text", "fields": { "keyword": { "type": "keyword", "ignore_above": 256 } } }, "http_user_agent": { "type": "text", "fields": { "keyword": { "type": "keyword", "ignore_above": 256 } } }, "method": { "type": "text", "fields": { "keyword": { "type": "keyword", "ignore_above": 256 } } }, "path": { "type": "text", "fields": { "keyword": { "type": "keyword", "ignore_above": 256 } } }, "remote_addr": { "type": "text", "fields": { "keyword": { "type": "keyword", "ignore_above": 256 } } }, "remote_user": { "type": "text", "fields": { "keyword": { "type": "keyword", "ignore_above": 256 } } }, "request_id": { "type": "text", "fields": { "keyword": { "type": "keyword", "ignore_above": 256 } } }, "request_length": { "type": "long" }, "request_proto": { "type": "text", "fields": { "keyword": { "type": "keyword", "ignore_above": 256 } } }, "request_query": { "type": "text", "fields": { "keyword": { "type": "keyword", "ignore_above": 256 } } }, "request_time": { "type": "float" }, "server_addr": { "type": "ip", "fields": { "keyword": { "type": "keyword" } } }, "server_geo_ip": { "dynamic": "false", "properties": { "city_name": { "type": "text", "fields": { "keyword": { "type": "keyword" } } }, "continent_name": { "type": "text", "fields": { "keyword": { "type": "keyword" } } }, "country_iso_code": { "type": "text", "fields": { "keyword": { "type": "keyword" } } }, "country_name": { "type": "text", "fields": { "keyword": { "type": "keyword" } } }, "location": { "type": "geo_point" }, "region_iso_code": { "type": "text", "fields": { "keyword": { "type": "keyword" } } }, "region_name": { "type": "text", "fields": { "keyword": { "type": "keyword" } } } } }, "status": { "type": "short", "ignore_malformed": false, "coerce": true }, "upstream_addr": { "type": "text", "fields": { "keyword": { "type": "keyword", "ignore_above": 256 } } }, "upstream_response_length": { "type": "long", "ignore_malformed": false, "coerce": true }, "upstream_response_time": { "type": "float", "ignore_malformed": false, "coerce": true }, "upstream_status": { "type": "short", "ignore_malformed": false, "coerce": true }, "vhost": { "type": "text", "fields": { "keyword": { "type": "wildcard", "ignore_above": 256 } } }, "x_forward_for": { "type": "ip", "fields": { "keyword": { "type": "keyword" } } } } } }
Перед попаданием в индекс, данные проходят дополнительную стадию обработки (Ingest Pipeline) для извлечения геоданных по адресам клиента и сервера из полей x_forward_for и server_addr по базе данных GeoIP2. Эта информация хранится объектах client_geo_ip и server_geo_ip соответственно.
Финальный результат
После настройки всех необходимых систем и сбора некоторого количества данных были настроены визуализации и дешборды для более понятного представления общей картины. Одной из таких визуализация является тепловая карта подключений пользователей, представленная на рисунке ниже.

Данная статья является моей первой статьей на Хабр. Я буду рад узнать о других способах реализации данного кейса, а также о советах по оптимизации текущей установки со стороны более опытных в этом деле людей. :)
