Как стать автором
Обновить
86.88

Policy as Code в Apache Kafka: опыт внедрения Open Policy Agent

Уровень сложностиСредний
Время на прочтение24 мин
Количество просмотров1.1K

Статья рассматривает внедрение Open Policy Agent (OPA) для управления авторизацией в кластерах Apache Kafka на bare metal‑серверах.

В рамках статьи проанализированы ограничения стандартных ACL и предложено решение на основе Open Policy Agent (OPA), обеспечивающее декларативное управление доступом через Policy as Code (PaaC). Особое внимание уделено обновлению OPA Kafka Plugin: создан pull request, в котором устранены уязвимости привносимые в OPA библиотекой Guava и реализован переход на более производительную библиотеку Caffeine. Описан процесс интеграции OPA с Kafka, включая автоматизацию доставки политик через Bundle API и S3-хранилище.


1. Краткая постановка проблемы

На фоне опыта в развертывании нескольких кластеров Kafka появилось понимание того, что стандартные Access Control List'ы (ACL) неудобны по следующим причинам:

  • сложность в конфигурировании через одну из вспомогательных утилит Kafka (bash‑скрипты, поставляемые в комплекте);

  • необходимость вести реестр разрешений отдельно от ПО‑конфигуратора;

  • даже в инструменте RedPanda, который нивелирует первый недостаток и позволяет достаточно удобно управлять разрешениями в UI, отсутствует валидация применяемых изменений со стороны кого‑либо уполномоченного на валидацию таких изменений в момент их применения;

  • отсутствие полноценной поддержки CI\CD, позволяющего реализовать идемпотентное изменение политик доступа.

Таким образом родилась задача по поиску альтернативы, которая позволяла бы осуществлять авторизацию, обеспечивала возможность декларативного объявления политик, позволяла настроить универсальную и понятную пользователям автоматизацию обновления политик, а также могла работать с механизмом SASL_SSL + GSSAPI (Kerberos‑аутентификатором).

2. Обзор рассмотренных альтернатив

Начать анализ продуктов было решено с крупных проектов и затем, если те будут обладать критическими недостатками (исходя из постановки задачи в приведенном случае), рассматривать и менее популярные решения. Такое ранжирование решений было выбрано в том числе потому, что не на последнем месте стояли такие требования, как охват в комьюнити по решению в целом, его поддержка и развитие, а также наличие достаточно подробной документации.

2.1. Apache Ranger 

Apache Ranger— это фреймворк для обеспечения, мониторинга и управления комплексной безопасностью данных в первую очередь на платформе Hadoop, но также может быть интегрирован и с некоторыми другими продуктами Apache, в том числе и с Kafka. Однако инструмент ориентирован прежде всего на работу в UI или с REST API, политики, которые могут быть описаны в виде кода, из‑за универсальности инструмента, приобретают тяжело поддерживаемый человеком формат, если работать с ними именно как с кодом. Пример политики для Kafka.

2.2. OPA 

Open Policy Agent (OPA) — это инструмент, который позволяет «написать свою политику». Изначально позиционируется как облачный инструмент для управления политиками по принципу Policy as a Code (PaaC), имеет множество интеграций с различными системами (в том числе, за счет простоты интеграции), включая плагин для Kafka. Возможность написания собственной логики политики предоставляет широкие возможности для кастомизации и решения именно поставленной задачи. Однако, как написано в знаменитом приветственном слове утилиты sudo:

«С большой силой приходит большая ответственность»

Когда появляется возможность управления логикой разрешений на столь низком уровне, стоит очень внимательно относиться к тестированию полученных политик.

Именно на данном инструменте были сосредоточены дальнейшие усилия, в силу вводных, он был оценен как весьма перспективный для решения поставленной задачи.

3. Kafka + OPA

На официальном сайте OPA представлено руководство по интеграции OPA в качестве авторизатора Kafka. Данное руководство предлагает тестовый стенд, состоящий из Kafka (на Zookeeper), самого OPA, который развернут по отношению к Kafka внешним образом, а также web‑сервера Nginx, который выступает в качестве сервера распространения политик. Примечательным является то, что Kafka используется совместно с OPA Kafka Plugin, который представляет собой набор переопределенных методов базового класса org.apache.kafka.server.authorizer.Authorizer. Он обеспечивает передачу контекста в OPA‑сервер и получение от него ответа о разрешении или запрете действия, переданного ранее в контексте. Несмотря на постоянное развитие самого OPA и регулярные релизы, было обращено внимание на то, что на данный момент у плагина дата последнего релиза — 9 марта 2023 года (1.5.1). Также возникли проблемы с библиотекой Guava при установке плагина на Kafka 3.7. Поэтому было принято решение обновить плагин и создать PR в официальный репозиторий. Подробнее будет описано в разделе ниже.

3.1. OPA Kafka Plugin 1.5.2

Кроме конфликта зависимостей с библиотекой Guava в Kafka 3.7 и OPA Kafka Plugin, потребность обновить версию OPA Kafka Plugin обусловлена наличием уязвимостей в Guava 30.1-jre:

Уязвимости были исправлены в master ветке OPA Kafka Plugin, но релиз (1.5.2) с ними на момент написания статьи так и не вышел.

Автор обновления плагина за работой
Автор обновления плагина за работой

В процессе анализа исходного кода OPA Kafka Plugin обнаружили, что из всего большого набора утилит Guava, используется только функциональность кеширования. Однако для локального кеширования в JVM на текущий момент есть лучшая альтернатива GuavaCaffeine.

В чем его преимущества:

После замены Guava на Caffeine в OPA Kafka Plugin и заведения PR (на момент выхода статьи еще не смержен мейнтейнером OPA Kafka Plugin):

  • сократился размер fat‑jar OPA Kafka Plugin (с 2.9 MB до 917 Kb);

  • ухудшения производительности плагина на нашем workload не было обнаружено;

  • уязвимости и конфликт зависимостей с Kafka 3.7 были устранены.

3.2. Рассмотрение различных вариантов развертывания OPA-сервер, их преимуществ и недостатков

Как было сказано выше, в примере интеграции OPA и Kafka на официальном сайте представлен вариант, когда OPA развернут в качестве внешнего (по отношению к Kafka) сервера. Учитывая, что OPA не является кластеризируемым продуктом, обеспечение отказоустойчивости ложится на плечи конечного пользователя или же дополнительного ПО (как в случае с k8s оператором). OPA может быть развернут как на машинах, так и с использованием средств контейнеризации и оркестрации (Docker image, k8s operator).

Также OPA может быть импортирован в качестве библиотеки на Go или же установлен локально на машине, где он будет использован. При этом на официальном сайте приведены рекомендации по размещению OPA максимально «близко» к сервису, который его использует.

Исходя из соображений минимизации использования сетевого канала, а также отказоустойчивости для ситуации, когда происходит потеря сетевого соединения между Kafka и внешней OPA‑инсталляции, было принято решение о развертывании OPA сервера на каждом брокере Kafka и контроллере KRaft, а обращения плагина направлять по loopback‑интерфейсу. Сам сервер развернут в качестве исполняемого файла и обернут в systemd‑модуль.

4. Предоставление политик для OPA Server

OPA сервер, который принимает решение о предоставлении доступа, опирается на политики, переданные данному серверу. Политики могут предоставляться в нескольких форматах, а также иметь разные способы распространения. В рамках статьи будут рассмотрены только несколько основных методов, которые использовались при настройке интеграции OPA + Kafka.

4.1. Рассматриваемые форматы политик 

OPA поддерживает передачу файлов с политиками и данными, на основе которых происходит принятие решений, (policy.rego, data.json) — как аргументы запуска сервера, а также использование предварительно собранных bundle с данными файлами.

4.1.1. Использование явных файлов политик 

При использовании явных локальных файлов политики необходимо обеспечить их идентичность и возможность централизованного обновления для каждого из серверов (т.к. OPA не является кластерезуемым решением, и в этом решении отсутствует единая точка распространения политик). Данный способ также поддерживает обновление активных политик «на лету» — без рестарта сервиса, но, имея возможность разместить политики на внешнем хранилище, лучше рассмотреть следующий способ.

4.1.2. Использование bundle с политиками

Bundle представляет собой архив, в который уже добавлены все политики с дата‑файлами по каждому из сервисов (либо для одного сервиса). Данный метод является предпочтительным, потому что он поддерживает возможность автоматизированного обновления политик из внешних источников, но при этом добавляет сложность в виде необходимости предварительной сборки bundle. Несмотря на сложности со сборкой, было принято решение использовать именно данный метод предоставления политик.

4.2. Методы сборки bundle 

Будет рассмотрено два основных способа сборки bundle политик для сервера OPA:

  • builder в составе OPA Operator (на кластере K8s);

  • использование пакета OPA для ручной сборки.

4.2.1. Сборка с использованием OPA Operator

Использование оператора в k8s потенциально могло бы решить проблему со сборкой и распространением bundle, а также их декларативным объявлением с помощью ConfigMap и их деплоем посредством GitOps‑инструмента, например, ArgoCD. Но было принято решение отказаться от данного метода, конкретную реализацию и причины можно прочитать отдельно под спойлером.

Сборка с использованием OPA Operator

Для экспериментов был выбран Stackable Operator for OPA (OpenPolicyAgent), который позволяет объявить CRD OpaCluster, который порождает DaemonSet с OPA, а также обрабатывает ConfigMap'ы с label opa.stackable.tech/bundle: "true", у каждого Pod'а, порожденного DaemonSet'ом, присутствует два контейнера:

  • opa — контейнер с OPA, который запущен в режиме сервера, а bundle получает от контейнера bundle‑builder;

  • bundle‑builder — контейнер для внутренних нужд, который собирает bundle на основе политик, размещенных в ConfigMap (все политики собираются в единый bundle, поскольку OPA должна работать с одним bundle). Данный контейнер предназначен для внутреннего использования и недоступен по умолчанию для обращения напрямую, лишь из контейнера opa.

Подразумевается, что сервисы k8s могут обращаться по DNS‑имени Service для получения «решения» на основе политик, загруженных в OPA, однако в рассматриваемом случае куда больший интерес представляла возможность обращения напрямую к bundle‑builder для получения bundle.tar.gz для последующего использования в OPA, установленного на Kafka. Для этого манифесты развертывания CRD OpaCluster были дополнены следующими абстракциями.

Не рекомендуется к использованию именно в приведенном виде, поскольку данные манифесты предназначались для тестирования!

# You can create new cluster simply copy-paste this file and replace 
# 1. "opa-example-cluster.mycorp.local" to your ingress address
# 2. "opa-example-cluster" to your new cluster name

---
# OPA cluster settings
apiVersion: opa.stackable.tech/v1alpha1
kind: OpaCluster
metadata:
  name: opa-example-cluster
  namespace: opa-example-cluster
spec:
  image:
    productVersion: "1.0.1"
    stackableVersion: "0.0.0-dev"
    repo: "stackable"
  servers:
    roleGroups:
      default:
        config:
          resources:
            cpu:
              min: 250m
              max: 500m
            memory:
              limit: 4Gi

---
# Ingress (optional)
apiVersion: networking.k8s.io/v1
kind: Ingress
metadata:
  name: opa-example-cluster
  namespace: opa-example-cluster
spec:
  ingressClassName: nginx
  rules:
  - host: opa-example-cluster.mycorp.local
    http:
      paths:
      - backend:
          service:
            name: opa-example-cluster
            port:
              number: 8081
        path: /
        pathType: ImplementationSpecific

---
apiVersion: v1
kind: Service
metadata:
  name: opa-example-cluster-policy-builder
  namespace: opa-example-cluster
spec:
  ports:
  - name: http
    port: 3030
    protocol: TCP
    targetPort: 3030
  selector:
    app.kubernetes.io/component: server
    app.kubernetes.io/instance: opa-example-cluster
    app.kubernetes.io/name: opa
  type: ClusterIP

---
# Ingress for bundles (optional)
apiVersion: networking.k8s.io/v1
kind: Ingress
metadata:
  name: opa-example-cluster-bundles
  namespace: opa-example-cluster
spec:
  ingressClassName: nginx
  rules:
  - host: opa-example-cluster.mycorp.local
    http:
      paths:
      - backend:
          service:
            name: opa-example-cluster-policy-builder
            port:
              number: 3030
        path: /opa/v1/opa/
        pathType: ImplementationSpecific

В приведенном случае появлялась возможность получения bundle.tar.gz по ссылке https://opa‑example‑cluster.mycorp.local/opa/v1/opa/bundle.tar.gz, которую можно было указать для OPA на Kafka, после чего bundle мог использоваться.

Объявление политики в этом случае представляло собой объявление ConfigMap. Пример приведен ниже:

---
apiVersion: v1
kind: ConfigMap
metadata:
  name: kafka
  labels:
    opa.stackable.tech/bundle: "true"
data:
  data.yaml: |
    ---
    cluster_admins:
      - kafka/demouser1
    cluster_viewers:
      - kafka/demouser2

    topics_rules:
      - topic: demo
        users:
        - username: kafka/demouser2
          permissions:
          - topic_write
          - topic_read


  kafka.rego: |
    package kafka.authz


    ## POLICY START
    # Default policy: deny everything that is not explicitly allowed
    default allow := false

    # High-level allow rules
    allow if {
        allow_cluster_admin
    }
    allow if {
        allow_cluster_viewer
    }
    allow if {
        allow_topic_opertions
    }

    allow if {
        allow_non_topic_operations
    }
    ## POLICY END

    ## PERMISSIONS CONFIGURATION START
    # Static permissions for kafka actions
    CLUSTER_VIEWER_PERMISSIONS := {
        "TOPIC": ["DESCRIBE", "DESCRIBE_CONFIGS"],
        "GROUP": ["READ", "DESCRIBE"],
        "CLUSTER": ["DESCRIBE", "DESCRIBE_CONFIGS"]
    }

    # Key of JSON match names used in topics_rules.users.permissions
    USER_PERMISSIONS := {
        "read": 
        {
            "TOPIC": ["READ", "DESCRIBE"],
            "GROUP": ["READ", "DESCRIBE"],
            "TRANSACTIONAL_ID": ["DESCRIBE", "WRITE"]
        },
        "write": 
        {
            "TOPIC": ["WRITE", "DESCRIBE"],
            "GROUP": ["READ", "DESCRIBE"],
            "TRANSACTIONAL_ID": ["DESCRIBE", "WRITE"],
            "CLUSTER": ["IDEMPOTENT_WRITE"]
        }
    }

    ## PERMISSIONS CONFIGURATION END

    ## GET CONFIG FROM JSON START
    # Get data from policy
    topics_rules := data.topics_rules
    cluster_admins := data.cluster_admins
    cluster_viewers := data.cluster_viewers

    ## GET CONFIG FROM JSON END

    ## POLICY DETERMINATOR START
    # Low-level functions for evaluate roles and permissions

    allow_cluster_admin if {
        some cluster_admin in cluster_admins
        # Match cluster admin user
        input.requestContext.principal.name == replace(cluster_admin, "/", "+")
    }

    allow_cluster_viewer if {
        some cluster_viewer in cluster_viewers
        # Match cluster viewer user
        input.requestContext.principal.name == replace(cluster_viewer, "/", "+")
        # Match permissions
        input.action.operation == CLUSTER_VIEWER_PERMISSIONS[input.action.resourcePattern.resourceType][_]
    }

    allow_topic_opertions if {
        some topic_rules in topics_rules
        # Match topic name
        input.action.resourcePattern.name == topic_rules.topic
        some user in topic_rules.users
        # Match topic user
        input.requestContext.principal.name == replace(user.username, "/", "+")
        some permission in user.permissions
        # Match permissions for topic resources
        input.action.operation == USER_PERMISSIONS[permission][input.action.resourcePattern.resourceType][_]
    }

    allow_non_topic_operations if {
        some topic_rules in topics_rules
        some user in topic_rules.users
        # Match user
        input.requestContext.principal.name == replace(user.username, "/", "+")
        some permission in user.permissions
        # Match permissions for non topic resources
        input.action.resourcePattern.resourceType != "TOPIC"
        input.action.operation == USER_PERMISSIONS[permission][input.action.resourcePattern.resourceType][_]
    }
    # ## POLICY DETERMINATOR END

При этом внутренняя структура формируемого bundle получалась следующая:

bundle.tar.gz
└── bundles
    └── kafka
        ├── data.yaml
        └── kafka.rego

Особенность работы оператора заключается в том, что наименование директории, в которой будут располагаться файлы политики (policy.rego и data.json файлы), будет соответствовать наименованию ConfigMap (в приведенном примере — kafka).

Основными причинами, по которым данный подход не получил развития в статье, являются следующие два факта:

  • со стороны оператора нет возможности осуществить валидацию на стадии написания ConfigMap. т. е. неверная с точки зрения OPA и верная с точки зрения синтаксиса k8s ConfigMap не позволит собраться новой версии bundle. Тем не менее, в силу особенностей OPA, продолжит работать предыдущая (успешно собранная) версия bundle, однако доставка изменений политик будет ограничена в этот момент;

  • второй недостаток был обнаружен в ходе тестирования, заключается он в том, что удаление ConfigMap с политикой из k8s не приводит к удалению политики из bundle. Пример:

    bundle.tar.gz
    └── bundles
        ├── demo-prod-kafka
        │   ├── data.json
        │   └── kafka.rego
        ├── kafka
        │   ├── data.json
        │   ├── data.yaml
        │   └── kafka.rego
        ├── kafka-demo
        │   ├── data.json
        │   └── kafka.rego
        └── kafkademoprod
            ├── data.json
            └── kafka.rego

Так выглядит bundle, отдаваемый bundle‑builder'ом после того, как были созданы и удалены ConfigMap'ы demo‑prod‑kafka, kafka‑demo, kafkademoprod. ConfigMap kafka, хоть и присутствует, но был изменен data.json на data.yaml, тем не менее обе версии присутствуют в bundle.

4.2.2. Локальная сборка с использованием бинарного файла OPA

Bundle может быть собран вручную с использованием пакета OPA, такой способ наиболее прост и понятен при использовании с разными службами, но требует отдельный образ и/или хост для осуществления сборки.

Сборка осуществляется с выбором директории, в которой находятся файлы *.rego и *.yaml\*.yml\*.json, также возможно указать дополнительные параметры; пример использования:

opa build ./ --output bundle-tests.tar.gz --ignore '.gitlab-ci.yml*'

При этом структура bundle будет несколько отличаться от собираемого с использованием OPA Operator:

bundle.tar.gz
├── data.json
├── kafka.rego
└── .manifest

Было принято решение использовать данный метод в связи с большим выбором дополнительных параметров и кастомизации, а также в связи со спецификой сетевой архитектуры компании. Решение с автоматической сборкой и деплоем в хранилище bundle будет описано далее (PaaC + GitLab CI).

4.3. Способы динамического обновления политик 

Доставка политик из внешних источников для OPA Server может осуществляться различными способами, которые подробно описаны в статье, один из них — Bundle API (в режиме pull), также поддерживается режим динамического обновления политики при работе с файлами, эти способы и будут рассмотрены ниже.

4.3.1. Обновление файлов политики

При использовании локальных файлов политик и данных также можно указать дополнительный параметр --watch, чтобы сервер ОРА следил за обновлением локальных файлов и мог без перезапуска сервиса обновлять текущие политики.

4.3.2. Использование метода Bundle API

Данный метод позволяет указать внешнее хранилище для bundle, к которому будет обращаться сервер OPA для обновления политики. Внешним хранилищем может выступать HTTP‑сервер, S3-bucket, Google Cloud Storage и т. д., а также в конфигурации имеется поддержка указания нескольких внешних источников для реализации резервирования поставщиков политик. В статье будет рассмотрен вариант с HTTP‑сервером и S3-bucket, о которых написано далее.

Именно такой способ распространения bundle предлагается в примере интеграции OPA и Kafka на официальном сайте OPA.

4.3.2.1. Использование внешнего HTTP(s) сервера

Загрузка и распространение bundle с помощью HTTP‑сервера работают; более того это подтверждает то, что приведено в документации OPA: при обращении к серверу распространения bundle OPA предварительно запрашивает hash‑сумму целевого файла (в своих логах данная hash‑сумма называется Etag и в приведенном случае использует алгоритм SHA-1) и осуществляет загрузку файла лишь в том случае, если hash‑сумма на сервере не совпала с представленной локально у OPA.

Также сервер OPA в случае потери соединения с сервером распространения политик или же при отсутствии запрашиваемого файла, будет использовать текущую активную политику, в случае успеха загрузки bundle, он будет использован далее.

Причина, по которой было решено отказаться от данного варианта, состоит в том, что в официальной документации OPA большее количество времени уделено варианту с AWS S3 хранилищем (но также может быть использовано S3-совместимое), и такой вариант позиционируется как целевой.

4.3.2.2. Использование внешнего S3 хранилища

Интеграция с S3 также работает без проблем, но есть небольшие нюансы, связанные с использованием Minio вместо AWS S3. Задача заключалась том, чтобы реализовать хранение bundle на корпоративном S3, не используя облачные решения, а также дополнительно разбить кластеры Kafka для использования разных S3 Bucket (и GitLab репозиториев со своим GitLab CI). Про особенности использования Minio будет расписано в разделе реализации, ведь именно данный способ был выбран целевым.

5. Наша реализация на Kafka 3.7

Для тестирования интеграции был развернут тестовый стенд, схожий по версиям компонентов и архитектуре с продуктовым. Основные характеристики и спецификация кластера Kafka:

  • использование BareMetal‑серверов как брокеров;

  • использование VM‑серверов для KRaft;

  • использование S3 Minio как сервер распространения политик для OPA;

  • версия используемого пакета Kafka — 3.7.

Тестовый кластер Apache Kafka состоит из 8 серверов: 3 KRaft + 5 Broker. Архитектура кластера не предоставляет возможность вынести на отдельный сервер функциональность OPA Server, поэтому было принято решение устанавливать OPA непосредственно на сервера Kafka 1 в 1 (подробнее было упомянуто в пункте 3.2).

Тип сервера

ОС

Kafka Role

OPA Server

Кол‑во

VM

RedOS

Controller

Server+Plugin

3 шт.

BareMetal

RedOS

Broker

Server+Plugin

5 шт.

5.2. Настройка OPA Server

Настройку и развертывание OPA Server было принято решение реализовать с использованием Ansible (как и основной инструмент развертывания кластера Kafka). Настройка производилась с учетом того, что будет использоваться схема с Bundle‑политиками и сервером распространения в виде корпоративного S3.

Способы определения параметров запуска

OPA Server поддерживает конфигурирование как с использованием YAML‑файла конфигурации, так и с переопределением конкретных параметров непосредственно в строке запуска.

В связи с этим были выделены основные параметры для настройки OPA Server:

Параметр

Значение

Описание

disable-telemetry

Не задается

Отключение телеметрии

decision_logs.console

false

Отключение подробного логирования запросов

server

Не задается

Запуск в режиме сервера

addr

localhost:8181

Основной слушающий адрес для запросов

diagnostic-addr

0.0.0.0:9181

Слушающий адрес для диагностики и метрик

bundles.authz.service

s3

Название внутреннего сервиса для метода authz

bundles.authz.resource

bundle.tar.gz

Название файла Bundle для применения политик к методу authz

services.s3.url

https://s3.mycorp.local/kafka‑policies/

Адрес S3 с путем до нужного бакета и директории

services.s3.credentials.s3_signing.profile_credentials.path

/opt/opa/.s3_cred

Путь на сервере до файла с учетными данными от S3

При использовании данных параметров OPA запустится в режиме сервера и будет слушать запросы с локального адреса, а также периодически обновлять политики в виде bundle с внешнего S3 хранилища.

5.3. Настройка Kafka + OPA Plugin

Плагин для Apache Kafka поставляет в виде.jar файла, который был немного изменен нами в связи с возникшими проблемами с кешем (подробнее в пункте 3.1).

Предварительно необходимо загрузить и установить OPA Kafka Plugin, это можно сделать любым из двух доступных способов:

  • поместить пакет в расширении .jar в каталог плагинов для Kafka (предварительно определив его в конфигурации Kafka);

  • поместить пакет в расширении .jar в classpath Kafka JVM.

Также необходимо провести настройку непосредственно самой Kafka, указав необходимый класс для авторизации, а также дополнительные параметры для него:

Параметр

Значение

Описание

authorizer.class.name

org.openpolicyagent.kafka.OpaAuthorizer

Указание использования класса OPA Plugin для авторизации запросов

opa.authorizer.allow.on.error

False

Политика по умолчанию при недоступности сервера OPA

opa.authorizer.cache.expire.after.seconds

60

Время жизни кеша запросов от OPA Plugin к OPA Server

opa.authorizer.cache.initial.capacity

50000

Выделяемые размер кеша при запуске приложения

opa.authorizer.cache.maximum.size

50000

Максимальный размер кеша

opa.authorizer.url

http://localhost:8181/v1/data/kafka/authz/allow

URL OPA Server

Важно отметить, что использование Kafka с механизмом SASL SSL (Kerberos) подразумевает отображение имен пользователей, как принципалов в домене Krb, на который выдан TGT. Но по умолчанию Kafka (по крайней мере версии 3.7) не умеет работать с двусоставными принципалами, поэтому необходимо дополнительно настроить маппинг, а также вынести брокеров и контроллеров в отдельную группу суперпользователей (администраторов) для того, чтобы для данной группы не проводилась дополнительная проверка — авторизация.

Параметр

Значение

Описание

sasl.kerberos.principal.to.local.rules

RULE:[2:$1+$2@$0](kafka[+]kafka-[0-9]*.mycorp.local@MYCORP.LOCAL)s/(.*)/kafka+admins/

Маппинг принципалов kafka/kafka-[0-9]*.mycorp.local в отдельную группу kafka+admins

RULE:[2:$1+$2@$0](.*@MYCORP.LOCAL)s/@MYCORP.LOCAL//

Маппинг для двусоставных принципалов

RULE:[1:$1@$0](.*@MYCORP.LOCAL)s/@MYCORP.LOCAL//

Маппинг для односоставных принципалов

super.users

User:kafka+admins

Выделенный пользователь/группа, с которыми не будет проводиться проверка политик доступа

После данных настроек кластер Kafka будет направлять любые запросы от любых пользователей (кроме принципалов, которые маппятся как kafka+admins) на сервер OPA для прохождения валидации.

Об особенностях работы Kafka в контекте авторизации

Kafka — кластерное решение, которое позволяет обращаться к любому из брокеров для предоставления информации по API и/или взаимодействия консьюмера и продюсера, но при этом авторизация работает только в рамках одной ноды: то есть, если вы обращаетесь к ноде Х, но необходимая информация или ресурс находятся на ноде Y, то проверка доступа — авторизация будет проводиться именно на ноде Y. Именно по этой причине было принято решение разворачивать функциональность OPA в том числе и на нодах KRaft.

6. Автоматизация сборки и доставки политик методом PaaC

Автоматизация представляет собой использование GitLab CI для реализации пошаговой сборки, проверки и деплоя на внешнее хранилище политик в виде bundle.

Pipeline выполняет шаги check и test для каждого коммита в репозиторий, чтобы удостовериться в корректности синтаксиса *.rego файлов и провести тестирование (подробнее о тестировании — ниже), после чего в ветке разработчика позволяет в ручном режиме осуществить сборку (build) и доставку bundle (deploy) на целевое хранилище в тестовую локацию. Также настроено автоматическое удаление (remove) тестового bundle по истечении времени ожидания или при ручном триггере этого действия. Отправка политик в продуктивное расположение на целевом хранилище происходит только после одобрения и мержа MR в ветку по умолчанию репозитория.

6.1. Структура репозитория

Репозиторий содержит следующие типы файлов:

  • .gitlab-ci.yml — файл пайплайна, реализующего проверку *.rego файлов, тестирование бандлов, их сборку и доставку на целевое хранилище;

  • *.rego — файлы описания политики (в этих файлах реализуется логика принятия решения) на языке rego;

  • *.test.rego — файлы для тестирования политик; описывают тест в формате ожидаемого результата работы политики и отправляемого вывода, написаны на языке rego. Файлы по данной маске не входят в целевой bundle;

  • *.yml, *.json, *.yaml — все файлы с этими расширениями и не являющиеся скрытыми (не начинающиеся с .) будут добавлены в собираемый bundle в качестве data.json, при этом все файлы объединяются в один. Стратегия разрешения конфликтов не определена четко, поэтому отсутствие конфликтов должно обеспечиваться автором!

Для pipeline подготовлен шаблон (ops/kafka-policies/cicd-templates/opa-templates.yml), содержащий основные шаги и позволяющий переиспользовать данные шаги и настраивать для каждого проекта, представляющего репозиторий с PaaC для каждого кластера Kafka (в примере: для кластера example-kafka-cluster реализован pipeline в файле ops/kafka-policies/example-kafka-cluster/.gitlab-ci.yml).

ops/kafka-policies/cicd-templates/opa-templates.yml
# ops/kafka-policies/cicd-templates/opa-templates.yml

variables:
  OPA_IMAGE: openpolicyagent/opa:1.1.0-debug
  MINIO_MC_IMAGE: minio/mc:RELEASE.2025-04-08T15-39-49Z-cpuv1


.opa_template:
  image: $OPA_IMAGE
  dependencies: []

.opa_check_job:
  extends: .opa_template
  stage: check
  script:
    - opa check ./ --format json

.opa_test_job:
  extends: .opa_template
  stage: test
  script:
    - opa build ./ --output bundle-tests.tar.gz --ignore '.*'
    - opa test bundle-tests.tar.gz --verbose

.opa_build_job:
  extends: .opa_template
  stage: build
  script:
    - opa build ./ --output bundle.tar.gz --revision ${CI_COMMIT_SHORT_SHA} --ignore '.*' --ignore '*.test.rego'
  artifacts:
    paths:
      - bundle.tar.gz
    expire_in: 1 hour

.opa_deploy_job:
  image: $MINIO_MC_IMAGE
  stage: deploy
  dependencies:
    - build
  script:
    - |
      mc alias set s3_alias https://$S3_HOST $S3_ACCESS_KEY $S3_SECRET_KEY && \
      mc cp bundle.tar.gz s3_alias/${S3_BUCKET}/${CI_PROJECT_NAME}/bundle.tar.gz

.opa_remove_job:
  image: $MINIO_MC_IMAGE
  stage: remove
  script:
    - |
      mc alias set s3_alias https://$S3_HOST $S3_ACCESS_KEY $S3_SECRET_KEY && \
      mc rm --force s3_alias/${S3_BUCKET}/${CI_PROJECT_NAME}/bundle.tar.gz
ops/kafka-policies/example-kafka-cluster/.gitlab-ci.yml
# ops/kafka-policies/example-kafka-cluster/.gitlab-ci.yml

stages:
  - check
  - test
  - build
  - deploy
  - remove

workflow:
  rules:
    - if: $CI_PIPELINE_SOURCE == "push"
    - if: $CI_PIPELINE_SOURCE == "web"
    - if: $CI_PIPELINE_SOURCE == "merge_request_event"


include:
  - project: 'ops/kafka-policies/cicd-templates'
    file: 'opa-templates.yml'

variables:
  OPA_IMAGE: openpolicyagent/opa:1.1.0-debug
  MINIO_MC_IMAGE: minio/mc:RELEASE.2025-04-08T15-39-49Z-cpuv1


check:
  extends: .opa_check_job


test:
  extends: .opa_test_job


build:
  extends: .opa_build_job
  rules:
    - if: $CI_PIPELINE_SOURCE == "merge_request_event"
      when: never
    - if: $CI_PIPELINE_SOURCE == "push"
    - if: $CI_PIPELINE_SOURCE == "web"


.opa_deploy_dev_env:
  variables:
    S3_BUCKET: kafka-policies-dev/${CI_COMMIT_REF_SLUG}

deploy-bundle-dev:
  extends:
    - .opa_deploy_dev_env
    - .opa_deploy_job
  environment:
    name: dev
    on_stop: remove-bundle-dev
    auto_stop_in: 1 week
  rules:
    - if: $CI_COMMIT_BRANCH == $CI_DEFAULT_BRANCH
      when: never
    - if: $CI_PIPELINE_SOURCE == "merge_request_event"
      when: never
    - when: manual


remove-bundle-dev:
  extends:
    - .opa_deploy_dev_env
    - .opa_remove_job
  environment:
    name: dev
    action: stop
  rules:
    - if: $CI_COMMIT_BRANCH == $CI_DEFAULT_BRANCH
      when: never
    - if: $CI_PIPELINE_SOURCE == "merge_request_event"
      when: never
    - when: manual


.opa_deploy_prod_env:
  variables:
    S3_BUCKET: kafka-policies

deploy-bundle-prod:
  extends:
    - .opa_deploy_prod_env
    - .opa_deploy_job
  environment:
    name: prod
  rules:
    - if: $CI_COMMIT_BRANCH == $CI_DEFAULT_BRANCH

Переменные*, использованные в pipeline, объявленные в разделе CI/CD Variables:

Имя переменной

Описание

S3_HOST

S3 хост, который будет использован утилитой minio mc**

S3_ACCESS_KEY

S3 access key, который будет использован утилитой minio mc

S3_SECRET_KEY

S3 secret key, который будет использован утилитой minio mc

* в нашем конкретном случае данные переменные хранятся в Hashicorp Vault, но для упрощения приведенного pipeline предполагаем, что они также размещены в CI/CD Variables Gitab.

** если Ваш s3 обладает самоподписанным сертификатом, то рекомендуется добавить его в образ minio/mc. В целях тестирования можно использовать флаг --insecure для всех команд mc.

6.2. Написание тестов для политик

Иногда, при изменении логики политики или объемных изменениях в ACL полезным бывает написание unit‑тестов, в данном репозитории файлы тестов называются по маске *.test.rego. Такая маска нужна для того, чтобы включить тесты на этапе тестов и исключить их нахождение в bundle, отправляемом на целевой удаленный сервер.

Тест имитирует ввод со стороны плагина Kafka, который должен попадать в OPA, а далее сравнивает ожидаемый результат от OPA и фактически полученный. Ошибочное завершение тестов вызывает ошибку pipeline и запрещает дальнейшие действия по распространению политики на удаленный сервер.

Простейший файл теста выглядит следующим образом:

package authz_test
 
import data.kafka.authz.allow
 
test_default_denied if {
  not allow with input as {}
}

test_default_denied — название теста, которое будет отображаться в логах стейджа test. В данном случае тест проверяет, что пустой ввод со стороны Kafka приведет к отказу в доступе (not allow).

Следующий пример иллюстрирует более сложный ввод, а также то, что доступ пользователю выдан с правами вьюера:

test_authorized_viewer_user_to_describe_allowed if {
  allow with input as {
    "action": {
      "operation": "DESCRIBE",
      "resourcePattern": {
        "name": "any-topic-name",
        "resourceType": "TOPIC"
      }
    },
    "requestContext": {
      "listenerName": "SASL_SSL",
      "principal": {
        "name": "kafka+demouser1",
        "principalType": "User"
      },
      "securityProtocol": "SASL_SSL"
    }
  }
}

В данном случае написание в примере значения поля name в виде kafka+demouser1 не является опечаткой! Поскольку именно в таком виде Kafka передает принципал OPA (заменяя / на + в принципале, это ограничение Kafka).

Более подробно о написании тестов для OPA можно прочесть по ссылке Policy Testing.

6.3. Написание логики политик

Логика политик сосредоточена в файлах под маской .rego, но не *.test.rego, написание логики происходит на языке rego. Главным при изменении/добавлении логики является аккуратное тестирование на различных вариантах ввода, отсутствие таких тестов может привести к ложноположительным или ложноотрицательным срабатываниям политики относительно предполагаемого результата.

Для локального тестирования удобно использовать плагин для Visual Studio Code, но также локальное тестирование может проводиться с помощью команды opa eval

Особое внимание проверяющего MR следует обратить при изменении логики в репозитории!

Выводы

По итогам проведенной работы можно с уверенностью сказать, что поставленные цели были достигнуты, а именно:

  • настроена масштабируемая и удобно‑управляемая (с использование PaaC‑подхода) авторизация на кластере Kafka;

  • политики доступа хранятся в GitLab и защищены от несанкционированного изменения (использован GitOps‑подход, позволяющий иметь единую точку правды и контроль за изменениями политик, верификацию этих изменений в привычном для пользователей формате);

  • написан Pipeline, позволяющий выполнять предварительное тестирование, упаковку и доставку политик на целевое хранилище;

  • имеется возможность быстрого дооснащения имеющихся кластеров Kafka данной системой авторизации, а также развертывание новых кластеров вкупе с OPA.

Также большим плюсом можно считать возможность модернизации: добавление новых ролей для более гранулярного разграничения прав, либо же, наоборот, объединение нескольких ролей, если это потребуется в дальнейшем — в отличие от встроенных Kafka ACL, которые вообще не поддерживают RBAC.


Мы благодарим Вас за прочтение нашей статьи и надеемся, что она поможет Вам в решении Ваших задач.

Авторы материала
Авторы материала

Авторы:

Денис Куничкин (Denis Kunichkin) — ведущий инженер по внедрению, t2

Данила Маланьин (Danila Malanin) — ведущий инженер по внедрению, t2

Обновление плагина:

Александр Кириллов (Alexander Kirillov) — ведущий инженер по разработке сервисов больших данных и искусственного интеллекта, t2

Теги:
Хабы:
+7
Комментарии10

Публикации

Информация

Сайт
t2.ru
Дата регистрации
Дата основания
Численность
5 001–10 000 человек
Местоположение
Россия