Системы оркестрации контейнеров существенно упростили управление многокомпонентными системами, в том числе основанными на микросервисной архитектуре. Но остался открытым вопрос организации надежного обмена сообщениями между микросервисами, координации последовательности операций при распределенной архитектуре. В этой статье мы рассмотрим подход Incubating (CNCF)-проекта Dapr (Distributed Application Runtime) по использованию Sidecar-контейнеров в Kubernetes для реализации микросервисной архитектуры, основанной на событиях.
Dapr предлагает для использования механизм обнаружения и взаимодействия между микросервисами (через вызовы методов и обмен сообщениями в модели pub-sub), но также позволяет управлять секретами, отслеживать действия при распределенных вычислениях, реализовать механизмы семафоров для блокировки общих ресурсов, выполнять связывание входных и выходных данных и событий с внешними хранилищами, а также сохранять и восстанавливать после сбоев состояние приложения и доставлять до распределенных приложений общую конфигурацию. Dapr использует концепцию акторов (actors) для описания логики и публикует HTTP и gRPC интерфейсы для взаимодействия с библиотекой. При этом Dapr создает только обобщенные интерфейсы, а реальное хранение секретов и состояния обеспечивается другими системами (например, SPIFFE или Postgres). Важной особенностью Dapr является использование Sidecar-контейнеров, которые присоединяются к существующим Pod’ам и выступают в роли посредника во взаимодействии с остальными сервисами. Таким образом каждый микросервис взаимодействует только с известным его Dapr Runtime (без необходимости встраивания его в код приложения), запущенный в том же pod, и никак не зависит от расположения других микросервисов (которые могут быть расположены в том числе в других облачных провайдерах), используемых решений для хранения состояния и секретов и другого. Dapr может работать как самостоятельный оркестратор (через утилиту командной строки dapr), так и совместно с Kubernetes (в этом случае для управления используется Dapr operator). Во втором случае конфигурация определяется через аннотации Deployment / DaemonSet / StatefulSet, а также CRD Component для конфигурирования хранилищ системных сервисов.
Для просмотра текущего состояния (распределенной конфигурации, доступных микросервисов и др.) Dapr устанавливает дополнительный Dashboard с доступом через веб-браузер.
В настоящий момент Dapr поддерживает SDK для следующих языков программирования (но теоретически можно использовать и любой другой, достаточно сделать реализацию клиента для HTTP или gRPC API):
C++
Go
Java / Kotlin (в том числе, с Spring Boot)
JavaScript (поддерживается также Express)
.NET
PHP
Python (может использоваться с Flask, FastAPI, gRPC)
Rust
Для настройки Dapr установим утилиту командой строки. После установки мы можем выполнить инициализацию Dapr:
dapr init
При установке в standalone-режиме дополнительно запускаются контейнеры dapr_redis для хранения состояния, dapr_zipkin для отслеживания истории и измерений времени при обработке распределенных вызовов, dapr_placement для управления запуском управляющих контейнеров. Для установки Dapr под управлением kubernetes нужно указать дополнительный флаг --kubernetes (и можно изменить пространство имен для запуска оператора --namespace) и --wait для ожидания завершения запуска и настройки оператора и вспомогательных контейнеров:
dapr init --kubernetes --namespace darp_system --wait
Далее запустим dashboard (dapr dashboard
) и подключимся к нему по адресу http://localhost:8080. Как можно увидеть, по умолчанию dapr создает хранилище конфигурации и состояния и механизм pub-sub.
Попробуем теперь сделать простое приложение, которое будет сохранять состояние при завершении и восстанавливать его при запуске. Для примера будем создавать код на Python. Прежде всего установим зависимости.
pip install dapr
И создадим код, который будет взаимодействовать с компонентом хранения состояния в Dapr:
from dapr.clients import DaprClient
store_name = 'statestore'
key = 'invocation_count'
with DaprClient() as d:
try:
state = d.get_state(store_name, key=key)
counter = int(state.data)
except:
counter = 0
counter+=1
print(f'New counter is {counter}')
d.save_state(store_name, key=key, value=str(counter))
Теперь запустим приложение с дополнительным управляющим контейнером dapr:
dapr run --app-id myapp -- python3 counter.py
При нескольких последовательных запусках значение счетчика будет сохраняться и увеличиваться на 1 при каждом запуске.
Опубликуем теперь метод для использования другими микросервисами, для этого импортируем класс App
из dapr.ext.grpc и аннотируем метод через @app.method
с указанием названия метода (можно будет использовать совместно с идентификатором приложения для вызова метода, например через команду dapr invoke
). Аннотированный метод будет получать объект типа InvokeMethodRequest
(из него можно получить дополнительные данные, добавленные к запросу) и возвращать InvokeMethodResponse
с кодированной байтовой последовательностью с ответом (может быть, например, json):
from dapr.ext.grpc import App, InvokeMethodRequest, InvokeMethodResponse
from dapr.clients import DaprClient
from dapr.clients.grpc._request import TransactionalStateOperation, TransactionOperationType
from dapr.clients.grpc._state import StateItem
store_name = 'statestore'
key = 'invocation_count'
app = App()
@app.method('increment_counter')
def increment(request: InvokeMethodRequest) -> InvokeMethodResponse:
with DaprClient() as d:
try:
state = d.get_state(store_name, key=key)
counter = int(state.data)
except:
counter = 0
counter+=1
d.save_state(store_name, key=key, value=str(counter))
return InvokeMethodResponse(str(counter).encode(), 'text/plain; charset=UTF-8')
app.run(10080)
Созданное приложение опубликуем на произвольный порт и укажем его при запуске сценария через dapr run, а затем подключимся (для теста) через утилиту командной строки:
dapr run --app-id myapp --app-port 10080 --app-protocol grpc -- python3 counter_service.py &
dapr invoke --app-id myapp --method increment_counter
При вызове dapr invoke будет отображаться ответ функции, опубликованной в приложении myapp (число увеличивается на 1 при каждом запуске dapr invoke). Для просмотра списка активных приложений можно использовать dapr list:
dapr list
APP ID HTTP PORT GRPC PORT APP PORT COMMAND AGE CREATED DAPRD PID CLI PID
myapp 50386 50387 10080 python3 counter_s... 9s 2022-12-14 23:52.08 56930 56924
Важно, что dapr invoke можно использовать только при self-hosted установке (через dapr init в Docker, без использования Kubernetes). Поэтому рассмотрим также, как можно выполнить взаимодействие между микросервисами. Для этого нам понадобится уже известный объект класса DaprClient
и метод invoke_method, который принимает идентификатор вызываемого приложение, название метода и дополнительные данные (при необходимости).
import json
import time
from dapr.clients import DaprClient
with DaprClient() as d:
resp = d.invoke_method(
'myapp',
'increment_counter',
)
print(resp.text(), flush=True)
теперь при запуске приложения из этого сценария мы будем видеть увеличивающееся целочисленное значение в логах выполнения приложения (в случае с Kubernetes их можно просмотреть через dapr logs, для self-hosted установки они доступны в выводе команды dapr run).
dapr run --app-id caller -- python3 caller.py
✅ You're up and running! Both Dapr and your app logs will appear here.
INFO[0000] placement tables updated, version: 0 app_id=client instance=d-zolotov-osx scope=dapr.runtime.actor.internal.placement type=log ver=1.9.5
== APP == 16
✅ Exited App successfully
Далее рассмотрим сценарий взаимодействия микросервисов через механизм pub/sub-очереди. В этом варианте использования ответ не предполагается получение ответа для сообщения, но публикующее обработчик приложение должно вернуть одно из значений статуса обработки (success - успешно, drop - сообщение пропущено и не должно быть обработано, retry - произошла ошибка и сообщение должно быть обработано позднее).
from dapr.ext.grpc import App
from dapr.clients.grpc._response import TopicEventResponse
from cloudevents.sdk.event import v1
import json
from dapr.clients import DaprClient
store_name = 'statestore'
pubsub_name = 'pubsub'
key = 'invocation_counter'
topic = 'counter'
app = App()
print('Publisher is initialized')
@app.subscribe(pubsub_name=pubsub_name, topic=topic)
def increment(event: v1.Event) -> TopicEventResponse:
with DaprClient() as d:
try:
state = d.get_state(store_name, key=key)
counter = int(state.data)
except:
counter = 0
counter+=1
d.save_state(store_name, key=key, value=str(counter))
print(f'New counter value is {counter}')
return TopicEventResponse("success")
app.run(10080)
Для удобства отладки можно дополнительно указать опцию --enable-api-logging для отображения обращений в API Dapr (например, чтение-запись состояния, подписка на очередь и другие). Также для корректного отображения вывода в консоль для сервиса добавим флаг -u (unbuffered) для python3:
dapr run --app-id myapp --app-port 10080 --app-protocol grpc --enable-api-logging -- python3 -u counter_service.py
Теперь мы можем протестировать отправку сообщения через консоль (для self-hosted решения):
dapr publish --pubsub pubsub --topic counter --publish-app-id="myapp"
или через клиента
import json
import time
from dapr.clients import DaprClient
with DaprClient() as d:
d.publish_event(
pubsub_name='pubsub',
topic_name='counter',
data=""
)
dapr run --app-id client -- python3 caller.py
Здесь важно отметить различия в сценариях вызова сервиса (ожидается ответ, вызов предполагает доступность вызываемого метода приложения в момент вызова) и отправки сообщения (сообщения будут сохраняться, даже если обработчик сейчас недоступен, и будут отправлены последовательно на обработку при запуске приложения, подписанного на эту очередь). Но если мы проведем простой эксперимент и отправим несколько сообщений при отключенном myapp и затем его запустим, то мы увидим несколько одинаковых сообщений об увеличении счетчика (в связи с тем, что обработчики будут запущены параллельно и начнут конкурировать за общий ресурс). Попробуем исправить эту ситуацию через использование Distributed Lock, для этого необходимо настроить lockstore, для этого создадим файл описания компонента в ~/.dapr/components/lockstore.yaml.
apiVersion: dapr.io/v1alpha1
kind: Component
metadata:
name: statestore
spec:
type: state.redis
version: v1
metadata:
- name: redisHost
value: localhost:6379
- name: redisPassword
value: ""
- name: actorStateStore
value: "true"
Также, чтобы исключить состояние гонки при вызове самого метода захвата блокировки, добавим произвольную задержку:
from dapr.ext.grpc import App
from dapr.clients.grpc._response import TopicEventResponse
from cloudevents.sdk.event import v1
import json
import time
import random
from dapr.clients import DaprClient
store_name = 'statestore'
lockstore_name = 'lockstore'
lock_name = 'counter_lock'
pubsub_name = 'pubsub'
client_id = 'myapp'
expiration = 10 # для исключения бесконечной блокировки
key = 'invocation_counter'
topic = 'counter'
app = App()
print('Publisher is initialized')
@app.subscribe(pubsub_name=pubsub_name, topic=topic)
def increment(event: v1.Event) -> TopicEventResponse:
with DaprClient() as d:
time.sleep(random.randint(1,500)/1000) # исключаем race condition для try_lock
with d.try_lock(lockstore_name, lock_name, client_id, expiration):
try:
state = d.get_state(store_name, key=key)
counter = int(state.data)
except:
counter = 0
counter+=1
d.save_state(store_name, key=key, value=str(counter))
print(f'New counter value is {counter}')
return TopicEventResponse("success")
app.run(10080)
Теперь множественные сообщения из очереди будут обрабатываться корректно и каждое из событий приведет к увеличению счетчика на 1.
Альтернативным способом удаленного вызова методов является использование акторов. Добавим необходимую зависимость:
pip install dapr-ext-fastapi-dev
(также поддерживаются другие серверы, например Flask).
Код сервера будет представлять собой REST API с регистрацией точки подключения как актора:
from dapr.actor import Actor, ActorInterface, actormethod
from dapr.actor.runtime.runtime import ActorRuntime
from dapr.actor.runtime.config import ActorRuntimeConfig, ActorTypeConfig
from dapr.clients import DaprClient
from dapr.ext.fastapi import DaprActor
from fastapi import FastAPI
store_name = "statestore"
key = "invocation_counter"
class CounterActorInterface(ActorInterface):
@actormethod("Increment")
async def increment(self) -> int:
pass
class CounterActor(
Actor, CounterActorInterface
): # дополнительно можно подключить Remindable и реализовать set_reminder, receive_reminder, set_timer, timer_callback
def __init__(self, ctx, actor_id):
super(CounterActor, self).__init__(ctx, actor_id)
async def increment(self):
with DaprClient() as d:
try:
state = d.get_state(store_name, key=key)
counter = int(state.data)
except:
counter = 0
counter += 1
d.save_state(store_name, key=key, value=str(counter))
print(f"New counter value is {counter}")
return counter
config = ActorRuntimeConfig()
ActorRuntime.set_actor_config(config)
app = FastAPI(title=f"{CounterActor.__name__}Service")
actor = DaprActor(app)
@app.on_event("startup")
async def fastapi_startup():
await actor.register_actor(CounterActor)
Запустим приложение, публикующее этого актора:
dapr run --app-id myapp --app-port 3000 --enable-api-logging -- uvicorn --port 3000 counter_actor:app
Для проверки создадим простого клиента:
import asyncio
from dapr.actor import Actor, ActorProxy, ActorId, ActorInterface, actormethod
class CounterActorInterface(ActorInterface):
@actormethod("Increment")
async def increment(self) -> int:
pass
async def main():
proxy = ActorProxy.create('CounterActor', ActorId('1'), CounterActorInterface)
result = await proxy.invoke_method("Increment")
value = result.decode("utf8")
print(f'New counter is {value}')
asyncio.run(main())
Также нередко для доступа к внешним сервисам или при создании интеграций нужно обеспечить надежное хранение секретов и возможность их получить из микросервисов. В следующем примере мы посмотрим как можно получить доступ к единому хранилищу секретов (в нашем случае на основе простого json-файла, но может использовать любое поддерживаемое vault-решение). Добавим конфигурацию для secretsstore в ~/.dapr/config.yaml:
apiVersion: dapr.io/v1alpha1
kind: Configuration
metadata:
name: daprConfig
spec:
tracing:
samplingRate: "1"
zipkin:
endpointAddress: http://localhost:9411/api/v2/spans
secrets:
scopes:
- storeName: "localsecretstore"
defaultAccess: "deny"
allowedSecrets: ["secretKey",]
добавим конфигурацию компонента:
apiVersion: dapr.io/v1alpha1
kind: Component
metadata:
name: localsecretstore
namespace: default
spec:
type: secretstores.local.file
metadata:
- name: secretsFile
value: secrets.json
- name: nestedSeparator
value: ":"
и создадим json-файл с конфигурацией ключей (secrets.json):
{
"secretKey": "mysecretvalue",
}
теперь доработаем код и получим значение секрета:
secretKey = 'secretKey'
secretStoreName = 'localsecretstore'
//...
print(f'Secret value is {d.get_secret(secretStoreName, secretKey).secret["secretKey"]}')
Мы рассмотрели основные сценарии использования Dapr для организации взаимодействия микросервисов. Во второй части статьи мы поговорим о возможностях наблюдения за выполнением последовательности вызовов (distributed tracing) и об интеграции с внешними источниками данных (binding).
Использование Dapr в Kubernetes принципиально не отличается от self-hosted установки (кроме того, что будут недоступны команды dapr publish и dapr invoke). Конфигурация компонентов (хранилище секретов и состояния, брокер очереди сообщений и др.) определяются через CRD Component (рассмотренные выше примеры будут работать и в Kubernetes). Дополнительно можно использовать следующие команды dapr:
components - отображает зарегистрированные компоненты и их конфигурацию
configurations - отображает известные конфигурации dapr
logs - просмотр логов для присоединенного dapr-контейнера
status - информация о состоянии сервисов dapr (включая оператор и инжектор, который используется для присоединения sidecar)
upgrade - обновление контейнеров dapr до актуальной версии
При публикации приложения через Deployment / StatefulSet / DaemonSet в kubernetes необходимые атрибуты (app-id, app-port и другие) могут быть определены в аннотациях шаблона (spec.template.annotations: dapr.io/enabled=true для присоединения sidecar к поду, dapr.io/app-id - идентификатор процесса, dapr.io/app-port - порт для публикации).
Примеры кода с конфигурацией можно посмотреть в официальном репозитории (например, для Python), а также найти рассмотренные выше фрагменты кода и сценарии для запуска в github-репозитории.
Материал подготовлен в преддверии старта курса Microservice Architecture от OTUS. Узнать подробнее о курсе и зарегистрироваться на бесплатный урок можно по ссылке ниже.