"Дисклеймер: Перед началом изложения хочу отметить, что описываемый в этой статье проект был реализован мной и моей напарницей Викой в рамках хакатона, организованного Мастерской Яндекс Практикума и компанией 'Лента'. Цель хакатона - создание интерфейса и предсказательной модели для прогнозирования спроса на товары собственного производства заказчика. В данной статье я сфокусируюсь исключительно на backend части проекта, описав наш опыт и решения, которые были применены для достижения поставленных задач и буду приводить только один пример реализации, остальное можно будет посмотреть в репозитории проекта .Все идеи и решения, описанные в статье, были разработаны в условиях ограниченного времени хакатона и могут быть доработаны и изменены в будущем, но это не точно."

Однако, здравствуйте. Несмотря на то, что я только что завершил расширенный курс "Python Backend-Developer +" от Яндекс Практикум и являюсь, так сказать, "недоджуном", я постараюсь максимально подробно рассказать, что нам (мне и моей коллеге) удалось реализовать, чтобы завоевать приз в номинации "Лучший backend-разработчик" на хакатоне от ООО "Лента". Не будем останавливаться на процессе подачи заявки на участие и всём, что предшествовало этому процессу; перейдём сразу к моменту, когда нас распределили по командам, и колесо хакатона закрутилось с полной мощностью.

Всего в хакатоне приняло участие 20 команд, в каждой по 11 человек. Состав команды выглядел следующим образом:

  • 1 Project-менеджер — (отвечает за синхронизацию Команды и выполнение задач в дедлайны Конкурса)

  • 3 Дизайнера — (креативщики Команды; отвечают за UI/UX, дизайн макетов)

  • 3 Data Scientist—(работает с данными для решения задачи, развития Продукта)

  • 2 Backend-разработчика —(работа над ML-продуктом, отвечают и управляет поступающими запросами и обеспечивает взаимодействие между другими компонентами системы)

  • 2 Frontend-разработчика —( отвечают за визуализацию данных)

Общая задача команды заключалась в создании предсказательной модели и её интерфейса для прогнозирования спроса на товары собственного производства заказчика ООО "Лента". Правильно построенный процесс работы и общий результат команды зависел от слаженного взаимодействия каждого подразделения, и требовался постоянный тесный контакт, особенно с фронтенд-разработчиками и в завершающей стадии с Data Science-командой. С первого дня у нас проходили как совместные митапы, так и отдельные встречи с каждым подразделением по мере возникновения вопросов. Также на старте необходимо было определить лидера в каждом направлении. Старшим разработчиком был назначен я. Было страшно, ведь вдруг не справлюсь? Однако, как только началась работа, эти сомнения рассеялись. После вводных инструктажей от организаторов и старших направлений, мы с моей напарницей Викой стали планировать план разработке проекта, выбирать технологии и определять, с чего начнём. Но обо всём по порядку.

Стек технологий, который нужно было использовать на бекенде, был Django DRF , это было единственное ограничени. Требования к бекенду выглядели следующим образом:

  • Отдаёт данные на фронт. По запросу от фронта идёт в БД, выбирает необходимые данные, подготавливает (если требуется), отдаёт в ответе на запрос.

  • Добавление фактических данных. Принимает входящий запрос на добавление исторических данных по продажам, обрабатывает их и складывает в БД.

  • Запускает и управляет процессом инференса. По расписанию (раз в день) или после обновления исторических данных начинает процесс прогнозирования. Для этого идёт в БД, выбирает необходимые данные, передаёт их в ML сервис, получает прогноз и складывает его в БД.

Проектирование базы данных

Первый и самый главный шаг, который нам предстояло сделать, это проектирование базы данных. Казалось бы, ничего сложного: имеем мок-данные, бери и проектируй. Но от того, насколько правильно и эффективно спроектирована база данных, будет зависеть скорость работы приложения, возможность масштабирования проекта в будущем, а также удобство поддержки и развития проекта. Мы понимали, что это один из ключевых этапов разработки любого приложения, так как база данных является основой для хранения и обработки данных.

Первоначальный вариант, который был спроектирован, в последствии был изменён ещё пару раз, что конечно очень выбивало из колеи, когда ты уже был на 99,99% уверен, что с базой всё решено, а в последний момент возникали проблемы. Так, на финальном этапе оказалось, что данные, которые в модели SALES, имеют некорректную связь. Из-за этого данные продаж определенного товара в магазине передавались в DS не в нужной структуре. То есть, если в магазине товар продавался более двух дней, то это должно было отображаться следующим образом:

Пример 1

{"data": [
  {"store": "store1",
   "sku": "sku1",
   "fact": [
     {"date": "2023-01-15",
      "sales_type": 0,
      "sales_units": 5,
      "sales_units_promo": 4, 
      "sales_rub": 6.7,
      "sales_run_promo": 8.9},
     {"date": "2023-01-16",
      "sales_type": 0,
      "sales_units": 2,
      "sales_units_promo": 6, 
      "sales_rub": 9.6,
      "sales_run_promo": 3.4},
     ...
    ]     
 },
   ]
  }

А структура ответ была такая:

Пример 2

{"data": [
  {"store": "store1",
   "sku": "sku1",
   "fact": [
     {"date": "2023-01-15",
      "sales_type": 0,
      "sales_units": 5,
      "sales_units_promo": 4, 
      "sales_rub": 6.7,
      "sales_run_promo": 8.9},
  {"store": "store1",
   "sku": "sku1",
   "fact": 
     {"date": "2023-01-16",
      "sales_type": 0,
      "sales_units": 2,
      "sales_units_promo": 6, 
      "sales_rub": 9.6,
      "sales_run_promo": 3.4},
     ...
    ]     
 },]} 

Данная структура не позволяла должным образом анализировать DS модели данные продаж товара в определённом магазине, и в итоге пришлось признать этот факап, причём за 2-3 дня до сдачи проекта. Этой проблемы можно было избежать, если бы на самом старте мы провели синхронизацию с DS-командой. Но этот момент был упущен: они думали, что мы будем ориентироваться на мок-данные, а мы думали и реализовали это по-другому. В конечном итоге мы оперативно решили эту проблему, и финальная реализация модели SALES и сериализатора выглядит следующим образом:

class SalesRecord(models.Model):
    """Модель записей продаж"""
    date = models.DateField(
        verbose_name='Дата'
        )
    
    sales_type = models.IntegerField(
        verbose_name='Флаг наличия промо'
        )
    sales_units = models.IntegerField(
        verbose_name='Число проданных товаров без признака промо'
        )
    sales_units_promo = models.IntegerField(
        verbose_name='Число проданных товаров с признаком промо'
        )
    sales_rub = models.DecimalField(
        max_digits=10, 
        decimal_places=2, 
        verbose_name='Продажи без признака промо в РУБ'
        )
    sales_rub_promo = models.DecimalField(
        max_digits=10, 
        decimal_places=2, 
        verbose_name='Продажи с признаком промо в РУБ;'
        )

    class Meta:
        verbose_name = 'Запись продаж'
        verbose_name_plural = 'Записи продаж'

    def __str__(self):
        return f"{self.date}, {self.sales_rub_promo}"


class Sales(models.Model):
    """Модель данных продаж по магазинам."""
    store = models.ForeignKey(
        Store,
        on_delete=models.CASCADE,
        related_name='store_sales',
        verbose_name='Магазин'
        )
    sku = models.ForeignKey(
        Category,
        on_delete=models.CASCADE,
        related_name='sku_sales',
        verbose_name='Товар'
        )
    facts = models.ManyToManyField(
        SalesRecord,
        related_name='sales_records',
        verbose_name='Записи продаж'
        )

    class Meta:
        verbose_name = 'Продажа'
        verbose_name_plural = 'Продажи'
class SalesRecordSerialazier(serializers.ModelSerializer):
    """Сериалайзер записи фактических исторических данных."""

    date = serializers.DateField()
    sales_type = serializers.IntegerField()
    sales_units = serializers.IntegerField()
    sales_units_promo = serializers.IntegerField()
    sales_rub = serializers.DecimalField(
        max_digits=10,
        decimal_places=2,
    )
    sales_rub_promo = serializers.DecimalField(
        max_digits=10,
        decimal_places=2,
    )

    class Meta:
        model = SalesRecord
        fields = ('date',
                  'sales_type',
                  'sales_units',
                  'sales_units_promo',
                  'sales_rub',
                  'sales_rub_promo',)


class SalesSerializer(serializers.ModelSerializer):
    """Сериалайзер продаж. GET запрос"""

    fact = SalesRecordSerialazier(source='facts', many=True, read_only=True)
    store = serializers.SerializerMethodField()
    sku = serializers.SerializerMethodField()

    class Meta:
        model = Sales
        fields = [
            'store',
            'sku',
            'fact',
        ]

    def get_store(self, obj):
        """
        Метод возвращает заголовок магазина (захешированный ID магазина) 
        связанного с объектом продажи. Если по каким-то причинам название 
        магазина не может быть получено, метод возвращает None и печатает 
        сообщение об ошибке.
        """
        try:
            return str(obj.store.store.title)
        except AttributeError as e:
            print(f"Ошибка при получении магазина для объекта продаж {obj.id}: {str(e)}")
            return None

    def get_sku(self, obj):
        """
        Метод возвращает SKU продукта связанного с объектом продажи.
        Если SKU не может быть получен, метод возвращает None.
        """
        if hasattr(obj, 'sku') and hasattr(obj.sku, 'sku'):
            return str(obj.sku.sku)
        return None

Модель SalesRecord хранит детальные записи продаж товаров по дням, включая такие параметры, как количество проданных товаров и выручка, с учетом и без учета проводимых промоакций. Эти записи затем ассоциируются с конкретными магазинами и товарами в модели Sales, где используются связи "многие ко многим" (ManyToManyField). Это обеспечивает гибкость и удобство при получении информации о продажах товара в различных магазинах.

Заполнение базы данных

База данных была тщательно спроектирована, но её ценность возрастает только тогда, когда она наполнена данными. Заказчик предоставил нам обезличенные и зашифрованные данные в формате .csv для дальнейшей работы. Я не стану подробно останавливаться на этом этапе, стоит отметить, что на написание скриптов для импорта данных в базу ушло примерно сутки. Эти скрипты выполняли не только функцию загрузки данных, но и их первичной обработки и валидации, чтобы в дальнейшем обеспечить корректность и целостность данных в системе.

class Command(BaseCommand):
    help = "Импорт данных о продажах из файла в БД"

    def handle(self, *args, **options):
        with open('data/sales_df_train.csv', encoding='utf-8') as file:
            file_reader = csv.reader(file)
            next(file_reader)
            for row in file_reader:
                try:
                    store_id = StoreID.objects.get(title=row[0])
                    store = Store.objects.get(store=store_id)
                    sku = Category.objects.get(sku=row[1])
                except ObjectDoesNotExist as e:
                    self.stdout.write(self.style.ERROR(f"Объект не найден: {e}"))
                    continue
                date = datetime.strptime(row[2], '%Y-%m-%d').date()
                sales_type = int(row[3])
                sales_units = float(row[4])
                sales_units_promo = float(row[5])
                sales_rub = float(row[6])
                sales_rub_promo = float(row[7])
                sales_record, created = SalesRecord.objects.get_or_create(
                    date=date,
                    sales_type=sales_type,
                    defaults={
                        'sales_units': sales_units,
                        'sales_units_promo': sales_units_promo,
                        'sales_rub': sales_rub,
                        'sales_rub_promo': sales_rub_promo,
                    }
                )
                sales, created = Sales.objects.get_or_create(
                    store=store,
                    sku=sku,
                )
                sales.facts.add(sales_record)
                self.stdout.write(self.style.SUCCESS(f"Успешно добавлено/обновлено запись продаж для {store} и {sku} на {date}"))

Сначала скрипт открывает файл и считывает его содержимое. Затем он итерирует по строкам файла, для каждой строки извлекает необходимую информацию о магазине, товаре и деталях продаж. После этого происходит поиск соответствующих объектов в базе данных и создание записи о продажах. В случае успешного добавления или обновления данных в базу данных, в консоль выводится соответствующее сообщение об успехе. Важно отметить, что в случае отсутствия необходимых объектов магазина или товара, сценарий выведет сообщение об ошибке и перейдет к следующей строке файла.

Реализация API на базе Django Rest Framework

Следующим ключевым этапом в разработке нашего проекта стала реализация API, который обеспечивает взаимодействие между моделью машинного обучения и фронтендом. Нами были настроены необходимые эндпоинты и функции обработки запросов, а также сконфигурированы URL-маршруты для корректного перенаправления запросов к нужным обработчикам.

Весь процесс работы с API был документирован с использованием Swagger, что значительно облегчает интеграцию и взаимодействие с системой для всех заинтересованных сторон. В общем и целом, на данном этапе серьезных проблем и затруднений не возникло. Однако стоит отметить, что нам пришлось поработать над настройкой фильтрации для API, так как фронтенд выразил желание иметь возможность множественной фильтрации в запросах. То есть, например, для модели SALES должна быть возможность в одном запросе указать несколько магазинов и товаров. В качестве решения этой задачи было решено использовать базовый класс в Django Rest Framework (DRF) - BaseFilterBackend, который позволяет создавать собственные фильтры для представлений API.Почему был выбран именно он ?

  • Гибкость и расширяемость: Мы можем создавать собственные фильтры, которые точно соответствуют вашим нуждам и требованиям нашего API.

  • Улучшение производительности: Фильтрация данных на стороне сервера может улучшить производительность API, так как клиенту будет отправляться только та информация, которая ему действительно нужна.

  • Поддержка сложных фильтров: С помощью BaseFilterBackend можно реализовать сложные фильтры, включая фильтрацию по нескольким параметрам, диапазонам значений, поиск по подстроке и другие.

И пример, как это было реализовано:

class SaleFilterBackend(BaseFilterBackend):
    """
    Фильтр, который позволяет фильтровать данные по продажам на основе 
    - store (магазин)
    - sku (захэшированное id товара)
    - date (Дата(день))
    - sales_type (флаг наличия промо)
    - sales_units (число проданных товаров без признака промо)
    - sales_units_promo (число проданных товаров с признаком промо)
    - sales_rub (продажи без признака промо в РУБ)
    - sales_rub_promo (продажи с признаком промо в РУБ;)
    """

    def filter_queryset(self, request, queryset, view):

        store_param = request.query_params.get('store')
        sku_param = request.query_params.get('sku')
        date_param = request.query_params.get('date')
        sales_type_param = request.query_params.get('sales_type')
        sales_units_param = request.query_params.get('sales_units')
        sales_units_promo_param = request.query_params.get('sales_units_promo')
        sales_rub_param = request.query_params.get('sales_rub')
        sales_rub_promo_param = request.query_params.get('sales_rub_promo')

        if store_param is not None:
            store_list = store_param.split(",")
            queryset = queryset.filter(store__store__title__in=store_list)

        if sku_param is not None:
            sku_list = sku_param.split(",")
            queryset = queryset.filter(sku__sku__in=sku_list)

        if date_param is not None:
            from dateutil.parser import parse
            date_list = [parse(date_str).date() for date_str
                         in date_param.split(",")]
            queryset = queryset.filter(fact__date__in=date_list)

        if sales_type_param is not None:
            sales_type_list = [int(x) for x in sales_type_param.split(",")]
            queryset = queryset.filter(fact__sales_type__in=sales_type_list)

        if sales_units_param is not None:
            sales_units_list = [int(x) for x in sales_units_param.split(",")]
            queryset = queryset.filter(fact__sales_units__in=sales_units_list)

        if sales_units_promo_param is not None:
            sales_units_promo_list = [int(x) for x in
                                      sales_units_promo_param.split(",")]
            queryset = queryset.filter(
                fact__sales_units_promo__in=sales_units_promo_list)

        if sales_rub_param is not None:
            sales_rub_list = [Decimal(x) for x in sales_rub_param.split(",")]
            queryset = queryset.filter(fact__sales_rub__in=sales_rub_list)

        if sales_rub_promo_param is not None:
            sales_rub_promo_list = [Decimal(x) for x in
                                    sales_rub_promo_param.split(",")]
            queryset = queryset.filter(
                fact__sales_rub_promo__in=sales_rub_promo_list)

        return queryset

После добавления основных API эндпоинтов, чтобы ускорить коллективную работу фронтенд и DS команды, мы упаковали бекенд и базу данных в Docker, наполнили её данными и развернули всё на арендованном сервере. Затем мы приступили к реализации взаимодействия бекенда и DS модели, а также записи возвращаемого прогноза в базу данных.

Взаимодействие бекенда и DS модели

На этом этапе нам предстояло определиться, как мы будем реализовывать взаимодействие между бекендом и DS моделью. Заказчик выразил желание видеть следующую реализацию: раз в сутки должен запускаться инференс, при котором в DS модель поступают данные из базы о продажах за сутки, категориях и магазинах, где был продан данный товар. Затем модель обрабатывает эти данные и возвращает прогнозы (например, сколько нужно произвести товара собственного производства) на ближайшие 14 дней, после чего данные записываются в базу данных.

Первоначальная идея заключалась в размещении ML сервиса и бекенда в разных контейнерах с настройкой крона для запуска DS модели один раз в сутки. Однако мы пришли к выводу, что такой подход имеет свои недостатки. Во-первых, использование крона не позволяет эффективно контролировать процесс выполнения задачи, что может привести к проблемам в случае сбоев или ошибок. Во-вторых, крон не предоставляет гибкости в управлении расписанием запуска, что может быть неудобно при изменении требований к процессу инференса. Поэтому мы решили использовать другой подход.

ML сервис был интегрирован непосредственно в бекенд, что упростило взаимодействие между этими двумя компонентами. Для запуска процесса инференса мы выбрали систему асинхронных задач Celery, которая позволяет асинхронно обрабатывать задачи, такие как запуск моделей машинного обучения, тем самым разгружая основной поток выполнения и повышая эффективность системы. С помощью Celery Beat мы настроили планировщик, который запускает процесс инференса раз в сутки. Кроме того, с помощью Celery Flower мы можем в реальном времени мониторить статус выполнения задач и управлять ими, что обеспечивает полный контроль над процессом. Реализация задачи выглядит следующим образом:

@shared_task
def main(today=date.today()):
    _logger.info(f"Запуск основного процесса с указанием даты начала: {today}")
    forecast_dates = [today + timedelta(days=d) for d in range(1, 15)]
    forecast_dates_str = [el.strftime("%Y-%m-%d") for el in forecast_dates]
    _logger.info("Получение информации о категориях...")
    categs_info = get_categories()
    for store in get_stores():
        _logger.info(f"Обработка магазинов: {store['store']}...")
        result = []
        for item in get_sales(store=store["store"]):
            _logger.debug(f"Обработка товара: {item['sku']} для магазина: {store['store']}...")
            item_info = categs_info.get(item["sku"])
            if item_info is None:
                _logger.warning(f"Нет информации о категории для элемента {item['sku']} в магазине {store['store']}")
                continue
            sales = item.get("fact")
            if sales is None:
                _logger.warning(f"Отсутствие данных о продажах для товара {item['sku']}, магазин {store['store']}")
                continue
            _logger.debug(f"Прогнозирование для {item['sku']}...")
            prediction = forecast(sales=sales, item_info=item_info, store_info=store)
            result.append({
                "store": store["store"],
                "sku": item["sku"],
                "forecast_date": today.strftime("%Y-%m-%d"),
                "sales_units": [
                    {"date": date, "target": round(target)} for date, target in zip(forecast_dates_str, prediction)
                ]
            })
        if result:
            for r in result:
                _logger.debug(f"Отправка прогноза для магазина: {store['store']}, SKU {r['sku']} to the API...")
                response = requests.post(get_address(URL_FORECAST), json=r, headers={'Content-Type': 'application/json'})
                if response.status_code == 201:
                    _logger.info("Данные успешно отправлены!")
                else:
                    _logger.error(f"Не удалось отправить данные! Код состояния: {response.status_code}, Response text: {response.text}")
        else:
            _logger.warning(f"Нет данных для передачи в магазин {store['store']}")

if __name__ == "__main__":
    main.delay()

Заключение

Это были тяжелые, но интересные 18 дней разработки. Полноценный рабочий день с 9:00 и до самой ночи, многочисленные правки багов, мозговые штурмы и отсутствие сил в конце каждого дня. Однако, каждый из нас внес свой вклад в общее дело, и этот процесс разработки стал для нас ценным опытом. В конечном итоге всё было не зря, мы заняли 2-ое общекомандное место, а также выиграли в номинации 'Лучший backend-разработчик'. Это большое достижение для нашей команды, и мы гордимся своей работой. Впереди нас ждут новые вызовы и новые проекты для каждого из нас, но мы уверены, что справимся с ними не хуже, чем с этим.

И да, когда меня на следующем собеседовании спросят о моём лучшем достижении в роли разработчика, я с уверенностью отвечу, что таких достижений у меня два. Первое – это занятое второе место на хакатоне и награда "Лучший backend-разработчик". А о моём втором значимом достижении я расскажу в ближайшем посте, подготовив для вас ещё одну захватывающую историю, не уступающую по вдохновению этой. И кстати, я ищу работу.

PS. Ссылка на репозиторий, кому интересно.