
Статья рассматривает внедрение Open Policy Agent (OPA) для управления авторизацией в кластерах Apache Kafka на bare metal‑серверах.
В рамках статьи проанализированы ограничения стандартных ACL и предложено решение на основе Open Policy Agent (OPA), обеспечивающее декларативное управление доступом через Policy as Code (PaaC). Особое внимание уделено обновлению OPA Kafka Plugin: создан pull request, в котором устранены уязвимости привносимые в OPA библиотекой Guava и реализован переход на более производительную библиотеку Caffeine. Описан процесс интеграции OPA с Kafka, включая автоматизацию доставки политик через Bundle API и S3-хранилище.
1. Краткая постановка проблемы
На фоне опыта в развертывании нескольких кластеров Kafka появилось понимание того, что стандартные Access Control List'ы (ACL) неудобны по следующим причинам:
сложность в конфигурировании через одну из вспомогательных утилит Kafka (bash‑скрипты, поставляемые в комплекте);
необходимость вести реестр разрешений отдельно от ПО‑конфигуратора;
даже в инструменте RedPanda, который нивелирует первый недостаток и позволяет достаточно удобно управлять разрешениями в UI, отсутствует валидация применяемых изменений со стороны кого‑либо уполномоченного на валидацию таких изменений в момент их применения;
отсутствие полноценной поддержки CI\CD, позволяющего реализовать идемпотентное изменение политик доступа.
Таким образом родилась задача по поиску альтернативы, которая позволяла бы осуществлять авторизацию, обеспечивала возможность декларативного объявления политик, позволяла настроить универсальную и понятную пользователям автоматизацию обновления политик, а также могла работать с механизмом SASL_SSL + GSSAPI (Kerberos‑аутентификатором).
2. Обзор рассмотренных альтернатив
Начать анализ продуктов было решено с крупных проектов и затем, если те будут обладать критическими недостатками (исходя из постановки задачи в приведенном случае), рассматривать и менее популярные решения. Такое ранжирование решений было выбрано в том числе потому, что не на последнем месте стояли такие требования, как охват в комьюнити по решению в целом, его поддержка и развитие, а также наличие достаточно подробной документации.
2.1. Apache Ranger
Apache Ranger— это фреймворк для обеспечения, мониторинга и управления комплексной безопасностью данных в первую очередь на платформе Hadoop, но также может быть интегрирован и с некоторыми другими продуктами Apache, в том числе и с Kafka. Однако инструмент ориентирован прежде всего на работу в UI или с REST API, политики, которые могут быть описаны в виде кода, из‑за универсальности инструмента, приобретают тяжело поддерживаемый человеком формат, если работать с ними именно как с кодом. Пример политики для Kafka.
2.2. OPA
Open Policy Agent (OPA) — это инструмент, который позволяет «написать свою политику». Изначально позиционируется как облачный инструмент для управления политиками по принципу Policy as a Code (PaaC), имеет множество интеграций с различными системами (в том числе, за счет простоты интеграции), включая плагин для Kafka. Возможность написания собственной логики политики предоставляет широкие возможности для кастомизации и решения именно поставленной задачи. Однако, как написано в знаменитом приветственном слове утилиты sudo:
«С большой силой приходит большая ответственность»
Когда появляется возможность управления логикой разрешений на столь низком уровне, стоит очень внимательно относиться к тестированию полученных политик.
Именно на данном инструменте были сосредоточены дальнейшие усилия, в силу вводных, он был оценен как весьма перспективный для решения поставленной задачи.
3. Kafka + OPA
На официальном сайте OPA представлено руководство по интеграции OPA в качестве авторизатора Kafka. Данное руководство предлагает тестовый стенд, состоящий из Kafka (на Zookeeper), самого OPA, который развернут по отношению к Kafka внешним образом, а также web‑сервера Nginx, который выступает в качестве сервера распространения политик. Примечательным является то, что Kafka используется совместно с OPA Kafka Plugin, который представляет собой набор переопределенных методов базового класса org.apache.kafka.server.authorizer.Authorizer
. Он обеспечивает передачу контекста в OPA‑сервер и получение от него ответа о разрешении или запрете действия, переданного ранее в контексте. Несмотря на постоянное развитие самого OPA и регулярные релизы, было обращено внимание на то, что на данный момент у плагина дата последнего релиза — 9 марта 2023 года (1.5.1). Также возникли проблемы с библиотекой Guava при установке плагина на Kafka 3.7. Поэтому было принято решение обновить плагин и создать PR в официальный репозиторий. Подробнее будет описано в разделе ниже.
3.1. OPA Kafka Plugin 1.5.2
Кроме конфликта зависимостей с библиотекой Guava в Kafka 3.7 и OPA Kafka Plugin, потребность обновить версию OPA Kafka Plugin обусловлена наличием уязвимостей в Guava 30.1-jre:
Уязвимости были исправлены в master ветке OPA Kafka Plugin, но релиз (1.5.2) с ними на момент написания статьи так и не вышел.

В процессе анализа исходного кода OPA Kafka Plugin обнаружили, что из всего большого набора утилит Guava, используется только функциональность кеширования. Однако для локального кеширования в JVM на текущий момент есть лучшая альтернатива Guava — Caffeine.
В чем его преимущества:
в Caffeine удалось оптимизировать задержки, связанные с использованием кеша, пропускная способность эквивалента Guava;
авторы Guava рекомендуют использовать для кеширования Caffeine, который обновляется и поддерживается лучше. Примеры комментариев авторов Guava на этот счет:
с точки зрения software architecture и безопасности предпочтительнее использовать узкоспециализированные библиотеки (Caffeine) по сравнению с большим набором common utils, закрывающим большой набор потребностей разработчиков (Guava).
После замены Guava на Caffeine в OPA Kafka Plugin и заведения PR (на момент выхода статьи еще не смержен мейнтейнером OPA Kafka Plugin):
сократился размер fat‑jar OPA Kafka Plugin (с 2.9 MB до 917 Kb);
ухудшения производительности плагина на нашем workload не было обнаружено;
уязвимости и конфликт зависимостей с Kafka 3.7 были устранены.
3.2. Рассмотрение различных вариантов развертывания OPA-сервер, их преимуществ и недостатков
Как было сказано выше, в примере интеграции OPA и Kafka на официальном сайте представлен вариант, когда OPA развернут в качестве внешнего (по отношению к Kafka) сервера. Учитывая, что OPA не является кластеризируемым продуктом, обеспечение отказоустойчивости ложится на плечи конечного пользователя или же дополнительного ПО (как в случае с k8s оператором). OPA может быть развернут как на машинах, так и с использованием средств контейнеризации и оркестрации (Docker image, k8s operator).
Также OPA может быть импортирован в качестве библиотеки на Go или же установлен локально на машине, где он будет использован. При этом на официальном сайте приведены рекомендации по размещению OPA максимально «близко» к сервису, который его использует.
Исходя из соображений минимизации использования сетевого канала, а также отказоустойчивости для ситуации, когда происходит потеря сетевого соединения между Kafka и внешней OPA‑инсталляции, было принято решение о развертывании OPA сервера на каждом брокере Kafka и контроллере KRaft, а обращения плагина направлять по loopback‑интерфейсу. Сам сервер развернут в качестве исполняемого файла и обернут в systemd‑модуль.
4. Предоставление политик для OPA Server
OPA сервер, который принимает решение о предоставлении доступа, опирается на политики, переданные данному серверу. Политики могут предоставляться в нескольких форматах, а также иметь разные способы распространения. В рамках статьи будут рассмотрены только несколько основных методов, которые использовались при настройке интеграции OPA + Kafka.
4.1. Рассматриваемые форматы политик
OPA поддерживает передачу файлов с политиками и данными, на основе которых происходит принятие решений, (policy.rego, data.json) — как аргументы запуска сервера, а также использование предварительно собранных bundle с данными файлами.
4.1.1. Использование явных файлов политик
При использовании явных локальных файлов политики необходимо обеспечить их идентичность и возможность централизованного обновления для каждого из серверов (т.к. OPA не является кластерезуемым решением, и в этом решении отсутствует единая точка распространения политик). Данный способ также поддерживает обновление активных политик «на лету» — без рестарта сервиса, но, имея возможность разместить политики на внешнем хранилище, лучше рассмотреть следующий способ.
4.1.2. Использование bundle с политиками
Bundle представляет собой архив, в который уже добавлены все политики с дата‑файлами по каждому из сервисов (либо для одного сервиса). Данный метод является предпочтительным, потому что он поддерживает возможность автоматизированного обновления политик из внешних источников, но при этом добавляет сложность в виде необходимости предварительной сборки bundle. Несмотря на сложности со сборкой, было принято решение использовать именно данный метод предоставления политик.
4.2. Методы сборки bundle
Будет рассмотрено два основных способа сборки bundle политик для сервера OPA:
builder в составе OPA Operator (на кластере K8s);
использование пакета OPA для ручной сборки.
4.2.1. Сборка с использованием OPA Operator
Использование оператора в k8s потенциально могло бы решить проблему со сборкой и распространением bundle, а также их декларативным объявлением с помощью ConfigMap и их деплоем посредством GitOps‑инструмента, например, ArgoCD. Но было принято решение отказаться от данного метода, конкретную реализацию и причины можно прочитать отдельно под спойлером.
Сборка с использованием OPA Operator
Для экспериментов был выбран Stackable Operator for OPA (OpenPolicyAgent), который позволяет объявить CRD OpaCluster, который порождает DaemonSet с OPA, а также обрабатывает ConfigMap'ы с label opa.stackable.tech/bundle: "true"
, у каждого Pod'а, порожденного DaemonSet'ом, присутствует два контейнера:
opa — контейнер с OPA, который запущен в режиме сервера, а bundle получает от контейнера bundle‑builder;
bundle‑builder — контейнер для внутренних нужд, который собирает bundle на основе политик, размещенных в ConfigMap (все политики собираются в единый bundle, поскольку OPA должна работать с одним bundle). Данный контейнер предназначен для внутреннего использования и недоступен по умолчанию для обращения напрямую, лишь из контейнера opa.
Подразумевается, что сервисы k8s могут обращаться по DNS‑имени Service для получения «решения» на основе политик, загруженных в OPA, однако в рассматриваемом случае куда больший интерес представляла возможность обращения напрямую к bundle‑builder для получения bundle.tar.gz
для последующего использования в OPA, установленного на Kafka. Для этого манифесты развертывания CRD OpaCluster
были дополнены следующими абстракциями.
Не рекомендуется к использованию именно в приведенном виде, поскольку данные манифесты предназначались для тестирования!
# You can create new cluster simply copy-paste this file and replace
# 1. "opa-example-cluster.mycorp.local" to your ingress address
# 2. "opa-example-cluster" to your new cluster name
---
# OPA cluster settings
apiVersion: opa.stackable.tech/v1alpha1
kind: OpaCluster
metadata:
name: opa-example-cluster
namespace: opa-example-cluster
spec:
image:
productVersion: "1.0.1"
stackableVersion: "0.0.0-dev"
repo: "stackable"
servers:
roleGroups:
default:
config:
resources:
cpu:
min: 250m
max: 500m
memory:
limit: 4Gi
---
# Ingress (optional)
apiVersion: networking.k8s.io/v1
kind: Ingress
metadata:
name: opa-example-cluster
namespace: opa-example-cluster
spec:
ingressClassName: nginx
rules:
- host: opa-example-cluster.mycorp.local
http:
paths:
- backend:
service:
name: opa-example-cluster
port:
number: 8081
path: /
pathType: ImplementationSpecific
---
apiVersion: v1
kind: Service
metadata:
name: opa-example-cluster-policy-builder
namespace: opa-example-cluster
spec:
ports:
- name: http
port: 3030
protocol: TCP
targetPort: 3030
selector:
app.kubernetes.io/component: server
app.kubernetes.io/instance: opa-example-cluster
app.kubernetes.io/name: opa
type: ClusterIP
---
# Ingress for bundles (optional)
apiVersion: networking.k8s.io/v1
kind: Ingress
metadata:
name: opa-example-cluster-bundles
namespace: opa-example-cluster
spec:
ingressClassName: nginx
rules:
- host: opa-example-cluster.mycorp.local
http:
paths:
- backend:
service:
name: opa-example-cluster-policy-builder
port:
number: 3030
path: /opa/v1/opa/
pathType: ImplementationSpecific
В приведенном случае появлялась возможность получения bundle.tar.gz по ссылке https://opa‑example‑cluster.mycorp.local/opa/v1/opa/bundle.tar.gz
, которую можно было указать для OPA на Kafka, после чего bundle мог использоваться.
Объявление политики в этом случае представляло собой объявление ConfigMap. Пример приведен ниже:
---
apiVersion: v1
kind: ConfigMap
metadata:
name: kafka
labels:
opa.stackable.tech/bundle: "true"
data:
data.yaml: |
---
cluster_admins:
- kafka/demouser1
cluster_viewers:
- kafka/demouser2
topics_rules:
- topic: demo
users:
- username: kafka/demouser2
permissions:
- topic_write
- topic_read
kafka.rego: |
package kafka.authz
## POLICY START
# Default policy: deny everything that is not explicitly allowed
default allow := false
# High-level allow rules
allow if {
allow_cluster_admin
}
allow if {
allow_cluster_viewer
}
allow if {
allow_topic_opertions
}
allow if {
allow_non_topic_operations
}
## POLICY END
## PERMISSIONS CONFIGURATION START
# Static permissions for kafka actions
CLUSTER_VIEWER_PERMISSIONS := {
"TOPIC": ["DESCRIBE", "DESCRIBE_CONFIGS"],
"GROUP": ["READ", "DESCRIBE"],
"CLUSTER": ["DESCRIBE", "DESCRIBE_CONFIGS"]
}
# Key of JSON match names used in topics_rules.users.permissions
USER_PERMISSIONS := {
"read":
{
"TOPIC": ["READ", "DESCRIBE"],
"GROUP": ["READ", "DESCRIBE"],
"TRANSACTIONAL_ID": ["DESCRIBE", "WRITE"]
},
"write":
{
"TOPIC": ["WRITE", "DESCRIBE"],
"GROUP": ["READ", "DESCRIBE"],
"TRANSACTIONAL_ID": ["DESCRIBE", "WRITE"],
"CLUSTER": ["IDEMPOTENT_WRITE"]
}
}
## PERMISSIONS CONFIGURATION END
## GET CONFIG FROM JSON START
# Get data from policy
topics_rules := data.topics_rules
cluster_admins := data.cluster_admins
cluster_viewers := data.cluster_viewers
## GET CONFIG FROM JSON END
## POLICY DETERMINATOR START
# Low-level functions for evaluate roles and permissions
allow_cluster_admin if {
some cluster_admin in cluster_admins
# Match cluster admin user
input.requestContext.principal.name == replace(cluster_admin, "/", "+")
}
allow_cluster_viewer if {
some cluster_viewer in cluster_viewers
# Match cluster viewer user
input.requestContext.principal.name == replace(cluster_viewer, "/", "+")
# Match permissions
input.action.operation == CLUSTER_VIEWER_PERMISSIONS[input.action.resourcePattern.resourceType][_]
}
allow_topic_opertions if {
some topic_rules in topics_rules
# Match topic name
input.action.resourcePattern.name == topic_rules.topic
some user in topic_rules.users
# Match topic user
input.requestContext.principal.name == replace(user.username, "/", "+")
some permission in user.permissions
# Match permissions for topic resources
input.action.operation == USER_PERMISSIONS[permission][input.action.resourcePattern.resourceType][_]
}
allow_non_topic_operations if {
some topic_rules in topics_rules
some user in topic_rules.users
# Match user
input.requestContext.principal.name == replace(user.username, "/", "+")
some permission in user.permissions
# Match permissions for non topic resources
input.action.resourcePattern.resourceType != "TOPIC"
input.action.operation == USER_PERMISSIONS[permission][input.action.resourcePattern.resourceType][_]
}
# ## POLICY DETERMINATOR END
При этом внутренняя структура формируемого bundle получалась следующая:
bundle.tar.gz
└── bundles
└── kafka
├── data.yaml
└── kafka.rego
Особенность работы оператора заключается в том, что наименование директории, в которой будут располагаться файлы политики (policy.rego
и data.json
файлы), будет соответствовать наименованию ConfigMap (в приведенном примере — kafka
).
Основными причинами, по которым данный подход не получил развития в статье, являются следующие два факта:
со стороны оператора нет возможности осуществить валидацию на стадии написания ConfigMap. т. е. неверная с точки зрения OPA и верная с точки зрения синтаксиса k8s ConfigMap не позволит собраться новой версии bundle. Тем не менее, в силу особенностей OPA, продолжит работать предыдущая (успешно собранная) версия bundle, однако доставка изменений политик будет ограничена в этот момент;
второй недостаток был обнаружен в ходе тестирования, заключается он в том, что удаление ConfigMap с политикой из k8s не приводит к удалению политики из bundle. Пример:
bundle.tar.gz └── bundles ├── demo-prod-kafka │ ├── data.json │ └── kafka.rego ├── kafka │ ├── data.json │ ├── data.yaml │ └── kafka.rego ├── kafka-demo │ ├── data.json │ └── kafka.rego └── kafkademoprod ├── data.json └── kafka.rego
Так выглядит bundle, отдаваемый bundle‑builder'ом после того, как были созданы и удалены ConfigMap'ы demo‑prod‑kafka
, kafka‑demo
, kafkademoprod
. ConfigMap kafka
, хоть и присутствует, но был изменен data.json
на data.yaml
, тем не менее обе версии присутствуют в bundle.
4.2.2. Локальная сборка с использованием бинарного файла OPA
Bundle может быть собран вручную с использованием пакета OPA, такой способ наиболее прост и понятен при использовании с разными службами, но требует отдельный образ и/или хост для осуществления сборки.
Сборка осуществляется с выбором директории, в которой находятся файлы *.rego
и *.yaml\*.yml\*.json
, также возможно указать дополнительные параметры; пример использования:
opa build ./ --output bundle-tests.tar.gz --ignore '.gitlab-ci.yml*'
При этом структура bundle будет несколько отличаться от собираемого с использованием OPA Operator:
bundle.tar.gz
├── data.json
├── kafka.rego
└── .manifest
Было принято решение использовать данный метод в связи с большим выбором дополнительных параметров и кастомизации, а также в связи со спецификой сетевой архитектуры компании. Решение с автоматической сборкой и деплоем в хранилище bundle будет описано далее (PaaC + GitLab CI).
4.3. Способы динамического обновления политик
Доставка политик из внешних источников для OPA Server может осуществляться различными способами, которые подробно описаны в статье, один из них — Bundle API (в режиме pull), также поддерживается режим динамического обновления политики при работе с файлами, эти способы и будут рассмотрены ниже.
4.3.1. Обновление файлов политики
При использовании локальных файлов политик и данных также можно указать дополнительный параметр --watch
, чтобы сервер ОРА следил за обновлением локальных файлов и мог без перезапуска сервиса обновлять текущие политики.
4.3.2. Использование метода Bundle API
Данный метод позволяет указать внешнее хранилище для bundle, к которому будет обращаться сервер OPA для обновления политики. Внешним хранилищем может выступать HTTP‑сервер, S3-bucket, Google Cloud Storage и т. д., а также в конфигурации имеется поддержка указания нескольких внешних источников для реализации резервирования поставщиков политик. В статье будет рассмотрен вариант с HTTP‑сервером и S3-bucket, о которых написано далее.

Именно такой способ распространения bundle предлагается в примере интеграции OPA и Kafka на официальном сайте OPA.
4.3.2.1. Использование внешнего HTTP(s) сервера
Загрузка и распространение bundle с помощью HTTP‑сервера работают; более того это подтверждает то, что приведено в документации OPA: при обращении к серверу распространения bundle OPA предварительно запрашивает hash‑сумму целевого файла (в своих логах данная hash‑сумма называется Etag и в приведенном случае использует алгоритм SHA-1) и осуществляет загрузку файла лишь в том случае, если hash‑сумма на сервере не совпала с представленной локально у OPA.
Также сервер OPA в случае потери соединения с сервером распространения политик или же при отсутствии запрашиваемого файла, будет использовать текущую активную политику, в случае успеха загрузки bundle, он будет использован далее.
Причина, по которой было решено отказаться от данного варианта, состоит в том, что в официальной документации OPA большее количество времени уделено варианту с AWS S3 хранилищем (но также может быть использовано S3-совместимое), и такой вариант позиционируется как целевой.
4.3.2.2. Использование внешнего S3 хранилища
Интеграция с S3 также работает без проблем, но есть небольшие нюансы, связанные с использованием Minio вместо AWS S3. Задача заключалась том, чтобы реализовать хранение bundle на корпоративном S3, не используя облачные решения, а также дополнительно разбить кластеры Kafka для использования разных S3 Bucket (и GitLab репозиториев со своим GitLab CI). Про особенности использования Minio будет расписано в разделе реализации, ведь именно данный способ был выбран целевым.
5. Наша реализация на Kafka 3.7

Для тестирования интеграции был развернут тестовый стенд, схожий по версиям компонентов и архитектуре с продуктовым. Основные характеристики и спецификация кластера Kafka:
использование BareMetal‑серверов как брокеров;
использование VM‑серверов для KRaft;
использование S3 Minio как сервер распространения политик для OPA;
версия используемого пакета Kafka — 3.7.
Тестовый кластер Apache Kafka состоит из 8 серверов: 3 KRaft + 5 Broker. Архитектура кластера не предоставляет возможность вынести на отдельный сервер функциональность OPA Server, поэтому было принято решение устанавливать OPA непосредственно на сервера Kafka 1 в 1 (подробнее было упомянуто в пункте 3.2).
Тип сервера | ОС | Kafka Role | OPA Server | Кол‑во |
VM | RedOS | Controller | Server+Plugin | 3 шт. |
BareMetal | RedOS | Broker | Server+Plugin | 5 шт. |
5.2. Настройка OPA Server
Настройку и развертывание OPA Server было принято решение реализовать с использованием Ansible (как и основной инструмент развертывания кластера Kafka). Настройка производилась с учетом того, что будет использоваться схема с Bundle‑политиками и сервером распространения в виде корпоративного S3.
Способы определения параметров запуска
OPA Server поддерживает конфигурирование как с использованием YAML‑файла конфигурации, так и с переопределением конкретных параметров непосредственно в строке запуска.
В связи с этим были выделены основные параметры для настройки OPA Server:
Параметр | Значение | Описание |
| Не задается | Отключение телеметрии |
|
| Отключение подробного логирования запросов |
| Не задается | Запуск в режиме сервера |
|
| Основной слушающий адрес для запросов |
|
| Слушающий адрес для диагностики и метрик |
|
| Название внутреннего сервиса для метода |
|
| Название файла Bundle для применения политик к методу |
|
| Адрес S3 с путем до нужного бакета и директории |
|
| Путь на сервере до файла с учетными данными от S3 |
При использовании данных параметров OPA запустится в режиме сервера и будет слушать запросы с локального адреса, а также периодически обновлять политики в виде bundle с внешнего S3 хранилища.
5.3. Настройка Kafka + OPA Plugin
Плагин для Apache Kafka поставляет в виде.jar файла, который был немного изменен нами в связи с возникшими проблемами с кешем (подробнее в пункте 3.1).
Предварительно необходимо загрузить и установить OPA Kafka Plugin, это можно сделать любым из двух доступных способов:
поместить пакет в расширении .jar в каталог плагинов для Kafka (предварительно определив его в конфигурации Kafka);
поместить пакет в расширении .jar в classpath Kafka JVM.
Также необходимо провести настройку непосредственно самой Kafka, указав необходимый класс для авторизации, а также дополнительные параметры для него:
Параметр | Значение | Описание |
|
| Указание использования класса OPA Plugin для авторизации запросов |
|
| Политика по умолчанию при недоступности сервера OPA |
|
| Время жизни кеша запросов от OPA Plugin к OPA Server |
|
| Выделяемые размер кеша при запуске приложения |
|
| Максимальный размер кеша |
|
| URL OPA Server |
Важно отметить, что использование Kafka с механизмом SASL SSL (Kerberos) подразумевает отображение имен пользователей, как принципалов в домене Krb, на который выдан TGT. Но по умолчанию Kafka (по крайней мере версии 3.7) не умеет работать с двусоставными принципалами, поэтому необходимо дополнительно настроить маппинг, а также вынести брокеров и контроллеров в отдельную группу суперпользователей (администраторов) для того, чтобы для данной группы не проводилась дополнительная проверка — авторизация.
Параметр | Значение | Описание |
|
| Маппинг принципалов |
| Маппинг для двусоставных принципалов | |
| Маппинг для односоставных принципалов | |
|
| Выделенный пользователь/группа, с которыми не будет проводиться проверка политик доступа |
После данных настроек кластер Kafka будет направлять любые запросы от любых пользователей (кроме принципалов, которые маппятся как kafka+admins
) на сервер OPA для прохождения валидации.
Об особенностях работы Kafka в контекте авторизации
Kafka — кластерное решение, которое позволяет обращаться к любому из брокеров для предоставления информации по API и/или взаимодействия консьюмера и продюсера, но при этом авторизация работает только в рамках одной ноды: то есть, если вы обращаетесь к ноде Х, но необходимая информация или ресурс находятся на ноде Y, то проверка доступа — авторизация будет проводиться именно на ноде Y. Именно по этой причине было принято решение разворачивать функциональность OPA в том числе и на нодах KRaft.
6. Автоматизация сборки и доставки политик методом PaaC
Автоматизация представляет собой использование GitLab CI для реализации пошаговой сборки, проверки и деплоя на внешнее хранилище политик в виде bundle.
Pipeline выполняет шаги check и test для каждого коммита в репозиторий, чтобы удостовериться в корректности синтаксиса *.rego
файлов и провести тестирование (подробнее о тестировании — ниже), после чего в ветке разработчика позволяет в ручном режиме осуществить сборку (build) и доставку bundle (deploy) на целевое хранилище в тестовую локацию. Также настроено автоматическое удаление (remove) тестового bundle по истечении времени ожидания или при ручном триггере этого действия. Отправка политик в продуктивное расположение на целевом хранилище происходит только после одобрения и мержа MR в ветку по умолчанию репозитория.
6.1. Структура репозитория
Репозиторий содержит следующие типы файлов:
.gitlab-ci.yml
— файл пайплайна, реализующего проверку *.rego файлов, тестирование бандлов, их сборку и доставку на целевое хранилище;*.rego
— файлы описания политики (в этих файлах реализуется логика принятия решения) на языке rego;*.test.rego
— файлы для тестирования политик; описывают тест в формате ожидаемого результата работы политики и отправляемого вывода, написаны на языке rego. Файлы по данной маске не входят в целевой bundle;*.yml
,*.json
,*.yaml
— все файлы с этими расширениями и не являющиеся скрытыми (не начинающиеся с.
) будут добавлены в собираемый bundle в качестве data.json, при этом все файлы объединяются в один. Стратегия разрешения конфликтов не определена четко, поэтому отсутствие конфликтов должно обеспечиваться автором!
Для pipeline подготовлен шаблон (ops/kafka-policies/cicd-templates/opa-templates.yml
), содержащий основные шаги и позволяющий переиспользовать данные шаги и настраивать для каждого проекта, представляющего репозиторий с PaaC для каждого кластера Kafka (в примере: для кластера example-kafka-cluster
реализован pipeline в файле ops/kafka-policies/example-kafka-cluster/.gitlab-ci.yml
).
ops/kafka-policies/cicd-templates/opa-templates.yml
# ops/kafka-policies/cicd-templates/opa-templates.yml
variables:
OPA_IMAGE: openpolicyagent/opa:1.1.0-debug
MINIO_MC_IMAGE: minio/mc:RELEASE.2025-04-08T15-39-49Z-cpuv1
.opa_template:
image: $OPA_IMAGE
dependencies: []
.opa_check_job:
extends: .opa_template
stage: check
script:
- opa check ./ --format json
.opa_test_job:
extends: .opa_template
stage: test
script:
- opa build ./ --output bundle-tests.tar.gz --ignore '.*'
- opa test bundle-tests.tar.gz --verbose
.opa_build_job:
extends: .opa_template
stage: build
script:
- opa build ./ --output bundle.tar.gz --revision ${CI_COMMIT_SHORT_SHA} --ignore '.*' --ignore '*.test.rego'
artifacts:
paths:
- bundle.tar.gz
expire_in: 1 hour
.opa_deploy_job:
image: $MINIO_MC_IMAGE
stage: deploy
dependencies:
- build
script:
- |
mc alias set s3_alias https://$S3_HOST $S3_ACCESS_KEY $S3_SECRET_KEY && \
mc cp bundle.tar.gz s3_alias/${S3_BUCKET}/${CI_PROJECT_NAME}/bundle.tar.gz
.opa_remove_job:
image: $MINIO_MC_IMAGE
stage: remove
script:
- |
mc alias set s3_alias https://$S3_HOST $S3_ACCESS_KEY $S3_SECRET_KEY && \
mc rm --force s3_alias/${S3_BUCKET}/${CI_PROJECT_NAME}/bundle.tar.gz
ops/kafka-policies/example-kafka-cluster/.gitlab-ci.yml
# ops/kafka-policies/example-kafka-cluster/.gitlab-ci.yml
stages:
- check
- test
- build
- deploy
- remove
workflow:
rules:
- if: $CI_PIPELINE_SOURCE == "push"
- if: $CI_PIPELINE_SOURCE == "web"
- if: $CI_PIPELINE_SOURCE == "merge_request_event"
include:
- project: 'ops/kafka-policies/cicd-templates'
file: 'opa-templates.yml'
variables:
OPA_IMAGE: openpolicyagent/opa:1.1.0-debug
MINIO_MC_IMAGE: minio/mc:RELEASE.2025-04-08T15-39-49Z-cpuv1
check:
extends: .opa_check_job
test:
extends: .opa_test_job
build:
extends: .opa_build_job
rules:
- if: $CI_PIPELINE_SOURCE == "merge_request_event"
when: never
- if: $CI_PIPELINE_SOURCE == "push"
- if: $CI_PIPELINE_SOURCE == "web"
.opa_deploy_dev_env:
variables:
S3_BUCKET: kafka-policies-dev/${CI_COMMIT_REF_SLUG}
deploy-bundle-dev:
extends:
- .opa_deploy_dev_env
- .opa_deploy_job
environment:
name: dev
on_stop: remove-bundle-dev
auto_stop_in: 1 week
rules:
- if: $CI_COMMIT_BRANCH == $CI_DEFAULT_BRANCH
when: never
- if: $CI_PIPELINE_SOURCE == "merge_request_event"
when: never
- when: manual
remove-bundle-dev:
extends:
- .opa_deploy_dev_env
- .opa_remove_job
environment:
name: dev
action: stop
rules:
- if: $CI_COMMIT_BRANCH == $CI_DEFAULT_BRANCH
when: never
- if: $CI_PIPELINE_SOURCE == "merge_request_event"
when: never
- when: manual
.opa_deploy_prod_env:
variables:
S3_BUCKET: kafka-policies
deploy-bundle-prod:
extends:
- .opa_deploy_prod_env
- .opa_deploy_job
environment:
name: prod
rules:
- if: $CI_COMMIT_BRANCH == $CI_DEFAULT_BRANCH
Переменные*, использованные в pipeline, объявленные в разделе CI/CD Variables:
Имя переменной | Описание |
S3_HOST | S3 хост, который будет использован утилитой minio mc** |
S3_ACCESS_KEY | S3 access key, который будет использован утилитой minio mc |
S3_SECRET_KEY | S3 secret key, который будет использован утилитой minio mc |
* в нашем конкретном случае данные переменные хранятся в Hashicorp Vault, но для упрощения приведенного pipeline предполагаем, что они также размещены в CI/CD Variables Gitab.
** если Ваш s3 обладает самоподписанным сертификатом, то рекомендуется добавить его в образ minio/mc. В целях тестирования можно использовать флаг --insecure
для всех команд mc.
6.2. Написание тестов для политик
Иногда, при изменении логики политики или объемных изменениях в ACL полезным бывает написание unit‑тестов, в данном репозитории файлы тестов называются по маске *.test.rego
. Такая маска нужна для того, чтобы включить тесты на этапе тестов и исключить их нахождение в bundle, отправляемом на целевой удаленный сервер.
Тест имитирует ввод со стороны плагина Kafka, который должен попадать в OPA, а далее сравнивает ожидаемый результат от OPA и фактически полученный. Ошибочное завершение тестов вызывает ошибку pipeline и запрещает дальнейшие действия по распространению политики на удаленный сервер.
Простейший файл теста выглядит следующим образом:
package authz_test
import data.kafka.authz.allow
test_default_denied if {
not allow with input as {}
}
test_default_denied
— название теста, которое будет отображаться в логах стейджа test
. В данном случае тест проверяет, что пустой ввод со стороны Kafka приведет к отказу в доступе (not allow).
Следующий пример иллюстрирует более сложный ввод, а также то, что доступ пользователю выдан с правами вьюера:
test_authorized_viewer_user_to_describe_allowed if {
allow with input as {
"action": {
"operation": "DESCRIBE",
"resourcePattern": {
"name": "any-topic-name",
"resourceType": "TOPIC"
}
},
"requestContext": {
"listenerName": "SASL_SSL",
"principal": {
"name": "kafka+demouser1",
"principalType": "User"
},
"securityProtocol": "SASL_SSL"
}
}
}
В данном случае написание в примере значения поля name
в виде kafka+demouser1
не является опечаткой! Поскольку именно в таком виде Kafka передает принципал OPA (заменяя /
на +
в принципале, это ограничение Kafka).
Более подробно о написании тестов для OPA можно прочесть по ссылке Policy Testing.
6.3. Написание логики политик
Логика политик сосредоточена в файлах под маской .rego
, но не *.test.rego
, написание логики происходит на языке rego. Главным при изменении/добавлении логики является аккуратное тестирование на различных вариантах ввода, отсутствие таких тестов может привести к ложноположительным или ложноотрицательным срабатываниям политики относительно предполагаемого результата.
Для локального тестирования удобно использовать плагин для Visual Studio Code, но также локальное тестирование может проводиться с помощью команды opa eval
Особое внимание проверяющего MR следует обратить при изменении логики в репозитории!
Выводы
По итогам проведенной работы можно с уверенностью сказать, что поставленные цели были достигнуты, а именно:
настроена масштабируемая и удобно‑управляемая (с использование PaaC‑подхода) авторизация на кластере Kafka;
политики доступа хранятся в GitLab и защищены от несанкционированного изменения (использован GitOps‑подход, позволяющий иметь единую точку правды и контроль за изменениями политик, верификацию этих изменений в привычном для пользователей формате);
написан Pipeline, позволяющий выполнять предварительное тестирование, упаковку и доставку политик на целевое хранилище;
имеется возможность быстрого дооснащения имеющихся кластеров Kafka данной системой авторизации, а также развертывание новых кластеров вкупе с OPA.
Также большим плюсом можно считать возможность модернизации: добавление новых ролей для более гранулярного разграничения прав, либо же, наоборот, объединение нескольких ролей, если это потребуется в дальнейшем — в отличие от встроенных Kafka ACL, которые вообще не поддерживают RBAC.
Мы благодарим Вас за прочтение нашей статьи и надеемся, что она поможет Вам в решении Ваших задач.

Авторы:
Денис Куничкин (Denis Kunichkin) — ведущий инженер по внедрению, t2
Данила Маланьин (Danila Malanin) — ведущий инженер по внедрению, t2
Обновление плагина:
Александр Кириллов (Alexander Kirillov) — ведущий инженер по разработке сервисов больших данных и искусственного интеллекта, t2