Отправляем письма с помощью asyncio и aiohttp из Django приложения

    Всем привет!

    Я занимаюсь разработкой и поддержкой сервиса уведомлений в Ostrovok.ru. Сервис написан на Python3 и Django. Помимо транзакционных писем, пушей и сообщений, сервис также берёт на себя задачи по массовым рассылкам коммерческих предложений (не спам! trust me, отписки у нас работают лучше подписок) пользователям, давшим на это согласие. Со временем база активных получателей разрослась до более миллиона адресов, к чему почтовый сервис не был готов. Я хочу рассказать о том, как новые возможности Python позволили ускорить массовые рассылки и сэкономить ресурсы и с какими проблемами нам пришлось столкнуться при работе с ними.



    Исходная реализация


    Изначально массовые рассылки были реализованы самым простым способом: на каждого получателя в очередь помещалась задача, которую забирал один из 60 массовых воркеров (особенность наших очередей заключается в том, что каждый воркер работает в отдельном процессе), подготавливал для нее контекст, рендерил шаблон, отправлял HTTP запрос в Mailgun для отправки письма и создавал в базе запись о том, что письмо отправлено. Вся рассылка занимала до 12 часов, отправляя около 0.3 писем в секунду с каждого воркера и блокируя рассылки маленьких кампаний.



    Асинхронное решение


    Быстрое профилирование показало, что большую часть времени воркеры тратят на установку соединений с Mailgun'ом, поэтому мы стали группировать задачи в чанки, по чанку на каждый воркер. Воркеры стали использовать одно соединение с Mailgun'ом, что позволило сократить время рассылок до 9 часов, отправляя каждым воркером в среднем 0,5 писем в секунду. Последующее профилирование снова показало, что работа с сетью по-прежнему занимает большую часть времени, что и подтолкнуло нас к идее использовать asyncio.

    Перед тем как поместить всю обработку в asyncio цикл, нам нужно было продумать решение ряда проблем:

    1. Django ORM пока ещё не умеет работать с asyncio, однако во время выполнения запросов освобождает GIL. Это значит, что запросы к базе могут выполняться в отдельном потоке и не блокировать работу основного цикла.
    2. Актуальные версии aiohttp требуют Python версии 3.6 и выше, что в момент реализации потребовало обновить докер образ. Эксперименты на более старых версиях aiohttp и Python 3.5 показали, что скорость отправки на этих версиях гораздо ниже, чем на новых, и сопоставима с последовательной отправкой.
    3. Хранение большого количества asyncio корутин быстро ведёт к расходованию всей памяти. Это значит, что нельзя заготовить заранее все корутины для писем и вызвать цикл для их обработки, необходимо подготавливать данные по мере отправки уже сформированных писем.

    Учитывая все особенности, создадим внутри каждого из воркеров свой asyncio цикл с подобием ThreadPool паттерна, состоящего из:

    • Одного или более производителей (producer), работающих с базой данных через Django ORM в отдельном потоке через asyncio.ThreadPoolExecutor. Производитель старается агрегировать запросы получения данных в маленькие батчи, рендерит шаблоны для полученных данных через Jinja2 и складывает данные для отправок в очередь задач.

    def get_campaign_send_data(ids: Iterable[int]) -> Iterable[Mapping[str, Any]]:
        """Формируем данные для отправки писем, здесь происходит работа с Django ORM и рендером шаблонов."""
        return [{'id': id} for id in ids]
    
    
    async def mail_campaign_producer(ids: Iterable[int], task_queue: asyncio.Queue) -> None:
        """
        Группируем получателей в подчанки и формируем для них данные для отправки, которые помещаем в очередь.
        Формирование данных требует работы с базой, поэтому выполняем его в ThreadPoolExecutor.
        """
        loop = asyncio.get_event_loop()
        total = len(ids)
        for subchunk_start in range(0, total, PRODUCER_SUBCHUNK_SIZE):
            subchunk_ids = ids[subchunk_start : min(subchunk_start + PRODUCER_SUBCHUNK_SIZE, total)]
            send_tasks = await loop.run_in_executor(None, get_campaign_send_data, subchunk_ids)
            for task in send_tasks:
                await task_queue.put(task)
    

    • Нескольких сотен отправщиков писем – asyncio корутины, которые в бесконечном цикле читают данные из очереди задач, отправляют сетевые запросы для каждой из них и складывают результат (ответ, или исключение) в очередь отчётов.

    async def send_mail(data: Mapping[str, Any], session: aiohttp.ClientSession) -> Union[Mapping[str, Any], Exception]:
        """Отправляем запрос во внешний сервис."""
        async with session.post(REQUEST_URL, data=data) as response:
            if response.status_code != 200:
                raise Exception
        return data
    
    
    async def mail_campaign_sender(
        task_queue: asyncio.Queue, result_queue: asyncio.Queue, session: aiohttp.ClientSession
    ) -> None:
        """
        Забираем из очереди данные и отправляем сетевые запросы.
        Нужно не забывать вызывать task_done, чтобы вызывающий код понял, когда завершится отправка.
        """
        while True:
            try:
                task_data = await task_queue.get()
                result = await send_mail(task_data, session)
                await result_queue.put(result)
            except asyncio.CancelledError:
                # Корректно обрабатываем остановку корутины
                raise
            except Exception as exception:
                # Обрабатываем ошибки отправки писем
                await result_queue.put(exception)
            finally:
                task_queue.task_done()
    

    • Одного или нескольких воркеров, группирующих данные из очереди отчётов и помещающих в базу данных bulk запросом информацию о результате отправки письма.

    def process_campaign_results(results: Iterable[Union[Mapping[str, Any], Exception]]) -> None:
        """Обрабатываем результаты отправок: исключения и успех и помещаем их в базу данных"""
        pass
    
    
    async def mail_campaign_reporter(task_queue: asyncio.Queue, result_queue: asyncio.Queue) -> None:
        """
        Группируем отчёты в список и передаём на обработку в ThreadPoolExecutor,
        чтобы положить в базу данных информацию об отправках.
        """
        loop = asyncio.get_event_loop()
        results_chunk = []
        while True:
            try:
                results_chunk.append(await result_queue.get())
                if len(results_chunk) >= REPORTER_BATCH_SIZE:
                    await loop.run_in_executor(None, process_campaign_results, results_chunk)
                    results_chunk.clear()
            except asyncio.CancelledError:
                await loop.run_in_executor(None, process_campaign_results, results_chunk)
                results_chunk.clear()
                raise
            finally:
                result_queue.task_done()
    

    • Очереди задач, являющейся экземпляром asyncio.Queue, ограниченной по максимальному количеству элементов, чтобы производитель не переполнял её, расходуя всю память.
    • Очереди отчётов, также являющуюся экземпляром asyncio.Queue с ограничением на максимальное количество элементов.
    • Асинхронного метода, который создаёт очереди, воркеры, и завершает рассылку по их остановке.

    async def send_mail_campaign(
        recipient_ids: Iterable[int], session: aiohttp.ClientSession, loop: asyncio.AbstractEventLoop = None
    ) -> None:
        """
        Создаёт очереди и запускает воркеры для обработки.
        Дожидается завершения формирования получателей, после ждёт окончания отправки и сохранения отчётов.
        """
        executor = ThreadPoolExecutor(max_workers=PRODUCERS_COUNT + 1)
        loop = loop or asyncio.get_event_loop()
        loop.set_default_executor(executor)
    
        task_queue = asyncio.Queue(maxsize=2 * SENDERS_COUNT, loop=loop)
        result_queue = asyncio.Queue(maxsize=2 * SENDERS_COUNT, loop=loop)
    
        producers = [
            asyncio.ensure_future(mail_campaign_producer(recipient_ids, task_queue)) for _ in range(PRODUCERS_COUNT)
        ]
        consumers = [
            asyncio.ensure_future(mail_campaign_sender(task_queue, result_queue, session)) for _ in range(SENDERS_COUNT)
        ]
        reporter = asyncio.ensure_future(mail_campaign_reporter(task_queue, result_queue))
    
        # Дожидаемся, когда все письма будут подготовлены
        done, _ = await asyncio.wait(producers)
    
        # Когда завершатся все отправки, останавливаем воркеров
        await task_queue.join()
        while consumers:
            consumers.pop().cancel()
    
        # Когда завершится сохранение отчётов, также останавливаем соответствующий воркер
        await result_queue.join()
        reporter.cancel()
    

    • Синхронного кода, который создаёт цикл и начинает рассылку.

    async def close_session(future: asyncio.Future, session: aiohttp.ClientSession) -> None:
        """
        Закрываем сессию, когда вся обработка завершена.
        Документация aiohttp рекомендует добавить задержку перед закрытием сессии.
        """
        await asyncio.wait([future])
        await asyncio.sleep(0.250)
        await session.close()
    
    
    def mail_campaign_send_chunk(recipient_ids: Iterable[int]) -> None:
        """
        Точка входа для начала рассылки.
        Принимает идентификаторы получателей, создаёт asyncio цикл и запускает корутину отправки.
        """
        loop = asyncio.new_event_loop()
        asyncio.set_event_loop(loop)
    
        # Session
        connector = aiohttp.TCPConnector(limit_per_host=0, limit=0)
        session = aiohttp.ClientSession(
            connector=connector, auth=aiohttp.BasicAuth('api', API_KEY), loop=loop, read_timeout=60
        )
    
        send_future = asyncio.ensure_future(send_mail_campaign(recipient_ids, session, loop=loop))
        cleanup_future = asyncio.ensure_future(close_session(send_future, session))
        loop.run_until_complete(asyncio.wait([send_future, cleanup_future]))
        loop.close()
    

    После реализации такого решения время отправки массовых рассылок сократилось до часа при таких же объёмах рассылок и 12 задействованных воркерах. То есть каждый воркер отправляет 20-25 писем в секунду, что в 50-80 раз производительнее исходного решения. Потребление памяти воркеров сохранилось на исходном уровне, загрузка процессора немного выросла, утилизация сети возросла многократно, что является ожидаемым эффектом. Также выросло количество соединений с базой данных, поскольку каждый из потоков воркеров-производителей и воркеров, сохраняющих отчёты, активно работают с базой. При этом освободившиеся воркеры могут рассылать небольшие рассылки в то время, как отправляется массовая кампания.



    Несмотря на все преимущества, такая реализация имеет ряд сложностей, которые необходимо учитывать:

    1. Необходимо быть осторожными при обработке ошибок. Необработанное исключение может завершить выполнение воркера, из-за чего кампания «подвиснет».
    2. При завершении отправки необходимо не потерять отчёты по получателям, не заполнившие чанк до конца, и сохранить их в базу данных.
    3. Усложняется логика принудительной остановки возобновления кампаний, поскольку после остановки рассылающих воркеров, необходимо сопоставлять, каким получателям были отправлены письма, а каким – нет.
    4. Через какое-то время сотрудники поддержки Mailgun связались с нами и попросили снизить скорость отправки, потому что почтовые сервисы начинают временно отклонять письма, если частота их отправок превышает пороговое значение. Это легко сделать, уменьшив количество воркеров.
    5. Нельзя было бы использовать asyncio, если какой-то из этапов отправки писем выполнял бы требовательные к ресурсам процессора опрерации. Рендер шаблонов с использованием jinja2 оказался не очень ресурсоёмкой операцией и практически не оказывает влияния на скорость отправки.
    6. Использование asyncio для рассылок требует, чтобы обработчики очереди рассылок запускались отдельными процессами.

    Надеюсь, наш опыт будет вам полезен! Если остались вопросы или появились идеи, пишите в комментариях!
    Ostrovok.ru
    Company

    Comments 4

      +1
      Интересный момент в коде:
      subchunk_ids = ids[subchunk_start : min(subchunk_start + PRODUCER_SUBCHUNK_SIZE, total)]

      Всю голову сломал зачем нужен min. Не в первый раз вижу такой прием, не могу понять что он дает.

        0
        В данной ситуации этот приём действительно ничего не даёт, поскольку срез списка с выходом за его длину корректно вернёт элементы до конца списка (или пустой список). Это может быть полезно при обработке списка чанками, чтобы на последней итерации не выйти за его границу, что в приведённом коде не будет ошибкой. Спасибо за замечание!
        +1
        Действительно интересный пример работы с asyncio объектами, но почему не захотели связываться с нативным способом массовой рассылки в Mailgun? Там для каждого email можно установить свой контекст. А трекать логи и ошибки через API.
          0

          Мы рассматривали такой вариант, но у мейлгана довольно простой механизм рендера шаблонов с подстановой значений, чего нам не достаточно. Мы используем практически все возможности jinja2, работая при этом с шаблонами в рантайме, включая наследование шаблонов, макросы, l10n и множественные формы слов, автоматическую постановку utm меток, а также ленивую генерацию контекстов для шаблонов. Поэтому мы решили сначала попробовать ускорить рассылки, не жертвуя гибкостью сервиса

        Only users with full accounts can post comments. Log in, please.