"Дисклеймер: Перед началом изложения хочу отметить, что описываемый в этой статье проект был реализован мной и моей напарницей Викой в рамках хакатона, организованного Мастерской Яндекс Практикума и компанией 'Лента'. Цель хакатона - создание интерфейса и предсказательной модели для прогнозирования спроса ��а товары собственного производства заказчика. В данной статье я сфокусируюсь исключительно на backend части проекта, описав наш опыт и решения, которые были применены для достижения поставленных задач и буду приводить только один пример реализации, остальное можно будет посмотреть в репозитории проекта .Все идеи и решения, описанные в статье, были разработаны в условиях ограниченного времени хакатона и могут быть доработаны и изменены в будущем, но это не точно."

Однако, здравствуйте. Несмотря на то, что я только что завершил расширенный курс "Python Backend-Developer +" от Яндекс Практикум и являюсь, так сказать, "недоджуном", я постараюсь максимально подробно рассказать, что нам (мне и моей коллеге) удалось реализовать, чтобы завоевать приз в номинации "Лучший backend-разработчик" на хакатоне от ООО "Лента". Не будем останавливаться на процессе подачи заявки на участие и всём, что предшествовало этому процессу; перейдём сразу к моменту, когда нас распределили по командам, и колесо хакатона закрутилось с полной мощностью.

Всего в хакатоне приняло участие 20 команд, в каждой по 11 человек. Состав команды выглядел следующим образом:

  • 1 Project-менеджер — (отвечает за синхронизацию Команды и выполнение задач в дедлайны Конкурса)

  • 3 Дизайнера — (креативщики Команды; отвечают за UI/UX, дизайн макетов)

  • 3 Data Scientist—(работает с данными для решения задачи, развития Продукта)

  • 2 Backend-разработчика —(работа над ML-продуктом, отвечают и управляет поступающими запросами и обеспечивает взаимодействие между другими компонентами системы)

  • 2 Frontend-разработчика —( отвечают за визуализацию данных)

Общая задача команды заключалась в создании предсказательной модели и её интерфейса для прогнозирования спроса на товары собственного производства заказчика ООО "Лента". Правильно построенный процесс работы и общий результат команды зависел от слаженного взаимодействия каждого подразделения, и требовался постоянный тесный контакт, особенно с фронтенд-разработчиками и в завершающей стадии с Data Science-командой. С первого дня у нас проходили как совместные митапы, так и отдельные встречи с каждым подразделением по мере возникновения вопросов. Также на старте необходимо было определить лидера в каждом направлении. Старшим разработчиком был назначен я. Было страшно, ведь вдруг не справлюсь? Однако, как только началась работа, эти сомнения рассеялись. После вводных инструктажей от организаторов и старших направлений, мы с моей напарницей Викой стали планировать план разработке проекта, выбирать технологии и определять, с чего начнём. Но обо всём по порядку.

Стек технологий, который нужно было использовать на бекенде, был Django DRF , это было единственное ограничени. Требования к бекенду выглядели следующим образом:

  • Отдаёт данные на фронт. По запросу от фронта идёт в БД, выбирает необходимые данные, подготавливает (если требуется), отдаёт в ответе на запрос.

  • Добавление фактических данных. Принимает входящий запрос на добавление исторических данных по продажам, обрабатывает их и складывает в БД.

  • Запускает и управляет процессом инференса. По расписанию (раз в день) или после обновления исторических данных начинает процесс прогнозирования. Для этого идёт в БД, выбирает необходимые данные, передаёт их в ML сервис, получает прогноз и складывает его в БД.

Проектирование базы данных

Первый и самый главный шаг, который нам предстояло сделать, это проектирование базы данных. Казалось бы, ничего сложного: имеем мок-данные, бери и проектируй. Но от того, насколько правильно и эффективно спроектирована база данных, будет зависеть скорость работы приложения, возможность масштабирования проекта в будущем, а также удобство поддержки и развития проекта. Мы понимали, что это один из ключевых этапов разработки любого приложения, так как база данных является основой для хранения и обработки данных.

Первоначальный вариант, который был спроектирован, в последствии был изменён ещё пару раз, что конечно очень выбивало из колеи, когда ты уже был на 99,99% уверен, что с базой всё решено, а в последний момент возникали проблемы. Так, на финальном этапе оказалось, что данные, которые в модели SALES, имеют некорректную связь. Из-за этого данные продаж определенного товара в магазине передавались в DS не в нужной структуре. То есть, если в магазине товар продавался более двух дней, то это должно было отображаться следующим образом:

Пример 1

{"data": [
  {"store": "store1",
   "sku": "sku1",
   "fact": [
     {"date": "2023-01-15",
      "sales_type": 0,
      "sales_units": 5,
      "sales_units_promo": 4, 
      "sales_rub": 6.7,
      "sales_run_promo": 8.9},
     {"date": "2023-01-16",
      "sales_type": 0,
      "sales_units": 2,
      "sales_units_promo": 6, 
      "sales_rub": 9.6,
      "sales_run_promo": 3.4},
     ...
    ]     
 },
   ]
  }

А структура ответ была такая:

Пример 2

{"data": [
  {"store": "store1",
   "sku": "sku1",
   "fact": [
     {"date": "2023-01-15",
      "sales_type": 0,
      "sales_units": 5,
      "sales_units_promo": 4, 
      "sales_rub": 6.7,
      "sales_run_promo": 8.9},
  {"store": "store1",
   "sku": "sku1",
   "fact": 
     {"date": "2023-01-16",
      "sales_type": 0,
      "sales_units": 2,
      "sales_units_promo": 6, 
      "sales_rub": 9.6,
      "sales_run_promo": 3.4},
     ...
    ]     
 },]} 

Данная структура не позволяла должным образом анализировать DS модели данные продаж товара в определённом магазине, и в итоге пришлось признать этот факап, причём за 2-3 дня до сдачи проекта. Этой проблемы можно было избежать, если бы на самом старте мы провели синхронизацию с DS-командой. Но этот момент был упущен: они думали, что мы будем ориентироваться на мок-данные, а мы думали и реализовали это по-другому. В конечном итоге мы оперативно решили эту проблему, и финальная реализация модели SALES и сериализатора выглядит следующим образом:

class SalesRecord(models.Model):
    """Модель записей продаж"""
    date = models.DateField(
        verbose_name='Дата'
        )
    
    sales_type = models.IntegerField(
        verbose_name='Флаг наличия промо'
        )
    sales_units = models.IntegerField(
        verbose_name='Число проданных товаров без признака промо'
        )
    sales_units_promo = models.IntegerField(
        verbose_name='Число проданных товаров с признаком промо'
        )
    sales_rub = models.DecimalField(
        max_digits=10, 
        decimal_places=2, 
        verbose_name='Продажи без признака промо в РУБ'
        )
    sales_rub_promo = models.DecimalField(
        max_digits=10, 
        decimal_places=2, 
        verbose_name='Продажи с признаком промо в РУБ;'
        )

    class Meta:
        verbose_name = 'Запись продаж'
        verbose_name_plural = 'Записи продаж'

    def __str__(self):
        return f"{self.date}, {self.sales_rub_promo}"


class Sales(models.Model):
    """Модель данных продаж по магазинам."""
    store = models.ForeignKey(
        Store,
        on_delete=models.CASCADE,
        related_name='store_sales',
        verbose_name='Магазин'
        )
    sku = models.ForeignKey(
        Category,
        on_delete=models.CASCADE,
        related_name='sku_sales',
        verbose_name='Товар'
        )
    facts = models.ManyToManyField(
        SalesRecord,
        related_name='sales_records',
        verbose_name='Записи продаж'
        )

    class Meta:
        verbose_name = 'Продажа'
        verbose_name_plural = 'Продажи'
class SalesRecordSerialazier(serializers.ModelSerializer):
    """Сериалайзер записи фактических исторических данных."""

    date = serializers.DateField()
    sales_type = serializers.IntegerField()
    sales_units = serializers.IntegerField()
    sales_units_promo = serializers.IntegerField()
    sales_rub = serializers.DecimalField(
        max_digits=10,
        decimal_places=2,
    )
    sales_rub_promo = serializers.DecimalField(
        max_digits=10,
        decimal_places=2,
    )

    class Meta:
        model = SalesRecord
        fields = ('date',
                  'sales_type',
                  'sales_units',
                  'sales_units_promo',
                  'sales_rub',
                  'sales_rub_promo',)


class SalesSerializer(serializers.ModelSerializer):
    """Сериалайзер продаж. GET запрос"""

    fact = SalesRecordSerialazier(source='facts', many=True, read_only=True)
    store = serializers.SerializerMethodField()
    sku = serializers.SerializerMethodField()

    class Meta:
        model = Sales
        fields = [
            'store',
            'sku',
            'fact',
        ]

    def get_store(self, obj):
        """
        Метод возвращает заголовок магазина (захешированный ID магазина) 
        связанного с объектом продажи. Если по каким-то причинам название 
        магазина не может быть получено, метод возвращает None и печатает 
        сообщение об ошибке.
        """
        try:
            return str(obj.store.store.title)
        except AttributeError as e:
            print(f"Ошибка при получении магазина для объекта продаж {obj.id}: {str(e)}")
            return None

    def get_sku(self, obj):
        """
        Метод возвращает SKU продукта связанного с объектом продажи.
        Если SKU не может быть получен, метод возвращает None.
        """
        if hasattr(obj, 'sku') and hasattr(obj.sku, 'sku'):
            return str(obj.sku.sku)
        return None

Модель SalesRecord хранит детальные записи продаж товаров по дням, включая такие параметры, как количество проданных товаров и выручка, с учетом и без учета проводимых промоакций. Эти записи затем ассоциируются с конкретными магазинами и товарами в модели Sales, где используются связи "многие ко многим" (ManyToManyField). Это обеспечивает гибкость и удобство при получении информации о продажах товара в различных магазинах.

Заполнение базы данных

База данных была тщательно спроектирована, но её ценность возрастает только тогда, когда она наполнена данными. Заказчик предоставил нам обезличенные и зашифрованные данные в формате .csv для дальнейшей работы. Я не стану подробно останавливаться на этом этапе, стоит отметить, что на написание скриптов для импорта данных в базу ушло примерно сутки. Эти скрипты выполняли не только функцию загрузки данных, но и их первичной обработки и валидации, чтобы в дальнейшем обеспечить корректность и целостность данных в системе.

class Command(BaseCommand):
    help = "Импорт данных о продажах из файла в БД"

    def handle(self, *args, **options):
        with open('data/sales_df_train.csv', encoding='utf-8') as file:
            file_reader = csv.reader(file)
            next(file_reader)
            for row in file_reader:
                try:
                    store_id = StoreID.objects.get(title=row[0])
                    store = Store.objects.get(store=store_id)
                    sku = Category.objects.get(sku=row[1])
                except ObjectDoesNotExist as e:
                    self.stdout.write(self.style.ERROR(f"Объект не найден: {e}"))
                    continue
                date = datetime.strptime(row[2], '%Y-%m-%d').date()
                sales_type = int(row[3])
                sales_units = float(row[4])
                sales_units_promo = float(row[5])
                sales_rub = float(row[6])
                sales_rub_promo = float(row[7])
                sales_record, created = SalesRecord.objects.get_or_create(
                    date=date,
                    sales_type=sales_type,
                    defaults={
                        'sales_units': sales_units,
                        'sales_units_promo': sales_units_promo,
                        'sales_rub': sales_rub,
                        'sales_rub_promo': sales_rub_promo,
                    }
                )
                sales, created = Sales.objects.get_or_create(
                    store=store,
                    sku=sku,
                )
                sales.facts.add(sales_record)
                self.stdout.write(self.style.SUCCESS(f"Успешно добавлено/обновлено запись продаж для {store} и {sku} на {date}"))

Сначала скрипт открывает файл и считывает его содержимое. Затем он итерирует по строкам файла, для каждой строки извлекает необходимую информацию о магазине, товаре и деталях продаж. После этого происходит поиск соответствующих объектов в базе данных и создание записи о продажах. В случае успешного добавления или обновления данных в базу данных, в консоль выводится соответствующее сообщение об успехе. Важно отметить, что в случае отсутствия необходимых объектов магазина или товара, сценарий выведет сообщение об ошибке и перейдет к следующей строке файла.

Реализация API на базе Django Rest Framework

Следующим ключевым этапом в разработке нашего проекта стала реализация API, который обеспечивает взаимодействие между моделью машинного обучения и фронтендом. Нами были настроены необходимые эндпоинты и функции обработки запросов, а также сконфигурированы URL-маршруты для корректного перенаправления запросов к нужным обработчикам.

Весь процесс работы с API был документирован с использованием Swagger, что значительно облегчает интеграцию и взаимодействие с системой для всех заинтересованных сторон. В общем и целом, на данном этапе серьезных проблем и затруднений не возникло. Однако стоит отметить, что нам пришлось поработать над настройкой фильтрации для API, так как фронтенд выразил желание иметь возможность множественной фильтрации в запросах. То есть, например, для модели SALES должна быть возможность в одном запросе указать несколько магазинов и товаров. В качестве решения этой задачи было решено использовать базовый класс в Django Rest Framework (DRF) - BaseFilterBackend, который позволяет создавать собственные фильтры для представлений API.Почему был выбран именно он ?

  • Гибкость и расширяемость: Мы можем создавать собственные фильтры, которые точно соответствуют вашим нуждам и требованиям нашего API.

  • Улучшение производительности: Фильтрация данных на стороне сервера может улучшить производительность API, так как клиенту будет отправляться только та информация, которая ему действительно нужна.

  • Поддержка сложных фильтров: С помощью BaseFilterBackend можно реализовать сложные фильтры, включая фильтрацию по нескольким параметрам, диапазонам значений, поиск по подстроке и другие.

И пример, как это было реализовано:

class SaleFilterBackend(BaseFilterBackend):
    """
    Фильтр, который позволяет фильтровать данные по продажам на основе 
    - store (магазин)
    - sku (захэшированное id товара)
    - date (Дата(день))
    - sales_type (флаг наличия промо)
    - sales_units (число проданных товаров без признака промо)
    - sales_units_promo (число проданных товаров с признаком промо)
    - sales_rub (продажи без признака промо в РУБ)
    - sales_rub_promo (продажи с признаком промо в РУБ;)
    """

    def filter_queryset(self, request, queryset, view):

        store_param = request.query_params.get('store')
        sku_param = request.query_params.get('sku')
        date_param = request.query_params.get('date')
        sales_type_param = request.query_params.get('sales_type')
        sales_units_param = request.query_params.get('sales_units')
        sales_units_promo_param = request.query_params.get('sales_units_promo')
        sales_rub_param = request.query_params.get('sales_rub')
        sales_rub_promo_param = request.query_params.get('sales_rub_promo')

        if store_param is not None:
            store_list = store_param.split(",")
            queryset = queryset.filter(store__store__title__in=store_list)

        if sku_param is not None:
            sku_list = sku_param.split(",")
            queryset = queryset.filter(sku__sku__in=sku_list)

        if date_param is not None:
            from dateutil.parser import parse
            date_list = [parse(date_str).date() for date_str
                         in date_param.split(",")]
            queryset = queryset.filter(fact__date__in=date_list)

        if sales_type_param is not None:
            sales_type_list = [int(x) for x in sales_type_param.split(",")]
            queryset = queryset.filter(fact__sales_type__in=sales_type_list)

        if sales_units_param is not None:
            sales_units_list = [int(x) for x in sales_units_param.split(",")]
            queryset = queryset.filter(fact__sales_units__in=sales_units_list)

        if sales_units_promo_param is not None:
            sales_units_promo_list = [int(x) for x in
                                      sales_units_promo_param.split(",")]
            queryset = queryset.filter(
                fact__sales_units_promo__in=sales_units_promo_list)

        if sales_rub_param is not None:
            sales_rub_list = [Decimal(x) for x in sales_rub_param.split(",")]
            queryset = queryset.filter(fact__sales_rub__in=sales_rub_list)

        if sales_rub_promo_param is not None:
            sales_rub_promo_list = [Decimal(x) for x in
                                    sales_rub_promo_param.split(",")]
            queryset = queryset.filter(
                fact__sales_rub_promo__in=sales_rub_promo_list)

        return queryset

После добавления основных API эндпоинтов, чтобы ускорить коллективную работу фронтенд и DS команды, мы упаковали бекенд и базу данных в Docker, наполнили её данными и развернули всё на арендованном сервере. Затем мы приступили к реализации взаимодействия бекенда и DS модели, а также записи возвращаемого прогноза в базу данных.

Взаимодействие бекенда и DS модели

На этом этапе нам предстояло определиться, как мы будем реализовывать взаимодействие между бекендом и DS моделью. Заказчик выразил желание видеть следующую реализацию: раз в сутки должен запускаться инференс, при котором в DS модель поступают данные из базы о продажах за сутки, категориях и магазинах, где был продан данный товар. Затем модель обрабатывает эти данные и возвращает прогнозы (например, сколько нужно произвести товара собственного производства) на ближайшие 14 дней, после чего данные записываются в базу данных.

Первоначальная идея заключалась в размещении ML сервиса и бекенда в разных контейнерах с настройкой крона для запуска DS модели один раз в сутки. Однако мы пришли к выводу, что такой подход имеет свои недостатки. Во-первых, использование крона не позволяет эффективно контролировать процесс выполнения задачи, что может привести к проблемам в случае сбоев или ошибок. Во-вторых, крон не предоставляет гибкости в управлении расписанием запуска, что может быть неудобно при изменении требований к процессу инференса. Поэтому мы решили использовать другой подход.

ML сервис был интегрирован непосредственно в бекенд, что упростило взаимодействие между этими двумя компонентами. Для запуска процесса инференса мы выбрали систему асинхронных задач Celery, которая позволяет асинхронно обрабатывать з��дачи, такие как запуск моделей машинного обучения, тем самым разгружая основной поток выполнения и повышая эффективность системы. С помощью Celery Beat мы настроили планировщик, который запускает процесс инференса раз в сутки. Кроме того, с помощью Celery Flower мы можем в реальном времени мониторить статус выполнения задач и управлять ими, что обеспечивает полный контроль над процессом. Реализация задачи выглядит следующим образом:

@shared_task
def main(today=date.today()):
    _logger.info(f"Запуск основного процесса с указанием даты начала: {today}")
    forecast_dates = [today + timedelta(days=d) for d in range(1, 15)]
    forecast_dates_str = [el.strftime("%Y-%m-%d") for el in forecast_dates]
    _logger.info("Получение информации о категориях...")
    categs_info = get_categories()
    for store in get_stores():
        _logger.info(f"Обработка магазинов: {store['store']}...")
        result = []
        for item in get_sales(store=store["store"]):
            _logger.debug(f"Обработка товара: {item['sku']} для магазина: {store['store']}...")
            item_info = categs_info.get(item["sku"])
            if item_info is None:
                _logger.warning(f"Нет информации о категории для элемента {item['sku']} в магазине {store['store']}")
                continue
            sales = item.get("fact")
            if sales is None:
                _logger.warning(f"Отсутствие данных о продажах для товара {item['sku']}, магазин {store['store']}")
                continue
            _logger.debug(f"Прогнозирование для {item['sku']}...")
            prediction = forecast(sales=sales, item_info=item_info, store_info=store)
            result.append({
                "store": store["store"],
                "sku": item["sku"],
                "forecast_date": today.strftime("%Y-%m-%d"),
                "sales_units": [
                    {"date": date, "target": round(target)} for date, target in zip(forecast_dates_str, prediction)
                ]
            })
        if result:
            for r in result:
                _logger.debug(f"Отправка прогноза для магазина: {store['store']}, SKU {r['sku']} to the API...")
                response = requests.post(get_address(URL_FORECAST), json=r, headers={'Content-Type': 'application/json'})
                if response.status_code == 201:
                    _logger.info("Данные успешно отправлены!")
                else:
                    _logger.error(f"Не удалось отправить данные! Код состояния: {response.status_code}, Response text: {response.text}")
        else:
            _logger.warning(f"Нет данных для передачи в магазин {store['store']}")

if __name__ == "__main__":
    main.delay()

Заключение

Это были тяжелые, но интересные 18 дней разработки. Полноценный рабочий день с 9:00 и до самой ночи, многочисленные правки багов, мозговые штурмы и отсутствие сил в конце каждого дня. Однако, каждый из нас внес свой вклад в общее дело, и этот процесс разработки стал для нас ценным опытом. В конечном итоге всё было не зря, мы заняли 2-ое общекомандное место, а также выиграли в номинации 'Лучший backend-разработчик'. Это большое достижение для нашей команды, и мы гордимся своей работой. Впереди нас ждут новые вызовы и новые проекты для каждого из нас, но мы уверены, что справимся с ними не хуже, чем с этим.

И да, когда меня на следующем собеседовании спросят о моём лучшем достижении в роли разработчика, я с уверенностью отвечу, что таких достижений у меня два. Первое – это занятое второе место на хакатоне и награда "Лучший backend-разработчик". А о моём втором значимом достижении я расскажу в ближайшем посте, подготовив для вас ещё одну захватывающую историю, не уступающую по вдохновению этой. И кстати, я ищу работу.

PS. Ссылка на репозиторий, кому интересно.