
ORM, или объектно-реляционное отображение — это программная технология, которая позволяет взаимодействовать с базами данных с использованием объектно-ориентированной парадигмы. Вместо того чтобы писать SQL-запросы напрямую для работы с данными в базе данных, можно использовать ORM, чтобы взаимодействовать с данными, как если бы они были объектами в вашем коде.
Не бывало ли вам интересно, как работает изнутри такая идейно простая концепция? Благодаря чему достигается удобство работы? Сегодня мы напишем ORM самостоятельно и узнаем, какие инструменты python нам для этого понадобятся.
Ремарка: sqlite3 выбран из-за простоты, нетрудно заменить обращения к нему на обращения к любой удобной для вас базе данных. По синтаксису я ориентировался на джанго.
▍ Базовые типы данных
В sqlite3 существуют: INTEGER — вещественное число с указанной точностью, TEXT — текст, BLOB — двоичные данные, REAL — число с плавающей запятой(float24), NUMERIC — то же, что и INTEGER.
CREATE TABLE "example" (
"Field1" INTEGER NOT NULL,
"Field2" TEXT UNIQUE,
"Field3" BLOB,
"Field4" REAL DEFAULT 123,
"Field5" NUMERIC
);
У каждого из них есть параметры NULL, UNIQUE, DEFAULT, так что первым делом пишем класс, который будут наследовать все остальные:
class BaseType:
field_type: str #название типа данных поля, например, "INTEGER"
def __init__(self, unique: bool = False, null: bool = True, default: int = None):
self.unique = unique
self.null = null
self.default = default
На основе него прописываем остальные базовые классы:
class IntegerField(BaseType):
field_type = 'INTEGER'
class TextField(BaseType):
field_type = 'TEXT'
class BlobField(BaseType):
field_type = 'BLOB'
class RealField(BaseType):
field_type = 'REAL'
class NumericField(BaseType):
field_type = 'NUMERIC'
▍ Пользовательские модели
Я хочу, чтобы пользовательские модели выглядел максимально просто, например, так:
class Box(models.Model):
name = TextField()
width = IntegerField()
height = IntegerField()
Для этого реализуем родительский класс Model. Он должен задавать объекты с заданными переменными. Для этого пишем инициализатор:
class Model:
def __init__(self, *args, **kwargs):
fields = [el for el in vars(self.__class__) if not el.startswith("__")] #поля, которые мы создали в модели (в данном случае name, width, height)
for i, value in enumerate(args):
setattr(self, fields[i], value)#задаем переменные переданные с помощью args
for field, value in kwargs.items():#задаем переменные переданные с помощью kwargs
setattr(self, field, value)
Все методы, которые, мы напишем для класса Model будут работать с любым объектом, который мы зададим.
Про args и kwargs
1. *args:
— *args позволяет передавать произвольное количество позиционных аргументов в функцию.
— Аргументы, переданные как *args, собираются в кортеж (tuple) внутри функции.
— Вы можете использовать любое имя вместо «args», но общепринято использовать именно «args».
Пример:
2. **kwargs:
— **kwargs позволяет передавать произвольное количество именованных аргументов (ключ-значение) в функцию.
— Аргументы, переданные как **kwargs, собираются в словарь (dictionary) внутри функции.
— Вы можете использовать любое имя вместо «kwargs», но общепринято использовать именно «kwargs».
Пример:
Используя *args и **kwargs, вы можете создавать более гибкие функции, которые могут обрабатывать разные наборы аргументов.
— *args позволяет передавать произвольное количество позиционных аргументов в функцию.
— Аргументы, переданные как *args, собираются в кортеж (tuple) внутри функции.
— Вы можете использовать любое имя вместо «args», но общепринято использовать именно «args».
Пример:
def print_args(*args):
for arg in args:
print(arg)
print_args(1, 2, 3) # Выводит: 1, 2, 3
2. **kwargs:
— **kwargs позволяет передавать произвольное количество именованных аргументов (ключ-значение) в функцию.
— Аргументы, переданные как **kwargs, собираются в словарь (dictionary) внутри функции.
— Вы можете использовать любое имя вместо «kwargs», но общепринято использовать именно «kwargs».
Пример:
def print_kwargs(**kwargs):
for key, value in kwargs.items():
print(f"{key}: {value}")
print_kwargs(name="John", age=30, city="New York") # Выводит: name: John, age: 30, city: New York
Используя *args и **kwargs, вы можете создавать более гибкие функции, которые могут обрабатывать разные наборы аргументов.
Давайте также создадим сразу метод json(), чтобы возвращать объект в виде словаря, он понадобится нам, например, для api или удобного вывода в консоль.
def json(self):
attributes = {}
for key, value in vars(self).items():
if not key.startswith("__") and not callable(value):#проверка на системные методы и поля
attributes[key] = value
return attributes
В данный момент мы уже можем пользоваться моделями, но пока что без базы данных:
class Box(models.Model):
name = TextField()
width = IntegerField()
height = IntegerField()
print(Box('Box 1', 1, 1).json())#выведет {'name': 'Box 1', 'width': 1, 'height': 1}
▍ Добавляем sqlite3
Для использования базы данных я хочу, чтобы каждый объект имел поле objects — менеджер объектов, через который мы и будем обращаться к sqlite.
Например, так:
Box.objects.add(Box('BOX 1', 1, 1))
Box.objects.get(name='BOX 1')
Box.objects.filter(width=1, height=1)
Box.objects.delete(name="BOX 1")
Мы хотим, чтобы каждый объект имел это поле, но при этом поведение различалось в зависимости от модели объекта. Для решения этого создадим прокси-класс, который определяет модель и возвращает нужный менеджер.
class ProxyObjects:
def __get__(self, instance, owner):
return Object(owner)
Про __get__
__get__ — это метод в Python, который используется для определения поведения при доступе к атрибуту объекта. Он является частью протокола дескрипторов в Python и позволяет объектам контролировать доступ к своим атрибутам.
__get__ определяется внутри класса, который также может иметь методы __set__ и __delete__, если необходимо управлять операциями присваивания и удаления атрибутов.
Пример использования __get__:
В этом примере MyDescriptor является дескриптором, который определяет поведение при доступе к атрибуту my_attribute класса MyClass. Метод __get__ определяет, что происходит при чтении значения этого атрибута через экземпляр obj.
__get__ определяется внутри класса, который также может иметь методы __set__ и __delete__, если необходимо управлять операциями присваивания и удаления атрибутов.
Пример использования __get__:
class MyDescriptor:
def __get__(self, instance, owner):
if instance is None:
# Если доступ к дескриптору осуществляется через класс, а не через экземпляр,
# то instance будет равен None, и мы можем вернуть сам дескриптор или другое значение.
return self
else:
# В этом случае instance - это экземпляр объекта, owner - это класс, к которому относится атрибут.
# Мы можем вернуть значение атрибута или выполнить другие действия при доступе к нему.
return instance._value
class MyClass:
def __init__(self, value):
self._value = value
# Используем дескриптор MyDescriptor для атрибута 'my_attribute'
my_attribute = MyDescriptor()
# Создаём экземпляр класса
obj = MyClass(42)
# Доступ к атрибуту 'my_attribute' будет вызывать метод __get__ дескриптора MyDescriptor
print(obj.my_attribute) # Выведет: 42
В этом примере MyDescriptor является дескриптором, который определяет поведение при доступе к атрибуту my_attribute класса MyClass. Метод __get__ определяет, что происходит при чтении значения этого атрибута через экземпляр obj.
Первое, что нужно сделать для работы с базой данных — создать таблицу. Для этого пишем метод:
import sqlite3 # Необходимо импортировать библиотеку для работы с SQLite
class Object:
def __init__(self, object_type: type):
# Конструктор класса принимает тип объекта (класс) и сохраняет его в атрибуте object_type.
self.object_type = object_type
def __createTable__(self):
# Метод для создания таблицы в базе данных, основанной на атрибутах класса object_type.
# Устанавливаем соединение с базой данных
conn = sqlite3.connect(db_name)
cursor = conn.cursor()
# Создаём список custom_fields для хранения определений полей таблицы.
custom_fields = []
# Проходимся по атрибутам класса object_type и извлекаем информацию о полях.
for key, value in vars(self.object_type).items():
if not key.startswith("__") and not callable(value):
field_name = key
field_type = value.field_type
is_unique = value.unique
is_null = value.null
default_value = value.default
# Создаём строку с определением поля и добавляем её в список custom_fields.
field_declaration = [f'"{field_name}" {field_type}']
if is_unique:
field_declaration.append('UNIQUE')
if not is_null:
field_declaration.append('NOT NULL')
if default_value is not None:
field_declaration.append(f'DEFAULT {default_value}')
custom_fields.append(' '.join(field_declaration))
# Создаём SQL-запрос для создания таблицы с определёнными полями.
create_table_sql = f'''
CREATE TABLE IF NOT EXISTS {self.object_type.__name__} (
{", ".join(custom_fields)}
);
'''
# Выполняем SQL-запрос.
cursor.execute(create_table_sql)
# Фиксируем изменения и закрываем соединение с базой данных.
conn.commit()
conn.close()
Создавать таблицу нужно для каждой пользовательской модели — для этого удобно использовать простой декоратор для класса:
def simple_orm(class_: type):
EXTERN_TYPES[class_.__name__] = class_ # Сохраняем в словарь моделей
class_.objects.__createTable__() # Создаём таблицу в бд
return class_
Про декораторы
Декоратор в Python — это функция, которая принимает другую функцию и добавляет к её поведению какое-то дополнительное функциональное или метаинформационное украшение, не изменяя саму функцию. Декораторы позволяют изменять или расширять поведение функций или методов, не модифицируя их код. Они являются мощным инструментом для реализации множества различных задач в Python.
Декораторы используются с помощью символа "@" перед определением функции, которую они декорируют. Вот пример использования декоратора:
В этом примере my_decorator — это декоратор, который добавляет вывод текста до и после вызова функции say_hello. Затем декоратор @my_decorator применяется к функции say_hello, и при вызове say_hello() будет выполнено дополнительное действие, предусмотренное декоратором.
Декораторы часто используются для следующих задач:
Python предоставляет множество встроенных декораторов, таких как @staticmethod, @classmethod, @property и другие, а также вы можете создавать свои собственные декораторы в соответствии с вашими потребностями.
Декораторы используются с помощью символа "@" перед определением функции, которую они декорируют. Вот пример использования декоратора:
def my_decorator(func):
def wrapper():
print("Что-то происходит перед вызовом функции")
func()
print("Что-то происходит после вызова функции")
return wrapper
@my_decorator
def say_hello():
print("Привет, мир!")
say_hello()
В этом примере my_decorator — это декоратор, который добавляет вывод текста до и после вызова функции say_hello. Затем декоратор @my_decorator применяется к функции say_hello, и при вызове say_hello() будет выполнено дополнительное действие, предусмотренное декоратором.
Декораторы часто используются для следующих задач:
- Логирования: Запись логов для функций или методов.
- Аутентификации: Проверка прав доступа перед вызовом функции.
- Кэширования: Сохранение результатов функции для ускорения будущих вызовов с теми же аргументами.
- Измерения времени выполнения: Оценка производительности функции.
- Модификации поведения: Изменение или расширение функциональности функции.
Python предоставляет множество встроенных декораторов, таких как @staticmethod, @classmethod, @property и другие, а также вы можете создавать свои собственные декораторы в соответствии с вашими потребностями.
Теперь при инициализации модели в бд создаётся таблица с соответствующими полями:

▍ JsonField
В sqlite3 нет JsonField по умолчанию, так что мы реализуем его на основе текста. Для начала добавляем его в базовые типы:
class JsonField(BaseType):
field_type = 'JSON'
Json, по сути, ведёт себя, как текст, за исключением того, что при создании и изменении надо использовать json.dumps(), а при получении — json.loads().
▍ ForeignKey
В реляционных базах данных, «foreign key» (внешний ключ) — это структурный элемент, который используется для установления связей между двумя таблицами. Внешний ключ представляет собой один или несколько столбцов в одной таблице, которые связаны с первичным ключом (обычно) в другой таблице. Эта связь позволяет базе данных поддерживать целостность данных и обеспечивать связи между данными в разных таблицах.
ForeignKey должен возвращать объект заданного типа по значению заданного поля.
class ForeignKey(BaseType):
field_type = 'FOREIGN_KEY'
def __init__(self, object_class: type, foreign_field: str, unique: bool = False,
null: bool = True, default=None):
self.object_class = object_class,
self.foreign_field = foreign_field,
self.unique = unique
self.null = null
self.default = default
Я реализую его просто, как json объект с параметрами type, key, value, например:
{"type": "Box", "key": "name", "value": "BOX 1"}.
Теперь мы с помощью него можем делать так:
@simple_orm
class Box(models.Model):
name = TextField()
width = IntegerField()
height = IntegerField()
@simple_orm
class Circle(models.Model):
box = ForeignKey(object_class=Box, foreign_field='name')
name = TextField()
radius = IntegerField()
data = JsonField()
box = Box.objects.add(Box('BOX 1', 1, 1))
circle = Circle.objects.add(Circle(box, "CIRCLE 1", 5, {'data': 5}))
print(circle.json()) #{'box': <__main__.Box object at 0x7f7637f6d850>, 'name': 'CIRCLE 1', 'radius': 5, 'data': {'data': 5}}
▍ Добавляем CRUD
Полный код Object
class Object:
def __init__(self, object_type):
self.object_type = object_type
def add(self, obj):
conn = sqlite3.connect(db_name)
cursor = conn.cursor()
d = copy.copy(obj.__dict__)
object_type_name = self.object_type.__name__
for key, value in vars(self.object_type).items():
if not key.startswith("__") and not callable(value):
if type(value) in BASIC_TYPES:
continue
if type(value) == JsonField:
d[key] = json.dumps(d[key])
if type(value) == ForeignKey:
d[key] = json.dumps({'type': value.object_class[0].__name__, 'key': value.foreign_field[0], 'value': getattr(d[key], value.foreign_field[0])})
insert_sql = f'INSERT INTO {object_type_name} ({", ".join(obj.__dict__.keys())}) VALUES ({", ".join(["?"] * len(obj.__dict__))});'
values = tuple(d.values())
cursor.execute(insert_sql, values)
conn.commit()
conn.close()
return obj
def save(self, obj):
conn = sqlite3.connect(db_name)
cursor = conn.cursor()
d = copy.copy(obj.__dict__)
object_type_name = self.object_type.__name__
for key, value in vars(self.object_type).items():
if not key.startswith("__") and not callable(value):
if type(value) in BASIC_TYPES:
continue
if type(value) == JsonField:
d[key] = json.dumps(d[key])
if type(value) == ForeignKey:
d[key] = json.dumps({'type': value.object_class[0].__name__, 'key': value.foreign_field[0], 'value': getattr(d[key], value.foreign_field[0])})
upsert_sql = f'INSERT OR REPLACE INTO {object_type_name} ({", ".join(obj.__dict__.keys())}) VALUES ({", ".join(["?"] * len(obj.__dict__))});'
values = tuple(d.values())
cursor.execute(upsert_sql, values)
conn.commit()
conn.close()
return obj
def get(self, **kwargs):
conn = sqlite3.connect(db_name)
cursor = conn.cursor()
object_type_name = self.object_type.__name__
attr_value_pairs = [(attr, value) for attr, value in kwargs.items()]
where_clauses = [f'{attr} = ?' for attr, _ in attr_value_pairs]
where_clause = ' AND '.join(where_clauses)
select_by_attrs_sql = f'SELECT * FROM {object_type_name} WHERE {where_clause};'
values = tuple(value for _, value in attr_value_pairs)
cursor.execute(select_by_attrs_sql, values)
row = cursor.fetchone()
conn.close()
if row:
obj = self.object_type()
for i, value in enumerate(row):
if type(getattr(obj, cursor.description[i][0])) == JsonField:
setattr(obj, cursor.description[i][0], json.loads(value))
elif type(getattr(obj, cursor.description[i][0])) == ForeignKey:
value = json.loads(value)
setattr(obj, cursor.description[i][0], EXTERN_TYPES[value['type']].objects.get(**{value['key']: value['value']}))
else:
setattr(obj, cursor.description[i][0], value)
return obj
else:
return None
def delete(self, **kwargs):
conn = sqlite3.connect(db_name)
cursor = conn.cursor()
object_type_name = self.object_type.__name__
attr_value_pairs = [(attr, value) for attr, value in kwargs.items()]
where_clauses = [f'{attr} = ?' for attr, _ in attr_value_pairs]
where_clause = ' AND '.join(where_clauses)
delete_by_attrs_sql = f'DELETE FROM {object_type_name} WHERE {where_clause};'
values = tuple(value for _, value in attr_value_pairs)
cursor.execute(delete_by_attrs_sql, values)
conn.commit()
conn.close()
def filter(self, **kwargs):
conn = sqlite3.connect(db_name)
cursor = conn.cursor()
object_type_name = self.object_type.__name__
attr_value_pairs = [(attr, value) for attr, value in kwargs.items()]
where_clauses = [f'{attr} = ?' for attr, _ in attr_value_pairs]
where_clause = ' AND '.join(where_clauses)
select_by_attrs_sql = f'SELECT * FROM {object_type_name} WHERE {where_clause};'
values = tuple(value for _, value in attr_value_pairs)
cursor.execute(select_by_attrs_sql, values)
rows = cursor.fetchall()
conn.close()
objects = []
for row in rows:
obj = self.object_type()
for i, value in enumerate(row):
if type(getattr(obj, cursor.description[i][0])) == JsonField:
setattr(obj, cursor.description[i][0], json.loads(value))
elif type(getattr(obj, cursor.description[i][0])) == ForeignKey:
value = json.loads(value)
setattr(obj, cursor.description[i][0],
EXTERN_TYPES[value['type']].objects.get(**{value['key']: value['value']}))
else:
setattr(obj, cursor.description[i][0], value)
objects.append(obj)
return objects
def all(self):
conn = sqlite3.connect(db_name)
cursor = conn.cursor()
object_type_name = self.object_type.__name__
select_all_sql = f'SELECT * FROM {object_type_name};'
cursor.execute(select_all_sql)
rows = cursor.fetchall()
conn.close()
objects = []
for row in rows:
obj = self.object_type()
for i, value in enumerate(row):
if type(getattr(obj, cursor.description[i][0])) == JsonField:
setattr(obj, cursor.description[i][0], json.loads(value))
elif type(getattr(obj, cursor.description[i][0])) == ForeignKey:
value = json.loads(value)
setattr(obj, cursor.description[i][0],
EXTERN_TYPES[value['type']].objects.get(**{value['key']: value['value']}))
else:
setattr(obj, cursor.description[i][0], value)
objects.append(obj)
return objects
def __createTable__(self):
conn = sqlite3.connect(db_name)
cursor = conn.cursor()
custom_fields = []
for key, value in vars(self.object_type).items():
if not key.startswith("__") and not callable(value):
field_name = key
field_type = value.field_type
is_unique = value.unique
is_null = value.null
default_value = value.default
if value.field_type == 'FOREIGN_KEY':
field_type = "TEXT"
if value.field_type == 'JSON':
field_type = 'TEXT'
field_declaration = [f'"{field_name}" {field_type}']
if is_unique:
field_declaration.append('UNIQUE')
if not is_null:
field_declaration.append('NOT NULL')
if default_value is not None:
field_declaration.append(f'DEFAULT {default_value}')
custom_fields.append(' '.join(field_declaration))
create_table_sql = f'''
CREATE TABLE IF NOT EXISTS {self.object_type.__name__} (
{", ".join(custom_fields)}
);
'''
cursor.execute(create_table_sql)
conn.commit()
conn.close()
▍ Список объектов
Сейчас функции all() и filter() возвращают list состоящий из объектов. Это неудобно, ведь нельзя, например, удалить все объекты. Исправим это, добавив класс ListOfObjects:
class ListOfObjects:
def __init__(self, objects):
self.objects = objects
def filter(self, **kwargs):
filtered_objects = []
for obj in self.objects:
if all(getattr(obj, attr, None) == value for attr, value in kwargs.items()):
filtered_objects.append(obj)
return ListOfObjects(filtered_objects)
def delete(self):
for obj in self.objects:
obj.delete()
def json(self):
object_dicts = [obj.json() for obj in self.objects]
return object_dicts
▍ Примеры
models.py
import simple_orm.models as models
from simple_orm.models import IntegerField, TextField, ForeignKey, JsonField
from simple_orm.models import simple_orm
from pathlib import Path
import sys
PATH = Path(__file__).absolute().parent.parent
sys.path.append(str(PATH))
@simple_orm
class Box(models.Model):
name = TextField()
width = IntegerField()
height = IntegerField()
@simple_orm
class Circle(models.Model):
box = ForeignKey(object_class=Box, foreign_field='name')
name = TextField()
radius = IntegerField()
data = JsonField()
main.py
from models import Box, Circle
box = Box.objects.add(Box('BOX 1', 1, 1))
circle = Circle.objects.add(Circle(box, "CIRCLE 1", 5, {'data': 5}))
print(circle.json())
print(Box.objects.filter(width=1, height=1).json())
print(Circle.objects.get(name="CIRCLE 1").json())
Box.objects.delete(name="BOX 1")
print(Box.objects.all().json())
▍ Заключение
В процессе создания своего ORM мы использовали много сложных инструментов языка python, которые помогли написать короткий и красивый код, решающий довольно сложную задачу.
Да, он не идеальный, вы можете предложить его улучшения в комментариях.
Полный код есть на https://github.com/leo-need-more-coffee/simple-orm
Библиотека для python: https://pypi.org/project/sqlite3-simple-orm
Скачать с помощью pip: pip install sqlite3-simple-orm
Узнавайте о новых акциях и промокодах первыми из нашего Telegram-канала ?
