Kubernetes, как система оркестрации, позволяет автоматизировать процесс развертывания сложных приложений и восстанавливать ожидаемое состояние кластера после сбоев. В общем случае приложение представляет собой резидентно запущенные контейнеры, которые обрабатывают запросы клиентов в цикле обработки событий, при этом при росте нагрузки могут создаваться дополнительные реплики (с использованием механизма Horizontal Pod Autoscaling). Однако, нередко бывают случаи, когда сервис используется не очень часто, но при этом в запущенном состоянии он забирает большое количество оперативной памяти или процессорного времени, и желательно обеспечить механизм запуска сервиса по запросу (или по внешнему событию). Для реализации такого варианта использования сейчас доступен инструмент knative, который был принят в марте 2022 года в качестве incubating-проекта в CNCF (Cloud Native Computing Foundation). В этой статье мы разберемся с основными понятиями knative и попробуем создать архитектуру приложения, основанную на событиях, с использованием eventing-возможностей knative.

Прежде всего отметим, что knative реализует модель "функция как сервис" по аналогии с AWS Lambda, Google Cloud Functions и другими похожими реализациями. KNative обеспечивает собственный механизм масштабирования (опирается на HPA), который ориентируется на запрос ресурсов экземплярами сервиса (функциями) и количество запросов. KNative может работать с существующим кластером Kubernetes, а также использовать один из вариантов установки на собственный компьютер (minikube, microk8s, …). В случае с microk8s его можно разрешить как плагин:

snap install microk8s --classic
microk8s enable community
microk8s enable knative
microk8s start

Для установки на кластер Kubernetes можно использовать Yaml-файлы для Serving (отвечает за запуск, регистрацию и удаление функций) и Eventing (управление очередями сообщений). В первой части мы будем иметь дело только с Serving:

kubectl apply -f https://github.com/knative/serving/releases/download/knative-v1.6.0/serving-crds.yaml
kubectl apply -f https://github.com/knative/serving/releases/download/knative-v1.6.0/serving-core.yaml
kubectl apply -f https://github.com/knative/net-kourier/releases/download/knative-v1.6.0/kourier.yaml
kubectl patch configmap/config-network \
  --namespace knative-serving \
  --type merge \
  --patch '{"data":{"ingress-class":"kourier.ingress.networking.knative.dev"}}'
kubectl apply -f https://github.com/knative/serving/releases/download/knative-v1.6.0/serving-default-domain.yaml
kubectl apply -f https://github.com/knative/serving/releases/download/knative-v1.6.0/serving-hpa.yaml

Как можно видеть, Knative включает в себя также Ingress-контроллер, который используется для обработки входящего трафика и запуска (или масштабирования) функций.

Также для управления KNative можно установить оператор:

kubectl apply -f https://github.com/knative/operator/releases/download/knative-v1.6.0/operator.yaml

После этого можно зарегистрировать необходимые компоненты (KNative serving, Ingress Controller):

apiVersion: v1
kind: Namespace
metadata:
  name: knative-serving
---
apiVersion: operator.knative.dev/v1beta1
kind: KnativeServing
metadata:
  name: knative-serving
  namespace: knative-serving
---
apiVersion: operator.knative.dev/v1beta1
kind: KnativeServing
metadata:
  name: knative-serving
  namespace: knative-serving
spec:
  ingress:
    kourier:
      enabled: true
  config:
    network:
      ingress-class: "kourier.ingress.networking.knative.dev"

Кроме Kourier также могут использоваться ingress-контроллеры Istio или Contour.

После установки любым способом в кластере появится новое пространство имен knative-serving, включающее служебные процессы ingress-контроллера (controller, для переадресации входящих запросов по названию домена или по заголовку Host в запросе), Autoscaler (для автоматического запуска-остановки экземпляров контейнеров), Activator (отвечает за пересылку запросов и отправку метрик в autoscaler). При установке через оператор также добавляются webhook для отслеживания изменений в конфигурации KNative (например, в Kind: KnativeServing) и выполнения действий по согласованию состояния развертывания в соответствии с новыми параметрами.

Теперь, когда мы установили управляющие процессы knative, перейдем к установке инструментов управления. Для управления функциями установим инструмент командной строки kn:

wget https://github.com/knative/client/releases/download/knative-v1.6.0/kn-linux-amd64
chmod +x kn-linux-amd64
sudo mv kn-linux-amd64 /usr/local/bin/kn

Основной единицей управления в KNative является сервис (Service), который также определен как тип ресурса Kubernetes ksvc (или kservice). Для развертывания сервиса можно использовать как команду kn service create, так и определение ресурса через yaml. При создании ресурса обязательно определить название образа контейнера, который будет обрабатывать входящий запрос (--image), а также ограничения и опции, которые обычно используются в Deployment:

  • --arg список значений для аргументов командной строки для передачи в entrypoint контейнера;

  • --cmd переопределение команды для вызова в entrypoint;

  • --concurrency-limit максимальное количество запросов для обработки одной репликой;

  • --env - изменение переменных окружения для контейнера;

  • --revision-name - определении ревизии (используется для версионирования разных версий функции);

  • --limit, --request - ограничение и изначальный запрос ресурсов (например, cpu, memory, …);

  • --mount - монтирование ConfigMap (cm:name), Secret (secret:name) или именованный volume (определяется в --volume) в указанный каталог в контейнере;

  • --port - определение номера порта, на котором приложение прослушивает запросы

  • --probe-liveness - определение способа проверки доступности (например, http:0.0.0.0:8080:/ping);

  • --probe-readiness - проверка готовности сервиса к обработке запросов (например, exec:/checkreadiness.sh);

  • --scale-min, --scale-max, --scale-init - минимальное, максимальное и начальное значение количества реплик сервиса (любое значение может быть 0, но max должен быть ≥min);

  • --scale-metric - какая метрика используется для оценки необходимости изменения количества реплик (например, rps для оценки количества запросов в секунду);

  • --scale-window - интервал между обновлениями количества активных реплик (например, 5s);

  • --volume - подключение разделов (например, volumename=pvc: связывает pvc с именем name и соответствующий volume (используется в mount).

С точки зрения разработчика функция представляет собой веб-сервис, который принимает запросы на расположении / и отдает ответ, который будет доставлен клиенту. Например, простая функция Hello, World на Go может выглядеть так:

package main

import (
	"fmt"
	"net/http"
	"os"
)

func handler(w http.ResponseWriter, r *http.Request) {
	serviceName := os.Getenv("SERVICE_NAME")
	fmt.Fprintf(w, "%s is called!\n", serviceName)
}

func main() {
	http.HandleFunc("/", handler)
	http.ListenAndServe(":8080", nil)
}

Пока мы будем использовать готовый контейнер из примеров и создадим новый сервис:

kn service create hello --image gcr.io/knative-samples/helloworld-go 

После создания сервиса мы увидим адрес для подключения к функции. Обратите внимание, что для внешнего подключения будет использоваться адрес, к которому присоединился kourier и его можно узнать с помощью команды:

kubectl get svc -n knative-serving kourier

Если knative будет развернут при отсутствии поставщика внешних адресов для LoadBalancer, можно заменить тип сервиса на ClusterIP и добавить для него ExternalIPs для публикации. Также можно установить, например, metallb (в microk8s может быть добавлен через microk8s enable metallb:192.168.100.250-192.168.100.254).

Для корректного обращения к функции нужно будет указывать зарегистрированный адрес в заголовке Host.

curl -H 'Host: hello.default.example.com' http://externalip

Посмотреть список активных функций можно командой:

kubectl get ksvc 
NAME    URL                                LATESTCREATED   LATESTREADY   READY   REASON 
hello   http://hello.default.example.com   hello-00001     hello-00001   True   

Как можно видеть, здесь также указывается актуальная ревизия, которая и будет принимать запросы по умолчанию. При публикации новой версии трафик переключается на нее, но с помощью опции --traffic можно указать какая доля трафика будет отправлена на какую ревизию (например, для проведения экспериментов).

Доступные ревизии также могут быть получены через kubectl get revisions, связь Host и соответствующих функций через kubectl get routes. Также для управления развертыванием в CI/CD можно использовать ресурсы в apiVersion: serving.knative.dev/v1 (Service, Revision, Route).

Теперь, когда мы умеем регистрировать отдельные функции и привязывать их к доменам, можно перейти к созданию архитектуры, основанной на событиях и поговорить про eventing.

В eventing ключевыми можно обозначить поставщиков событий (event producers), получателей событий (event consumers) и посредника между ними, в роли которого чаще всего используется брокер очередей (event sink). Sink выполняет функцию маршрутизации событий от поставщиков к получателям и также во многих случаях решает задачу буферизации (накопления) сообщений, если на момент отправки от producer не было ни одного consumer.

С точки зрения KNative функции могут выполняться последовательно (при этом на каждом этапе функция принимает событие на вход, выполняет обработку и формирует новое событие на выход, т.е. является одновременно и consumer (тело http-запроса) и producer (результат обработки запроса). Последняя функция в цепочке не возвращает результат. Также выполнение может происходить параллельно.

Прежде всего установим KNative Eventing. При использовании ручной установки это можно выполнить через применение CRD:

kubectl apply -f https://github.com/knative/eventing/releases/download/knative-v1.6.0/eventing-crds.yaml kubectl apply -f https://github.com/knative/eventing/releases/download/knative-v1.6.0/eventing-core.yaml 

Для установки через оператор можно создать необходимые ресурсы:

apiVersion: v1
kind: Namespace
metadata:
  name: knative-eventing
---
apiVersion: operator.knative.dev/v1beta1
kind: KnativeEventing
metadata:
  name: knative-eventing
  namespace: knative-eventing

Для реализации sink можно использовать каналы в памяти (ненадежно, поскольку будут потеряны при перезапуске):

kubectl apply -f https://github.com/knative/eventing/releases/download/knative-v1.6.0/in-memory-channel.yaml 

Альтернативно можно установить поддержку каналов с использованием Apache Kafka:

kubectl apply -f https://github.com/knative-sandbox/eventing-kafka-broker/releases/download/knative-v1.6.2/eventing-kafka-controller.yaml kubectl apply -f https://github.com/knative-sandbox/eventing-kafka-broker/releases/download/knative-v1.6.2/eventing-kafka-channel.yaml 

Источником событий для запуска последовательности может быть как функция (в этом случае они объединяются через Sequence), так и настроенная очередь (например, можно создать KafkaSource):

apiVersion: sources.knative.dev/v1beta1
kind: KafkaSource
metadata:
  name: kafka-source
spec:
  consumerGroup: knative-group
  bootstrapServers:
  - my-cluster-kafka-bootstrap.kafka:9092
  topics:
  - knative-demo-topic
  sink:
    ref:
      apiVersion: serving.knative.dev/v1
      kind: Service
      name: event-display

В ref указывается в какой Service будут отправляться все новые сообщения в topic knative-demo-topic. Также источниками данных может быть периодическое событие (PingSource), внешний запрос (ApiServerSource), данные из Apache Camel (CamelSource), а также из произвольного контейнера (ContainerSource, он должен их отправлять через POST-запрос по протоколу Cloud Events). Регистрация источника может быть выполнена как через применение yaml-ресурса в apiVersion sources.knative.dev/v1beta1, так и через командную строку:

kn source ping create mysource --sink <...>

Для создания последовательности обработки событий нужно прежде всего зарегистрировать брокер:

kn broker create main

или через yaml:

apiVersion: eventing.knative.dev/v1
kind: Broker
metadata:
 name: default

При создании брокера можно указать расположение конфигурации (для автоматического извлечения параметров подключения) и класс брокера (для создания собственных реализаций). Далее мы соединим три сервиса в последовательность обработки событий:

apiVersion: flows.knative.dev/v1
kind: Sequence
metadata:
  name: sequence
spec:
  channelTemplate:
    apiVersion: messaging.knative.dev/v1
    kind: InMemoryChannel
  steps:
    - ref:
        apiVersion: serving.knative.dev/v1
        kind: Service
        name: first
    - ref:
        apiVersion: serving.knative.dev/v1
        kind: Service
        name: second
    - ref:
        apiVersion: serving.knative.dev/v1
        kind: Service
        name: third
  reply:
    ref:
      kind: Broker
      apiVersion: eventing.knative.dev/v1
      name: default
---
apiVersion: sources.knative.dev/v1
kind: PingSource
metadata:
  name: ping-source
spec:
  schedule: "*/2 * * * *"
  contentType: "application/json"
  data: '{"message": "Hello world!"}'
  sink:
    ref:
      apiVersion: eventing.knative.dev/v1
      kind: Broker
      name: default
---
apiVersion: eventing.knative.dev/v1
kind: Trigger
metadata:
  name: sequence-trigger
spec:
  broker: default
  filter:
    attributes:
      type: dev.knative.sources.ping
  subscriber:
    ref:
      apiVersion: flows.knative.dev/v1
      kind: Sequence
      name: sequence

Здесь используется канал для передачи сообщений в памяти. Сообщения через брокер запустят последовательность (здесь используется триггер для фильтрации сообщений по типу и запуску соответствующей последовательности), выход первой функции поступит на вход второй, затем аналогично на третью. В действительности можно было бы обойтись и без брокера (напрямую передавать выход PingSource в sink Sequence), но тогда мы не сможем воспользоваться возможностями буферизации брокеров (например, Apache Kafka). Объект reply указывает, куда будет отправлен ответ последней функции последовательности (в нашем случае - обратно в брокер), здесь также можно указать другую последовательность или сервис (в этом случае обработка на нем завершится). Объект reply может отсутствовать, если результат обработки не имеет значения.

Спецификация для параллельной обработки состоит из списка branches, в каждом из которых определяются filter (отбор релевантных событий) и subscriber (обработчик событий). Оба объекта используют ref для указания на соответствующий сервис или даже целую последовательность или другую параллельную обработку. Также может использоваться uri для вызова внешнего сервиса для обработки события. Результат может быть аккумулирован и отправлен через reply в сервис или поток обработки (Sequence или Parallel).

apiVersion: flows.knative.dev/v1
kind: Parallel
metadata:
  name: me-odd-even-parallel
spec:
  channelTemplate:
    apiVersion: messaging.knative.dev/v1
    kind: InMemoryChannel
  branches:
  - filter:
      ref:
        apiVersion: serving.knative.dev/v1
        kind: Service
        name: filter
    subscriber:
      ref:
        apiVersion: serving.knative.dev/v1
        kind: Service
        name: transformer

Таким образом, KNative обеспечивает не только управление запуском функций в соответствии с актуальной нагрузкой, но и способы взаимодействия функций в модели потока событий, которые могут иметь как внешнее происхождение (например, из Kafka, API-запроса или любого другого источника, который может быть создан программно), так и являться выводом предыдущего сервиса или потока, которые координируются через механизмы KNative Eventing и позволяют организовать настройку взаимодействие компонентов приложения, построенного на событийно-ориентированной архитектуре, с использованием yaml-конфигураций внутри кластера Kubernetes.

Статья подготовлена в преддверии старта курса "DevOps практики и инструменты".