Event-driven архитектура в Kubernetes
Kubernetes, как система оркестрации, позволяет автоматизировать процесс развертывания сложных приложений и восстанавливать ожидаемое состояние кластера после сбоев. В общем случае приложение представляет собой резидентно запущенные контейнеры, которые обрабатывают запросы клиентов в цикле обработки событий, при этом при росте нагрузки могут создаваться дополнительные реплики (с использованием механизма Horizontal Pod Autoscaling). Однако, нередко бывают случаи, когда сервис используется не очень часто, но при этом в запущенном состоянии он забирает большое количество оперативной памяти или процессорного времени, и желательно обеспечить механизм запуска сервиса по запросу (или по внешнему событию). Для реализации такого варианта использования сейчас доступен инструмент knative, который был принят в марте 2022 года в качестве incubating-проекта в CNCF (Cloud Native Computing Foundation). В этой статье мы разберемся с основными понятиями knative и попробуем создать архитектуру приложения, основанную на событиях, с использованием eventing-возможностей knative.
Прежде всего отметим, что knative реализует модель "функция как сервис" по аналогии с AWS Lambda, Google Cloud Functions и другими похожими реализациями. KNative обеспечивает собственный механизм масштабирования (опирается на HPA), который ориентируется на запрос ресурсов экземплярами сервиса (функциями) и количество запросов. KNative может работать с существующим кластером Kubernetes, а также использовать один из вариантов установки на собственный компьютер (minikube, microk8s, …). В случае с microk8s его можно разрешить как плагин:
snap install microk8s --classic
microk8s enable community
microk8s enable knative
microk8s start
Для установки на кластер Kubernetes можно использовать Yaml-файлы для Serving (отвечает за запуск, регистрацию и удаление функций) и Eventing (управление очередями сообщений). В первой части мы будем иметь дело только с Serving:
kubectl apply -f https://github.com/knative/serving/releases/download/knative-v1.6.0/serving-crds.yaml
kubectl apply -f https://github.com/knative/serving/releases/download/knative-v1.6.0/serving-core.yaml
kubectl apply -f https://github.com/knative/net-kourier/releases/download/knative-v1.6.0/kourier.yaml
kubectl patch configmap/config-network \
--namespace knative-serving \
--type merge \
--patch '{"data":{"ingress-class":"kourier.ingress.networking.knative.dev"}}'
kubectl apply -f https://github.com/knative/serving/releases/download/knative-v1.6.0/serving-default-domain.yaml
kubectl apply -f https://github.com/knative/serving/releases/download/knative-v1.6.0/serving-hpa.yaml
Как можно видеть, Knative включает в себя также Ingress-контроллер, который используется для обработки входящего трафика и запуска (или масштабирования) функций.
Также для управления KNative можно установить оператор:
kubectl apply -f https://github.com/knative/operator/releases/download/knative-v1.6.0/operator.yaml
После этого можно зарегистрировать необходимые компоненты (KNative serving, Ingress Controller):
apiVersion: v1
kind: Namespace
metadata:
name: knative-serving
---
apiVersion: operator.knative.dev/v1beta1
kind: KnativeServing
metadata:
name: knative-serving
namespace: knative-serving
---
apiVersion: operator.knative.dev/v1beta1
kind: KnativeServing
metadata:
name: knative-serving
namespace: knative-serving
spec:
ingress:
kourier:
enabled: true
config:
network:
ingress-class: "kourier.ingress.networking.knative.dev"
Кроме Kourier также могут использоваться ingress-контроллеры Istio или Contour.
После установки любым способом в кластере появится новое пространство имен knative-serving, включающее служебные процессы ingress-контроллера (controller, для переадресации входящих запросов по названию домена или по заголовку Host в запросе), Autoscaler (для автоматического запуска-остановки экземпляров контейнеров), Activator (отвечает за пересылку запросов и отправку метрик в autoscaler). При установке через оператор также добавляются webhook для отслеживания изменений в конфигурации KNative (например, в Kind: KnativeServing) и выполнения действий по согласованию состояния развертывания в соответствии с новыми параметрами.
Теперь, когда мы установили управляющие процессы knative, перейдем к установке инструментов управления. Для управления функциями установим инструмент командной строки kn:
wget https://github.com/knative/client/releases/download/knative-v1.6.0/kn-linux-amd64
chmod +x kn-linux-amd64
sudo mv kn-linux-amd64 /usr/local/bin/kn
Основной единицей управления в KNative является сервис (Service), который также определен как тип ресурса Kubernetes ksvc (или kservice). Для развертывания сервиса можно использовать как команду kn service create, так и определение ресурса через yaml. При создании ресурса обязательно определить название образа контейнера, который будет обрабатывать входящий запрос (--image), а также ограничения и опции, которые обычно используются в Deployment:
--arg
список значений для аргументов командной строки для передачи в entrypoint контейнера;--cmd
переопределение команды для вызова в entrypoint;--concurrency-limit
максимальное количество запросов для обработки одной репликой;--env
- изменение переменных окружения для контейнера;--revision-name
- определении ревизии (используется для версионирования разных версий функции);--limit
,--request
- ограничение и изначальный запрос ресурсов (например, cpu, memory, …);--mount
- монтирование ConfigMap (cm:name), Secret (secret:name) или именованный volume (определяется в --volume) в указанный каталог в контейнере;--port
- определение номера порта, на котором приложение прослушивает запросы--probe-liveness
- определение способа проверки доступности (например, http:0.0.0.0:8080:/ping);--probe-readiness
- проверка готовности сервиса к обработке запросов (например, exec:/checkreadiness.sh);--scale-min
,--scale-max
,--scale-init
- минимальное, максимальное и начальное значение количества реплик сервиса (любое значение может быть 0, но max должен быть ≥min);--scale-metric
- какая метрика используется для оценки необходимости изменения количества реплик (например, rps для оценки количества запросов в секунду);--scale-window
- интервал между обновлениями количества активных реплик (например, 5s);--volume
- подключение разделов (например, volumename=pvc: связывает pvc с именем name и соответствующий volume (используется в mount).
С точки зрения разработчика функция представляет собой веб-сервис, который принимает запросы на расположении / и отдает ответ, который будет доставлен клиенту. Например, простая функция Hello, World на Go может выглядеть так:
package main
import (
"fmt"
"net/http"
"os"
)
func handler(w http.ResponseWriter, r *http.Request) {
serviceName := os.Getenv("SERVICE_NAME")
fmt.Fprintf(w, "%s is called!\n", serviceName)
}
func main() {
http.HandleFunc("/", handler)
http.ListenAndServe(":8080", nil)
}
Пока мы будем использовать готовый контейнер из примеров и создадим новый сервис:
kn service create hello --image gcr.io/knative-samples/helloworld-go
После создания сервиса мы увидим адрес для подключения к функции. Обратите внимание, что для внешнего подключения будет использоваться адрес, к которому присоединился kourier и его можно узнать с помощью команды:
kubectl get svc -n knative-serving kourier
Если knative будет развернут при отсутствии поставщика внешних адресов для LoadBalancer, можно заменить тип сервиса на ClusterIP и добавить для него ExternalIPs для публикации. Также можно установить, например, metallb (в microk8s может быть добавлен через microk8s enable metallb:192.168.100.250-192.168.100.254).
Для корректного обращения к функции нужно будет указывать зарегистрированный адрес в заголовке Host.
curl -H 'Host: hello.default.example.com' http://externalip
Посмотреть список активных функций можно командой:
kubectl get ksvc
NAME URL LATESTCREATED LATESTREADY READY REASON
hello http://hello.default.example.com hello-00001 hello-00001 True
Как можно видеть, здесь также указывается актуальная ревизия, которая и будет принимать запросы по умолчанию. При публикации новой версии трафик переключается на нее, но с помощью опции --traffic
можно указать какая доля трафика будет отправлена на какую ревизию (например, для проведения экспериментов).
Доступные ревизии также могут быть получены через kubectl get revisions
, связь Host и соответствующих функций через kubectl get routes
. Также для управления развертыванием в CI/CD можно использовать ресурсы в apiVersion: serving.knative.dev/v1
(Service
, Revision
, Route
).
Теперь, когда мы умеем регистрировать отдельные функции и привязывать их к доменам, можно перейти к созданию архитектуры, основанной на событиях и поговорить про eventing.
В eventing ключевыми можно обозначить поставщиков событий (event producers), получателей событий (event consumers) и посредника между ними, в роли которого чаще всего используется брокер очередей (event sink). Sink выполняет функцию маршрутизации событий от поставщиков к получателям и также во многих случаях решает задачу буферизации (накопления) сообщений, если на момент отправки от producer не было ни одного consumer.
С точки зрения KNative функции могут выполняться последовательно (при этом на каждом этапе функция принимает событие на вход, выполняет обработку и формирует новое событие на выход, т.е. является одновременно и consumer (тело http-запроса) и producer (результат обработки запроса). Последняя функция в цепочке не возвращает результат. Также выполнение может происходить параллельно.
Прежде всего установим KNative Eventing. При использовании ручной установки это можно выполнить через применение CRD:
kubectl apply -f https://github.com/knative/eventing/releases/download/knative-v1.6.0/eventing-crds.yaml kubectl apply -f https://github.com/knative/eventing/releases/download/knative-v1.6.0/eventing-core.yaml
Для установки через оператор можно создать необходимые ресурсы:
apiVersion: v1
kind: Namespace
metadata:
name: knative-eventing
---
apiVersion: operator.knative.dev/v1beta1
kind: KnativeEventing
metadata:
name: knative-eventing
namespace: knative-eventing
Для реализации sink можно использовать каналы в памяти (ненадежно, поскольку будут потеряны при перезапуске):
kubectl apply -f https://github.com/knative/eventing/releases/download/knative-v1.6.0/in-memory-channel.yaml
Альтернативно можно установить поддержку каналов с использованием Apache Kafka:
kubectl apply -f https://github.com/knative-sandbox/eventing-kafka-broker/releases/download/knative-v1.6.2/eventing-kafka-controller.yaml kubectl apply -f https://github.com/knative-sandbox/eventing-kafka-broker/releases/download/knative-v1.6.2/eventing-kafka-channel.yaml
Источником событий для запуска последовательности может быть как функция (в этом случае они объединяются через Sequence), так и настроенная очередь (например, можно создать KafkaSource):
apiVersion: sources.knative.dev/v1beta1
kind: KafkaSource
metadata:
name: kafka-source
spec:
consumerGroup: knative-group
bootstrapServers:
- my-cluster-kafka-bootstrap.kafka:9092
topics:
- knative-demo-topic
sink:
ref:
apiVersion: serving.knative.dev/v1
kind: Service
name: event-display
В ref указывается в какой Service будут отправляться все новые сообщения в topic knative-demo-topic. Также источниками данных может быть периодическое событие (PingSource), внешний запрос (ApiServerSource), данные из Apache Camel (CamelSource), а также из произвольного контейнера (ContainerSource, он должен их отправлять через POST-запрос по протоколу Cloud Events). Регистрация источника может быть выполнена как через применение yaml-ресурса в apiVersion sources.knative.dev/v1beta1, так и через командную строку:
kn source ping create mysource --sink <...>
Для создания последовательности обработки событий нужно прежде всего зарегистрировать брокер:
kn broker create main
или через yaml:
apiVersion: eventing.knative.dev/v1
kind: Broker
metadata:
name: default
При создании брокера можно указать расположение конфигурации (для автоматического извлечения параметров подключения) и класс брокера (для создания собственных реализаций). Далее мы соединим три сервиса в последовательность обработки событий:
apiVersion: flows.knative.dev/v1
kind: Sequence
metadata:
name: sequence
spec:
channelTemplate:
apiVersion: messaging.knative.dev/v1
kind: InMemoryChannel
steps:
- ref:
apiVersion: serving.knative.dev/v1
kind: Service
name: first
- ref:
apiVersion: serving.knative.dev/v1
kind: Service
name: second
- ref:
apiVersion: serving.knative.dev/v1
kind: Service
name: third
reply:
ref:
kind: Broker
apiVersion: eventing.knative.dev/v1
name: default
---
apiVersion: sources.knative.dev/v1
kind: PingSource
metadata:
name: ping-source
spec:
schedule: "*/2 * * * *"
contentType: "application/json"
data: '{"message": "Hello world!"}'
sink:
ref:
apiVersion: eventing.knative.dev/v1
kind: Broker
name: default
---
apiVersion: eventing.knative.dev/v1
kind: Trigger
metadata:
name: sequence-trigger
spec:
broker: default
filter:
attributes:
type: dev.knative.sources.ping
subscriber:
ref:
apiVersion: flows.knative.dev/v1
kind: Sequence
name: sequence
Здесь используется канал для передачи сообщений в памяти. Сообщения через брокер запустят последовательность (здесь используется триггер для фильтрации сообщений по типу и запуску соответствующей последовательности), выход первой функции поступит на вход второй, затем аналогично на третью. В действительности можно было бы обойтись и без брокера (напрямую передавать выход PingSource в sink Sequence), но тогда мы не сможем воспользоваться возможностями буферизации брокеров (например, Apache Kafka). Объект reply указывает, куда будет отправлен ответ последней функции последовательности (в нашем случае - обратно в брокер), здесь также можно указать другую последовательность или сервис (в этом случае обработка на нем завершится). Объект reply может отсутствовать, если результат обработки не имеет значения.
Спецификация для параллельной обработки состоит из списка branches, в каждом из которых определяются filter (отбор релевантных событий) и subscriber (обработчик событий). Оба объекта используют ref для указания на соответствующий сервис или даже целую последовательность или другую параллельную обработку. Также может использоваться uri для вызова внешнего сервиса для обработки события. Результат может быть аккумулирован и отправлен через reply в сервис или поток обработки (Sequence или Parallel).
apiVersion: flows.knative.dev/v1
kind: Parallel
metadata:
name: me-odd-even-parallel
spec:
channelTemplate:
apiVersion: messaging.knative.dev/v1
kind: InMemoryChannel
branches:
- filter:
ref:
apiVersion: serving.knative.dev/v1
kind: Service
name: filter
subscriber:
ref:
apiVersion: serving.knative.dev/v1
kind: Service
name: transformer
Таким образом, KNative обеспечивает не только управление запуском функций в соответствии с актуальной нагрузкой, но и способы взаимодействия функций в модели потока событий, которые могут иметь как внешнее происхождение (например, из Kafka, API-запроса или любого другого источника, который может быть создан программно), так и являться выводом предыдущего сервиса или потока, которые координируются через механизмы KNative Eventing и позволяют организовать настройку взаимодействие компонентов приложения, построенного на событийно-ориентированной архитектуре, с использованием yaml-конфигураций внутри кластера Kubernetes.
Статья подготовлена в преддверии старта курса "DevOps практики и инструменты".