Асинхронные микросервисы на Python
Микросервисы – это парадигма, где приложение разбивается на небольшие независимые компоненты, каждый из которых отвечает за конкретную функцию. Это как отделы в офисе, каждый офис – это отдельный сервис, который может быть разработан, масштабирован и развернут независимо.
Почему асинхронность так важна для наших микросервисов? Представьте себе множество людей, ожидающих в лифте – каждый из них хочет двигаться своим темпом, и никто не хочет ждать, когда лифт подойдет к нужному этажу. Так и в мире микросервисов – каждый сервис может заниматься своей задачей, не блокируя другие. Асинхронность позволяет нам этим заниматься: вместо того чтобы ждать ответа от одного сервиса, мы можем отправить запрос другому и эффективно использовать время, пока ждем ответа.
Асинхронность в микросервисах позволяет нам связывать компоненты приложения так, чтобы они работали в гармонии, минимизируя задержки и максимизируя эффективность.
Асинхронное программирование в Python
Ключевыми инструментами асинхронного программирования в Python являются ключевые слова async
и await
. Когда мы объявляем функцию с использованием ключевого слова async
, мы говорим Python, что это асинхронная функция и она может быть приостановлена во время выполнения.
Мы также используем оператор await
внутри асинхронной функции для "ожидания" завершения асинхронной операции. В этот момент управление переходит к другим асинхронным задачам, пока выполняется операция, на которой мы ждем.
Примеры асинхронного программирования на Python:
Асинхронные сетевые запросы с использованием библиотеки
aiohttp
:
import aiohttp
import asyncio
async def fetch_data(session, url):
async with session.get(url) as response:
return await response.text()
async def main():
urls = [
"https://api.example.com/data/1",
"https://api.example.com/data/2",
"https://api.example.com/data/3"
]
async with aiohttp.ClientSession() as session:
tasks = [fetch_data(session, url) for url in urls]
results = await asyncio.gather(*tasks)
for result in results:
print(result)
asyncio.run(main())
Разберем пример:
Мы определяем функцию
fetch_data
, которая принимает два аргумента:session
иurl
. Вместо создания новой сессии каждый раз для каждого запроса, мы передаем одну сессию в функциюfetch_data
, чтобы повторно использовать ее для всех запросов. Это уменьшает накладные расходы на создание и уничтожение сессии для каждого запроса и повышает производительность.В функции
main
мы создаем списокurls
, содержащий URL-адреса, к которым мы хотим выполнить асинхронные запросы.Затем мы создаем
ClientSession
с помощью ключевого словаasync with
. Это позволяет нам гарантировать, что сессия будет закрыта после завершения всех запросов, что важно для корректного управления ресурсами.Затем мы создаем список
tasks
, содержащий асинхронные вызовы функцииfetch_data
с использованиемsession
и каждого URL из спискаurls
.Затем мы используем
asyncio.gather(*tasks)
, чтобы выполнить все асинхронные задачи параллельно. Это позволяет нам выполнять запросы к разным URL-адресам параллельно, что улучшает общее время выполнения программы.Когда все задачи завершаются,
asyncio.gather
возвращает результаты в виде спискаresults
.Данный пример был предложен пользователем @baldr.
Асинхронная обработка больших объемов данных с помощью библиотеки
aiomultiprocess
:
import aiomultiprocess
async def process_data(data_chunk):
# Выполняем длительную обработку данных
# и возвращаем результат
return processed_data
async def main():
data = [...] # Большой объем данных
chunk_size = 1000
chunks = [data[i:i+chunk_size] for i in range(0, len(data), chunk_size)]
async with aiomultiprocess.Pool() as pool:
results = await pool.map(process_data, chunks)
# Объединяем результаты обработки данных
final_result = merge_results(results)
print(final_result)
asyncio.run(main())
В этом примере мы используем библиотеку aiomultiprocess
для асинхронной обработки большого объема данных. Мы разбиваем данные на части и обрабатываем каждую часть параллельно в нескольких процессах. Затем мы объединяем результаты обработки данных для получения конечного результата.
Фреймворки и библиотеки для асинхронных микросервисов
Конечно, давайте глубже погрузимся в асинхронные фреймворки и библиотеки на Python, и рассмотрим примеры их использования более подробно.
Асинхронные фреймворки: FastAPI и Sanic
FastAPI - это настоящая жемчужина среди асинхронных фреймворков. Он разработан с учетом современных стандартов и требований, предоставляя разработчикам удивительный опыт в создании веб-сервисов. Проектирование API в FastAPI – это настоящее удовольствие благодаря его декларативному подходу. Декораторы позволяют описать параметры запроса, возвращаемые значения и обработчики событий очень легко и интуитивно понятно.
Например, создание простого веб-сервиса для конвертации валюты с использованием FastAPI может выглядеть так:
from fastapi import FastAPI
app = FastAPI()
@app.get("/convert")
async def convert_currency(amount: float, from_currency: str, to_currency: str):
# Здесь можно реализовать логику конвертации валюты
converted_amount = ...
return {"converted_amount": converted_amount}
FastAPI также активно использует типовые подсказки Python (Type Hints) для автоматической валидации запросов и генерации интерактивной документации. Например, если вы передадите несоответствующий тип данных, FastAPI предупредит вас об этом, что спасает от многих потенциальных ошибок.
Sanic – это еще один асинхронный фреймворк, который ставит скорость и производительность в центр своей сути. Его асинхронная природа и асинхронные обработчики запросов позволяют обрабатывать большое количество одновременных подключений без значительных потерь в производительности. Sanic также предоставляет гибкость в работе с HTTP-запросами и поддерживает асинхронные операции, что делает его прекрасным выбором для создания быстрых и отзывчивых веб-приложений.
Использование библиотеки asyncio
Библиотека asyncio
– это must have для асинхронного программирования в Python. Ее инструменты позволяют создавать асинхронные функции и корутины, которые могут быть объединены в асинхронные задачи для параллельного выполнения.
Пример использования asyncio
для выполнения асинхронных операций с базой данных:
import asyncio
import asyncpg
async def fetch_data_from_db():
conn = await asyncpg.connect(user='user', password='password',
database='mydb', host='127.0.0.1')
data = await conn.fetch('SELECT * FROM mytable')
await conn.close()
return data
async def main():
result = await fetch_data_from_db()
print(result)
asyncio.run(main())
В этом примере мы создаем асинхронную функцию fetch_data_from_db()
, которая асинхронно подключается к базе данных, выполняет запрос и затем закрывает соединение. Затем, с помощью asyncio.run(main())
, мы запускаем нашу основную асинхронную задачу.
Пример разработки асинхронного микросервиса на Python
Разработаем простой, но мощный микросервис с использованием FastAPI и асинхронных функций. Будем шаг за шагом создавать сервис, который обрабатывает запросы, взаимодействует с базой данных и предоставляет данные.
Для начала, убедимся, что у нас установлен FastAPI. Если нет, давайте установим его с помощью pip
:
pip install fastapi
И так, представьте, что нам нужно создать микросервис для управления задачами в списке. Начнем с создания файла main.py
и импортирования необходимых модулей:
from fastapi import FastAPI
app = FastAPI()
Теперь создадим асинхронные обработчики для наших запросов. Для примера, давайте реализуем добавление и получение списка задач.
Добавление задачи:
tasks = []
@app.post("/tasks/")
async def create_task(task: str):
tasks.append(task)
return {"message": "Task created successfully"}
Получение списка задач:
@app.get("/tasks/")
async def get_tasks():
return {"tasks": tasks}
Теперь наш микросервис может принимать POST-запросы для создания новых задач и GET-запросы для получения списка задач. Но что, если мы хотим сохранять задачи в базе данных? Давайте внедрим асинхронное взаимодействие с базой данных с использованием aiomysql
.
Взаимодействие с базой данных
Сначала установим aiomysql
:
pip install aiomysql
Теперь добавим код для работы с базой данных:
import aiomysql
async def create_pool():
pool = await aiomysql.create_pool(
host="localhost", user="user",
password="password", db="tasks_db",
autocommit=True
)
return pool
async def save_task_to_db(task):
async with pool.acquire() as conn:
async with conn.cursor() as cursor:
await cursor.execute("INSERT INTO tasks (task_name) VALUES (%s)", (task,))
Изменим обработчик создания задачи:
pool = None
@app.on_event("startup")
async def startup_db():
global pool
pool = await create_pool()
@app.on_event("shutdown")
async def shutdown_db():
global pool
pool.close()
await pool.wait_closed()
@app.post("/tasks/")
async def create_task(task: str):
await save_task_to_db(task)
return {"message": "Task created successfully"}
Теперь наш микросервис сохраняет задачи в базу данных, обеспечивая надежное хранение данных между запросами.
Вот как выглядит полный пример разработки асинхронного микросервиса на Python с использованием FastAPI и асинхронных функций:
from fastapi import FastAPI
import aiomysql
app = FastAPI()
pool = None
async def create_pool():
pool = await aiomysql.create_pool(
host="localhost", user="user",
password="password", db="tasks_db",
autocommit=True
)
return pool
async def save_task_to_db(task):
async with pool.acquire() as conn:
async with conn.cursor() as cursor:
await cursor.execute("INSERT INTO tasks (task_name) VALUES (%s)", (task,))
@app.on_event("startup")
async def startup_db():
global pool
pool = await create_pool()
@app.on_event("shutdown")
async def shutdown_db():
global pool
pool.close()
await pool.wait_closed()
tasks = []
@app.post("/tasks/")
async def create_task(task: str):
tasks.append(task)
await save_task_to_db(task)
return {"message": "Task created successfully"}
@app.get("/tasks/")
async def get_tasks():
return {"tasks": tasks}
Таким образом, мы разработали асинхронный микросервис на Python с использованием FastAPI и асинхронных функций. Этот микросервис способен принимать запросы на создание и получение задач, а также сохранять задачи в базе данных для долгосрочного хранения. Это всего лишь начало возможностей асинхронной разработки на Python, и вы можете дальше расширять и улучшать этот микросервис, добавляя новые функции и улучшая производительность.
Управление асинхронными операциями
Обработка ошибок и исключений в асинхронных функциях
Когда мы работаем с асинхронными операциями, неизбежно встает вопрос об обработке ошибок. В асинхронных функциях ошибки могут возникнуть как в самой функции, так и в асинхронных вызовах, которые она выполняет. Пример обработки ошибок с использованием try
и except
:
async def divide(a, b):
try:
result = a / b
return result
except ZeroDivisionError:
return "Division by zero is not allowed"
Также мы можем использовать asyncio.gather()
для асинхронного выполнения нескольких операций и обработки исключений:
import asyncio
async def main():
try:
results = await asyncio.gather(
divide(10, 2),
divide(20, 0),
divide(30, 3)
)
except Exception as e:
print(f"An error occurred: {e}")
else:
print(results)
asyncio.run(main())
Мониторинг и логирование асинхронных процессов
Эффективный мониторинг и логирование асинхронных процессов - это ключевой аспект в создании надежных и отказоустойчивых приложений. Для логирования асинхронных событий мы можем использовать стандартную библиотеку logging
:
import asyncio
import logging
logging.basicConfig(level=logging.INFO)
async def my_task():
logging.info("Task started")
await asyncio.sleep(2)
logging.info("Task completed")
async def main():
await asyncio.gather(my_task(), my_task())
asyncio.run(main())
Тестирование асинхронного кода
Тестирование асинхронного кода имеет свои особенности, но с правильными подходами можно создать надежные и понятные тесты. Для тестирования асинхронных функций мы можем использовать библиотеку pytest
и ее асинхронные возможности:
import asyncio
import pytest
async def async_add(a, b):
await asyncio.sleep(1) # имитируем асинхронную операцию
return a + b
@pytest.mark.asyncio
async def test_async_add():
result = await async_add(2, 3)
assert result == 5
Обработка ошибок, мониторинг, логирование и тестирование – все это важные инструменты, позволяющие создавать надежные и эффективные асинхронные приложения на Python. Ваше понимание этих аспектов значительно обогатит вашу разработку и поможет создать более качественное программное обеспечение.
Заключение
В заключении, асинхронные микросервисы на Python предоставляют нам мощные инструменты для создания высокопроизводительных, масштабируемых и отзывчивых приложений.
Данная статья подготовлена в преддверии старта курса Microservice Architecture. На странице курса вы можете подробно ознакомиться с программой, а также зарегистрироваться на бесплатные вебинары.