company_banner

Пример event-driven приложения на основе вебхуков в объектном S3-хранилище Mail.ru Cloud Solutions


    Rube Goldberg coffee machine

    Event-driven architecture повышает ценовую эффективность используемых ресурсов, потому что они задействуются только в тот момент, когда они нужны. Существует масса вариантов, как это реализовать и не создавать дополнительные облачные сущности в качестве worker-приложений. И сегодня я расскажу не про FaaS, а про вебхуки. Я покажу учебный пример обработки событий с помощью вебхуков объектного хранилища.

    Пара слов об объектном хранилище и о вебхуках. Объектные хранилища позволяют хранить любые данные в облаке в виде объектов, доступных по S3 или другому API (в зависимости от реализации) через HTTP/HTTPS. Вебхуки (webhooks) в общем случае — это пользовательские обратные вызовы по HTTP. Обычно они запускаются событием, например, отправкой кода в репозиторий или комментарием, публикуемым в блоге. Когда происходит событие, исходный сайт отправляет HTTP-запрос на URL-адрес, указанный для вебхука. В результате можно сделать так, чтобы события на одном сайте вызывали действия на другом (wiki). В случае, когда исходным сайтом выступает объектное хранилище, в роли событий выступают изменения его содержимого.

    Примеры простых кейсов, когда можно использовать такую автоматизацию:

    1. Создание копий всех объектов в другом облачном хранилище. Копии должны создаваться «на лету», при любом добавлении или изменении файлов.
    2. Автоматическое создание серий миниатюр графических файлов, добавление водяных знаков к фотографиям, другие модификации изображений.
    3. Оповещение о приходе новых документов (к примеру, распределенная бухгалтерская служба выкладывает в облако отчеты, а финмониторинг получает оповещения о новых отчетах, проверяет и анализирует их).
    4. Чуть более сложные кейсы подразумевают, к примеру, формирование запроса к Kubernetes, который создает под с нужными контейнерами, передает в него параметры задачи и после обработки сворачивает контейнер.

    В качестве примера мы сделаем вариант задачи 1, когда изменения в бакете объектного хранилища Mail.ru Cloud Solutions (MCS) с помощью вебхуков синхронизируются в объектном хранилище AWS. В реальном нагруженном кейсе следует предусмотреть асинхронную работу за счет регистрации вебхуков в очереди, но для учебной задачи мы сделаем реализацию без этого.

    Схема работы


    Протокол взаимодействия детально описан в руководстве по S3-вебхукам на MCS. В схеме работы есть следующие элементы:

    • Сервис публикации, который находится на стороне S3-хранилища и публикует HTTP-запросы при срабатывании webnhook.
    • Сервер приема вебхуков, который слушает обращения сервиса публикации по HTTP и выполняет соответствующие действия. Сервер может быть написан на любом языке, в нашем примере мы напишем сервер на Go.

    Особенность реализации вебхуков в S3 API — регистрация сервера приема вебхуков на сервисе публикации. В частности, сервер приема вебхуков должен подтвердить подписку на сообщения сервиса публикации (в других реализациях вебхуков обычно подтверждать подписку не требуется).

    Соответственно, сервер приема вебхуков должен поддерживать две основных операции:

    • отвечать на запрос сервиса публикации о подтверждении регистрации,
    • обрабатывать приходящие события.

    Установка сервера приема вебхуков


    Для запуска сервера приема вебхуков нужен Linux-сервер. В данной статье для примера используем виртуальный инстанс, который развертываем на MCS.

    Установим необходимое ПО и запустим сервер приема вебхуков.

    ubuntu@ubuntu-basic-1-2-10gb:~$ sudo apt-get install git
    Reading package lists... Done
    Building dependency tree
    Reading state information... Done
    The following packages were automatically installed and are no longer required:
      bc dns-root-data dnsmasq-base ebtables landscape-common liblxc-common 
    liblxc1 libuv1 lxcfs lxd lxd-client python3-attr python3-automat 
    python3-click python3-constantly python3-hyperlink
      python3-incremental python3-pam python3-pyasn1-modules 
    python3-service-identity python3-twisted python3-twisted-bin 
    python3-zope.interface uidmap xdelta3
    Use 'sudo apt autoremove' to remove them.
    Suggested packages:
      git-daemon-run | git-daemon-sysvinit git-doc git-el git-email git-gui 
    gitk gitweb git-cvs git-mediawiki git-svn
    The following NEW packages will be installed:
      git
    0 upgraded, 1 newly installed, 0 to remove and 46 not upgraded.
    Need to get 3915 kB of archives.
    After this operation, 32.3 MB of additional disk space will be used.
    Get:1 http://MS1.clouds.archive.ubuntu.com/ubuntu bionic-updates/main 
    amd64 git amd64 1:2.17.1-1ubuntu0.7 [3915 kB]
    Fetched 3915 kB in 1s (5639 kB/s)
    Selecting previously unselected package git.
    (Reading database ... 53932 files and directories currently installed.)
    Preparing to unpack .../git_1%3a2.17.1-1ubuntu0.7_amd64.deb ...
    Unpacking git (1:2.17.1-1ubuntu0.7) ...
    Setting up git (1:2.17.1-1ubuntu0.7) ...

    Клонируем папку с сервером приема вебхуков:

    ubuntu@ubuntu-basic-1-2-10gb:~$ git clone
    https://github.com/RomanenkoDenys/s3-webhook.git
    Cloning into 's3-webhook'...
    remote: Enumerating objects: 48, done.
    remote: Counting objects: 100% (48/48), done.
    remote: Compressing objects: 100% (27/27), done.
    remote: Total 114 (delta 20), reused 45 (delta 18), pack-reused 66
    Receiving objects: 100% (114/114), 23.77 MiB | 20.25 MiB/s, done.
    Resolving deltas: 100% (49/49), done.

    Запустим сервер:

    ubuntu@ubuntu-basic-1-2-10gb:~$ cd s3-webhook/
    ubuntu@ubuntu-basic-1-2-10gb:~/s3-webhook$ sudo ./s3-webhook -port 80

    Подписка на сервис публикации


    Зарегистрировать свой сервер приема вебхуков можно через API либо web-интерфейс. Для простоты будем регистрировать через web-интерфейс:

    1. Идем в раздел бакетов в кабинете управления.
    2. Заходим в бакет, для которого будем настраивать wеbhooks, и нажимаем на шестеренку:



    Переходим на вкладку Webhooks и нажимаем Добавить:


    Заполняем поля:



    ID — название вебхука.

    Event — какие события передавать. Мы задали передачу всех событий, которые происходят при работе с файлами (добавление и удаление).

    URL — адрес сервера приема вебхуков.

    Filter prefix/suffix — фильтр, который позволяет генерировать вебхуки только на объекты, названия которых соответствуют определенным правилам. Например, чтобы вебхук срабатывал только файлы с расширением .png, в Filter suffix надо написать «png».

    В текущий момент поддерживаются для обращения к серверу приема вебхуков только порты 80 и 443.

    Нажмем Добавить hook и увидим следующее:


    Hook добавлен.

    Сервер приема вебхуков в логах показывает прохождение процесса регистрации хука:

    ubuntu@ubuntu-basic-1-2-10gb:~/s3-webhook$ sudo ./s3-webhook -port 80
    2020/06/15 12:01:14 [POST] incoming HTTP request from 
    95.163.216.92:42530
    2020/06/15 12:01:14 Got timestamp: 2020-06-15T15:01:13+03:00 TopicArn: 
    mcs5259999770|myfiles-ash|s3:ObjectCreated:*,s3:ObjectRemoved:* Token: 
    E2itMqAMUVVZc51pUhFWSp13DoxezvRxkUh5P7LEuk1dEe9y URL: 
    http://89.208.199.220/webhook
    2020/06/15 12:01:14 Generate responce signature: 
    3754ce36636f80dfd606c5254d64ecb2fd8d555c27962b70b4f759f32c76b66d

    Регистрация закончена. В следующем разделе более детально рассмотрим алгоритм работы сервера приема вебхуков.

    Описание сервера приема вебхуков


    В нашем примере сервер написан на Go. Разберем основные принципы его работы.

    package main
    
    // Generate hmac_sha256_hex
    func HmacSha256hex(message string, secret string) string {
    }
    
    // Generate hmac_sha256
    func HmacSha256(message string, secret string) string {
    }
    
    // Send subscription confirmation
    func SubscriptionConfirmation(w http.ResponseWriter, req *http.Request, body []byte) {
    }
    
    // Send subscription confirmation
    func GotRecords(w http.ResponseWriter, req *http.Request, body []byte) {
    }
    
    // Liveness probe
    func Ping(w http.ResponseWriter, req *http.Request) {
        // log request
        log.Printf("[%s] incoming HTTP Ping request from %s\n", req.Method, req.RemoteAddr)
        fmt.Fprintf(w, "Pong\n")
    }
    
    //Webhook
    func Webhook(w http.ResponseWriter, req *http.Request) {
    }
    
    func main() {
    
        // get command line args
        bindPort := flag.Int("port", 80, "number between 1-65535")
        bindAddr := flag.String("address", "", "ip address in dot format")
        flag.StringVar(&actionScript, "script", "", "external script to execute")
        flag.Parse()
    
        http.HandleFunc("/ping", Ping)
        http.HandleFunc("/webhook", Webhook)
    
    log.Fatal(http.ListenAndServe(*bindAddr+":"+strconv.Itoa(*bindPort), nil))
    }

    Рассмотрим основные функции:

    • Ping() — роут, отвечающий по URL/ping, простейшая реализация liveness probe.
    • Webhook() — основной роут, обработчик URL/вебхука:
      • подтверждает регистрацию на сервисе публикации (переход в функцию SubscriptionConfirmation),
      • обрабатывает приходящие вебхуки (функция Gotrecords).
    • Функции HmacSha256 и HmacSha256hex — реализации алгоритмов шифрования HMAC-SHA256 и HMAC-SHA256 с выводом в виде строки 16-ричных чисел для подстчета сигнатуры.
    • main — главная функция, обрабатывает параметры командной строки и регистрирует обработчики URL.

    Параметры командной строки, принимаемые сервером:

    • -port — порт, на котором сервер будет слушать.
    • -address — IP-адрес, который будет слушать сервер.
    • -script — внешняя программа, которая вызывается на каждый пришедший хук.

    Рассмотрим подробнее некоторые функции:

    //Webhook
    func Webhook(w http.ResponseWriter, req *http.Request) {
    
        // Read body
        body, err := ioutil.ReadAll(req.Body)
        defer req.Body.Close()
        if err != nil {
            http.Error(w, err.Error(), 500)
            return
        }
    
        // log request
        log.Printf("[%s] incoming HTTP request from %s\n", req.Method, req.RemoteAddr)
        // check if we got subscription confirmation request
        if strings.Contains(string(body), 
    "\"Type\":\"SubscriptionConfirmation\"") {
            SubscriptionConfirmation(w, req, body)
        } else {
            GotRecords(w, req, body)
        }
    
    }

    Эта функция определяет, что пришло — запрос на подтверждение регистрации либо вебхук. Как следует из документации, в случае подтверждения регистрации приходит следующая структура Json в запросе Post:

    POST http://test.com HTTP/1.1
    x-amz-sns-messages-type: SubscriptionConfirmation
    content-type: application/json
    
    {
        "Timestamp":"2019-12-26T19:29:12+03:00",
        "Type":"SubscriptionConfirmation",
        "Message":"You have chosen to subscribe to the topic $topic. To confirm the subscription you need to response with calculated signature",
        "TopicArn":"mcs2883541269|bucketA|s3:ObjectCreated:Put",
        "SignatureVersion":1,
        "Token":«RPE5UuG94rGgBH6kHXN9FUPugFxj1hs2aUQc99btJp3E49tA»
    }

    На этот запрос нужно ответить:

    content-type: application/json
    
    {"signature":«ea3fce4bb15c6de4fec365d36bcebbc34ccddf54616d5ca12e1972f82b6d37af»}

    Где сигнатура вычисляется как:

    signature = hmac_sha256(url, hmac_sha256(TopicArn, 
    hmac_sha256(Timestamp, Token)))

    Если же приходит вебхук, то структура Post-запроса выглядит так:

    POST <url> HTTP/1.1
    x-amz-sns-messages-type: SubscriptionConfirmation
    
    { "Records":
        [
            {
                "s3": {
                    "object": {
                        "eTag":"aed563ecafb4bcc5654c597a421547b2",
                        "sequencer":1577453615,
                        "key":"some-file-to-bucket",
                        "size":100
                    },
                "configurationId":"1",
                "bucket": {
                    "name": "bucketA",
                    "ownerIdentity": {
                        "principalId":"mcs2883541269"}
                    },
                    "s3SchemaVersion":"1.0"
                },
                "eventVersion":"1.0",
                "requestParameters":{
                    "sourceIPAddress":"185.6.245.156"
                },
                "userIdentity": {
                    "principalId":"2407013e-cbc1-415f-9102-16fb9bd6946b"
                },
                "eventName":"s3:ObjectCreated:Put",
                "awsRegion":"ru-msk",
                "eventSource":"aws:s3",
                "responseElements": {
                    "x-amz-request-id":"VGJR5rtJ"
                }
            }
        ]
    }

    Соответственно, в зависимости от запроса нужно понять, как обрабатывать данные. Я выбрал в качестве индикатора запись "Type":"SubscriptionConfirmation", поскольку она присутствует в запросе на подтверждение подписки и не присутствует в вебхуке. Исходя из наличия/отсутствия этой записи в POST-запросе, дальнейшее выполнение программы переходит либо в функцию SubscriptionConfirmation, либо в функцию GotRecords.

    Функцию SubscriptionConfirmation детально рассматривать не будем, она реализована по принципам, изложенным в документации. Изучить исходный код этой функции можно в git-репозитории проекта.

    Функция GotRecords разбирает приходящий запрос и для каждого объекта Record вызывает внешний скрипт (имя которого было передано в параметре -script) с параметрами:

    • имя бакета
    • ключ объекта
    • действие:
      • copy — если в исходном запросе EventName = ObjectCreated | PutObject | PutObjectCopy
      • delete — если в исходном запросе EventName = ObjectRemoved | DeleteObject

    Таким образом, если прилетел хук c Post-запросом, как описано выше, и параметр -script=script.sh то скрипт будет вызван следующим образом:

    script.sh  bucketA some-file-to-bucket copy

    Следует понимать, что данный сервер приема вебхуков — это не законченное production-решение, а упрощенный пример возможной реализации.

    Пример работы


    Сделаем синхронизацию файлов основного бакета в MCS в резервный бакет в AWS. Основной бакет называется myfiles-ash, резервный — myfiles-backup (конфигурация бакета в AWS выходит за пределы данной статьи). Соответственно, когда файл кладется в основной бакет, его копия должна появиться в резервном, когда удаляется из основного — удаляться в резервном.

    Работать с бакетами будем утилитой awscli, с которой совместимо как облачное хранилище MCS, так и облачное хранилище AWS.

    ubuntu@ubuntu-basic-1-2-10gb:~$ sudo apt-get install awscli
    Reading package lists... Done
    Building dependency tree
    Reading state information... Done
    After this operation, 34.4 MB of additional disk space will be used.
    Unpacking awscli (1.14.44-1ubuntu1) ...
    Setting up awscli (1.14.44-1ubuntu1) ...

    Сконфигурируем доступ к API S3 MCS:

    ubuntu@ubuntu-basic-1-2-10gb:~$ aws configure --profile mcs
    AWS Access Key ID [None]: hdywEPtuuJTExxxxxxxxxxxxxx
    AWS Secret Access Key [None]: hDz3SgxKwXoxxxxxxxxxxxxxxxxxx
    Default region name [None]:
    Default output format [None]:

    Сконфигурируем доступ к API S3 AWS:

    ubuntu@ubuntu-basic-1-2-10gb:~$ aws configure --profile aws
    AWS Access Key ID [None]: AKIAJXXXXXXXXXXXX
    AWS Secret Access Key [None]: dfuerphOLQwu0CreP5Z8l5fuXXXXXXXXXXXXXXXX
    Default region name [None]:
    Default output format [None]:

    Проверим доступы:

    К AWS:

    ubuntu@ubuntu-basic-1-2-10gb:~$ aws s3 ls --profile aws
    2020-07-06 08:44:11 myfiles-backup

    Для MCS, при работе команды надо добавлять --endpoint-url:

    ubuntu@ubuntu-basic-1-2-10gb:~$ aws s3 ls --profile mcs --endpoint-url 
    https://hb.bizmrg.com
    2020-02-04 06:38:05 databasebackups-0cdaaa6402d4424e9676c75a720afa85
    2020-05-27 10:08:33 myfiles-ash

    Доступ получен.

    Теперь напишем скрипт обработки приходящего хука, назовем его s3_backup_mcs_aws.sh

    #!/bin/bash
    # Require aws cli
    # if file added — copy it to backup bucket
    # if file removed — remove it from backup bucket
    # Variables
    ENDPOINT_MCS="https://hb.bizmrg.com"
    AWSCLI_MCS=`which aws`" --endpoint-url ${ENDPOINT_MCS} --profile mcs s3"
    AWSCLI_AWS=`which aws`" --profile aws s3"
    BACKUP_BUCKET="myfiles-backup"
    
    SOURCE_BUCKET="${1}"
    SOURCE_FILE="${2}"
    ACTION="${3}"
    
    SOURCE="s3://${SOURCE_BUCKET}/${SOURCE_FILE}"
    TARGET="s3://${BACKUP_BUCKET}/${SOURCE_FILE}"
    TEMP="/tmp/${SOURCE_BUCKET}/${SOURCE_FILE}"
    
    case ${ACTION} in
        "copy")
        ${AWSCLI_MCS} cp "${SOURCE}" "${TEMP}"
        ${AWSCLI_AWS} cp "${TEMP}" "${TARGET}"
        rm ${TEMP}
        ;;
    
        "delete")
        ${AWSCLI_AWS} rm ${TARGET}
        ;;
    
        *)
        echo "Usage: ${0} sourcebucket sourcefile copy/delete"
        exit 1
        ;;
    esac

    Запускаем сервер:

    ubuntu@ubuntu-basic-1-2-10gb:~/s3-webhook$ sudo ./s3-webhook -port 80 -
    script scripts/s3_backup_mcs_aws.sh

    Проверяем, как это сработает. Через веб-интерфейс MCS добавим файл test.txt в бакет myfiles-ash. В логах в консоли видно, что был сделан запрос на сервер вебхуков:

    2020/07/06 09:43:08 [POST] incoming HTTP request from 
    95.163.216.92:56612
    download: s3://myfiles-ash/test.txt to ../../../tmp/myfiles-ash/test.txt
    upload: ../../../tmp/myfiles-ash/test.txt to 
    s3://myfiles-backup/test.txt

    Проверим содержимое бакета myfiles-backup в AWS:

    ubuntu@ubuntu-basic-1-2-10gb:~/s3-webhook$ aws s3 --profile aws ls 
    myfiles-backup
    2020-07-06 09:43:10       1104 test.txt

    Теперь через веб-интерфейс удалим файл из бакета myfiles-ash.

    Логи сервера:

    2020/07/06 09:44:46 [POST] incoming HTTP request from 
    95.163.216.92:58224
    delete: s3://myfiles-backup/test.txt

    Содержимое бакета:

    ubuntu@ubuntu-basic-1-2-10gb:~/s3-webhook$ aws s3 --profile aws ls 
    myfiles-backup
    ubuntu@ubuntu-basic-1-2-10gb:~$

    Файл удален, задача решена.

    Заключение и ToDo


    Весь код, используемый в этой статье, лежит в моем репозитории. Там же лежат примеры скриптов и примеры подсчета сигнатур для регистрации вебхуков.

    Данный код — не более чем пример того, как можно использовать S3-вебхуки в своей деятельности. Как я сказал в начале, если планировать использование такого сервера в продуктиве, необходимо как минимум переписывать сервер под асинхронную работу: приходящие вебхуки регистрировать в очереди (RabbitMQ или NATS), а оттуда их разбирать и обрабатывать worker-приложениями. Иначе при массированном приходе вебхуков можно столкнуться с нехваткой ресурсов сервера для выполнения задач. Наличие же очередей позволяет разносить сервер и workers, а также решать вопросы с повтором задач в случае сбоев. Так же желательно менять логирование на более подробное и более стандартизованное.

    Успехов!

    В нашем телеграм-канале — новости об этом и других сервисах на облачной платформе Mail.ru Cloud Solutions.

    Еще почитать по теме:

    Mail.ru Group
    Строим Интернет

    Комментарии 0

    Только полноправные пользователи могут оставлять комментарии. Войдите, пожалуйста.

    Самое читаемое