Это продолжение цикла статей про asyncio
. Начало здесь.
6. Веб-сервер aiohttp и другие жители асинхронного мира
Продолжаем готовить asyncio
. Теперь мы уже знаем достаточно, чтобы написать модный асинхронный микросервис. Реализуем известный архитектурный паттерн "API-шлюз". Это довольно простая штука. По запросу на свой API-интерфейс приложение собирает данные из других API, обрабатывает и возвращает результат пользователю. При этом пользователь знает только одну точку входа, а все внутренние подробности (кто куда и зачем сходил) от него скрыты.
В предыдущей главе мы научились запрашивать погоду у сервиса api.openweathermap.org. Давайте его слегка украдем импортозаместим. Вернее сказать, русифицируем. Пусть пользователь нашего сервиса передает название города на русском языке в параметрах GET-запроса и получает ответ в виде json опять-таки на великом и могучем. А откуда мы взяли информацию о погоде, пользователю знать не положено. Может у нас собственные метеостанции в каждой деревне стоят, поди проверь.
Мы уже освоили http-клиента библиотеки aiohttp
, с помощью которого можно обращаться к внешним API. Оказывается, в этой же библиотеке есть и все необходимое для создания полноценного http-сервера. Для начала напишем просто зеркальный прокси:
Пример 6.1
import asyncio
import json
from aiohttp import ClientSession, web
async def get_weather(city):
async with ClientSession() as session:
url = f'http://api.openweathermap.org/data/2.5/weather'
params = {'q': city, 'APPID': '2a4ff86f9aaa70041ec8e82db64abf56'}
async with session.get(url=url, params=params) as response:
weather_json = await response.json()
try:
return weather_json["weather"][0]["main"]
except KeyError:
return 'Нет данных'
async def handle(request):
city = request.rel_url.query['city']
weather = await get_weather(city)
result = {'city': city, 'weather': weather}
return web.Response(text=json.dumps(result, ensure_ascii=False))
async def main():
app = web.Application()
app.add_routes([web.get('/weather', handle)])
runner = web.AppRunner(app)
await runner.setup()
site = web.TCPSite(runner, 'localhost', 8080)
await site.start()
while True:
await asyncio.sleep(3600)
if __name__ == '__main__':
asyncio.run(main())
В асинхронной функции get_weather
ничего нового нет, мы ее лишь слегка причесали, чтобы запрос погоды для несуществующего города не приводил к трагическим последствиям для всего нашего приложения. За обработку запроса отвечает функция handle
("ручка" на сленге бэкендеров). Из запроса извлекается параметр city и передается в get_weather
. Далее формируется результирующий ответ в виде json. Адрес нашего сервиса и тип запроса задается вapp.add_routes
.
Стоп! А где же задачи? Не переживайте. Когда мы имеем дело с асинхронными веб-фреймворками (а aiohttp
— это именно фреймворк, хоть и супер-минималистический), вся работа по созданию и запуску задач asyncio
происходит у фреймворка "под капотом". Нам, как разработчикам, теперь нет нужды беспокоится об этих низменных деталях. Приложение мирно спит в бесконечном неблокирующем цикле, пока не придет запрос GET на определенный URL. Как только это произойдет, отработает логика в ручке. И снова баю-бай до следующего запроса. Но если первый запрос еще не успел обработаться, как поступил следующий, фреймворк отработает его в отдельной задаче, не дожидаясь (по возможности) окончания обработки первого. В этом сама суть асинхронности.
Заходим браузером на адрес: localhost:8080/weather?city=Sochi
и получаем симпатичный json:
{"city": "Sochi", "weather": "Clouds"}
Кстати, если вы всерьез решили заняться бэкенд-разработкой, одним браузером вам никак не обойтись. Потребуется инструмент, позволяющий залезать вглубь HTTP. Стандарт де-факто здесь Postman, но в природе существуют и альтернативные решения.
Скелет нашего приложения готов, теперь начинаем наращивать на нем мышцы. Воспользуемся бесплатным API переводчика libretranslate.de:
Пример 6.2
import asyncio
import json
from aiohttp import ClientSession, web
async def get_weather(city):
async with ClientSession() as session:
url = f'http://api.openweathermap.org/data/2.5/weather'
params = {'q': city, 'APPID': '2a4ff86f9aaa70041ec8e82db64abf56'}
async with session.get(url=url, params=params) as response:
weather_json = await response.json()
try:
return weather_json["weather"][0]["main"]
except KeyError:
return 'Нет данных'
async def get_translation(text, source, target):
async with ClientSession() as session:
url = 'https://libretranslate.de/translate'
data = {'q': text, 'source': source, 'target': target, 'format': 'text'}
async with session.post(url, json=data) as response:
translate_json = await response.json()
try:
return translate_json['translatedText']
except KeyError:
return text
async def handle(request):
city_ru = request.rel_url.query['city']
city_en = await get_translation(city_ru, 'ru', 'en')
weather_en = await get_weather(city_en)
weather_ru = await get_translation(weather_en, 'en', 'ru')
result = {'city': city_ru, 'weather': weather_ru}
return web.Response(text=json.dumps(result, ensure_ascii=False))
async def main():
app = web.Application()
app.add_routes([web.get('/weather', handle)])
runner = web.AppRunner(app)
await runner.setup()
site = web.TCPSite(runner, 'localhost', 8080)
await site.start()
while True:
await asyncio.sleep(3600)
if __name__ == '__main__':
asyncio.run(main())
Теперь в ручке дважды вызывается асинхронная функция get_translation
(обратите внимание, на этот раз мы передаем параметры внешнему сервису в виде json через тело запроса POST) и, вуаля:
localhost:8080/weather?city=Сочи
{"city": "Сочи", "weather": "Облака"}
Над всей Испанией безоблачное небо, а в деревне Гадюкино опять идут дожди...
Но что это за микросервис без логгера? Однако использовать в насквозь асинхронном приложении привычную синхронную (а значит блокирующую) библиотеку logging
— это не наш путь. Воспользуемся правильной асинхронной библиотекой логгирования aiologger
:
Пример 6.3
import asyncio
import json
from aiohttp import ClientSession, web
from aiologger.loggers.json import JsonLogger
logger = JsonLogger.with_default_handlers(
level='DEBUG',
serializer_kwargs={'ensure_ascii': False},
)
async def get_weather(city):
async with ClientSession() as session:
url = f'http://api.openweathermap.org/data/2.5/weather'
params = {'q': city, 'APPID': '2a4ff86f9aaa70041ec8e82db64abf56'}
async with session.get(url=url, params=params) as response:
weather_json = await response.json()
try:
return weather_json["weather"][0]["main"]
except KeyError:
return 'Нет данных'
async def get_translation(text, source, target):
await logger.info(f'Поступил запрос на на перевод слова: {text}')
async with ClientSession() as session:
url = 'https://libretranslate.de/translate'
data = {'q': text, 'source': source, 'target': target, 'format': 'text'}
async with session.post(url, json=data) as response:
translate_json = await response.json()
try:
return translate_json['translatedText']
except KeyError:
logger.error(f'Невозможно получить перевод для слова: {text}')
return text
async def handle(request):
city_ru = request.rel_url.query['city']
await logger.info(f'Поступил запрос на город: {city_ru}')
city_en = await get_translation(city_ru, 'ru', 'en')
weather_en = await get_weather(city_en)
weather_ru = await get_translation(weather_en, 'en', 'ru')
result = {'city': city_ru, 'weather': weather_ru}
return web.Response(text=json.dumps(result, ensure_ascii=False))
async def main():
app = web.Application()
app.add_routes([web.get('/weather', handle)])
runner = web.AppRunner(app)
await runner.setup()
site = web.TCPSite(runner, 'localhost', 8080)
await site.start()
while True:
await asyncio.sleep(3600)
if __name__ == '__main__':
asyncio.run(main())
Обратите внимание, асинхронный логгер ни на миллисекунду не задержит работу нашего приложения в целом. Он будет использовать паузы, пока мы ожидаем запроса от пользователя или ответа от внешнего сервиса. Поэтому, если мы будем наблюдать за его работой под серьезной нагрузкой, может обнаружиться, что он заметно отстает от реального времени. Но в конечном итоге все что должно будет выведено в лог и ни один символ не потеряется.
Ну а теперь вишенка на торте — асинхронный доступ к базе данных. Предположим, в процессе работы нашего приложения нам надо что-то писать в БД, ну, например, сохранять поступившие запросы (ничего более умного мне как-то в голову не пришло). Самая простая БД в мире, как известно, — это SQLite. И для нее, к счастью, есть асинхронный драйвер aiosqlite
. Пробуем:
Пример 6.4
import json
import aiosqlite
import asyncio
from aiohttp import ClientSession, web
from aiologger.loggers.json import JsonLogger
from datetime import datetime
logger = JsonLogger.with_default_handlers(
level='DEBUG',
serializer_kwargs={'ensure_ascii': False},
)
async def create_table():
async with aiosqlite.connect('weather.db') as db:
await db.execute('CREATE TABLE IF NOT EXISTS requests '
'(date text, city text, weather text)')
await db.commit()
async def save_to_db(city, weather):
async with aiosqlite.connect('weather.db') as db:
await db.execute('INSERT INTO requests VALUES (?, ?, ?)',
(datetime.now(), city, weather))
await db.commit()
async def get_weather(city):
async with ClientSession() as session:
url = f'http://api.openweathermap.org/data/2.5/weather'
params = {'q': city, 'APPID': '2a4ff86f9aaa70041ec8e82db64abf56'}
async with session.get(url=url, params=params) as response:
weather_json = await response.json()
try:
return weather_json["weather"][0]["main"]
except KeyError:
return 'Нет данных'
async def get_translation(text, source, target):
await logger.info(f'Поступил запрос на на перевод слова: {text}')
async with ClientSession() as session:
url = 'https://libretranslate.de/translate'
data = {'q': text, 'source': source, 'target': target, 'format': 'text'}
async with session.post(url, json=data) as response:
translate_json = await response.json()
try:
return translate_json['translatedText']
except KeyError:
logger.error(f'Невозможно получить перевод для слова: {text}')
return text
async def handle(request):
city_ru = request.rel_url.query['city']
await logger.info(f'Поступил запрос на город: {city_ru}')
city_en = await get_translation(city_ru, 'ru', 'en')
weather_en = await get_weather(city_en)
weather_ru = await get_translation(weather_en, 'en', 'ru')
result = {'city': city_ru, 'weather': weather_ru}
await save_to_db(city_ru, weather_ru)
return web.Response(text=json.dumps(result, ensure_ascii=False))
async def main():
await create_table()
app = web.Application()
app.add_routes([web.get('/weather', handle)])
runner = web.AppRunner(app)
await runner.setup()
site = web.TCPSite(runner, 'localhost', 8080)
await site.start()
while True:
await asyncio.sleep(3600)
if __name__ == '__main__':
asyncio.run(main())
Для продвинутых
Документация aiohttp
не рекомендует создавать новую ClientSession
для каждого запроса. В высоконагруженных приложениях это может привести к снижению производительности. Правильным решением является создание единственной сессии для всего приложения (или, как вариант, нескольких сессий — отдельно для каждого внешнего сервиса). Реализовать это можно так:
Пример 6.5
import json
import aiosqlite
import asyncio
from aiohttp import ClientSession, web
from aiologger.loggers.json import JsonLogger
from datetime import datetime
logger = JsonLogger.with_default_handlers(
level='DEBUG',
serializer_kwargs={'ensure_ascii': False},
)
# здесь будут храниться объекты, общие для всего приложения
app_storage = {}
async def create_table():
async with aiosqlite.connect('weather.db') as db:
await db.execute('CREATE TABLE IF NOT EXISTS requests '
'(date text, city text, weather text)')
await db.commit()
async def save_to_db(city, weather):
async with aiosqlite.connect('weather.db') as db:
await db.execute('INSERT INTO requests VALUES (?, ?, ?)',
(datetime.now(), city, weather))
await db.commit()
async def get_weather(city):
url = f'http://api.openweathermap.org/data/2.5/weather'
params = {'q': city, 'APPID': '2a4ff86f9aaa70041ec8e82db64abf56'}
# используем единую сессию
async with app_storage['session'].get(url=url, params=params) as response:
weather_json = await response.json()
try:
return weather_json["weather"][0]["main"]
except KeyError:
return 'Нет данных'
async def get_translation(text, source, target):
await logger.info(f'Поступил запрос на на перевод слова: {text}')
url = 'https://libretranslate.de/translate'
data = {'q': text, 'source': source, 'target': target, 'format': 'text'}
# используем единую сессию
async with app_storage['session'].post(url, json=data) as response:
translate_json = await response.json()
try:
return translate_json['translatedText']
except KeyError:
logger.error(f'Невозможно получить перевод для слова: {text}')
return text
async def handle(request):
city_ru = request.rel_url.query['city']
await logger.info(f'Поступил запрос на город: {city_ru}')
city_en = await get_translation(city_ru, 'ru', 'en')
weather_en = await get_weather(city_en)
weather_ru = await get_translation(weather_en, 'en', 'ru')
result = {'city': city_ru, 'weather': weather_ru}
await save_to_db(city_ru, weather_ru)
return web.Response(text=json.dumps(result, ensure_ascii=False))
async def main():
# создаем единую сессию для всего приложения
app_storage['session'] = ClientSession()
# контекстный менеджер чтобы сессия корректно закрылась после завершения приложения
async with app_storage['session']:
await create_table()
app = web.Application()
app.add_routes([web.get('/weather', handle)])
runner = web.AppRunner(app)
await runner.setup()
site = web.TCPSite(runner, 'localhost', 8080)
await site.start()
while True:
await asyncio.sleep(3600)
if __name__ == '__main__':
asyncio.run(main())
Можете заглянуть внутрь созданного на лету файла weather.db
(только используйте не текстовый просмотрщик, а какую-нибудь утилиту для работы с БД, например, DBeaver). Для каждого запроса создается соответствующая запись в таблице requests
. И снова никаких блокировок, мы ведь живем в асинхронном мире.
В заключение этого раздела хочу вас поздравить. Теперь вы имеете в руках все необходимое для создания собственных асинхронных веб-приложений. Неважно какой фреймворк вы будете использовать: FastAPI
, Tornado
, Falcon
или какой-нибудь еще. Принцип останется тот же самый как в старом добром aiohttp
: создаем ручку и в ней нанизываем "шашлык" из вызовов асинхронных функций. Главное, за чем необходимо следить — это чтобы в эти функции не затесался какой-нибудь блокирующий зловред из скучной пыльной синхронной вселенной.
На этом временно прощаемся.
Продолжение здесь.