В этой статье хочу рассказать о том, как написать полезный сервис, для получения ИНН по персональным данным (паспортные данные). ИНН физического лица получаем с использованием сайта https://service.nalog.ru/. Похожая функциональность, скорее всего, уже где-то была реализована. Основная идея статьи - поделиться опытом работы с Python в части создания законченного проекта с использованием контейнера зависимостей, создания слушателей для RabbitMQ и работой с базой данных MongoDB. Работа с клиентами сервиса реализована через RabbitMQ в режиме непрерывного чтения очереди, отправкой результата в выходную очередь. Сервис будет жить в Kubernetes, что требует наличие liveness и readiness проб. Для этого используется веб-сервер.

Общие сведения
Сервис будем реализовывать на Python 3.10 с использованием библиотек aio-pika, fastapi, pydantic, motor и других библиотек, которые будут указаны в pyproject.toml проекта. В качестве базы данных используем MongoDB 4+. Обращение к сервису налоговой выполняется при помощи библиотеки aiohttp. Проект размещён в публичном доступе на GitHub.
Приложение функционирует как слушатель входной очереди и веб-сервер для отдачи liveness и readiness-проб. При получении сообщения из очереди, из заголовка reply-to вычитывается имя выходной очереди, в которую будет направлен ответ. Обработка запроса передаётся в сервис, который проверяет наличие похожего запроса в базе данных. В случае отсутствия данных по клиенту, выполняется запрос к внешнему сервису. Внешний сервис может обработать какое-то количество сообщений без запроса капчи. После превышения лимитов, которые доподлинно не известны (но изменяются при общей повышенной нагрузке), сообщение помещается в мёртвую очередь и через указанное в настройках время возвращается в обработку.
Подготовительные работы для базы данных не требуется. При первом подключении к MongoDB будут созданы необходимые коллекции и индексы.
Контракт общения с сервисом
Определим контракт входного сообщения в формате JSON:
{ "requestId": str, "firstName": str, "lastName": str, "middleName": str, "birthDate": date, "documentSerial": str, "documentNumber": str, "documentDate": date }
Все поля интуитивно понятны. Атрибут requestId должен быть уникален в пределах всех сообщений, имеет смысл передавать его как строковое представление GUID.
Имя выходной очереди может передаваться через поле reply-to заголовка сообщения.
Контракт выходного сообщения будет следующим:
{ "requestId": str, "inn": str, "cached": bool, "details": str, "elapsedTime": float }
В ответе будем отдавать код запроса, собственно ИНН, время, за которое отработал сервис и признак кэшированного ответа.
Структура проекта
Общая структура директорий проекта следующая.
src |--inn_service |--clients |--connection_managers |--core |--infrastructure |--controllers |--handlers |--http_server |--queue_manager |--models |--repositories |--serializers |--services main.py .env.example .gitignore docker-compose.yaml pyproject.yaml
В корневой директории будут размещаться инструменты запуска проекта: docker-compose, make-файл запуска линтинга и тестов. Собственно проект размещён в src/inn_service и содержит:
clients - клиенты для подключения к действительным поставщикам данных (nalog.ru и прочие);
connection_managers - инфраструктурные подключения к базе данных, очередям;
core - общий код приложения (собственно приложение, контейнер);
infrastructure - менеджер обработчиков очередей, сами обработчики, инфраструктурные контроллеры;
models - модели приложения, DTO-объекты;
repositories - репозиторий для работы с базой данных;
serializers - сериализаторы входных запросов, данных для отправки в провайдер ИНН;
services - сервисы приложения.
Работу по созданию виртуального подключения переложим на PyCharm и poetry. Краткая команда установки: poetry install.
Настройки приложения
Начнём разработку с создания настроек приложения, используя BaseSettings из пакета pydantic.
В файле settings.py будут находиться настройки.
Класс настроек settings.py
class Settings(BaseSettings): app_name: str = 'INN service' app_request_retry_times: int # Количество попыток обработки внешнего запроса app_request_retry_sec: int # Время задержки в секундах перед повторной обработкой запроса http_host: str http_port: int http_handler: str = 'asyncio' mongo_host: str mongo_port: str mongo_user: str mongo_pass: str mongo_name: str mongo_rs: Optional[str] = None mongo_auth: str mongo_timeout_server_select: int = 5000 rabbitmq_host: str rabbitmq_port: int rabbitmq_user: str rabbitmq_pass: str rabbitmq_vhost: str rabbitmq_exchange_type: str rabbitmq_prefetch_count: int rabbitmq_source_queue_name: str client_nalog_url: str # Адрес внешнего сервиса для получения ИНН client_nalog_timeout_sec: int # Таймаут ожидания ответа от сервиса client_nalog_retries: int # Количество попыток запросов к внешнему сервису client_nalog_wait_sec: int # Время ожидания между попытками client_nalog_retries @property def mongo_dsn(self) -> str: mongo_dsn = 'mongodb://{}:{}@{}:{}/{}'.format( self.mongo_user, self.mongo_pass, self.mongo_host, self.mongo_port, self.mongo_auth ) if self.mongo_rs: mongo_dsn += f'?replicaSet={self.mongo_rs}' return mongo_dsn @property def rabbitmq_dsn(self) -> str: return 'amqp://{}:{}@{}:{}/{}'.format( self.rabbitmq_user, self.rabbitmq_pass, self.rabbitmq_host, self.rabbitmq_port, self.rabbitmq_vhost )
Предлагаю не указывать значения по умолчанию для настроек. Если что-то пойдёт не так, то сразу увидим проблему. В этот момент можно подготовить сразу и файл .env.example, содержащий настройки по-умолчанию для сервиса.
Подключения к инфраструктуре
Создадим слой подключения к инфраструктуре rabbitmq, mongodb через компоненты aio-pika и motor:
poetry add motor aio-pika fast fastapi uvicorn injector

Слой подключения будет размещаться в connection_managers и предназначен для организации подключения к базе данных и менеджеру очередей. Добавим две миксины для создания механизма регистрации автозапуска и завершения приложения. Механизм автозапуска функций применяется при старте приложения для инициализации подключения к RabbitMQ и MongoDB, а также для создания индексов в коллекции базы данных. В случае возникновения ошибок при подключении, приложение не стартует и выдаётся ошибка в логи.
class StartupEventMixin(ABC): @abstractmethod def startup(self) -> Coroutine: raise NotImplementedError class ShutdownEventMixin(ABC): @abstractmethod def shutdown(self) -> Coroutine: raise NotImplementedError
На примере RabbitConnectionManager продемонстрируем реализацию.
class RabbitConnectionManager(StartupEventMixin, ShutdownEventMixin, EventLiveProbeMixin): def startup(self) -> Coroutine: return self.create_connection() def shutdown(self) -> Coroutine: return self.close_connection() async def create_connection(self) -> None: self.logger.info('Create connection RabbitMQ') try: self._connection = await connect_robust(self._dsn) self._connection.reconnect_callbacks.add(self.on_connection_restore) self._connection.close_callbacks.add(self.on_close_connection) self.connected = True except ConnectionError as exc: err_message = f'Rabbit connection problem: {exc}' self.logger.error(err_message) raise ConnectionError(err_message) async def close_connection(self) -> None: if self._connection: await self._connection.close() # ... некоторый код пропущен, полная версия на гитхабе def on_close_connection(self, *args): self.logger.error('Lost connection to RabbitMQ...') self.connected = False def on_connection_restore(self, *args): self.logger.info('Connection to RabbitMQ has been restored...') self._channel = None self._exchange = None self.connected = True
При подключении к RabbitMQ устанавливаются функции коллбэков для реагирования на потерю соединения и его восстановление.
Менеджер обработчиков
Менеджер обработчиков предназначен для управления слушателями (consumers) очередей. В проекте используется концепция "мёртвых очередей", которая позволяет отложить сообщение на некоторое время и вернуться к его обработке позже. Причиной для этого может являться долгий ответ от провайдера, временные ошибки провайдера, требование ввода капчи из-за нагрузки. Достаточно подробно механизм мёртвых очередей технически разобран в статье Отложенные ретраи силами RabbitMQ. Каждый обработчик очереди должен хранить и возвращать признак использования ретраев, время между возвратами в основную очередь на обработку, а также имя очереди, которую планирует слушать. Основной код обработчика находится в run_handler. От функции ожидается True при успешной обработке, либо непоправимой ошибке запроса (некорректное тело сообщения) и False, если запрос не удалось обработать, но следует повторить позднее.
Код базового обработчика:
class BaseHandler(ABC): def __init__( self, settings: Settings, logger: AppLogger, rabbitmq_connection: RabbitConnectionManager ) -> None: self.settings = settings self.logger = logger self.rabbitmq_connection = rabbitmq_connection @abstractmethod def get_use_retry(self) -> bool: raise NotImplementedError def get_retry_ttl(self) -> int: return 0 @abstractmethod def get_source_queue(self) -> str: raise NotImplementedError def convert_seconds_to_mseconds(self, value: int) -> int: return value * 1000 @abstractmethod async def run_handler( self, message: dict, request_id: Optional[str], result_queue: Optional[str], count_retry: Optional[int] = 0 ) -> bool: raise NotImplementedError
Собственно единственный наследник класса RequestHandler, реализующий приём и обработку сообщения:
Класс RequestHandler
class RequestHandler(BaseHandler): def __init__( self, settings: Settings, logger: AppLogger, rabbitmq_connection: RabbitConnectionManager, service: InnService ) -> None: super().__init__(settings, logger, rabbitmq_connection) self.source_queue_name = self.settings.rabbitmq_source_queue_name self.retry_times = self.settings.app_request_retry_times self.retry_sec = self.settings.app_request_retry_sec self.service = service def get_source_queue(self) -> str: return self.source_queue_name def get_use_retry(self) -> bool: return True def get_retry_ttl(self) -> int: return self.retry_sec async def run_handler( self, message: dict, request_id: Optional[str], result_queue: Optional[str], count_retry: Optional[int] = 0 ) -> bool: if count_retry > self.retry_times: self.logger.warning(f'Request {request_id} was rejected by excess attempts {self.retry_times} times') return True self.logger.info(f'Get request {request_id} for response {result_queue}') client_data = RequestSerializer.parse_obj(message) response = await self.service.get_client_inn(client_data) if result_queue: json_message = response.dict() await self.rabbitmq_connection.send_data_by_queue(json_message, result_queue) return True
При получении сообщения проверяем количество повторного попадания в очередь через параметр count_retry. В случае превышения - отправляем статус обработки сообщения (ошибку) в выходную очередь и приостанавливаем обработку данного сообщения. RequestSerializer.parse_obj(message) не обёрнут в try...except блок потому как менеджер очередей контролирует ошибки преобразования сообщений ValidationError.
Работа с базой данных
Выбор на MongoDB пал из-за простоты использования, отсутствия миграций, гибкой схемы обработки данных. В задаче нет необходимости в хранении зависимых данных, оформлении связей между таблицами. Для работы с данными будем использовать паттерн Репозиторий.
В базовом репозитории расположены функции работы с данными, индексами в нотации Mongo, а в конкретных классах реализуем необходимые сервису функции. Создание индексов выполняется при старте приложения в фоновом режиме (флаг background), для чего используется имплементация миксины StartupEventMixin. Запросы набора данных поддерживают пагинацию и сортировку.
Конкретный класс создаётся на каждую отдельную коллекцию. В проекте один репозиторий для клиентских запросов. Модель для хранения данных находится в директории models и называется ClientDataModel. Клиентская модель создана с типизацией, поддерживаемой MongoDB (datetime вместо date), для атрибута created_at указана функция генерации значения по умолчанию через default_factory. Также в модель добавлена функция подсчёта времени обработки запроса elapsed_time и метод класса для создания объекта из клиентского запроса.
class ClientDataModel(BaseModel): created_at: datetime = Field(default_factory=datetime.utcnow) request_id: str first_name: str last_name: str middle_name: str birth_date: datetime birth_place: str = Field(default='') passport_num: str document_date: datetime executed_at: Optional[datetime] inn: Optional[str] error: Optional[str] @classmethod def create_from_request(cls, request: RequestMqSerializer) -> 'ClientDataModel': return ClientDataModel( request_id=request.request_id, first_name=request.first_name, last_name=request.last_name, middle_name=request.middle_name, birth_date=datetime.combine(request.birth_date, datetime.min.time()), passport_num='{} {}'.format(request.document_serial, request.document_number), document_date=datetime.combine(request.document_date, datetime.min.time()), ) @property def elapsed_time(self) -> float: end = self.executed_at or datetime.utcnow() return (end - self.created_at).total_seconds()
Код базового репозитория
class BaseRepository(StartupEventMixin): def __init__(self, mongodb_connection_manager: MongoConnectionManager, setting: Settings) -> None: self.mongodb_connection_manager = mongodb_connection_manager self.db_name = setting.mongo_name @property def collection_name(self) -> str: raise NotImplementedError @property def collection_indexes(self) -> Iterable[IndexDef]: raise NotImplementedError def startup(self) -> Coroutine: return self.create_indexes() async def create_index(self, field_name: str, sort_id: int) -> None: connection = await self.mongodb_connection_manager.get_connection() collection = connection[self.db_name][self.collection_name] await collection.create_index([(field_name, sort_id), ], background=True) async def create_indexes(self) -> None: tasks = [] for index_item in self.collection_indexes: tasks.append(self.create_index(index_item.name, index_item.sort)) asyncio.ensure_future(asyncio.gather(*tasks)) async def get_one_document(self, criteria: dict) -> Optional[dict]: connection = await self.mongodb_connection_manager.get_connection() collection = connection[self.db_name][self.collection_name] return await collection.find_one(criteria) async def get_list_document( self, criteria: dict, sort_criteria: Optional[list] = None, limit: Optional[int] = 0, skip: Optional[int] = 0, ) -> List[dict]: if not sort_criteria: sort_criteria = [] connection = await self.mongodb_connection_manager.get_connection() cursor = connection[self.db_name][self.collection_name].find( criteria, limit=limit, skip=skip, sort=sort_criteria ) result = list() async for data in cursor: result.append(data) return result async def save_document(self, data: dict) -> str: connection = await self.mongodb_connection_manager.get_connection() result = await connection[self.db_name][self.collection_name].insert_one(data) return result.inserted_id async def update_document(self, criteria: dict, data: dict) -> None: connection = await self.mongodb_connection_manager.get_connection() await connection[self.db_name][self.collection_name].update_one(criteria, {'$set': data})
Сервисный слой
Сервисный слой выполняет всю необходимую обработку с данными.
обращение в базу данных для поиска аналогичного запроса (request_id и паспортные данные);
отдаёт результат, если данные были найдены;
выполняет запрос к API;
сохраняет результат запроса в базу данных;
возвращает ответ.
В сервисном слое попытался абстрагироваться от работы с инфраструктурой. Возврат ответа производится в вызывающую функцию, которая должна знать куда вернуть ответ. В данном случае, менеджер очередей "знает" куда ему ответить благодаря наличию поля reply-to в заголовке запроса. Возвращаемое значение оформлено в виде DTO-объекта (RequestDTO).
Код класса InnService
class InnService: def __init__( self, settings: Settings, logger: AppLogger, client: NalogApiClient, storage: RequestRepository ) -> None: self.settings = settings self.logger = logger self.client = client self.storage_repository = storage async def get_client_inn_from_storage(self, client_data: RequestSerializer) -> Optional[RequestModel]: client_passport = f'{client_data.document_serial} {client_data.document_number}' client_request = await self.storage_repository.find_request(client_passport, client_data.request_id) return client_request def update_status(self, model: RequestModel, inn: str, error: str) -> None: model.inn = inn model.error = error async def get_client_inn(self, client_data: RequestSerializer) -> RequestDTO: """Получение клиентского ИНН""" start_process = datetime.utcnow() model = RequestModel.create_from_request(client_data) # Получить данные из БД existing_data = await self.get_client_inn_from_storage(client_data) if existing_data: elapsed_time = (datetime.utcnow() - start_process).total_seconds() return RequestDTO( request_id=client_data.request_id, inn=existing_data.inn, elapsed_time=elapsed_time, cashed=True ) # Сделать фактический запрос в Nalog API request = NalogApiRequestSerializer.create_from_request(client_data) error, result = None, '' try: result = await self.client.send_request_for_inn(request) except NalogApiClientException as exception: self.logger.error('Error request to Nalog api service', details=str(exception)) error = str(exception) self.update_status(model, result, error) await self.storage_repository.save_request(model) return RequestDTO( request_id=model.request_id, inn=model.inn, details=model.error, elapsed_time=model.elapsed_time )
Второй сервис в приложении - это сервис опроса инфраструктуры для health-check. Инфраструктурные менеджеры, которые необходимо мониторить, должны наследоваться от миксины EventLiveProbeMixin и реализовать функцию is_connected.
Клиент
Клиент NalogApiClient предназначен для выполнения POST запроса к https://service.nalog.ru/inn.do и разбора статуса ответа. Функция непосредственного оформления запроса обёрнута в retry декоратор повторителя запроса при возникновении ошибок. Настройки повторителя в общих настройках приложения.
class NalogApiClient: CLIENT_EXCEPTIONS = ( NalogApiClientException, aiohttp.ClientProxyConnectionError, aiohttp.ServerTimeoutError, ) def __init__(self, settings: Settings, logger: AppLogger): self.nalog_api_service_url = settings.client_nalog_url self.request_timeout = settings.client_nalog_timeout_sec self.retries_times = settings.client_nalog_retries self.retries_wait = settings.client_nalog_wait_sec self.logger = logger self.timeout = aiohttp.ClientTimeout(total=self.request_timeout) @property def _headers(self): return { "Accept": "application/json, text/javascript, */*; q=0.01", "Accept-Language": "ru-RU,ru", "Connection": "keep-alive", "Origin": "https://service.nalog.ru", "Referer": self.nalog_api_service_url, "Sec-Fetch-Dest": "empty", "Sec-Fetch-Mode": "cors", "Sec-Fetch-Site": "same-origin", "Sec-GPC": "1", "X-Requested-With": "XMLHttpRequest", } async def send_request_for_inn(self, nalog_api_request: NalogApiRequestSerializer) -> Optional[str]: self.logger.debug(f'Request to nalog api service for {nalog_api_request.client_fullname}') form_data = nalog_api_request.dict(by_alias=True) @retry(self.CLIENT_EXCEPTIONS, logger=self.logger, attempts=self.retries_times, wait_sec=self.retries_wait) async def make_request(client_session: aiohttp.ClientSession): async with client_session.post(url=self.nalog_api_service_url, data=form_data) as response: if response.status not in [http.HTTPStatus.OK, http.HTTPStatus.NOT_FOUND]: response_text = await response.text() raise NalogApiClientException(response_text) data = await response.json() code = data.get('code') captcha_required = data.get('captchaRequired') if captcha_required: raise NalogApiClientException(f'Captcha required for request {nalog_api_request.client_fullname}') if code == 0: return 'no inn' elif code == 1: return data.get('inn') else: raise NalogApiClientException(f'Unable to parse response! Details: {response}') async with aiohttp.ClientSession(timeout=self.timeout, headers=self._headers) as session: return await make_request(session)
Контейнер
Контейнер предназначен для сборки необходимых зависимостей и передачи их в приложение. Наш контейнер собран в классе ApplicationContainer. Все зависимости пробрасываются в виде синглтонов @singleton и регистрируются как провайдеры зависимостей типов @provider предоставляемых библиотекой injector. При написании тестов необходимо подготовить другой контейнер с актуальными fake или stub-объектами.
Основной интерес по работе с контейнером сосредоточен в классе ContainerManager, который используется для проверки реализации миксин EventSubscriberMixin и EventLiveProbeMixin. Функция get_event_collection формирует списки функций обратного вызова для старта и выхода из приложения. Проход по спискам и вызов функций обратного вызова реализован в функциях: run_startup и run_shutdown.
class ContainerManager: def __init__(self, cls_container: Type[Container]) -> None: self._container = Injector(cls_container()) self._bindings = self._container.binder._bindings def get_container(self) -> Injector: return self._container def get_live_probe_handlers(self) -> List[Type[Callable]]: result = [] binding_collection = [binding for binding in self._bindings] for binding in binding_collection: if issubclass(binding, EventLiveProbeMixin): binding_obj = self._container.get(binding) result.append(binding_obj.is_connected) return result def get_startup_handlers(self): handlers = [] binding_collection = [binding for binding in self._bindings] for binding in binding_collection: if issubclass(binding, StartupEventMixin): binding_obj = self._container.get(binding) handlers.append(binding_obj.startup()) return handlers def get_shutdown_handlers(self): handlers = [] binding_collection = [binding for binding in self._bindings] for binding in binding_collection: if issubclass(binding, ShutdownEventMixin): binding_obj = self._container.get(binding) handlers.append(binding_obj.shutdown()) return handlers async def run_startup(self) -> None: exception = None for handler in self.get_startup_handlers(): if exception: handler.close() else: try: await handler except Exception as exc: exception = exc if exception is not None: raise exception async def run_shutdown(self) -> None: handlers = [] for handler in self.get_shutdown_handlers(): handlers.append(handler) await asyncio.gather(*handlers)
Собственно сам контейнер, в котором производится инициализация нужных экземпляров классов. При написании тестов будет создан аналогичный контейнер.
class ApplicationContainer(Container): @singleton @provider def provide_settings(self) -> Settings: return Settings() # ... немного кода пропущено @singleton @provider def provide_mongodb_connection(self, settings: Settings, logger: AppLogger) -> MongoConnectionManager: return MongoConnectionManager(settings, logger) @singleton @provider def provide_rabbitmq_connection(self, settings: Settings, logger: AppLogger) -> RabbitConnectionManager: return RabbitConnectionManager(settings, logger) @singleton @provider def provide_nalog_api_client(self, settings: Settings, logger: AppLogger) -> NalogApiClient: return NalogApiClient(settings, logger) @singleton @provider def provide_request_repository(self, settings: Settings, mongo_connection: MongoConnectionManager) -> RequestRepository: return RequestRepository(mongo_connection, settings)
Приложение
Основная задача приложения - собрать всё воедино и запустить общий поток выполнения. Код сборки приложения предельно простой, инициализацию классов выполняет менеджер контейнера. Сборка приложения выполняется следующими шагами:
получение контейнера, передача его в менеджер контейнеров;
инициализация event_loop;
добавление обработчиков для очередей;
запуск инициализаторов для инфраструктурного слоя (реализующих startup миксины);
запуск веб-сервера FastAPI для отдачи health-check;
включение глобального обработчика ошибок.
class Application: def __init__(self, cls_container: Type[Container]) -> None: self.loop = asyncio.get_event_loop() self.container_manager = ContainerManager(cls_container) self.container = self.container_manager.get_container() self.settings = self.container.get(Settings) self.logger = self.container.get(AppLogger) self.live_probe_service = self.container.get(LiveProbeService) self.queue_manager = self.container.get(QueueManager) self.app_name = self.settings.app_name self.http_server = None def init_application(self): self.http_server = ServerAPIManager(self.container) request_handler = self.container.get(RequestHandler) self.queue_manager.add_handler(request_handler) live_probe_handlers = self.container_manager.get_live_probe_handlers() for handler in live_probe_handlers: self.live_probe_service.add_component(handler) def run(self) -> None: self.logger.info(f'Starting application {self.app_name}') self.init_application() try: self.loop.run_until_complete(self.container_manager.run_startup()) tasks = asyncio.gather( self.http_server.serve(), self.queue_manager.run_handlers_async(), ) self.loop.run_until_complete(tasks) self.loop.run_forever() except BaseException as exception: exit(1) finally: self.loop.run_until_complete(self.container_manager.run_shutdown()) self.loop.close() self.logger.info('Application disabled')
Приложение стартует из main-скрипта с использованием небольшой библиотеки typer. Маленькая библиотека имеет возможность удобно обрабатывать параметры командной строки.
import typer from core.application import Application from app_container import ApplicationContainer def main(): try: application = Application(ApplicationContainer) application.run() except BaseException as exc: typer.echo(f'Error starting application. Details: {str(exc)}') if __name__ == "__main__": typer.run(main)
Как это всё запустить?
Проект содержит файл docker-compose для сборки и запуска. Так же необходимо необходимо скопировать файл .env.example в файл .env .
docker compose build docker compose up
После выполнения этих команд, будет запущен экземпляр mongodb на 27017 порту и rabbitmq на 5672 порту с админкой на 15672. В административную панель RabbitMQ можно зайти по адресу http://localhost:15672. В разделе очередей необходимо создать новую очередь, в которую будут направляться результаты работы сервиса и прибиндить её к exchange по умолчанию (direct).
Продолжение следует
В статье рассмотрена тема разработки приложения на Python с использованием очередей, контейнером зависимостей и поддержкой health-check. Предлагаю обсудить архитектуру в комментариях, а затем продолжить развивать сервис. Следующими итерациями планирую добавить гипотетического не бесплатного клиента, которого будем использовать после определённого количества запросов в бесплатный сервис. И в завершении написать тесты.
Материалы, которые могут быть полезны для понимания материала:
RabbitMQ Tutorials (англ) - https://www.rabbitmq.com/getstarted.html
Презентация Dependency injection от создателя Dependency Injector (рус). В проекте используется библиотека injector, но общий смысл romanmogylatov6331 доносит понятно - https://www.youtube.com/watch?v=VdtxdDeG7RA