Привет!

Меня зовут Александр, я руковожу backend-разработкой в КТS. Сегодня расскажу, как написать асинхронный краулер.

Такая задача часто встречается на практике, когда нужно реализовать периодическую синхронизацию/обкачку между сервисами. 

Статья написана по мотивам вебинара, который мы проводили в рамках курса «Асинхронное программирование на Python для джуниор-разработчиков». Если интересно, загляните посмотреть.

Что будет в статье:

Цель

У нас есть краулер, который обкачивает страницы. Это может быть поисковый бот Google, который ходит по сайтам, скачивает данные, кладет в базу и индексирует, или какой-нибудь агрегатор: аптек, маркетплейсов и т.д.

Задача в том, что краулер должен работать и не положить сервис, который он обкачивает. 

Код для начала работы:

import asyncio
from dataclasses import dataclass
from typing import Optional

class Pool:
    def __init__(self, max_rate: int, interval: int = 1, concurrent_level: Optional[int] = None):
        self.max_rate = max_rate
        self.interval = interval
        self.concurrent_level = concurrent_level
        
async def start(pool):
  	await asyncio.sleep(5)
    
def main():
    loop = asyncio.get_event_loop()
    
    try:
        loop.run_until_complete(start())
    except KeyboardInterrupt:
        loop.close()

if __name__ == '__main__':
    main()

Краулеру нужно посетить и скачать много страниц, следовательно, много раз обратиться к ресурсу. Мы можем позволить себе отправлять много запросов, но сервис, на который мы приходим, может не выдержать большой нагрузки. Поэтому к источнику данных нужно ходить управляемо — сделать rate-limit. 

Если в какой-то момент задача прервалась, или мы сами решили остановить краулер, нужно сделать корректную и аккуратную остановку работы. Для этого начатые задачи должны завершиться, а новые задачи из очереди должны прекратить поступать. 

Исходный код

У нас есть сущность Pool. Эта сущность умеет управлять количеством запросов в единицу времени. Pool принимает:

  • max_rate — максимальное количество запросов

  • interval — интервал. Если мы передаем значения max_rate = 5 и interval = 1, в секунду может исполняться 5 запросов

  • concurrent_level — обозначает допустимое количество параллельных запросов

max_rate и concurrent_level могут не совпадать, когда время выполнения запроса больше, чем interval. Например, мы делаем 5 запросов в секунду, как заявлено в переменных, но API все равно отвечает медленнее. Чтобы не положить сервис, мы вводим переменную concurrent_level.

Планировщик

Для начала нужно написать что-то, что позволит делать ровно 5 запросов в секунду, не обращая внимание на время запроса. Для этого мы запустим планировщик, который назовем scheduler. Он будет просыпаться раз в секунду и ставить количество задач, равное max_rate. Планировщик не ждет их исполнения, просто создает 5 задач каждую секунду. 

Дополним class Pool и напишем функцию scheduler:

from task import Task

class Pool:
    def __init__(self, max_rate: int, interval: int = 1, concurrent_level: Optional[int] = None):
        self.max_rate = max_rate
        self.interval = interval
        self.concurrent_level = concurrent_level
        self.is_running = False
        
     async def _scheduler(self):
        while self.is_running:
            for _ in range(self.max_rate):
                pass

Обратите внимание на две вещи:

  • функция бесконечная, пока работает наш краулер

  • раз в период функция выполняет max_rate раз какое-то действие

Задача для краулера

Scheduler должен откуда-то взять задачи, которые нужно запланировать. Для этого нам нужно сделать очередь, которую мы возьмем из библиотеки asyncio. Примитив называется asyncio.Queue(). В class Pool дописываем:

class Pool:
    def __init__(self, max_rate: int, interval: int = 1, concurrent_level: Optional[int] = None):
        self.max_rate = max_rate
        self.interval = interval
        self.concurrent_level = concurrent_level
        self.is_running = False
        self._queue = asyncio.Queue()

Теперь мы просыпаемся раз в интервал и получаем количество задач, равное max_rate. Но нужно что-то сделать, чтобы они исполнялись.

Для этого в asyncio есть функция create_task. Она запускает выполнение корутины, но при этом не дожидается ее исполнения, а создает фоновую задачу. В create_task передадим метод perform.

    async def _scheduler(self):
        while self.is_running:
            for _ in range(self.max_rate):
                task = await self._queue.get()
              	asyncio.create_task(task.perform))
            await asyncio.sleep(self.interval)

Пробный запуск

Давайте попробуем все это запустить. Сделаем функцию start и таким же образом запустим scheduler. Нам нужно не ждать его, а просто запустить в фоне корутину с помощью create_task:

    async def _scheduler(self):
        while self.is_running:
            for _ in range(self.max_rate):
                task = await self._queue.get()
                asyncio.create_task(self._worker(task))
            await asyncio.sleep(self.interval)
    
    def start(self):
        self.is_running = True
        asyncio.create_task(self._scheduler())

В будущем для корректного завершения работы краулера нужно завершить работу scheduler. Для этого нужно вызвать cancel у задачи, поэтому возвращаемое значение из create_task мы сохраняем в переменную scheduler_task:

class Pool:
    def __init__(self, max_rate: int, interval: int = 1, concurrent_level: Optional[int] = None):
        self.max_rate = max_rate
        self.interval = interval
        self.concurrent_level = concurrent_level
        self.is_running = False
        self._queue = asyncio.Queue()
        self._scheduler_task: Optional[asyncio.Task] = None

Выставим rate-limit на 3 и внутри start запустим наш Pool:

def start(self):
    self.is_running = True
    self._scheduler_task = asyncio.create_task(self._scheduler())
        
async def start(pool):
    pool = Pool(3)
    pool.start()
    await asyncio.sleep(5)

Запускаем и видим, что ничего не произошло:

Это потому, что внутри очереди ничего нет. Мы сделали старт и поспали 5 секунд, а на момент окончания задачи у нас осталась фоновая задача scheduler.  

Промежуточный итог

  1. У нас есть Pool с параметрами:
    ограничение количества запросов max_rate
    — интервал активизации планировщика interval
    — максимальное количество параллельных запросов concurrent_level

  2. Мы написали планировщик scheduler, который работает постоянно, просыпается раз в объявленный интервал, достает из очереди max_rate задач и запускает их исполнение.

  3. Задача task — просто дата-класс с функцией perform. Для описания поведения задачи нужно создать класс-наследник и в нем переопределить perform.

  4. Еще мы написали функцию start, в которой выставили признак работы is_running и в фоне запустили наш планировщик.

Функции put и join

Перед тем, как запустить Pool, попробуем положить туда задачку. Для этого напишем функцию put, которая принимает задачу и кладет ее в нашу внутреннюю очередь. 

Дополнительно добавим tid (task_id) и print в код задачи:

import asyncio
from dataclasses import dataclass

@dataclass
class Task:
    tid: int

    async def perform(self, pool):
        print('start perform', self.tid)
        await asyncio.sleep(3)
        print('complete perform', self.tid)

И добавим 10 задач перед стартом pool:

async def start(pool):
    pool = Pool(3)
    for tid in range(10):
        await pool.put(Task(tid))
    pool.start()
    await asyncio.sleep(5)

Добавим еще кое-что. У стандартной библиотеки queue есть метод join. Тогда краулер будет ждать не 5 секунд, как мы указали в начале, а до тех пор, пока очередь не опустеет:

async def start(pool):
    pool = Pool(3)
    for tid in range(10):
        await pool.put(Task(tid))
    pool.start()
    await pool.join()

Запустим и посмотрим, что произойдет:

Хотя все зависло, планировщик работал. 

Вы можете увидеть, что задача выполняется 3 секунды. И, несмотря на то, что предыдущие задачи еще не завершились, планировщик все равно создает новые. Это плохо, потому что если API отвечает медленнее, чем мы шлем к нему запросы, есть вероятность «положить» сервис. Эту проблему мы решим чуть позже.

Чтобы join отработал, нужно помечать задачи выполненными. Не будем усложнять код scheduler и сделаем отдельную функцию _worker. В нее перенесем perform и ниже добавим self._queue.task_done(). Это означает, что задачу мы выполнили:

async def _worker(self, task: Task):
    await task.perform(self)
    self._queue.task_done()

Обратите внимание, что _worker вызывается без await, потому что scheduler не должен ждать его завершения. Иначе он не успеет запланировать задачи. 

В scheduler вместо perform нужно передать _worker и task:

async def _scheduler(self):
    while self.is_running:
        for _ in range(self.max_rate):
            task = await self._queue.get()
            asyncio.create_task(self._worker(task))
        await asyncio.sleep(self.interval)

Снова попробуем запустить:

Программа завершилась, но осталось предупреждение о том, что scheduler остался работать в фоне. Функцию stop напишем чуть позже.

Semaphore

На этом этапе видим, что:

  • метод start запускает наш Pool и планировщик scheduler

  • планировщик раз в секунду ставит новые задачи и запускает _worker

  • _worker эти задачи выполняет

  • метод join ждет, пока очередь не станет пустой

Если время выполнения задач больше интервала активизации планировщика (interval), он накидывает дополнительные задачи сверху тех, которые еще не выполнились.

В таком случае количество параллельных запросов к сервису за interval будет больше rate_limit. Поэтому нужно ограничить количество параллельных запросов. Для этого нам потребуется переменная concurrent_level, которая по умолчанию равна rate_limit.

В asyncio есть примитив синхронизации Semaphore. С его помощью можно ограничить количество параллельных исполняемых worker. Если количество запланированных задач больше заданного значения, мы ждем их исполнения. В нашем примере задач 3.

Объявим Semaphore и передадим в него либо concurrent_level, либо max_rate. 

Когда worker начинает исполняться, нам нужно занять Semaphore. Для этого используем «асинхронный контекстный менеджер»: async with self._sem. Мы занимаем Semaphore, пока не закончатся операции ниже — await task.perform(self) и self._queue.task_done(). 

async def _worker(self, task: Task):
    async with self._sem:
        await task.perform(self)
        self._queue.task_done()

Добавим Semaphore внутрь scheduler, чтобы scheduler не запускал новые worker'ы, если количество параллельных worker'ов уже достигло максимума:

async def _scheduler(self):
    while self.is_running:
        for _ in range(self.max_rate):
            async with self._sem:
                task = await self._queue.get()
                asyncio.create_task(self._worker(task))
        await asyncio.sleep(self.interval)

Запускаем:

Мы добавили 3 задачи и ждем, пока они исполнятся. Таким образом мы соблюдаем максимальное параллельное количество запросов. 

Остановка фонового планировщика

У нас осталась проблема с корректным завершением планировщика. После завершения остановки краулера появляется предупреждение о незавершенной корутине.

Чтобы этого не было, напишем функцию stop:

async def stop(self):
    self.is_running = False
    self._scheduler_task.cancel()

Теперь после того, как внутри пула закончатся задачи, его нужно корректно остановить. Добавим метод stop в конце функции start:

async def start():
    pool = Pool(3)
    for tid in range(10):
      await pool.put(Task(tid))
    pool.start()
    await pool.join()
    await pool.stop()

Попробуем:

Теперь все работает корректно. 

Мы остановили планировщик, когда задачи в очереди закончились. Но если мы остановим краулер в процессе работы, начнут появляться предупреждения о том, что какая-то задача не завершилась:

А чем больше время выполнения perform, тем больше будет таких уведомлений. 

Поэтому нам нужно ожидать, когда все worker завершатся. Для этого введем дополнительную переменную, обозначающую количество параллельно работающих worker: concurrent_workers. Изначально она равна 0. При запуске воркера мы увеличиваем concurrent_workers на 1. При выходе, наоборот, уменьшаем на 1:

async def _worker(self, task: FetchTask):
    async with self._sem:
        self._cuncurrent_workers += 1
        await task.perform(self)
        self._queue.task_done()
    self._cuncurrent_workers -= 1

Теперь нужно как-то сказать функции stop, что все параллельные worker завершились. Это произойдет, когда is_running будет false и concurrent_workers станет равной 0.

Для этого есть примитив синхронизации Event. В нашем коде мы добавим его в Pool и назовем stop_event. Это переменная, на которой можно ждать await self._stop_event.wait() до тех пор, пока кто-то не вызовет self._stop_event.set():

class Pool:
    def __init__(self, max_rate: int, interval: int = 1, concurrent_level: Optional[int] = None):
        self.max_rate = max_rate
        self.interval = interval
        self.concurrent_level = concurrent_level
        self.is_running = False
        self._queue = asyncio.Queue()
        self._scheduler_task: Optional[asyncio.Task] = None
        self._sem = asyncio.Semaphore(concurrent_level or max_rate)
        self._cuncurrent_workers = 0
        self._stop_event = asyncio.Event()

Если равна, то все worker завершили свою работу, планировщик отменен и не создает новые задачи. В таком случае все компоненты Pool остановлены или завершили свою работу — программу можно завершать.

Но если concurrent_workers не равна 0, нам нужно внутри метода stop подождать событие stop_event:

async def stop(self):
    self.is_running = False
    self._scheduler_task.cancel()
    if self._cuncurrent_workers != 0:
        await self._stop_event.wait()

Когда Pool остановлен, последний работающий worker должен отправить уведомление:

    async def _worker(self, task: FetchTask):
        async with self._sem:
            self._cuncurrent_workers += 1
            await task.perform(self)
            self._queue.task_done()
        self._cuncurrent_workers -= 1
        if not self.is_running and self._cuncurrent_workers == 0:
            self._stop_event.set()

Обновим функцию main, чтобы все корректно работало:

def main():
    loop = asyncio.get_event_loop()
    pool = Pool(3)
    try:
        loop.run_until_complete(start(pool))
    except KeyboardInterrupt:
        loop.run_until_complete(pool.stop())
        loop.close()

Теперь все работает. После нажатия Ctrl + C выполняются оставшиеся задачи, и программа завершается:

Работа краулера на примере обкачки нашего блога на Хабре

Мы реализовали механику пула на нашей абстрактной задачке task.

Для следующего этапа я подготовил задачу FetchTask. 

Скрытый текст
MAX_DEPTH = 2
PARSED_URLS = set()


@dataclass
class FetchTask(Task):
    url: URL
    depth: int

    def parser(self, data: str) -> List['FetchTask']:
        if self.depth + 1 > MAX_DEPTH:
            return []
        soup = BeautifulSoup(data, 'lxml')
        res = []
        for link in soup.find_all('a', href=True):
            new_url = URL(link['href'])
            if new_url.host is None and new_url.path.startswith('/'):
                new_url = URL.build(
                    scheme=self.url.scheme,
                    host=self.url.host,
                    path=new_url.path,
                    query_string=new_url.query_string
                )
                if new_url in PARSED_URLS:
                    continue
                PARSED_URLS.add(new_url)
                res.append(FetchTask(
                    tid=self.tid,
                    url=new_url,
                    depth=self.depth + 1
                ))
        return res

    async def perform(self, pool):
        async with aiohttp.ClientSession() as session:
            async with session.get(self.url) as resp:
                print(self.url, resp.status)
                data = await resp.text()
                res: List[FetchTask] = await asyncio.get_running_loop().run_in_executor(
                    None, self.parser, data
                )
                for task in res:
                    await pool.put(task)

Внутри функции parcer есть переменная soup,  которая объявлена как soup = BeautifulSoup(data, ’lxml’). Дам небольшие пояснения.

BeautifulSoup — парсер для анализа HTML/XML. 

lxml — реализация HTML/XML парсера. Из-за GIL мы специально запускаем res внутри функции perform через executor:

async def perform(self, pool):
    async with aiohttp.ClientSession() as session:
      async with session.get(self.url) as resp:
          print(self.url, resp.status)
          data = await resp.text()
          res: List[FetchTask] = await asyncio.get_running_loop().run_in_executor(
            	None, self.parser, data
          )
          for task in res:
            	await pool.put(task)

GIL — блокировка, которая запрещает параллельные потоки в Python. Но если вы пишите расширение на С, есть возможность «отпустить» GIL.

Парсер lxml написан на С. У себя под капотом он умеет отпускать GIL и выполняться в отдельном потоке. Это относится и к некоторым другим расширениям: https://lxml.de/2.0/FAQ.html#id1

В fetch_task также переопределяем функцию perform, в которой нужно сходить в сеть. Для этого я взял aiohttp client.

В задаче FetchTask мы идем на указанный URL, оттуда получаем данные и запускаем executor для их обработки. Нужно взять все ссылки в документе, перейти на них и тоже обкачать:

    def parser(self, data: str) -> List['FetchTask']:
        if self.depth + 1 > MAX_DEPTH:
            return []
        soup = BeautifulSoup(data, 'lxml')
        res = []
        for link in soup.find_all('a', href=True):
            new_url = URL(link['href'])
            if new_url.host is None and new_url.path.startswith('/'):
                new_url = URL.build(
                    scheme=self.url.scheme,
                    host=self.url.host,
                    path=new_url.path,
                    query_string=new_url.query_string
                )
                if new_url in PARSED_URLS:
                    continue
                PARSED_URLS.add(new_url)
                res.append(FetchTask(
                    tid=self.tid,
                    url=new_url,
                    depth=self.depth + 1
                ))
        return res

В конце мы добавляем в результат новую задачу и увеличиваем на 1 глубину depth.

Например, когда мы поставили задачку habr.com, глубина была равна 1. Мы скачали этот документ, в котором есть и другие ссылки: блоги Mail.ru, Yandex или KTS. Когда мы стали обкачивать следующие страницы, глубина увеличилась до 2. Этот параметр нужен для ограничения количества обкачиваемых ресурсов, фактически — глубины. 

Обратите внимание, что у нас есть список посещенных страничек PARSED_URLS. Так мы не будем дважды посещать одни и те же страницы.

Теперь импортируем задачи в краулер из fetch_task и изменяем start:

async def start(pool):
    await pool.put(
        FetchTask(URL('https://habr.com/ru/company/kts/blog/'), 1)
    )
    pool.start()
    await pool.join()
    await pool.stop()

Выставляем 3 запроса в секунду и смотрим, как наш краулер потихоньку обкачивает Хабр:

Спасибо за внимание

На этом все! Спасибо всем, кто дочитал статью.

Если сталкивались с подобными задачами, пожалуйста, поделитесь своим опытом в комментариях.


Другие наши статьи по бэкенду и асинхронному программированию для начинающих:

Другие наши статьи по бэкенду и асинхронному программированию для продвинутого уровня:


Несколько слов о курсе по асинхронному программированию на Python ?

Этот курс — маст хев для тех, кто хочет прокачать харды и стать специалистом, который не боится сложных проектов.

  • Вы разберётесь, как работает асинхронное программи­рование и где его лучше применять.

  • Научитесь мыслить нелинейно и сможете продумывать более сложные архитектуры приложений.

  • Получите опыт работы с микросервисами и узнаете best practices написания асинхронных приложений на Python.

? Посмотреть программу курса и дату старта: https://vk.cc/cmUy1v