ORM, или объектно-реляционное отображение — это программная технология, которая позволяет взаимодействовать с базами данных с использованием объектно-ориентированной парадигмы. Вместо того чтобы писать SQL-запросы напрямую для работы с данными в базе данных, можно использовать ORM, чтобы взаимодействовать с данными, как если бы они были объектами в вашем коде.

Не бывало ли вам интересно, как работает изнутри такая идейно простая концепция? Благодаря чему достигается удобство работы? Сегодня мы напишем ORM самостоятельно и узнаем, какие инструменты python нам для этого понадобятся.

Ремарка: sqlite3 выбран из-за простоты, нетрудно заменить обращения к нему на обращения к любой удобной для вас базе данных. По синтаксису я ориентировался на джанго.

▍ Базовые типы данных


В sqlite3 существуют: INTEGER — вещественное число с указанной точностью, TEXT — текст, BLOB — двоичные данные, REAL — число с плавающей запятой(float24), NUMERIC — то же, что и INTEGER.

CREATE TABLE "example" (
    "Field1"    INTEGER NOT NULL,
    "Field2"    TEXT UNIQUE,
    "Field3"    BLOB,
    "Field4"    REAL DEFAULT 123,
    "Field5"    NUMERIC
);

У каждого из них есть параметры NULL, UNIQUE, DEFAULT, так что первым делом пишем класс, который будут наследовать все остальные:

class BaseType:
  field_type: str #название типа данных поля, например, "INTEGER"

  def __init__(self, unique: bool = False, null: bool = True, default: int = None):
      self.unique = unique
      self.null = null
      self.default = default

На основе него прописываем остальные базовые классы:

class IntegerField(BaseType):
  field_type = 'INTEGER'


class TextField(BaseType):
  field_type = 'TEXT'


class BlobField(BaseType):
  field_type = 'BLOB'


class RealField(BaseType):
  field_type = 'REAL'


class NumericField(BaseType):
  field_type = 'NUMERIC'

▍ Пользовательские модели


Я хочу, чтобы пользовательские модели выглядел максимально просто, например, так:

class Box(models.Model):
  name = TextField()
  width = IntegerField()
  height = IntegerField()

Для этого реализуем родительский класс Model. Он должен задавать объекты с заданными переменными. Для этого пишем инициализатор:

class Model:
  def __init__(self, *args, **kwargs):
      fields = [el for el in vars(self.__class__) if not el.startswith("__")] #поля, которые мы создали в модели (в данном случае name, width, height)
      for i, value in enumerate(args):
          setattr(self, fields[i], value)#задаем переменные переданные с помощью args

      for field, value in kwargs.items():#задаем переменные переданные с помощью kwargs
          setattr(self, field, value)

Все методы, которые, мы напишем для класса Model будут работать с любым объектом, который мы зададим.

Про args и kwargs
1. *args:
— *args позволяет передавать произвольное количество позиционных аргументов в функцию.
— Аргументы, переданные как *args, собираются в кортеж (tuple) внутри функции.
— Вы можете использовать любое имя вместо «args», но общепринято использовать именно «args».
Пример:

def print_args(*args):
for arg in args:
    print(arg)

print_args(1, 2, 3)  # Выводит: 1, 2, 3

2. **kwargs:
— **kwargs позволяет передавать произвольное количество именованных аргументов (ключ-значение) в функцию.
— Аргументы, переданные как **kwargs, собираются в словарь (dictionary) внутри функции.
— Вы можете использовать любое имя вместо «kwargs», но общепринято использовать именно «kwargs».
Пример:

def print_kwargs(**kwargs):
for key, value in kwargs.items():
    print(f"{key}: {value}")

print_kwargs(name="John", age=30, city="New York")  # Выводит: name: John, age: 30, city: New York


Используя *args и **kwargs, вы можете создавать более гибкие функции, которые могут обрабатывать разные наборы аргументов.

Давайте также создадим сразу метод json(), чтобы возвращать объект в виде словаря, он понадобится нам, например, для api или удобного вывода в консоль.

def json(self):
  attributes = {}
  for key, value in vars(self).items():
      if not key.startswith("__") and not callable(value):#проверка на системные методы и поля
          attributes[key] = value

  return attributes

В данный момент мы уже можем пользоваться моделями, но пока что без базы данных:

class Box(models.Model):
  name = TextField()
  width = IntegerField()
  height = IntegerField()


print(Box('Box 1', 1, 1).json())#выведет {'name': 'Box 1', 'width': 1, 'height': 1}

▍ Добавляем sqlite3


Для использования базы данных я хочу, чтобы каждый объект имел поле objects — менеджер объектов, через который мы и будем обращаться к sqlite.
Например, так:

Box.objects.add(Box('BOX 1', 1, 1))
Box.objects.get(name='BOX 1')
Box.objects.filter(width=1, height=1)
Box.objects.delete(name="BOX 1")

Мы хотим, чтобы каждый объект имел это поле, но при этом поведение различалось в зависимости от модели объекта. Для решения этого создадим прокси-класс, который определяет модель и возвращает нужный менеджер.

class ProxyObjects:
  def __get__(self, instance, owner):
      return Object(owner)

Про __get__
__get__ — это метод в Python, который используется для определения поведения при доступе к атрибуту объекта. Он является частью протокола дескрипторов в Python и позволяет объектам контролировать доступ к своим атрибутам.

__get__ определяется внутри класса, который также может иметь методы __set__ и __delete__, если необходимо управлять операциями присваивания и удаления атрибутов.

Пример использования __get__:

class MyDescriptor:
def __get__(self, instance, owner):
    if instance is None:
        # Если доступ к дескриптору осуществляется через класс, а не через экземпляр,
        # то instance будет равен None, и мы можем вернуть сам дескриптор или другое значение.
        return self
    else:
        # В этом случае instance - это экземпляр объекта, owner - это класс, к которому относится атрибут.
        # Мы можем вернуть значение атрибута или выполнить другие действия при доступе к нему.
        return instance._value

class MyClass:
def __init__(self, value):
    self._value = value

# Используем дескриптор MyDescriptor для атрибута 'my_attribute'
my_attribute = MyDescriptor()

# Создаём экземпляр класса
obj = MyClass(42)

# Доступ к атрибуту 'my_attribute' будет вызывать метод __get__ дескриптора MyDescriptor
print(obj.my_attribute)  # Выведет: 42

В этом примере MyDescriptor является дескриптором, который определяет поведение при доступе к атрибуту my_attribute класса MyClass. Метод __get__ определяет, что происходит при чтении значения этого атрибута через экземпляр obj.

Первое, что нужно сделать для работы с базой данных — создать таблицу. Для этого пишем метод:

import sqlite3  # Необходимо импортировать библиотеку для работы с SQLite

class Object:
def __init__(self, object_type: type):
    # Конструктор класса принимает тип объекта (класс) и сохраняет его в атрибуте object_type.
    self.object_type = object_type

def __createTable__(self):
    # Метод для создания таблицы в базе данных, основанной на атрибутах класса object_type.

    # Устанавливаем соединение с базой данных
    conn = sqlite3.connect(db_name)
    cursor = conn.cursor()

    # Создаём список custom_fields для хранения определений полей таблицы.
    custom_fields = []

    # Проходимся по атрибутам класса object_type и извлекаем информацию о полях.
    for key, value in vars(self.object_type).items():
        if not key.startswith("__") and not callable(value):
            field_name = key
            field_type = value.field_type
            is_unique = value.unique
            is_null = value.null
            default_value = value.default

            # Создаём строку с определением поля и добавляем её в список custom_fields.
            field_declaration = [f'"{field_name}" {field_type}']

            if is_unique:
                field_declaration.append('UNIQUE')
            if not is_null:
                field_declaration.append('NOT NULL')
            if default_value is not None:
                field_declaration.append(f'DEFAULT {default_value}')

            custom_fields.append(' '.join(field_declaration))

    # Создаём SQL-запрос для создания таблицы с определёнными полями.
    create_table_sql = f'''
    CREATE TABLE IF NOT EXISTS {self.object_type.__name__} (
        {", ".join(custom_fields)}
    );
    '''

    # Выполняем SQL-запрос.
    cursor.execute(create_table_sql)

    # Фиксируем изменения и закрываем соединение с базой данных.
    conn.commit()
    conn.close()

Создавать таблицу нужно для каждой пользовательской модели — для этого удобно использовать простой декоратор для класса:

def simple_orm(class_: type):
  EXTERN_TYPES[class_.__name__] = class_ # Сохраняем в словарь моделей
  class_.objects.__createTable__() # Создаём таблицу в бд

  return class_

Про декораторы
Декоратор в Python — это функция, которая принимает другую функцию и добавляет к её поведению какое-то дополнительное функциональное или метаинформационное украшение, не изменяя саму функцию. Декораторы позволяют изменять или расширять поведение функций или методов, не мод��фицируя их код. Они являются мощным инструментом для реализации множества различных задач в Python.

Декораторы используются с помощью символа "@" перед определением функции, которую они декорируют. Вот пример использования декоратора:

def my_decorator(func):
def wrapper():
    print("Что-то происходит перед вызовом функции")
    func()
    print("Что-то происходит после вызова функции")
return wrapper

@my_decorator
def say_hello():
print("Привет, мир!")

say_hello()

В этом примере my_decorator — это декоратор, который добавляет вывод текста до и после вызова функции say_hello. Затем декоратор @my_decorator применяется к функции say_hello, и при вызове say_hello() будет выполнено дополнительное действие, предусмотренное декоратором.

Декораторы часто используются для следующих задач:

  1. Логирования: Запись логов для функций или методов.
  2. Аутентификации: Проверка прав доступа перед вызовом функции.
  3. Кэширования: Сохранение результатов функции для ускорения будущих вызовов с теми же аргументами.
  4. Измерения времени выполнения: Оценка производительности функции.
  5. Модификации поведения: Изменение или расширение функциональности функции.

Python предоставляет множество встроенных декораторов, таких как @staticmethod, @classmethod, @property и другие, а также вы можете создавать свои собственные декораторы в соответствии с вашими потребностями.

Теперь при инициализации модели в бд создаётся таблица с соответствующими полями:



▍ JsonField


В sqlite3 нет JsonField по умолчанию, так что мы реализуем его на основе текста. Для начала добавляем его в базовые типы:

class JsonField(BaseType):
  field_type = 'JSON'

Json, по сути, ведёт себя, как текст, за исключением того, что при создании и изменении надо использовать json.dumps(), а при получении — json.loads().

▍ ForeignKey


В реляционных базах данных, «foreign key» (внешний ключ) — это структурный элемент, который используется для установления связей между двумя таблицами. Внешний ключ представляет собой один или несколько столбцов в одной таблице, которые связаны с первичным ключом (обычно) в другой таблице. Эта связь позволяет базе данных поддерживать целостность данных и обеспечивать связи между данными в разных таблицах.

ForeignKey должен возвращать объект заданного типа по значению заданного поля.

class ForeignKey(BaseType):
  field_type = 'FOREIGN_KEY'

  def __init__(self, object_class: type, foreign_field: str, unique: bool = False,
                null: bool = True, default=None):
      self.object_class = object_class,
      self.foreign_field = foreign_field,
      self.unique = unique
      self.null = null
      self.default = default

Я реализую его просто, как json объект с параметрами type, key, value, например:

{"type": "Box", "key": "name", "value": "BOX 1"}.

Теперь мы с помощью него можем делать так:

@simple_orm
class Box(models.Model):
  name = TextField()
  width = IntegerField()
  height = IntegerField()


@simple_orm
class Circle(models.Model):
  box = ForeignKey(object_class=Box, foreign_field='name')
  name = TextField()
  radius = IntegerField()
  data = JsonField()


box = Box.objects.add(Box('BOX 1', 1, 1))
circle = Circle.objects.add(Circle(box, "CIRCLE 1", 5, {'data': 5}))
print(circle.json()) #{'box': <__main__.Box object at 0x7f7637f6d850>, 'name': 'CIRCLE 1', 'radius': 5, 'data': {'data': 5}}

▍ Добавляем CRUD


Полный код Object
class Object:
  def __init__(self, object_type):
      self.object_type = object_type

  def add(self, obj):
      conn = sqlite3.connect(db_name)
      cursor = conn.cursor()

      d = copy.copy(obj.__dict__)

      object_type_name = self.object_type.__name__
      for key, value in vars(self.object_type).items():
          if not key.startswith("__") and not callable(value):
              if type(value) in BASIC_TYPES:
                  continue
              if type(value) == JsonField:
                  d[key] = json.dumps(d[key])
              if type(value) == ForeignKey:
                  d[key] = json.dumps({'type': value.object_class[0].__name__, 'key': value.foreign_field[0], 'value': getattr(d[key], value.foreign_field[0])})

      insert_sql = f'INSERT INTO {object_type_name} ({", ".join(obj.__dict__.keys())}) VALUES ({", ".join(["?"] * len(obj.__dict__))});'

      values = tuple(d.values())
      cursor.execute(insert_sql, values)
      conn.commit()
      conn.close()
      return obj

  def save(self, obj):
      conn = sqlite3.connect(db_name)
      cursor = conn.cursor()

      d = copy.copy(obj.__dict__)

      object_type_name = self.object_type.__name__
      for key, value in vars(self.object_type).items():
          if not key.startswith("__") and not callable(value):
              if type(value) in BASIC_TYPES:
                  continue
              if type(value) == JsonField:
                  d[key] = json.dumps(d[key])
              if type(value) == ForeignKey:
                  d[key] = json.dumps({'type': value.object_class[0].__name__, 'key': value.foreign_field[0], 'value': getattr(d[key], value.foreign_field[0])})

      upsert_sql = f'INSERT OR REPLACE INTO {object_type_name} ({", ".join(obj.__dict__.keys())}) VALUES ({", ".join(["?"] * len(obj.__dict__))});'

      values = tuple(d.values())
      cursor.execute(upsert_sql, values)
      conn.commit()
      conn.close()
      return obj

  def get(self, **kwargs):
      conn = sqlite3.connect(db_name)
      cursor = conn.cursor()

      object_type_name = self.object_type.__name__

      attr_value_pairs = [(attr, value) for attr, value in kwargs.items()]

      where_clauses = [f'{attr} = ?' for attr, _ in attr_value_pairs]
      where_clause = ' AND '.join(where_clauses)
      select_by_attrs_sql = f'SELECT * FROM {object_type_name} WHERE {where_clause};'

      values = tuple(value for _, value in attr_value_pairs)

      cursor.execute(select_by_attrs_sql, values)
      row = cursor.fetchone()
      conn.close()

      if row:
          obj = self.object_type()
          for i, value in enumerate(row):
              if type(getattr(obj, cursor.description[i][0])) == JsonField:
                  setattr(obj, cursor.description[i][0], json.loads(value))
              elif type(getattr(obj, cursor.description[i][0])) == ForeignKey:
                  value = json.loads(value)
                  setattr(obj, cursor.description[i][0], EXTERN_TYPES[value['type']].objects.get(**{value['key']: value['value']}))
              else:
                  setattr(obj, cursor.description[i][0], value)

          return obj
      else:
          return None

  def delete(self, **kwargs):
      conn = sqlite3.connect(db_name)
      cursor = conn.cursor()

      object_type_name = self.object_type.__name__

      attr_value_pairs = [(attr, value) for attr, value in kwargs.items()]

      where_clauses = [f'{attr} = ?' for attr, _ in attr_value_pairs]
      where_clause = ' AND '.join(where_clauses)
      delete_by_attrs_sql = f'DELETE FROM {object_type_name} WHERE {where_clause};'
      values = tuple(value for _, value in attr_value_pairs)

      cursor.execute(delete_by_attrs_sql, values)
      conn.commit()
      conn.close()

  def filter(self, **kwargs):
      conn = sqlite3.connect(db_name)
      cursor = conn.cursor()

      object_type_name = self.object_type.__name__

      attr_value_pairs = [(attr, value) for attr, value in kwargs.items()]

      where_clauses = [f'{attr} = ?' for attr, _ in attr_value_pairs]
      where_clause = ' AND '.join(where_clauses)
      select_by_attrs_sql = f'SELECT * FROM {object_type_name} WHERE {where_clause};'

      values = tuple(value for _, value in attr_value_pairs)

      cursor.execute(select_by_attrs_sql, values)
      rows = cursor.fetchall()
      conn.close()

      objects = []
      for row in rows:
          obj = self.object_type()
          for i, value in enumerate(row):
              if type(getattr(obj, cursor.description[i][0])) == JsonField:
                  setattr(obj, cursor.description[i][0], json.loads(value))
              elif type(getattr(obj, cursor.description[i][0])) == ForeignKey:
                  value = json.loads(value)
                  setattr(obj, cursor.description[i][0],
                          EXTERN_TYPES[value['type']].objects.get(**{value['key']: value['value']}))
              else:
                  setattr(obj, cursor.description[i][0], value)

          objects.append(obj)

      return objects

  def all(self):
      conn = sqlite3.connect(db_name)
      cursor = conn.cursor()

      object_type_name = self.object_type.__name__
      select_all_sql = f'SELECT * FROM {object_type_name};'

      cursor.execute(select_all_sql)
      rows = cursor.fetchall()
      conn.close()

      objects = []
      for row in rows:
          obj = self.object_type()
          for i, value in enumerate(row):
              if type(getattr(obj, cursor.description[i][0])) == JsonField:
                  setattr(obj, cursor.description[i][0], json.loads(value))
              elif type(getattr(obj, cursor.description[i][0])) == ForeignKey:
                  value = json.loads(value)
                  setattr(obj, cursor.description[i][0],
                          EXTERN_TYPES[value['type']].objects.get(**{value['key']: value['value']}))
              else:
                  setattr(obj, cursor.description[i][0], value)
          objects.append(obj)

      return objects

  def __createTable__(self):
      conn = sqlite3.connect(db_name)
      cursor = conn.cursor()
      custom_fields = []
      for key, value in vars(self.object_type).items():
          if not key.startswith("__") and not callable(value):
              field_name = key
              field_type = value.field_type
              is_unique = value.unique
              is_null = value.null
              default_value = value.default

              if value.field_type == 'FOREIGN_KEY':
                  field_type = "TEXT"
              if value.field_type == 'JSON':
                  field_type = 'TEXT'

              field_declaration = [f'"{field_name}" {field_type}']

              if is_unique:
                  field_declaration.append('UNIQUE')
              if not is_null:
                  field_declaration.append('NOT NULL')
              if default_value is not None:
                  field_declaration.append(f'DEFAULT {default_value}')

              custom_fields.append(' '.join(field_declaration))

      create_table_sql = f'''
      CREATE TABLE IF NOT EXISTS {self.object_type.__name__} (
          {", ".join(custom_fields)}
      );
      '''
      cursor.execute(create_table_sql)

      conn.commit()
      conn.close()


▍ Список объектов


Сейчас функции all() и filter() возвращают list состоящий из объектов. Это неудобно, ведь нельзя, например, удалить все объекты. Исправим это, добавив класс ListOfObjects:

class ListOfObjects:
  def __init__(self, objects):
      self.objects = objects

  def filter(self, **kwargs):
      filtered_objects = []
      for obj in self.objects:
          if all(getattr(obj, attr, None) == value for attr, value in kwargs.items()):
              filtered_objects.append(obj)
      return ListOfObjects(filtered_objects)

  def delete(self):
      for obj in self.objects:
          obj.delete()

  def json(self):
      object_dicts = [obj.json() for obj in self.objects]
      return object_dicts

▍ Примеры


models.py

import simple_orm.models as models
from simple_orm.models import IntegerField, TextField, ForeignKey, JsonField
from simple_orm.models import simple_orm
from pathlib import Path
import sys

PATH = Path(__file__).absolute().parent.parent
sys.path.append(str(PATH))


@simple_orm
class Box(models.Model):
name = TextField()
width = IntegerField()
height = IntegerField()


@simple_orm
class Circle(models.Model):
box = ForeignKey(object_class=Box, foreign_field='name')
name = TextField()
radius = IntegerField()
data = JsonField()

main.py

from models import Box, Circle

box = Box.objects.add(Box('BOX 1', 1, 1))
circle = Circle.objects.add(Circle(box, "CIRCLE 1", 5, {'data': 5}))
print(circle.json())

print(Box.objects.filter(width=1, height=1).json())
print(Circle.objects.get(name="CIRCLE 1").json())
Box.objects.delete(name="BOX 1")
print(Box.objects.all().json())

▍ Заключение


В процессе создания своего ORM мы использовали много сложных инструментов языка python, которые помогли написать короткий и красивый код, решающий довольно сложную задачу.
Да, он не идеальный, вы можете предложить его улучшения в комментариях.

Полный код есть на https://github.com/leo-need-more-coffee/simple-orm
Библиотека для python: https://pypi.org/project/sqlite3-simple-orm
Скачать с помощью pip: pip install sqlite3-simple-orm

Узнавайте о новых акциях и промокодах первыми из нашего Telegram-канала ?