Как мы упростили работу с данными с помощью пайплайна: пошаговый план
Привет, меня зовут Руслан Шкарин. Я Senior Software Engineer и это моя первая статья в DIY-медиа вАЙТИ от beeline cloud. Расскажу, как занимался построением пайплайна для сбора и анализа логов системы в реальном времени для сервиса, который обходил сотни тысяч веб-страниц и парсил контактные данные.
Зачем нам пайплайн
Пайплайн нужен был для того, чтобы выявить ошибки и аномалии в системе, а также записать отфильтрованные логи в OpenSearch. В реальном времени, при большом объеме данных невозможно просмотреть глазами журнал с логами ошибок, поэтому нужно автоматизированное решение. Например, если мы настраиваем анализатор скачанной веб-страницы на поиск номера телефона, то в 100 случаях разбор происходит по плану, а затем один из номеров оказывается записан в нестандартном формате, и в лог попадает ошибка. Человеку понадобятся часы, чтобы просмотреть все записи и понять, в чем проблема, а пайплайн найдет ошибку за секунды.
Так как задачей было регулярно собирать контактные данные с веб-страниц, нам требовалась стабильно работающая система. Представим, что анализатор неисправен (например, пропущена ошибка в коде после очередного релиза). В таком случае скрипт будет работать вхолостую до тех пор, пока ошибка не будет обнаружена. Теряется время, за которое можно было бы выполнить много полезной работы.
Наша система настроена на генерацию уведомлений при достижении заданного порога — количества ошибок. Как это было сделано, я опишу далее.
Какие сервисы мы использовали и что сделали
Для анализа логов в реальном времени мы использовали сервисы от Amazon — CloudWatch, Kinesis Firehose, Lambda, OpenSearch, S3.
Расскажу по порядку, что и для чего было сделано в той части системы, которая занималась анализом данных.
Предварительная настройка сервисов
Мы использовали Kinesis Firehose Delivery Stream — гибкий сервис от AWS, предназначенный для упрощения и автоматизации процесса загрузки потоковых данных в различные сервисы AWS, такие как Amazon S3, Amazon Redshift, Amazon Elasticsearch Service и, в нашем случае, Amazon OpenSearch Service. Для решения задачи выбрали именно Firehose из-за автоматического масштабирования и способности обрабатывать большие объемы данных в реальном времени.
CloudWatch. В разделе Subscription filters создали фильтр для нужных нам логов. По сути, это «папка», которая отсекает логи из других систем в AWS-аккаунте.
В качестве фильтра использовали заранее созданный Firehose Delivery Stream, который принимал логи. Delivery Stream агрегирует записи заданное количество времени или пока не будет достигнут заданный объем — в нашем случае оптимально было агрегировать логи на протяжении 15 минут.
Математика была такая:
четыре сервера обходили примерно 200 000 страниц в сутки, то есть 2,3 страницы в секунду;
на 1000 посещенных страниц приходилось в среднем 20 ошибок — это 2%;
за 900 секунд (15 минут) серверы обходили 2070 страниц (2,3 страницы в секунду × 900);
2% ошибок на 2070 страницах — это, грубо говоря, 41 ошибка за 15 минут, или 164 ошибки в час.
Разработка функции для обработки логов
Затем мы создали Lambda-функцию, которая занималась получением и обработкой логов, агрегированных в Delivery Stream. После обработки запись попадала обратно в Delivery Stream для последующей передачи в OpenSearch.
Здесь следует заметить, что в случае помещения записи обратно в Delivery Stream не происходила бесконечная рекурсия. Когда Kinesis Firehose отправляет данные на обработку в Lambda, она не просто «помещает» их обратно в тот же поток Firehose. После обработки данных Lambda-функцией они возвращаются не в начало потока, а в раздел, отвечающий за доставку в конечные места назначения — OpenSearch или S3. Firehose понимает, что возвращенные Lambda данные уже обработаны, и не отправляет их в Lambda повторно.
Пример кода Lambda-функции(Node.js), которая занималась обработкой логов:
exports.handler = async(event) = >{
let output = event.records.map((record) = >{
let entry = Buffer.from(record.data, 'base64').toString('utf8');
let log = JSON.parse(entry);
// Проверяем, что уровень лога — 'error'
if (log.level === 'error') {
// Создаем новый объект с необходимыми полями
const errorLog = {
host: JSON.parse(log.data).host,
// Парсим JSON-строку из поля data
status: log.level,
time: log.time,
msg: log.msg
};
let outputData = Buffer.from(JSON.stringify(errorLog)).toString('base64');
return {
recordId: record.recordId,
result: 'Ok',
data: outputData,
// Возвращаем обработанный лог в Firehose
};
}
// Для логов без ошибок возвращаем 'Dropped', чтобы Firehose их не отправлял в OpenSearch
return {
recordId: record.recordId,
result: 'Dropped',
data: record.data,
};
});
return {
records: output
};
};
Сбор логов для анализа и хранения
Далее мы настроили OpenSearch, который принимал записи из Delivery Stream. OpenSearch использовался отделом контроля качества для дальнейшего анализа отфильтрованных и обработанных логов. Для этого данные визуализировали на дашбордах, а также создавали алерты на случай обнаружении аномалий.
Для поиска аномалий использовали SQL-запросы в OpenSearch такого вида:
SELECT host, COUNT(*) as error_count
FROM collector_index
WHERE status = 'error'
AND time >= now() - INTERVAL 1 HOUR
GROUP BY host
HAVING COUNT(*) > 164
Здесь 164 — это пороговое число ошибок за час, которое мы рассчитали ранее. Если количество ошибок превышало это значение, срабатывал алерт.
По мере развития проекта количество ошибок уменьшалось, и настройки приходилось менять.
Кроме того, вынесение функционала анализа в отдельный сервис позволило масштабировать и усложнять систему обработки в будущем.
Помимо OpenSearch, дополнительно мы настроили отправку логов в S3 bucket. Это нужно для дополнительного резервирования на время разработки, чтобы при необходимости иметь возможность просмотреть историю. В качестве типа хранилища использовали S3 ‘One Zone–Infrequent Access’ — для экономии средств.
Заключение
Отличительной чертой этой системы является то, что ее можно использовать для обработки и анализа любых данных в реальном (например, Kinesis Data Stream + Lambda) или практически реальном (например, Kinesis Data Stream + Kinesis Data Firehose) времени, так как Kinesis Data Firehose в качестве input может принимать данные как от Amazon-сервисов, так и от абсолютно любых других источников с помощью AWS API или SDK.
Благодаря тому, что решение полностью облачное, нам не нужно управлять собственными серверами, настройка и отладка которых занимала бы намного больше времени. На весь проект ушло примерно полтора месяца: около двух недель на построение прототипа и еще месяц на отладку и шлифовку до нужной нам работоспособности.
И главное, нам стало проще анализировать, что происходит в большом потоке данных. Когда система выросла, нам стало не хватать обычного логирования всего подряд, потребовалась дополнительная обработка данных в реальном времени. Пайплайн отлично справился с этой задачей.
Другие статьи с вАЙТИ
#Разработка
Внедряем CRM. 7 шагов, основанных на личном опыте
Рассказываю, что помогает нам успешно внедрять CRM в бизнес-процессы клиентов.
Замена сотрудников техподдержки чат-ботом: «подводные камни» и советы
Моя специальность — чат-боты, и я расскажу, как их создать и заставить работать на ваш бизнес.
Jmix — любовь с первого взгляда, если ты Java-программист
Объясняю, почему нашей компании нужен только Jmix: подробно об инструментах и интерфейсе.
#Kubernetes
Устанавливаем отказоустойчивый кластер Kubernetes
Показываю, как сделать надежный кластер с помощью дополнительные серверов и балансировщика нагрузки.
Пошаговое руководство Как установить кластер Kubernetes с CRI-O в качестве Container Engine.
Как провести аудит безопасности Kubernetes — с помощью утилит Kube-bench, Kube-hunter, Kubescape, Trivy, собрать итоговый отчет о проверке и исправить уязвимости.
#Хранение данных
Как нам помог опыт использования отечественного BI-инструмента
Как мы свели все данные о бизнесе в BI-систему и что получили благодаря миграции.
Защита личных данных пациентов. Как работать с конфиденциальными медданными
Как работает с личными данными компания, которая проводит дистанционные медицинские осмотры.
Как перенести 2 ТБ данных из одного дата-центра в другой при низкой скорости интернета
Что нужно сделать, чтобы быстро объединить два кластера MongoDB, которые работают в разных ЦОД.