Храним состояние чата на стеке

    Всё новое — это хорошо забытое старое!

    Сейчас многие пишут различных ботов, которые в IM общаются с пользователем и как-то помогают пользователю жить.



    Если Вы посмотрите на код многих ботов, то он обычно сводится к одному и тому же паттерну:


    • приходит сообщение
    • оно передаётся пользовательскому обработчику сообщений (callback)

    Это в общем-то универсальный способ написания ботов. Он подходит и для чатов с одним человеком и для ботов, подключаемых в группы. С этим способом всё хорошо кроме одного: код даже простых ботов часто бывает довольно запутан.


    Давайте попробуем его распутать.


    Начну с дисклаймеров:


    1. То что описано в этой статье подходит для ботов вида бот <-> один человек.
    2. Код, который приведён в данной статье — является кодом-скетчем. Написан специально для этой статьи за 15 минут. Так что не судите строго.
    3. Я применял подобный подход в бизнесе: с балансированием нагрузки. Но, увы, мой продакшен код имеет много инфраструктурных зависимостей и так просто его не опубликовать. Поэтому в статье используется этот скетч. Я коснусь вопросов развития парадигмы (опишу куда и как мы развивали).

    Ну а теперь поехали.


    В качестве опоры рассмотрим асинхронную библиотеку aiogram, python3.7+. По ссылке есть пример простого echo-бота.


    Скопирую его сюда:


    Посмотреть код
    """
    This is a echo bot.
    It echoes any incoming text messages.
    """
    
    import logging
    
    from aiogram import Bot, Dispatcher, executor, types
    
    API_TOKEN = 'BOT TOKEN HERE'
    
    # Configure logging
    logging.basicConfig(level=logging.INFO)
    
    # Initialize bot and dispatcher
    bot = Bot(token=API_TOKEN)
    dp = Dispatcher(bot)
    
    @dp.message_handler(regexp='(^cat[s]?$|puss)')
    async def cats(message: types.Message):
        with open('data/cats.jpg', 'rb') as photo:
            '''
            # Old fashioned way:
            await bot.send_photo(
                message.chat.id,
                photo,
                caption='Cats are here ',
                reply_to_message_id=message.message_id,
            )
            '''
    
            await message.reply_photo(photo, caption='Cats are here ')
    
    @dp.message_handler()
    async def echo(message: types.Message):
        # old style:
        # await bot.send_message(message.chat.id, message.text)
    
        await message.answer(message.text)
    
    if __name__ == '__main__':
        executor.start_polling(dp, skip_updates=True)

    Видим, что организация бота — традиционная. Каждый раз, когда пользователь нам что-то пишет — вызывается функция-обработчик.


    Что плохого в этой парадигме?


    То, что функция-обработчик для реализации сложных диалогов должна на каждом своём вызове восстанавливать свой стейт из какого-то хранилища.


    Если взглянуть на большинство ботов поддерживающих какой-то бизнес (например приём на работу), то они задают пользователю 1..N вопросов, затем по итогу этих вопросов что-то делают (например сохраняют анкету в БД).


    Если бы можно было писать бота в традиционном стиле (а не колбечном), то можно было бы хранить данные пользователя прямо на стеке.


    Давайте попробуем это сделать.


    Я набросал скетч модуля, подключив который можно использовать с этой библиотекой:


    Посмотреть код
    # Файл - chat_dispatcher.py
    import asyncio
    
    class ChatDispatcher:
        class Timeout(RuntimeError):
            def __init__(self, last_message):
                self.last_message = last_message
                super().__init__('timeout exceeded')
    
        def __init__(self, *,
                     chatcb,
                     shardcb = lambda message: message.from_user.id,
                     inactive_timeout = 15 * 60):
            self.chatcb = chatcb
            self.shardcb = shardcb
            self.inactive_timeout = inactive_timeout
            self.chats = {}
    
        async def handle(self, message):
            shard = self.shardcb(message)
    
            loop = asyncio.get_event_loop()
    
            if shard not in self.chats:
                self.chats[shard] = {
                    'task': self.create_chat(loop, shard),
                    'messages': [],
                    'wait': asyncio.Event(),
                    'last_message': None,
                }
            self.chats[shard]['messages'].append(message)
            self.chats[shard]['wait'].set()
    
        def create_chat(self, loop, shard):
            async def _chat_wrapper():
                try:
                    await self.chatcb(self.get_message(shard))
                finally:
                    del self.chats[shard]
    
            return loop.create_task(_chat_wrapper())
    
        def get_message(self, shard):
            async def _get_message(inactive_timeout=self.inactive_timeout):
                while True:
                    if self.chats[shard]['messages']:
                        last_message = self.chats[shard]['messages'].pop(0)
                        self.chats[shard]['last_message'] = last_message
                        return last_message
    
                    try:
                        await asyncio.wait_for(self.chats[shard]['wait'].wait(),
                                               timeout=inactive_timeout)
                    except asyncio.TimeoutError:
                        self.chats[shard]['wait'].set()
                        raise self.Timeout(self.chats[shard]['last_message'])
    
                    if not self.chats[shard]['messages']:
                        self.chats[shard]['wait'].clear()
            return _get_message
    

    Небольшие пояснения:


    Инстанцируется класс ChatDispatcher с передачей ему следующих параметров:


    1. функции шардинга входящих сообщений (почему названо шардингом — позднее, когда коснёмся больших нагрузок). Функция возвращает уникализированное число указывающее на диалог. В примере — просто возвращает идентификатор пользователя.
    2. функции которая будет выполнять работу обслуживания чата.
    3. Значение таймаута по неактивности пользователя.

    Описание работы:


    1. В ответ на первое сообщение пользователя создаётся асинхронная задача, которая будет обслуживать диалог. Эта задача будет работать до тех пор пока диалог не завершится.
    2. Чтобы получить сообщение пользователя мы запрашиваем его в явной форме. Пример чата echo:
      async def chat(get_message):
          message = await get_message()
          await message.answer(message.text)
    3. Отвечаем на сообщения так, как предлагает нам библиотека (message.answer).

    Давайте попробуем написать бота в этой парадигме


    Полный пример кода тут
    # Файл bot.py
    
    import asyncio
    import re
    from .chat_dispatcher import ChatDispatcher
    import logging
    from aiogram import Bot, Dispatcher, executor, types
    
    API_TOKEN ='Сюда впишите токен Вашего бота'
    
    logging.basicConfig(level=logging.INFO)
    bot = Bot(token=API_TOKEN)
    dp = Dispatcher(bot)
    
    async def chat(get_message):
        try:
            message = await get_message()
            await message.answer('Умею складывать числа, введите первое число')
    
            first = await get_message()
            if not re.match('^\d+$', str(first.text)):
                await first.answer('это не число, начните сначала: /start')
                return
    
            await first.answer('Введите второе число')
            second = await get_message()
    
            if not re.match('^\d+$', str(second.text)):
                await second.answer('это не число, начните сначала: /start')
                return
    
            result = int(first.text) + int(second.text)
            await second.answer('Будет %s (/start - сначала)' % result)
    
        except ChatDispatcher.Timeout as te:
            await te.last_message.answer('Что-то Вы долго молчите, пойду посплю')
            await te.last_message.answer('сначала - /start')
    
    chat_dispatcher = ChatDispatcher(chatcb=chat,
                                     inactive_timeout=20)
    
    @dp.message_handler()
    async def message_handle(message: types.Message):
        await chat_dispatcher.handle(message)
    
    if __name__ == '__main__':
        executor.start_polling(dp, skip_updates=True)
    

    Написанный пример бота — просто складывает пару чисел и выдаёт результат.


    Выглядит результат работы так:



    Ну а теперь рассмотрим поближе код. Инстанцирование не должно вызывать вопросы.


    Интеграция с нашим скетчем сделана так что в стандартном обработчике мы вызываем await chat_dispatcher.handle(message). А чат мы описали в функции chat, повторю сюда его код:


    async def chat(get_message):
        try:
            message = await get_message()
            await message.answer('Умею складывать числа, введите первое число')
    
            first = await get_message()
            if not re.match('^\d+$', str(first.text)):
                await first.answer('это не число, начните сначала: /start')
                return
    
            await first.answer('Введите второе число')
            second = await get_message()
    
            if not re.match('^\d+$', str(second.text)):
                await second.answer('это не число, начните сначала: /start')
                return
    
            result = int(first.text) + int(second.text)
            await second.answer('Будет %s (/start - сначала)' % result)
    
        except ChatDispatcher.Timeout as te:
            await te.last_message.answer('Что-то Вы долго молчите, пойду посплю')
            await te.last_message.answer('сначала - /start')
    

    Код обслуживания чата — просто запрашивает один за другим данные у пользователя. Ответы пользователя просто складируются на стеке (переменные first, second, message).


    Функция get_message может выбросить исключение, если пользователь ничего не вводит в течение установленного таймаута (и ей же можно передать таймаут по месту).


    Стейт диалога — прямо связан с номером строки внутри этой функции. Продвигаясь вниз по коду — мы продвигаемся по схеме диалога. Внести изменения в ветку диалога — не просто, а очень просто!
    Таким образом стейт-машины не нужны. В этой парадигме можно писать очень сложные диалоги и понимать их код будет значительно проще чем код с callback'ами.


    Недостатки


    Куда ж без них.


    1. На каждого активного пользователя приходится одна таск-корутина. В среднем один CPU нормально обслуживает около 1000 пользователей, потом начинаются задержки.
    2. Рестарт всего демона — приводит к прекращению всех диалогов (и перезапуску их).
    3. Код [из примера] не приспособлен к масштабированию нагрузки и интернационализации.

    Если со второй проблемой понятно что делать: перехватить сигнал останова и сообщить пользователям "у меня тут ЧП, пожар, вернусь немного позднее". То последняя проблема может вызывать сложности. Давайте рассмотрим её:


    Масштабирование нагрузки


    Очевидно, нагруженные боты надо пускать на многих бакендах сразу. Соответственно будет использоваться webHook режим работы.


    Если просто балансировать webHook между скажем двумя бакендами, то очевидно нужно как-то обеспечить чтобы один и тот же пользователь приходил к одной и той же корутине, которая ведёт с ним диалог.


    Мы это сделали следующим образом.


    1. На балансере парсим JSON входящего сообщения (message)
    2. Выбираем из него идентификатор пользователя
    3. По идентификатору вычисляем номер бакенда (== шарда). Например по алгоритму user_id % Nshards.
    4. Перенаправляем запрос шарду.

    Идентификатор пользователя — становится ключем шардирования между корутинами диалогов и основой для вычисления шард-номера бакенда в балансере.


    Код такого балансера простой — пишется на любом языке за 10 минут. Не буду его приводить.


    Заключение


    Если писать ботов в этой парадигме, то можно довольно просто переделывать диалоги с одного на другой. При этом что важно — новый программист легко разбирается в коде диалогов, которые кто-то сделал до него.


    Почему большинство пишет ботов в колбечной архитектуре — я не знаю.


    Раньше писали в такой парадигме. Обслуживание чатов в таком стиле было принято в эпоху IRC и ботов для него. Так что я не претендую на какую-то новизну.


    И ещё. Если использовать эту парадигму на языке с оператором goto, то это будет как раз красивый пример применения goto (циклы в диалогах красиво делаются на goto). К сожалению это не о Python.

    AdBlock похитил этот баннер, но баннеры не зубы — отрастут

    Подробнее
    Реклама

    Комментарии 20

      +1

      Пытались как-то реализовать бота подобным способом и умудриться решить проблемы перезапуска и масштабирования. Решение в итоге нашли, но реализовывать не стали, т.к. в рамках наших задач особой нагрузки не было, да и взаимодействий много было и пришлось бы прилично так потратить времени на доработку.
      Суть была в том, что, по хорошему, все входящие и исходящие сигналы бота должны быть (де-)сериализуемы для хранения где-то (например, в БД или кэше). Так диалог мог бы быть поднят/восстановлен в любом из нодов, на которых работал бот. Беда, как раз, в том, что даже вызовы методов сервисов и отправку их результатов пришлось бы оборачивать в сообщения.

        0

        эм, зачем?


        смысл же в том, что в 99℅ случаев восстанавливать диалог из БД не нужно


        большинство задач о ботах — одноразовые (в том числе сложные) диалоги


        обычно никто не ведёт переговоры с ботом так, чтобы диалог длился сутки напролёт

          +1

          Согласен, потому и забили.

            +1

            но вы забили на решение позволяющее писать простой код
            в пользу более сложного


            если я правильно Вас понял


            если писать ботов "традиционно", то это стейтмашины и лапша из колбеков

              +1

              Нет, не совсем так. Забили на восстановление диалога из истории.
              Сами диалоги до сих пор на асинхронных корутинах работают:)

          0
          Беда, как раз, в том, что даже вызовы методов сервисов и отправку их результатов пришлось бы оборачивать в сообщения.

          почему между запросом first и second в примере нельзя просто вставить вызов метода сервиса или БД?

            +1

            Потому, что при восстановлении код будет исполнен второй раз и запрос/вызов также будет вызван второй раз.

              +1

              я не смог понять проблему :(

                0

                всё, я вроде понял что Вы делали


                Вы сохраняли диалог от его начала где-то. А затем при восстановлении стейта "проигрывали" его, так?


                тогда получается вместе с диалогами нужно хранить маркеры начала, конца и маркер обозначающий что бот сделал (или собирается) какой-то модифицирующий запрос.


                При восстановлении — не восстанавливаем если есть хоть один модифицирующий маркер и вроде всё.
                не решаем 100% проблем восстановления, но бОльшую часть решаем.


                ну и если учесть что большинство ботов делают записи в БД только в конце диалога...


                то в общем эту тему можно покопать.


                надо помедитировать над ней

                  0

                  кстати возможность "проигрывать" диалог — позволит удобно писать тесты на диалоги.


                  Это хорошая штука!

            0

            А почему вы решили использовать именно Event, а не Future?

              0

              многократное использование
              в принципе пойдет много всего
              и простые локи в том числе

                0

                В данном случае многократное использование лишь усложняет код. Например, у вас откуда-то появился цикл в функции _get_message

                  0

                  там действительно можно переписать без цикла. но это не связано с выбором примитива синхронизации.


                  Там просто пока обрабатывается одно сообщение пользователя — может быть пауза (например в БД куда-то ходим или вообще в интернет) и может прийти другое сообщение пользователя.


                  соответственно когда мы дождались события в буффере может уже быть на обработку более чем одно сообщение.
                  в этом случае нельзя в следующий вызов ждать, но по очистке буфера надо перезавести объект. Будь там Future или Event с моим алгоритмом был бы такой же цикл.


                  Теоретически цикл можно убрать и clear вызывать под условием что буффер пуст. будет без цикла

                    0
                    Там просто пока обрабатывается одно сообщение пользователя — может быть пауза и может прийти другое сообщение пользователя

                    Это вы уже закрыли проверкой if self.chats[shard]['messages']:


                    clear вызывать под условием что буффер пуст.

                    Эта проверка в вашем коде тоже уже есть...

                      0

                      на самом деле цикл там вообще необходим. просто это место до конца не дописано.


                      смотрите что там задумано.


                      inactive_timeout в общем случае стоит обычно в большом значении (минут 15-25)
                      соответственно wait на такое большое время плох


                      чем плох — тем что если захотим stop демону сделать — то файберы придётся кенселить. поэтому я думал там цикл делать который спит по 0.5 скажем секунд и до таймаута.


                      то есть чтобы демон имел корректный stop.
                      но потом подумал о том что это код для статьи — будет перегруженным и убрал.


                      если кто-то возьмёт этот код за прототим ему придётся решать эту проблему так (как я задумал) или иначе (кенселы сделать или всем set в Event заслать).


                      вот в общем для этого.


                      переписать можно — меньше строк сильно не получится.
                      работает — и хорошо :)


                      какая-то такая логика :)

                        0

                        Во-первых, нет ничего плохого в том, чтобы отменить ваш файбер.


                        Во-вторых, вы всегда можете сделать для Event вызов set, а для Future — set_exception или cancel.


                        Нет никакой необходимости делать активное ожидание с малым тайм-аутом только ради своевременной остановки сервера.

                          0
                          Во-первых, нет ничего плохого в том, чтобы отменить ваш файбер.

                          отмена — самое плохое что можно придумать.
                          если с ним например связан скажем коннект к БД он потеряет консистентность.


                          очень много багов из за отмен бывает.
                          например aiohttp отменяет файберы если клиент отконнектился
                          а обработчик, который использует asyncpg получается отменён
                          а пул asyncpg становится неконсистентным.


                          и это популярные модули.


                          в общем cancel — это самое большое зло которое можно придумать для файбера.


                          правильный паттерн — файбер работает в цикле while self.is_run:, а затем самостоятельно завершается. как-то так.


                          Нет никакой необходимости делать активное ожидание с малым тайм-аутом только ради своевременной остановки сервера.

                          я про этот вариант тоже написал — его можно разбудить сигналом от stop (таким же как сигнал от пользовательского сообщения).


                          в общем и правда можно переписать без цикла

                            0

                            Э-э-э, а в Питоне что-то мешает обработать отмену так же как любое другое исключение? Вроде на других языках такой проблемы нет...

                              0

                              если библиотеки которыми Вы пользуетесь не делают этого — то это исправить трудно.


                              вообще Python имеет самое плохое качество библиотек по моему. Увы.
                              но приходится на нём писать — поскольку на него все отмигрировали

            Только полноправные пользователи могут оставлять комментарии. Войдите, пожалуйста.

            Самое читаемое