В предыдущей статье мы распарсили реплей одного матча по Dota 2 и нашли хайлайты с помощью кластеризации. В данной статье увеличим масштаб и напишем сервис для параллельного парсинга реплеев на Celery и Flask.
Под катом
Собираем ссылки на реплеи матчей The International 2021
Распараллеливаем парсинг матчей с помощью Celery
Пишем минимальное API на Flask
Запускаем парсинг
Делаем выводы
Все ссылки на код и использованные материалы вы найдете в конце статьи
Собираем ссылки на реплеи матчей The International 2021
Качать репелеи через клиент игры — замечательная опция, но довольно нудная для десятков и сотен матчей. Поэтому мы пойдем иным путем и достанем ссылки на реплеи через OpenDota API.
Для начала нам потребуется узнать Tournament ID для TI 2021. Например, для этого можно зайти на Dotabuff, найти в поиске страницу турнира и пристально посмотреть на URL.

Сделаем запрос к API и получим Match ID идентификаторы всех матчей с турнира.
import requests tournament_id = 13256 r = requests.get( f'https://api.opendota.com/api/leagues/{tournament_id}/matches') matches = r.json() len(matches) > 487
Теперь для каждого матча нужно сходить в Replay API. Здесь стоит учесть, что бесплатная версия API позволяет делать до 60 запросов в минуту.
import time limit = 14 replays = [] for i, m in enumerate(matches): if i == limit: break match_id = m['match_id'] r = requests.get( 'https://api.opendota.com/api/replays/', params=dict(match_id=match_id) ) r.raise_for_status() replays.append(r.json()) time.sleep(0.05) replays > [ [{'match_id': 6078908851, 'cluster': 236, 'replay_salt': 908561694}], [{'match_id': 6056286110, 'cluster': 187, 'replay_salt': 1627987562}], ...
Ссылки на реплеи получаются подставлением значений в шаблон
http://replay{cluster}.valve.net/570/{match_id}_{replay_salt}.dem.bz2.
Нюанс в том, что не все ссылки окажутся рабочими, т.к. в некоторых матчах игроки рестартуют лобби. Отфильтруем их, сделав HEAD запросы.
urls = [] for replay in replays: cluster = replay[0]['cluster'] match_id = replay[0]['match_id'] replay_salt = replay[0]['replay_salt'] url = f'http://replay{cluster}.valve.net/570/{match_id}_{replay_salt}.dem.bz2' r = requests.head(url) print(f'Status: {r.status_code}, URL: {url}') if r.status_code == 200: urls.append(url) len(urls) > 10
Из первых 14 ссылок 10 оказались рабочими. Вполне достаточно для теста, т.к. в среднем один .dem реплей весит 100 MB.
Распараллеливаем парсинг реплеев с помощью Celery
Здесь я не буду подробно останавливаться на Celery, на эту тему уже написано множество статей. Просто напомню, что он позволяет выполнять очереди из задач. В качестве брокера я использовал локальный Redis. Воркеры также работали локально, а всю работу по созданию отдельных процессов под задачи берет на себя Celery.
Структура проекта выглядит следующим образом.
$ tree -P '*.py' -I '__pycache__' src/ ├── async_parser │ ├── celery.py │ └── tasks.py ├── server.py ├── settings.py └── sync_parser.py 1 directory, 5 files
Ниже я приведу выжимку кода. Полную версию вы найдете по ссылке на репозиторий в конце статьи.
Подготовительные работы
Для начала поднимем Redis.
sudo docker run --name dota-redis -p 6379:6379 -d redis
А также запустим на локальном порту 5600 Clarity Parser. Подробнее о нем вы можете почитать в предыдущей части.
git clone https://github.com/arch1baald/clarity-parser.git parser sudo docker build -t odota/parser parser/ sudo docker run -d --name clarity-parser -p 5600:5600 odota/parser
Суть
В файле celery.py определяем объект Celery.
from celery import Celery app = Celery( 'async_parser', broker=REDIS_URL, backend=REDIS_URL, include=['async_parser.tasks'], accept=['json'] ) app.start()
А в файле tasks.py определим две основные задачи.
download_save(URL)— Скачать.demреплей и сохранить локальноparse(dem_path)— Прогнать реплей через Clarity Parser, получить лог событий в формате.jsonlinesи сохранить локально
А чтобы парсинг не начинался до того, как завершилась загрузка реплея воспользуемся celery.chain().
1. Загрузка реплея
Задачи для Celery помечаются специальным декоратором @app.task().
def download(url): logger.info(f'Downloading: {url}...') r = requests.get(url) r.raise_for_status() compressed_dem = r.content logger.info(f'Decompressing: {url}...') dem = decompress(compressed_dem) return dem @app.task() def download_save(url): right = url.split('/')[-1] match_salt = right.replace('dem.bz2', '') file_name = match_salt.split('_')[0] file_name += '.dem' path = os.path.join(REPLAY_DIR, file_name) if os.path.exists(path): logger.info(f'Dem file already exists: {path}...') return path dem = download(url) with open(path, 'wb') as fout: fout.write(dem) logger.info(f'Saved to {path}...') return path
Чтобы не раздувать Redis, задача возвращает путь к файлу реплея на диске, а не содержимое самого файла. В случае, если вы хотите обработать все матчи с турнира, можно использовать S3 вместо локального диска.
2. Парсинг
@app.task() def parse(dem_path, remove_dem=False): jsonlines_path = dem_path.replace('.dem', '.jsonlines') logger.info(f'Parsing {jsonlines_path}...') cmd = f'curl localhost:5600 --data-binary "@{dem_path}" > {jsonlines_path}' subprocess.run(cmd, shell=True) if os.path.getsize(jsonlines_path) == 0: os.remove(jsonlines_path) raise ClarityParserException( f'Result file is empty: {jsonlines_path}...\nDid you forget to run odota/parser?') if os.path.exists(dem_path) and remove_dem: logger.info(f'Removing temporary file {dem_path}...') os.remove(dem_path) return jsonlines_path
Финальная функция, объединяющая предыдущие задачи в последовательность. Обратите внимание, что в parse мы не передаем аргумент dem_path, потому что chain сам подставляет результат из download_save.s(url).
def download_parse_save(url): res = chain(download_save.s(url), parse.s())() return res
Запустим Celery.
celery -A async_parser worker -l INFO
Пишем минимальное API на Flask
В данный момент наша очередь задач на парсинг пуста. Чтобы это исправить, напишем простое API, которое принимает в качестве параметра ссылку на реплей и добавляет задачу в очередь.
В файле server.py напишем логику для ручки localhost:5000/parse.
from flask import Flask, request, jsonify from async_parser.tasks import download_parse_save app = Flask(__name__) @app.route('/parse') def parse(): dem_url = request.args.get('url') logger.info(f'{dem_url=}') if dem_url is None: return jsonify(dict( success=False, error='Demo URL not found' )), 400 async_result = download_parse_save(dem_url) logger.info(f'{async_result}') return jsonify(dict( success=True, url=dem_url, job_id=async_result.id ))
Запустим веб-сервер.
FLASK_APP=server flask run
Запускаем парсинг
Для этого сделаем запрос к серверу.
curl -X GET 'http://localhost:5000/parse?url=http://replay191.valve.net/570/6216665747_89886887.dem.bz2'
Запрос обрабатывает Flask API и добавляет цепочку из двух задач в очередь с помощью
download_parse_save(dem_url)Celery worker видит задачу
download_save(url), идет по ссылкеhttp://replay191.valve.net/570/6216665747_89886887.dem.bz2на CDN Valve, скачивает реплей и сохраняет на дискеCelery worker видит задачу
parse(dem_path)и делает запрос к Clarity ParserРезультат работы Clarity Parser сохраняется в формате
.jsonlinesлога на диске
Отлично! Теперь вспомним, что изначально нашей задачей было распарсить 10 матчей с The International 2021, ссылки на которые мы получили ранее. Для этого запустим простой скрипт.
with open(os.path.join(REPLAY_DIR, 'urls.txt'), 'r') as fin: for url in fin: url = url.strip() try: r = requests.get('http://localhost:5000/parse', params=dict(url=url)) r.raise_for_status() except Exception as e: logger.info(e) continue logger.info(r.json()) time.sleep(0.05)
Отойдем на пару минут, нальем чаю, а по возвращению обнаружим файлы с результатами.
ls replays 6066863360.jsonlines 6215020578.jsonlines 6216545156.jsonlines 6227492909.jsonlines 6079386505.jsonlines 6215346651.jsonlines 6216665747.jsonlines urls.txt 6214179880.jsonlines 6216526891.jsonlines 6227203516.jsonlines
Пример содержимого одного из файлов.

Делаем выводы
Вы дочитали статью — вы великолепны. Теперь вы можете парсить сотни матчей по Dota 2 за разумное время.
В следующей части мы вернемся к теме поиска хайлайтов, улучшим алгоритм и обкатаем его на нескольких матчах.
