Привет! Я Артём, скромный платформенный инженер: код пишу, метрики смотрю и иногда даже понимаю, что происходит. В работе мне часто приходится сталкиваться с Observability‑инструментами. Одним из таких инструментов, о котором я хотел бы рассказать - OpenTelemetry Collector. Это мощный инструмент, который позволяет работать с различной телеметрией и строить гибкие пайплайны для метрик, логов и трейсов.
Но иногда возможностей стандартного набора компонентов не хватает, чтобы справиться с поставленными задачами без использования костылей и изоленты. Тогда на сцену выходят кастомные компоненты для Otel-Collector.

Содержание
Введение
В стандартном наборе opentelemetry-collector-contrib много готовых процессоров: фильтрация, нормализация, лимит памяти и так далее. Но у нас возникла задача: обогащать спаны данными из внешнего справочника (в нашем случае выбор пал на CSV‑файл).
Например, у нас есть спан с атрибутом SolObjectID.
А в CSV лежит таблица, где этому SolObjectID соответствует поле trace_code, очень нужное нам, по каким-то причинам. Хочется, чтобы в трейсе появлялся этот trace_code без дополнительного изменения кода в сервисах. К сожалению, готовых решений в contrib версии OpenTelemetry Collector не предусмотрено, однако, такой функционал очень даже хочется иметь. Взять и написать свой процессор - кажется хорошей идеей.
В этой статье я расскажу про свой опыт написания кастомного процессора для otel-collector. Он обогащает спаны данными из CSV‑файла: находит совпадение по атрибуту и добавляет дополнительные поля прямо внутрь трейсов. Мы разберём архитектуру процессора, посмотрим код и конфигурацию, а в конце покажу, как собрать и запустить Collector с этим расширением.
Архитектура кастомного процессора
Любой компонент в Collector (receiver, processor, exporter, connector) устроен по одному принципу: у него есть Config, Factory и сама логика (в нашем случае Processor).
1. Config (config.go)
Config описывает, какие параметры мы можем задать в YAML‑конфигурации Collector.
В OpenTelemetry Collector каждый компонент имеет свой конфиг. Collector ожидает, что этот конфиг будет реализовывать интерфейс component.Config.
Collector не знает заранее, какие именно поля есть у нашего конфига. Но он знает, что любой конфиг - это объект, который реализует интерфейс component.Config. Это позволяет Collector работать с любым компонентом одинаково.
Фрагмент Config кода:
type Config struct { CSVPath string `mapstructure:"csv_path"` MatchField string `mapstructure:"match_field"` EnrichColumns []string `mapstructure:"enrich_columns"` ReloadInterval time.Duration `mapstructure:"reload_interval"` } var _ component.Config = (*Config)(nil)
Что здесь важно:
CSVPath- путь к CSV‑файлу.MatchField- атрибут спана, по которому ищем совпадения.EnrichColumns- список колонок, которые нужно добавить в спан.ReloadInterval- как часто обновлять данные из CSV без перезапуска Collector.
Пример использования в collector-config.yaml:
processors: csvenricherprocessor: csv_path: "/etc/mapping.csv" match_field: "SolObjectID" enrich_columns: [ "trace_code", "product_id" ] reload_interval: "5m"
2. Factory (factory.go)
Factory - это «фабрика» для создания процессора. Collector работает по контракту: каждый компонент обязан предоставить фабрику, чтобы Collector мог его корректно зарегистрировать и собрать в пайплайн.
Фабрика отвечает за три ключевых момента:
Тип компонента - строковый идентификатор, по которому Collector узнаёт наш процессор.
var ( strType = component.MustNewType("csvenricherprocessor") )
Конфигурация по умолчанию - значения, которые будут использоваться, если параметры не заданы в YAML.
func createDefaultConfig() component.Config { return &Config{ MatchField: "SolObjectID", // дефолтное поле для поиска EnrichColumns: []string{"trace_code", "another_code"}, // дефолтные колонки для обогащения } }
Создание самого процессора - функция, которая принимает контекст, конфигурацию и
nextConsumer(следующий в цепочке компонент), а возвращает рабочий экземпляр процессора.
func createTracesProcessor( ctx context.Context, set processor.Settings, cfg component.Config, nextConsumer consumer.Traces, ) (processor.Traces, error) { processorCfg, ok := cfg.(*Config) if !ok { return nil, fmt.Errorf("configuration parsing error") } // создаём наш процессор proc, err := newProcessor(processorCfg, set.Logger) if err != nil { return nil, fmt.Errorf("cannot create csvenricher processor: %w", err) } // оборачиваем в хелпер return processorhelper.NewTraces( ctx, set, cfg, nextConsumer, proc.processTraces, processorhelper.WithCapabilities(consumer.Capabilities{MutatesData: true}), processorhelper.WithStart(proc.start), processorhelper.WithShutdown(proc.shutdown), ) }
Всё это объединяется в фабрике:
func NewFactory() processor.Factory { return processor.NewFactory( strType, createDefaultConfig, processor.WithTraces(createTracesProcessor, stability), ) }
Таким образом, Factory:
регистрирует наш процессор в Collector (под уникальным именем csvenricherprocessor);
гарантирует, что у процессора всегда есть валидная конфигурация (даже если YAML пустой);
описывает, как создать рабочий экземпляр для обработки трейсов;
подключает lifecycle-хуки: start, shutdown, processTraces.
Без фабрики Collector просто не сможет «узнать» о существовании процессора и включить его в пайплайн.
3. Processor (processor.go)
Processor - это место, где реализуется бизнес-логика обработки данных.
В отличие от Factory (которая только регистрирует и создаёт компонент), Processor отвечает за всё: загрузку данных, обработку входящих спанов и graceful shutdown.
Разберём ключевые части.
Структура процессора
type csvEnricherProcessor struct { logger *zap.Logger config *Config csvData []map[string]string // мапа с данными из таблицы matchIndex map[string]int // индекс для поиска по значению `MatchField` mu sync.RWMutex ticker *time.Ticker stopChan chan struct{} }
Что тут есть:
logger- для логирования (стандартная практика в Collector).config- ссылка на Config, чтобы знать, где искать CSV, какое поле использовать и т.д.csvDataиmatchIndex- подготовленные данные из CSV для быстрого поиска.mu-sync.RWMutexдля потокобезопасного доступа к данным (Collector обрабатывает данные конкурентно).tickerиstopChan- управление циклом периодической перезагрузки CSV.
Жизненный цикл
Processor должен уметь запускаться и корректно завершаться:
start() - поднимает бэкграунд-процесс для регулярной перезагрузки CSV.
func (p *csvEnricherProcessor) start(ctx context.Context, host component.Host) error { p.logger.Info("Starting CSV Enricher Processor") // Если интервал <= 0, просто загружаем один раз и выходим if p.config.ReloadInterval <= 0 { ... return nil } p.logger.Info("Starting background CSV reload loop", zap.Duration("interval", p.config.ReloadInterval)) go func() { ticker := time.NewTicker(p.config.ReloadInterval) defer ticker.Stop() for { select { case <-ticker.C: p.logger.Info("Reloading CSV data") if err := p.loadCSVData(); err != nil { p.logger.Warn("Failed to reload CSV data", zap.Error(err)) } case <-ctx.Done(): p.logger.Info("CSV reload context done, stopping reload loop") return } } }() return nil }
shutdown() - останавливает тикеры и чистит ресурсы.
func (p *csvEnricherProcessor) shutdown(ctx context.Context) error { p.logger.Info("Shutting Down CSV Enricher Processor") if p.ticker != nil { p.ticker.Stop() } close(p.stopChan) return nil }
loadCSVData() - загрузка данных из csv в мапу
func (p *csvEnricherProcessor) loadCSVData() error { file, err := os.Open(p.config.CSVPath) ... records, err := reader.ReadAll() ... // headers + построение индекса по MatchField }
Тут алгоритм такой:
Открываем файл.
Читаем все строки.
Берём заголовок (header).
Строим
map[matchValue]→ индекс строки для быстрого поиска.
Обработка трейсов
Collector передаёт процессору батч трейсов через функцию processTraces.
func (p *csvEnricherProcessor) processTraces(ctx context.Context, td ptrace.Traces) (ptrace.Traces, error) { p.mu.RLock() defer p.mu.RUnlock() resourceSpans := td.ResourceSpans() for i := 0; i < resourceSpans.Len(); i++ { resourceSpan := resourceSpans.At(i) // enrichment на уровне resource resourceAttrs := resourceSpan.Resource().Attributes() p.enrichResource(resourceAttrs) } return td, nil }
В нашем случае мы решили обогащать атрибуты ресурса (Resource.Attributes), а не отдельные спаны.
Почему так:
ресурс привязан к сервису/экземпляру, и логично обогащать его.
меньше дублирования - обогащение на уровне Resource применяется ко всем спанам внутри.
Логика enrichment
func (p *csvEnricherProcessor) enrichResource(resourceAttrs pcommon.Map) { matchValue, exists := resourceAttrs.Get(p.config.MatchField) if !exists || matchValue.Type() != pcommon.ValueTypeStr { return } ... recordIdx, found := p.matchIndex[matchValue.Str()] ... for _, column := range p.config.EnrichColumns { if value, ok := record[column]; ok { resourceAttrs.PutStr(column, value) } } }
Если в ресурсных атрибутах есть поле MatchField, мы ищем его в CSV и добавляем все указанные колонки как новые атрибуты.
Итого, Processor:
управляет данными (CSV → память → быстрый поиск);
добавляет атрибуты к ресурсам;
живёт по жизненному циклу (start/shutdown);
потокобезопасен (
sync.RWMutex);умеет обновлять данные «на лету» без рестарта Collector.
Как встроить процессор в Collector
Чтобы кастомный процессор оказался в финальном бинарнике Collector, нужно пересобрать его с помощью otelcol‑builder. Подробно про сборку бинарника можно прочитать в официальной документации
Мы же готовим Docker Image, поэтому опишу, как собрать кастомный образ.
Шаг 1. Настройка builder-config.yaml
В файле builder-config.yaml в конец блока processors добавить ссылку на репозиторий с кодом кастомного процессора, версия релиза обязательна.
processors: - gomod: go.opentelemetry.io/collector/processor/batchprocessor v0.128.0 - gomod: go.opentelemetry.io/collector/processor/memorylimiterprocessor v0.128.0 ... # наш процессор - gomod: github.com/hiphopzeliboba/csvenricherprocessorr v0.3.0
При разработке процессора или его тестировании, можно использовать replaces и подключить локальный пакет:
replaces: - github.com/hiphopzeliboba/csvenricherprocessor => /path/to/local/csvenricherprocessor
В файле builder-config.yaml указать корректный name и output_path.
output_path должен соответствовать пути ENTRYPOINT ["/otelcol-contrib"] в Dockerfile
dist: module: github.com/open-telemetry/opentelemetry-collector-contrib/cmd/otelcontribcol name: otelcol-contrib decription: Local OpenTelemetry Collector Contrib binary, testing only. version: 0.128.0-dev output_path: ./otelcol-contrib
Шаг 2. Сборка Docker Image
Пример готового Doсkerfile можно найти тут
docker build -t opentelemetry-collector-contrib-custom:1.128.4 .
Шаг 3. Запускаем Collector (локально, для тестов)
Тут мы пробрасываем необходимый collector-config.yaml и монтируем enrich.csv внутрь контейнера
docker run -it --rm \ -v $(pwd)/enrich.csv:/data/enrich.csv \ -v $(pwd)/collector-config.yaml:/otelcol/collector-config.yaml \ -p 4317:4317 -p 4318:4318 -p 8888:8888 \ --name otelcol-custom opentelemetry-collector-contrib-custom:0.128.4
На этом этапе всё должно заработать :-)
Тестирование
Чтобы проверить работу процессора, я написал маленькую утилиту test_tracer на Go. Она отправляет тестовые спаны с нужными атрибутами, чтобы убедиться, что обогащение работает. (Работает на стандартных портах Collector)
cd test_tracer/ go mod tidy go run tracer.go
Итог
Мы реализовали кастомный процессор для OpenTelemetry Collector, который:
читает данные из CSV;
сопоставляет их по ключу с атрибутами спана;
добавляет новые поля в трассировки;
обновляет справочник без перезапуска Collector.
Благодаря модульной архитектуре Collector, сделать такой процессор оказалось не так сложно - главное понять три кита: Config, Factory и Processor.
Кастомные компоненты - это удобный способ расширить возможности OpenTelemetry Collector под свои бизнес‑нужды.
Если стандартных компонентов не хватает, можно писать свои: достаточно один раз разобраться в структуре, и дальше получится быстро собирать собственные «кирпичики» для Observability.Надеюсь, эта статья будет полезна и поможет вам сэкономить кучу времени, экспериментов и поисков по коду contrib‑репозитория :-).
Репозиторий c проектом
Полный код проекта доступен на GitHub:
hiphopzeliboba/otel-collector-contrib-custom-processor
