Pull to refresh

Tic Tac Toe, часть 6: Flask и Celery/RabbitMQ

Reading time3 min
Views9K
Tic Tac Toe: содержание цикла статей

Попробуем подключить Celery/RabbitMQ к нашему проекту. В качестве основы возьмем проект с Flask'ом. Celery займется вычислением случайного числа.


Установка проекта

Клонируем проект на свой компьютер:


git clone https://github.com/nomhoi/tic-tac-toe-part6.git

Запускаем контейнеры:


cd tic-tac-toe-part6
docker-compose up -d

Выполняем сборку веб-приложения:


cd front
npm install
npm run-script build

Открываем броузер по адресу http://localhost.


Docker контейнеры

Сервис nginx остался без изменений. В сервис flask добавили установку пакета Celery в файл requirements.txt и смонтировали папку с исходником проекта Celery:


    volumes:
      - ./flask:/code
      - ./celery/app/proj:/code/proj

Добавились новые сервисы celery и rabbit.


  celery:
    container_name: celery
    build:
      context: celery/
      dockerfile: Dockerfile
    volumes:
      - ./celery/app:/app
    depends_on:
      - rabbit
    networks:
      - backend

  rabbit:
    container_name: rabbit
    hostname: rabbit
    image: rabbitmq:3.7.15-alpine
    environment:
      - RABBITMQ_DEFAULT_USER=admin
      - RABBITMQ_DEFAULT_PASS=CT2gNABH8eJ9yVh
    ports:
      - "5672:5672"
    networks:
      - backend

Сервис celery

Сервис celery выполнен на базе этого туториала. Кто не знаком с Celery, имеет смысл тут-же пройтись по этому туториалу:


$ docker exec -it celery python
Python 3.7.3 (default, May 11 2019, 02:00:41)
[GCC 8.3.0] on linux
Type "help", "copyright", "credits" or "license" for more information.
>>> from proj.tasks import add
>>> add.delay(2, 2)
<AsyncResult: 43662174-657f-4dd3-ab1a-22f5950c8794>
>>>

Как видим, наш проект в Celery оформлен в виде пакета proj. В задачи Celery в файл tasks.py добавлена наша задача getnumber:


@app.task
def getnumber():
    return randrange(9)

Сервис flask

Напомню, что в этот сервис мы добавили пакет Celery и смонтировали папку с проектом proj. Исходный код этого проекта теперь присутствует в двух сервисах flask и celery.


from flask import Flask, jsonify
from proj.tasks import getnumber
from proj.celery import app as celery

app = Flask(__name__)

@app.route('/number')
def number():
    task = getnumber.delay()
    return task.id

@app.route('/result/<task_id>')
def result(task_id):
    task = getnumber.AsyncResult(task_id)
    result = task.get(timeout = 3)
    response = {
        'state': task.state,
        'number': result,
    }
    return jsonify(response)

В обработчике number мы вызываем задачу getnumber, которая выполняется в воркере celery и возвращаем идентификатор задачи. В обработчике result мы получаем результат выполненной задачи по идентификатору и возвращаем ответ в JSON формате фронтенду.


Фронтенд

В диспетчере нашей игры по нажатию кнопки Get random number сначала отправляем запрос бэкенду по адресу /number и получаем от него идентификатор задачи Celery. После этого в функции getResult периодически отправляем запрос бэкенду на получение результата по адресу /result/<task_id>.


async function getResult(task_id) {
    var i = 1;
    var timerId = setTimeout(async function go() {
        console.log("Result request: " + i);
        console.log("Task Id: " + task_id)
        const res = await fetch(`result/` + task_id);
        const response = await res.text();

        if (res.ok) {
            let result = JSON.parse(response);
            console.log(result)

            if (result.state === 'SUCCESS') {
                let i = parseInt(result.number);

                if ($status === 1 || $history.state.squares[i]) {
                    promise_number = result.number + ' - busy';
                    return;
                }

                promise_number = i;
                history.push(new Command($history.state, i));
                return;
            }
        }

        if (i < 5) 
            setTimeout(go, 500);
        i++;
    }, 500);
}   

Изменили вывод результатов запросов к бэкенду:


{#await promise}
    <p>...подождите</p>
{:then taskid}
    <p>Task Id: {taskid}</p>
{:catch error}
    <p style="color: red">{error.message}</p>
{/await}

{#await promise_number}
    <p>...подождите</p>
{:then number}
    <p>Number: {number}</p>
{:catch error}
    <p style="color: red">{error.message}</p>
{/await}

Домашнее задание

На самом деле сейчас результат приходит сразу после первого запроса. Попробуйте нашего интеллектуального агента живущего в celery сделать немного задумчивым, чтобы не сразу выдавал ответ.


Время от времени начинает приходить ошибка от flask'a "500 (INTERNAL SERVER ERROR)", это в celery поднимается исключение "celery.exceptions.TimeoutError: The operation timed out.". Помогает только перезагрузка сервисов. Пока не копал в чем дело, пожалуйста, посмотрите.


В getResult обрабатывается ответ только с состоянием SUCCESS, в остальных случаях выполняется повторный запрос. Можно добавить обработку ответов с FAILURE и PENDING. Формирование ответа в обработчике result также может зависеть от состояния задачи.


Вместо брокера сообщений RabbitMQ можно попробовать подключить Redis.


Можно попробовать выполнять запросы из приложения через брокеры сообщений.


А также попробовать выполнить это на базе примера с Boost.Beast.


Репозиторий на GitHub

https://github.com/nomhoi/tic-tac-toe-part6

Tags:
Hubs:
Total votes 6: ↑6 and ↓0+6
Comments0

Articles