Pull to refresh
1011.63
OTUS
Цифровые навыки от ведущих экспертов

Микросервисы на основе событий с Dapr

Reading time11 min
Views5.1K

Системы оркестрации контейнеров существенно упростили управление многокомпонентными системами, в том числе основанными на микросервисной архитектуре. Но остался открытым вопрос организации надежного обмена сообщениями между микросервисами, координации последовательности операций при распределенной архитектуре. В этой статье мы рассмотрим подход Incubating (CNCF)-проекта Dapr (Distributed Application Runtime) по использованию Sidecar-контейнеров в Kubernetes для реализации микросервисной архитектуры, основанной на событиях. 

Dapr предлагает для использования механизм обнаружения и взаимодействия между микросервисами (через вызовы методов и обмен сообщениями в модели pub-sub), но также позволяет управлять секретами, отслеживать действия при распределенных вычислениях, реализовать механизмы семафоров для блокировки общих ресурсов, выполнять связывание входных и выходных данных и событий с внешними хранилищами, а также сохранять и восстанавливать после сбоев состояние приложения и доставлять до распределенных приложений общую конфигурацию. Dapr использует концепцию акторов (actors) для описания логики и публикует HTTP и gRPC интерфейсы для взаимодействия с библиотекой. При этом Dapr создает только обобщенные интерфейсы, а реальное хранение секретов и состояния обеспечивается другими системами (например, SPIFFE или Postgres). Важной особенностью Dapr является использование Sidecar-контейнеров, которые присоединяются к существующим Pod’ам и выступают в роли посредника во взаимодействии с остальными сервисами. Таким образом каждый микросервис взаимодействует только с известным его Dapr Runtime (без необходимости встраивания его в код приложения), запущенный в том же pod, и никак не зависит от расположения других микросервисов (которые могут быть расположены в том числе в других облачных провайдерах), используемых решений для хранения состояния и секретов и другого. Dapr может работать как самостоятельный оркестратор (через утилиту командной строки dapr), так и совместно с Kubernetes (в этом случае для управления используется Dapr operator). Во втором случае конфигурация определяется через аннотации Deployment / DaemonSet / StatefulSet, а также CRD Component для конфигурирования хранилищ системных сервисов.

 Для просмотра текущего состояния (распределенной конфигурации, доступных микросервисов и др.) Dapr устанавливает дополнительный Dashboard с доступом через веб-браузер.

В настоящий момент Dapr поддерживает SDK для следующих языков программирования (но теоретически можно использовать и любой другой, достаточно сделать реализацию клиента для HTTP или gRPC API):

  • C++

  • Go

  • Java / Kotlin (в том числе, с Spring Boot)

  • JavaScript (поддерживается также Express)

  • .NET

  • PHP

  • Python (может использоваться с Flask, FastAPI, gRPC)

  • Rust

Для настройки Dapr установим утилиту командой строки. После установки мы можем выполнить инициализацию Dapr:

dapr init

При установке в standalone-режиме дополнительно запускаются контейнеры dapr_redis для хранения состояния, dapr_zipkin для отслеживания истории и измерений времени при обработке распределенных вызовов, dapr_placement для управления запуском управляющих контейнеров. Для установки Dapr под управлением kubernetes нужно указать дополнительный флаг --kubernetes (и можно изменить пространство имен для запуска оператора --namespace) и --wait для ожидания завершения запуска и настройки оператора и вспомогательных контейнеров:

dapr init --kubernetes --namespace darp_system --wait

Далее запустим dashboard (dapr dashboard) и подключимся к нему по адресу http://localhost:8080. Как можно увидеть, по умолчанию dapr создает хранилище конфигурации и состояния и механизм pub-sub.

Попробуем теперь сделать простое приложение, которое будет сохранять состояние при завершении и восстанавливать его при запуске. Для примера будем создавать код на Python. Прежде всего установим зависимости.

pip install dapr

И создадим код, который будет взаимодействовать с компонентом хранения состояния в Dapr:

from dapr.clients import DaprClient

store_name = 'statestore'
key = 'invocation_count'

with DaprClient() as d:
    try:
        state = d.get_state(store_name, key=key)
        counter = int(state.data)
    except:
        counter = 0
    counter+=1
    print(f'New counter is {counter}')
    d.save_state(store_name, key=key, value=str(counter))

Теперь запустим приложение с дополнительным управляющим контейнером dapr:

dapr run --app-id myapp -- python3 counter.py

При нескольких последовательных запусках значение счетчика будет сохраняться и увеличиваться на 1 при каждом запуске. 

Опубликуем теперь метод для использования другими микросервисами, для этого импортируем класс App из dapr.ext.grpc и аннотируем метод через @app.method с указанием названия метода (можно будет использовать совместно с идентификатором приложения для вызова метода, например через команду dapr invoke). Аннотированный метод будет получать объект типа InvokeMethodRequest (из него можно получить дополнительные данные, добавленные к запросу) и возвращать InvokeMethodResponse с кодированной байтовой последовательностью с ответом (может быть, например, json):

from dapr.ext.grpc import App, InvokeMethodRequest, InvokeMethodResponse

from dapr.clients import DaprClient

from dapr.clients.grpc._request import TransactionalStateOperation, TransactionOperationType
from dapr.clients.grpc._state import StateItem

store_name = 'statestore'
key = 'invocation_count'

app = App()

@app.method('increment_counter')
def increment(request: InvokeMethodRequest) -> InvokeMethodResponse:
    with DaprClient() as d:
        try:
            state = d.get_state(store_name, key=key)
            counter = int(state.data)
        except:
            counter = 0
        counter+=1
        d.save_state(store_name, key=key, value=str(counter))
        return InvokeMethodResponse(str(counter).encode(), 'text/plain; charset=UTF-8')

app.run(10080)

Созданное приложение опубликуем на произвольный порт и укажем его при запуске сценария через dapr run, а затем подключимся (для теста) через утилиту командной строки:

dapr run --app-id myapp --app-port 10080 --app-protocol grpc -- python3 counter_service.py &
dapr invoke --app-id myapp --method increment_counter

При вызове dapr invoke будет отображаться ответ функции, опубликованной в приложении myapp (число увеличивается на 1 при каждом запуске dapr invoke). Для просмотра списка активных приложений можно использовать dapr list:

dapr list


  APP ID  HTTP PORT  GRPC PORT  APP PORT  COMMAND               AGE  CREATED              DAPRD PID  CLI PID
  myapp   50386      50387      10080     python3 counter_s...  9s   2022-12-14 23:52.08  56930      56924

Важно, что dapr invoke можно использовать только при self-hosted установке (через dapr init в Docker, без использования Kubernetes).  Поэтому рассмотрим также, как можно выполнить взаимодействие между микросервисами. Для этого нам понадобится уже известный объект класса DaprClient и метод invoke_method, который принимает идентификатор вызываемого приложение, название метода и дополнительные данные (при необходимости).

import json
import time

from dapr.clients import DaprClient

with DaprClient() as d:
    resp = d.invoke_method(
        'myapp',
        'increment_counter',
    )

    print(resp.text(), flush=True)

теперь при запуске приложения из этого сценария мы будем видеть увеличивающееся целочисленное значение в логах выполнения приложения (в случае с Kubernetes их можно просмотреть через dapr logs, для self-hosted установки они доступны в выводе команды dapr run).

dapr run --app-id caller -- python3 caller.py

✅  You're up and running! Both Dapr and your app logs will appear here.

INFO[0000] placement tables updated, version: 0          app_id=client instance=d-zolotov-osx scope=dapr.runtime.actor.internal.placement type=log ver=1.9.5
== APP == 16
✅  Exited App successfully

Далее рассмотрим сценарий взаимодействия микросервисов через механизм pub/sub-очереди. В этом варианте использования ответ не предполагается получение ответа для сообщения, но публикующее обработчик приложение должно вернуть одно из значений статуса обработки (success - успешно, drop - сообщение пропущено и не должно быть обработано, retry - произошла ошибка и сообщение должно быть обработано позднее). 

from dapr.ext.grpc import App
from dapr.clients.grpc._response import TopicEventResponse

from cloudevents.sdk.event import v1
import json

from dapr.clients import DaprClient

store_name = 'statestore'
pubsub_name = 'pubsub'
key = 'invocation_counter'
topic = 'counter'

app = App()

print('Publisher is initialized')

@app.subscribe(pubsub_name=pubsub_name, topic=topic)
def increment(event: v1.Event) -> TopicEventResponse:
    with DaprClient() as d:
        try:
            state = d.get_state(store_name, key=key)
            counter = int(state.data)
        except:
            counter = 0
        counter+=1
        d.save_state(store_name, key=key, value=str(counter))
        print(f'New counter value is {counter}')
        return TopicEventResponse("success")

app.run(10080)

Для удобства отладки можно дополнительно указать опцию --enable-api-logging для отображения обращений в API Dapr (например, чтение-запись состояния, подписка на очередь и другие). Также для корректного отображения вывода в консоль для сервиса добавим флаг -u (unbuffered) для python3:

dapr run --app-id myapp --app-port 10080 --app-protocol grpc --enable-api-logging -- python3 -u counter_service.py

Теперь мы можем протестировать отправку сообщения через консоль (для self-hosted решения):

dapr publish --pubsub pubsub --topic counter --publish-app-id="myapp"

или через клиента

import json
import time

from dapr.clients import DaprClient

with DaprClient() as d:
    d.publish_event(
        pubsub_name='pubsub',
        topic_name='counter',
        data=""
    )
dapr run --app-id client -- python3 caller.py

Здесь важно отметить различия в сценариях вызова сервиса (ожидается ответ, вызов предполагает доступность вызываемого метода приложения в момент вызова) и отправки сообщения (сообщения будут сохраняться, даже если обработчик сейчас недоступен, и будут отправлены последовательно на обработку при запуске приложения, подписанного на эту очередь). Но если мы проведем простой эксперимент и отправим несколько сообщений при отключенном myapp и затем его запустим, то мы увидим несколько одинаковых сообщений об увеличении счетчика (в связи с тем, что обработчики будут запущены параллельно и начнут конкурировать за общий ресурс). Попробуем исправить эту ситуацию через использование Distributed Lock, для этого необходимо настроить lockstore, для этого создадим файл описания компонента в ~/.dapr/components/lockstore.yaml.

apiVersion: dapr.io/v1alpha1
kind: Component
metadata:
  name: statestore
spec:
  type: state.redis
  version: v1
  metadata:
  - name: redisHost
    value: localhost:6379
  - name: redisPassword
    value: ""
  - name: actorStateStore
    value: "true"

Также, чтобы исключить состояние гонки при вызове самого метода захвата блокировки, добавим произвольную задержку:

from dapr.ext.grpc import App
from dapr.clients.grpc._response import TopicEventResponse

from cloudevents.sdk.event import v1
import json
import time
import random

from dapr.clients import DaprClient

store_name = 'statestore'
lockstore_name = 'lockstore'
lock_name = 'counter_lock'
pubsub_name = 'pubsub'
client_id = 'myapp'
expiration = 10		# для исключения бесконечной блокировки
key = 'invocation_counter'
topic = 'counter'

app = App()

print('Publisher is initialized')

@app.subscribe(pubsub_name=pubsub_name, topic=topic)
def increment(event: v1.Event) -> TopicEventResponse:
    with DaprClient() as d:
        time.sleep(random.randint(1,500)/1000)   # исключаем race condition для try_lock
        with d.try_lock(lockstore_name, lock_name, client_id, expiration):
            try:
                state = d.get_state(store_name, key=key)
                counter = int(state.data)
            except:
                counter = 0
            counter+=1
            d.save_state(store_name, key=key, value=str(counter))
            print(f'New counter value is {counter}')
            return TopicEventResponse("success")

app.run(10080)

Теперь множественные сообщения из очереди будут обрабатываться корректно и каждое из событий приведет к увеличению счетчика на 1.

Альтернативным способом удаленного вызова методов является использование акторов. Добавим необходимую зависимость:

pip install dapr-ext-fastapi-dev

(также поддерживаются другие серверы, например Flask). 

Код сервера будет представлять собой REST API с регистрацией точки подключения как актора:

from dapr.actor import Actor, ActorInterface, actormethod
from dapr.actor.runtime.runtime import ActorRuntime
from dapr.actor.runtime.config import ActorRuntimeConfig, ActorTypeConfig
from dapr.clients import DaprClient
from dapr.ext.fastapi import DaprActor
from fastapi import FastAPI

store_name = "statestore"
key = "invocation_counter"


class CounterActorInterface(ActorInterface):
    @actormethod("Increment")
    async def increment(self) -> int:
        pass


class CounterActor(
    Actor, CounterActorInterface
):  # дополнительно можно подключить Remindable и реализовать set_reminder, receive_reminder, set_timer, timer_callback
    def __init__(self, ctx, actor_id):
        super(CounterActor, self).__init__(ctx, actor_id)

    async def increment(self):
        with DaprClient() as d:
            try:
                state = d.get_state(store_name, key=key)
                counter = int(state.data)
            except:
                counter = 0
            counter += 1
            d.save_state(store_name, key=key, value=str(counter))
            print(f"New counter value is {counter}")
            return counter


config = ActorRuntimeConfig()
ActorRuntime.set_actor_config(config)

app = FastAPI(title=f"{CounterActor.__name__}Service")
actor = DaprActor(app)


@app.on_event("startup")
async def fastapi_startup():
    await actor.register_actor(CounterActor)

Запустим приложение, публикующее этого актора:

dapr run --app-id myapp --app-port 3000 --enable-api-logging -- uvicorn --port 3000 counter_actor:app

Для проверки создадим простого клиента:

import asyncio
from dapr.actor import Actor, ActorProxy, ActorId, ActorInterface, actormethod

class CounterActorInterface(ActorInterface):
    @actormethod("Increment")
    async def increment(self) -> int:
        pass

async def main():
    proxy = ActorProxy.create('CounterActor', ActorId('1'), CounterActorInterface)
    result = await proxy.invoke_method("Increment")
    value = result.decode("utf8")
    print(f'New counter is {value}')

asyncio.run(main())

Также нередко для доступа к внешним сервисам или при создании интеграций нужно обеспечить надежное хранение секретов и возможность их получить из микросервисов. В следующем примере мы посмотрим как можно получить доступ к единому хранилищу секретов (в нашем случае на основе простого json-файла, но может использовать любое поддерживаемое vault-решение). Добавим конфигурацию для secretsstore в ~/.dapr/config.yaml:

apiVersion: dapr.io/v1alpha1
kind: Configuration
metadata:
  name: daprConfig
spec:
  tracing:
    samplingRate: "1"
    zipkin:
      endpointAddress: http://localhost:9411/api/v2/spans
  secrets:
    scopes:
      - storeName: "localsecretstore"
        defaultAccess: "deny"
        allowedSecrets: ["secretKey",]

добавим конфигурацию компонента:

apiVersion: dapr.io/v1alpha1
kind: Component
metadata:
  name: localsecretstore
  namespace: default
spec:
  type: secretstores.local.file
  metadata:
  - name: secretsFile
    value: secrets.json
  - name: nestedSeparator
    value: ":"

и создадим json-файл с конфигурацией ключей (secrets.json):

{
    "secretKey": "mysecretvalue",
}

теперь доработаем код и получим значение секрета:

secretKey = 'secretKey'
secretStoreName = 'localsecretstore'

//...
    print(f'Secret value is {d.get_secret(secretStoreName, secretKey).secret["secretKey"]}')

Мы рассмотрели основные сценарии использования Dapr для организации взаимодействия микросервисов. Во второй части статьи мы поговорим о возможностях наблюдения за выполнением последовательности вызовов (distributed tracing) и об интеграции с внешними источниками данных (binding).

Использование Dapr в Kubernetes принципиально не отличается от self-hosted установки (кроме того, что будут недоступны команды dapr publish и dapr invoke). Конфигурация компонентов (хранилище секретов и состояния, брокер очереди сообщений и др.) определяются через CRD Component (рассмотренные выше примеры будут работать и в Kubernetes). Дополнительно можно использовать следующие команды dapr:

  • components - отображает зарегистрированные компоненты и их конфигурацию

  • configurations - отображает известные конфигурации dapr

  • logs - просмотр логов для присоединенного dapr-контейнера

  • status - информация о состоянии сервисов dapr (включая оператор и инжектор, который используется для присоединения sidecar)

  • upgrade - обновление контейнеров dapr до актуальной версии

При публикации приложения через Deployment / StatefulSet / DaemonSet в kubernetes необходимые атрибуты (app-id, app-port и другие) могут быть определены в аннотациях шаблона (spec.template.annotations: dapr.io/enabled=true для присоединения sidecar к поду, dapr.io/app-id - идентификатор процесса, dapr.io/app-port - порт для публикации).

Примеры кода с конфигурацией можно посмотреть в официальном репозитории (например, для Python), а также найти рассмотренные выше фрагменты кода и сценарии для запуска в github-репозитории.

Материал подготовлен в преддверии старта курса Microservice Architecture от OTUS. Узнать подробнее о курсе и зарегистрироваться на бесплатный урок можно по ссылке ниже.

Tags:
Hubs:
If this publication inspired you and you want to support the author, do not hesitate to click on the button
Total votes 10: ↑9 and ↓1+10
Comments0

Articles

Information

Website
otus.ru
Registered
Founded
Employees
101–200 employees
Location
Россия
Representative
OTUS