Я прочитал статью, и меня поразило, сколько сомнительных решений можно использовать для одной простой задачи.
В этой статье я расскажу, как правильно создать сервис для конкурентных обновлений остатков данных в базе данных. Буду использовать .NET, C#, Entity Framework и Postgres.
Условия
В базе данных хранятся остаток товаров и количество зарезервированных товаров по каждой паре (товар, склад)
Инвариант системы - для всех пар (товар, склад) количество зарезервированных товаров должно быть не больше остатка
Должен быть метод размещения заказа (резерва), который:
Получает
Id
заказа и массив строк заказа (Id товара
,Id склада
,количество
)Данные о заказе и позициях заказа сохраняет в базе данных, обновляет резервы
Для каждой позиции заказа обновляет количество зарезервированных товаров = количество зарезервированных товаров + количество в заказе
Работает транзакционно - выполняется полностью или не выполняется вовсе, не нарушает инварианты, не влияет на другие операции и другие операции не влияют на результат, если вернул положительный ответ, то данные уже не потеряются (ACID)
Система не должна падать с ошибками под нагрузкой
Каркас приложения
Для создания приложения выполняю команды
dotnet new webapi
dotnet add package Microsoft.EntityFrameworkCore
dotnet add package Microsoft.EntityFrameworkCore.Design
Я буду использовать Entity Framework, так как хочу создавать схему в коде и использовать миграции для обновления.
Модель
[PrimaryKey(nameof(ItemId), nameof(WarehouseId))]
public class Stock
{
public int ItemId { get; set; }
public int WarehouseId { get; set; }
public int Quantity { get; set; }
public int Reserved { get; set; }
}
[JsonObjectCreationHandling(JsonObjectCreationHandling.Populate)]
public class Order
{
[Key]
public Guid Id { get; set; }
public List<OrderLine> Lines { get; } = new List<OrderLine>();
}
[PrimaryKey(nameof(OrderId), nameof(ItemId), nameof(WarehouseId))]
public class OrderLine
{
[JsonIgnore]
public Guid OrderId { get; set; }
public int ItemId { get; set; }
public int WarehouseId { get; set; }
public int Quantity { get; set; }
}
По умолчанию применяем минимум четвертую нормальную форму, чтобы не требовалось массовых операций при добавлении\изменении\удалении одного элемента модели.
Первичные ключи гарантируют уникальность, то есть нельзя будет дважды вставить один заказ или сделать две строки заказа с одним и тем же товаром и складом. Кроме того все базы данных при назначении ключей создают индексы по ключевым полям, что может ускорить поиск.
Класс контекста
public class StockApiDataContext(DbContextOptions<StockApiDataContext> options) : DbContext(options)
{
public DbSet<Order> Orders { get; set; } = null!;
public DbSet<OrderLine> OrderLines { get; set; } = null!;
public DbSet<Stock> Stock { get; set; } = null!;
protected override void OnModelCreating(ModelBuilder modelBuilder)
{
var tableBuilder = modelBuilder.Entity<Stock>();
tableBuilder.Property(s => s.Reserved).HasDefaultValue(0);
}
}
Значение по умолчанию для поля Reserved нужно только для более удобного наполнения тестовыми данными.
База данных
В качестве сервера баз данных я буду использовать docker-контейнер, запущенный локально
docker run -d \
--name postgres \
-p 5432:5432 \
-e POSTGRES_PASSWORD=P@ssw0rd \
postgres
В проект добавлю пакеты для работы с Postgres
dotnet add package Npgsql.EntityFrameworkCore.PostgreSQL
dotnet add package EFCore.NamingConventions
Последний пакет дает возможность изменить соглашение именования объектов в базе данных с PascalCase
на snake_case
, что позволяет писать запросы к postgres без экранирования имен.
Для создания миграций установлю инструменты
dotnet tool install --global dotnet-ef
Код приложения для подключения к базе данных и применения миграций
var builder = WebApplication.CreateBuilder(args);
// ...
builder.Services.AddDbContext<StockApiDataContext>(opt =>
opt.UseNpgsql(builder.Configuration.GetConnectionString("DefaultConnection"))
.UseSnakeCaseNamingConvention()
);
// ...
var app = builder.Build();
//...
using (var scope = app.Services.CreateScope())
{
var db = scope.ServiceProvider.GetRequiredService<StockApiDataContext>();
await db.Database.MigrateAsync();
}
Теперь можно создать миграции
dotnet ef migrations add Initial
Код сервиса
app.MapPost("/place-order",
async (Order order,
StockApiDataContext ctx,
CancellationToken ct) =>
{
var lines = from l in order.Lines
group l by new { l.ItemId, l.WarehouseId } into g
select new OrderLine()
{
ItemId = g.Key.ItemId,
WarehouseId = g.Key.WarehouseId,
Quantity = g.Aggregate(0, (s, l) => s + l.Quantity)
};
lines = lines.ToArray();
order.Lines.Clear();
order.Lines.AddRange(lines);
ctx.Orders.Add(order);
await ctx.SaveChangesAsync(ct);
})
Данные сущности Order ��аполняются с помощью привязки значений, при этом поле
Order.Lines
не имеет сеттера, но указав атрибутJsonObjectCreationHandling(JsonObjectCreationHandling.Populate)
, можно заполнять значениями существующие объекты, а не создавать новые.В запросе может прийти множество строк заказа с одинаковыми
ItemId
иWarehouseId
, они склеиваются в одну, суммируя количество.
Код только сохраняет заказы в базу, не обновляет и не проверяет остатки. Используем его для получения бейзлайна по быстродействию.
Тест быстродействия
Использую k6 от графаны
winget install k6 --source winget
Код теста
import http from 'k6/http';
function rand(min, max) {
return Math.round(Math.random() * (max - min) + min);
}
const itemsCount = 10;
const warehousesCount = 10;
export const options = {
iterations: itemsCount * warehousesCount * 300,
vus: 50,
};
export default function () {
const url = __ENV.applicationUrl ?? 'http://localhost:5011';
const params = {
headers: {
'Content-Type': 'application/json',
},
};
const count = rand(1, itemsCount);
let lines = [];
for (let index = 0; index < count; index++) {
lines.push({
itemId: rand(1, itemsCount),
warehouseId: rand(1, warehousesCount),
quantity: rand(1, 5)
})
}
const payload = JSON.stringify({ id: crypto.randomUUID(), lines: lines });
http.post(`${url}/place-order`, payload, params);
}
Тест генерирует запрос в котором от 1 до 10 позиций, в которых случайные id для товаров и складов в диапазоне от 1 до 10 и от 1 до 5 единиц товара в каждой позиции.
Тест выполняется параллельно в 50 потоков
Общее количество итераций = 300 х 10 х 10 = 30 000
Так как количество строк заказа будет варьироваться между тестами, то и время забега будет немного отличаться, поэтому я буду делать несколько замеров и выкладывать средний.
Нагрузочный тест
Запуск приложения
dotnet run -c Release -- Logging:LogLevel:Microsoft.EntityFrameworkCore.Database.Command="Error"
По умолчанию ef core все запросы логирует в консоль, чтобы консоль не искажала результаты замера быстродействие я оставил вывод в консоль только ошибок.
Перед каждым забегом я сбрасываю состояние базы данных sql-скриптом
TRUNCATE TABLE orders CASCADE;
TRUNCATE TABLE stock;
INSERT INTO stock
select *, random(10000, 100000),0 as quantity from generate_series(1,10) item_id, generate_series(1,10) warehouse_id;
Запуск теста (в другом терминале)
k6 run k6test.js
Результат
HTTP
http_req_duration..............: avg=16.03ms min=4.85ms med=14.86ms max=385.25ms p(90)=20.7ms p(95)=23.46ms
{ expected_response:true }...: avg=16.03ms min=4.85ms med=14.86ms max=385.25ms p(90)=20.7ms p(95)=23.46ms
http_req_failed................: 0.00% 0 out of 30000
http_reqs......................: 30000 3074.002522/s
Это бейзлайн, относительно которого я буду оценивать быстродействие.
Код смотреть тут: https://github.com/gandjustas/habr-post-stock-api/tree/initial
Обновление остатков
Добавлю логику обновления остатков на складах, пока без проверки
app.MapPost("/place-order/",
async (Order order,
StockApiDataContext ctx,
CancellationToken ct) =>
{
var lines = from l in order.Lines
group l by new { l.ItemId, l.WarehouseId } into g
select new OrderLine()
{
ItemId = g.Key.ItemId,
WarehouseId = g.Key.WarehouseId,
Quantity = g.Aggregate(0, (s, l) => s + l.Quantity)
};
lines = lines.ToArray();
order.Lines.Clear();
order.Lines.AddRange(lines);
await using var t = await ctx.Database.BeginTransactionAsync(ct);
ctx.Orders.Add(order);
await ctx.SaveChangesAsync(ct);
foreach (var l in order.Lines)
{
await ctx.Stock
.Where(s => s.ItemId == l.ItemId
&& s.WarehouseId == l.WarehouseId)
.ExecuteUpdateAsync(setter =>
setter.SetProperty(s => s.Reserved,
s => s.Reserved + l.Quantity),
ct);
}
await t.CommitAsync(ct);
})
И сразу же получаем взаимоблокировки (deadlock) при нагрузочном тесте и сыпятся ошибки в консоли.
Почему происходит deadlock
База данных имеет по умолчанию уровень изоляции read committed, то есть не может прочитать данные, которые были изменены, но еще не закоммичены другой транзакцией.
Для этого Postgres (и другие СУБД) накладывает блокировку на запись строки и снимает её только в конце транзакции.
Когда параллельных транзакций больше одной и они меняют больше одной строки, то может случиться так:
Транзакция А успела заблокировать строку 1
Транзакция Б успела заблокирова��ь строку 2
Транзакция А повисла в ожидании снятия блокировки строки 2
Транзакция Б повисла в ожидании снятия блокировки строки 1
Ситуация, описанная выше, называется взаимоблокировка (deadlock). Postgres (и другие СУБД) определяет, что появилась такая блокировка и отменяет одну из заблокированных транзакций, чтобы другие могли выполняться.
Как бороться со взаимоблокировками
Простое правило:
Все транзакции должны накладывать блокировки в одном и том же порядке
Для этого просто отсортируем позиции по Id товара и склада
// ...
foreach (var l in order.Lines.OrderBy(l => l.ItemId).ThenBy(l => l.WarehouseId))
// ...
Результаты забега
HTTP
http_req_duration..............: avg=92.86ms min=4.45ms med=23.4ms max=6.03s p(90)=186.04ms p(95)=390.5ms
{ expected_response:true }...: avg=92.86ms min=4.45ms med=23.4ms max=6.03s p(90)=186.04ms p(95)=390.5ms
http_req_failed................: 0.00% 0 out of 30000
http_reqs......................: 30000 536.144972/s
Из-за ожиданий блокировок некоторые транзакции висят по несколько секунд. Пропускная способность упала почти в 6 раз.
Код смотреть тут: https://github.com/gandjustas/habr-post-stock-api/tree/stock-update-loop
Обновление остатков одним запросом
Предыдущий вариант плох тем, что мы очень много раз бегаем в базу, отправляя отдельные запросы на обновление остатков, хотя саму логику обновления мы можем сделать одним запросом
app.MapPost("/place-order/",
async (Order order,
StockApiDataContext ctx,
CancellationToken ct) =>
{
var lines = from l in order.Lines
group l by new { l.ItemId, l.WarehouseId } into g
select new OrderLine()
{
ItemId = g.Key.ItemId,
WarehouseId = g.Key.WarehouseId,
Quantity = g.Aggregate(0, (s, l) => s + l.Quantity)
};
lines = lines.ToArray();
order.Lines.Clear();
order.Lines.AddRange(lines);
await using var t = await ctx.Database.BeginTransactionAsync(ct);
ctx.Orders.Add(order);
await ctx.SaveChangesAsync(ct);
var q = from l in ctx.OrderLines
where l.OrderId == order.Id
join s in ctx.Stock
on new { l.ItemId, l.WarehouseId }
equals new { s.ItemId, s.WarehouseId }
select new { s, l };
await q.ExecuteUpdateAsync(setter =>
setter.SetProperty(x => x.s.Reserved,
x => x.s.Reserved + x.l.Quantity),
ct);
await t.CommitAsync(ct);
})
К сожалению такой код будет вызывать deadlock, так как Postgres не гарантирует в каком порядке будут накладываться блокировки на строки. Это зависит от порядка записей на диске, а порядок записей на диске зависит от предыдущих операций обновления.
Результаты забега при появлении deadlock
HTTP
http_req_duration..............: avg=87.22ms min=4.28ms med=19.75ms max=13.26s p(90)=160.89ms p(95)=326.21ms
{ expected_response:true }...: avg=86.34ms min=4.28ms med=19.74ms max=13.26s p(90)=160.48ms p(95)=321.98ms
http_req_failed................: 0.05% 15 out of 30000
http_reqs......................: 30000 571.569136/s
Результаты забега когда взаимоблокировки не возникают
HTTP
http_req_duration..............: avg=69.83ms min=4.15ms med=19.94ms max=5.32s p(90)=155.74ms p(95)=280.21ms
{ expected_response:true }...: avg=69.83ms min=4.15ms med=19.94ms max=5.32s p(90)=155.74ms p(95)=280.21ms
http_req_failed................: 0.00% 0 out of 30000
http_reqs......................: 30000 713.321511/s
Это значительно быстрее, чем обновление остатков с цикле, но нужно как-то бороться с deadlock_ами.
Код смотреть тут: https://github.com/gandjustas/habr-post-stock-api/tree/stock-check-query
Что делать с deadlock
Если вы не можете обеспечить одинаковый порядок блокирования ресурсов, то в общем случае есть две стратегии: выбор более строгого режима изоляции транзакций и увеличение гранулярности блокировок.
Но в Postgres увеличение уровня изоляции до repeatable read или serializable приведет к тому, что ошибок станет еще больше, так как postgres откатывает транзакции при нарушении изоляции.
Увеличение гранулярности может помочь, но:
Это приведет к уменьшению параллельности запросов, вплоть до полостью последовательного выполнения, что снизит пропускную способность
Потребует руками писать SQL-код, так как у EF Core нет методов для ручного управления блокировками
Поэтому лучше смириться с тем, что часть запросов будут отваливаться и просто повторять их. Тем более в EF Core встроен механизм отказоустойчивости, он автоматически повторяет запросы, если считает что ошибка некритична.
Такой механизм полезен не только для Postgres, но и других баз данных, так как они тоже могут выдавать ошибки взаимоблокировок и другие временные (transient) ошибки.
Повторение запросов
Для повторения запросов надо это повторение включить
// ...
builder.Services.AddDbContext<StockApiDataContext>(opt =>
opt.UseNpgsql(builder.Configuration.GetConnectionString("DefaultConnection"),
npgOptions => npgOptions.EnableRetryOnFailure()) // включить повторение запросов
.UseSnakeCaseNamingConvention()
);
// ...
А также нужно использовать "стратегию" в коде, чтобы группа запросов повторялась в одной транзакции
app.MapPost("/place-order/",
async (Order order,
StockApiDataContext ctx,
CancellationToken ct) =>
{
var lines = from l in order.Lines
group l by new { l.ItemId, l.WarehouseId } into g
orderby g.Key.ItemId, g.Key.WarehouseId // добавлено
select new OrderLine()
{
ItemId = g.Key.ItemId,
WarehouseId = g.Key.WarehouseId,
Quantity = g.Aggregate(0, (s, l) => s + l.Quantity)
};
lines = lines.ToArray();
order.Lines.Clear();
order.Lines.AddRange(lines);
var db = ctx.Database;
await db.CreateExecutionStrategy().ExecuteInTransactionAsync(async ct =>
{
ctx.Orders.Add(order);
await ctx.SaveChangesAsync(ct);
var q = from l in ctx.OrderLines
where l.OrderId == order.Id
join s in ctx.Stock
on new { l.ItemId, l.WarehouseId }
equals new { s.ItemId, s.WarehouseId }
select new { s, l };
await q.ExecuteUpdateAsync(setter =>
setter.SetProperty(x => x.s.Reserved,
x => x.s.Reserved + x.l.Quantity),
ct);
}, ct => Task.FromResult(false), ct);
})
Также добавил сортировку строк по ItemId
и WarehouseId
перед добавлением в базу данных, чтобы сократить количество взаимоблокировок.
Результаты забега
HTTP
http_req_duration..............: avg=74.72ms min=4.56ms med=19.61ms max=6.32s p(90)=146.08ms p(95)=284.93ms
{ expected_response:true }...: avg=74.72ms min=4.56ms med=19.61ms max=6.32s p(90)=146.08ms p(95)=284.93ms
http_req_failed................: 0.00% 0 out of 30000
http_reqs......................: 30000 666.887768/s
Код смотреть тут: https://github.com/gandjustas/habr-post-stock-api/tree/stock-update-query-retry
Проверка остатков
Для этого достаточно добавить одну строку в код выше
await q.ExecuteUpdateAsync(/* прощено */);
// Проверка остатков
if (await q.AnyAsync(x =>
x.s.Quantity < x.s.Reserved,
ct)) throw new Exception("Oversell");
Так как предыдущий код, обновляющий остатки, уже навешивает блокировки на все строки, то проверка этих же строк будет изолирована от других транзакций.
Если инвариант будет нарушен, но EF Core не будет повторять запрос, так как такая ошибка не считается "временной".
Результаты забега
HTTP
http_req_duration..............: avg=79.47ms min=4.68ms med=21.88ms max=4.36s p(90)=168.33ms p(95)=320.29ms
{ expected_response:true }...: avg=79.47ms min=4.68ms med=21.88ms max=4.36s p(90)=168.33ms p(95)=320.29ms
http_req_failed................: 0.00% 0 out of 30000
http_reqs......................: 30000 627.168904/s
Нагрузочный тест делается в условиях когда запасы заведомо превышают все резервы заказов, так как ошибки сильно влияют на время выполнения запросов.
Код смотреть тут: https://github.com/gandjustas/habr-post-stock-api/tree/stock-check-query
Итого
Вся необходимая логика реализована, без ручных блокировок и на уровне изоляции read committed
В каждом запросе среднем 5 позиций заказа, получаем (627.168904*5)=3 135.84452 обновлений остатков в секунду
В тесте всего 10 разных товаров, на один товар приходится (3 135.84452/10)=313.584452, округленно 314 обновлений остатков в секунду
Это на домашнем компе, с Postgres в docker в wsl
Во время теста показатели скорости записи на диск не превышали 5 мб\сек
Мне кажется что 600 заказов в секунду и 300 резервов на один товар это вполне достаточная производительность для любого маркетплейса.
Логика в базе данных
По житейской логике заказ всегда должен создаваться вместе с резервом товаров по этому заказу. Поэтому нет необходимости каждый раз из приложения отправлять одни и те же запросы на обновление и контроль остатков. В базе данных есть средства делать это автоматически.
На этом месте общественность может возмутиться: "как же так, бизнес-логика в базе данных это плохо". Но плохо это, если бизнес-логика в базе данных обновляется и проверяется независимо от приложения. Тогда очень легко допустить несогласованные изменения.
К счастью для EF есть расширение, которое позволяет делать триггеры в базе данных на C#
dotnet add package Laraue.EfCoreTriggers.PostgreSql
Нужно добавить расширение к контексту
builder.Services.AddDbContext<StockApiDataContext>(opt =>
opt.UseNpgsql(builder.Configuration.GetConnectionString("DefaultConnection"),
npgOptions => npgOptions.EnableRetryOnFailure())
.UseSnakeCaseNamingConvention()
.UsePostgreSqlTriggers() // добавлено
);
И добавить необходимые триггеры и проверки в модель
// Изменения в классе StockApiDataContext, остальной код не меняется
protected override void OnModelCreating(ModelBuilder modelBuilder)
{
var tableBuilder = modelBuilder.Entity<Stock>();
var reservedProp = tableBuilder.Property(s => s.Reserved).HasDefaultValue(0);
var quantityProp = tableBuilder.Property(s => s.Quantity);
tableBuilder.ToTable(t =>
t.HasCheckConstraint("check_stock", $"{quantityProp.Metadata.GetColumnName()} >= {reservedProp.Metadata.GetColumnName()}"));
modelBuilder.Entity<OrderLine>()
.AfterInsert(t =>
t.Action(a =>
a.Update<Stock>(
(l, s) => s.ItemId == l.New.ItemId
&& s.WarehouseId == l.New.WarehouseId,
(l, s) => new Stock {
Reserved = s.Reserved + l.New.Quantity }
)
)
);
}
Такой код создает триггер, который на каждое добавление OrderLine
вызывает обновление таблицы Stock
. Контроль остатков выполняется обычным check constraint.
Код метода при этом упрощается
app.MapPost("/place-order/", async (Order order,
StockApiDataContext ctx,
CancellationToken ct) =>
{
var lines = from l in order.Lines
group l by new { l.ItemId, l.WarehouseId } into g
orderby g.Key.ItemId, g.Key.WarehouseId
select new OrderLine()
{
ItemId = g.Key.ItemId,
WarehouseId = g.Key.WarehouseId,
Quantity = g.Aggregate(0, (s, l) => s + l.Quantity)
};
lines = lines.ToArray();
order.Lines.Clear();
order.Lines.AddRange(lines);
ctx.Orders.Add(order);
await ctx.SaveChangesAsync(ct);
})
Результаты забега
HTTP
http_req_duration..............: avg=74.77ms min=2.66ms med=17.56ms max=5.57s p(90)=139.43ms p(95)=291.77ms
{ expected_response:true }...: avg=74.77ms min=2.66ms med=17.56ms max=5.57s p(90)=139.43ms p(95)=291.77ms
http_req_failed................: 0.00% 0 out of 30000
http_reqs......................: 30000 666.194313/s
Я подошел к границе Парето-эффективности
Выигрыш по сравнению с предыдущим вариантом 7-8%
Обновление логики триггеров теперь требует миграций, даже во время написания кода я потратил заметно больше времени, чем в варианте с запросами
Код все еще универсальный, для смены провайдера надо будет поменять только пакеты регистрацию в контейнере
Код смотреть тут: https://github.com/gandjustas/habr-post-stock-api/tree/trigger-and-constraint
Увеличиваем нагрузку
В k6 тесте поменяю количество виртуальных пользователей с 50 до 100
export const options = {
iterations: itemsCount * warehousesCount * 300,
vus: 100, // было 50
};
В результате получаю
HTTP
http_req_duration..............: avg=165.63ms min=3.03ms med=27.67ms max=57.48s p(90)=197.06ms p(95)=453.54ms
{ expected_response:true }...: avg=165.63ms min=3.03ms med=27.67ms max=57.48s p(90)=197.06ms p(95)=453.54ms
http_req_failed................: 0.00% 0 out of 30000
http_reqs......................: 30000 489.284481/s
В логах Postgres появляется много сообщений `FATAL: sorry, too many clients already`. Повторение запросов Entity Framework помогает и тут, повторяя запросы пока они не выполнятся. Но если увеличить количество vus до 150, то запросы начинают отваливаться по ошибке. По-умолчанию механизм повторений EF Core пытается повторить запрос 5 раз.
В Postgres по умолчанию max_connections=100
. Это число можно увеличить, но с ростом количества соединений увеличивается и потребление памяти. Теоретически верхняя граница количества подключений равна максимальному количеству открытых портов, что порядка 65 тысяч. Однако, если доступный объем памяти будет исчерпан, операционная система завершит работу Postgres. Таким образом, всегда существует предел, который может быть превышен при высокой нагрузке.
Для Microsoft SQL Server такой проблемы нет. Для Postgres можно настроить Maximum Pool Size, чтобы подключений в пуле было меньше, чем свободных подключений в postgres. Тогда попытка получить еще одно подключение зависнет в ожидании на Timeout, который по умолчанию 15 сек.
При Maximum Pool Size=80
и vus: 100
результаты такие
HTTP
http_req_duration..............: avg=223.56ms min=32.59ms med=127.14ms max=13.24s p(90)=266.08ms p(95)=470.36ms
{ expected_response:true }...: avg=223.56ms min=32.59ms med=127.14ms max=13.24s p(90)=266.08ms p(95)=470.36ms
http_req_failed................: 0.00% 0 out of 30000
http_reqs......................: 30000 666.724807/s
Поэтому далее идут оптимизации, которые вы почти никогда не будете применять на практике.
Очередь запросов к базе данных
Первое правило создания очередей: не создавай очередей (с) Джейсон Стэйтем
Очередь чтения и записи на диск, блокировки в базе данных, механизм повторов, пул соединений, ThreadPool в .net - это все уже очереди, возможно стоит настроить их параметры и не заниматься созданием своих очередей. Поищите альтернативные решения, прежде чем изобретать свою очередь.
Для начала напишу такой код, как хочу получить
app.MapPost("/place-order/",
async (Order order,
DbQueueService<StockApiDataContext> worker, // вместо StockApiDataContext ctx
CancellationToken ct) =>
{
var lines = from l in order.Lines
group l by new { l.ItemId, l.WarehouseId } into g
orderby g.Key.ItemId, g.Key.WarehouseId
select new OrderLine()
{
ItemId = g.Key.ItemId,
WarehouseId = g.Key.WarehouseId,
Quantity = g.Aggregate(0, (s, l) => s + l.Quantity)
};
lines = lines.ToArray();
order.Lines.Clear();
order.Lines.AddRange(lines);
await worker.ExecuteAsync(async (ctx, ct) => // добавлено
{
ctx.Orders.Add(order);
await ctx.SaveChangesAsync(ct);
}, ct);
})
Чтобы такой код заработал мне необходимо зарегистрировать класс в контейнере
builder.Services.Configure<DbQueueServiceOptions>(o => { });
builder.Services.AddSingleton<DbQueueService<StockApiDataContext>>();
// регистрация синглтона в качестве Hosted Service
builder.Services.AddHostedService(sp =>
sp.GetRequiredService<DbQueueService<StockApiDataContext>>());
Сам класс очереди на базе BackgroundService
internal class DbQueueService<TContext>(
IOptions<DbQueueServiceOptions> options,
IServiceProvider sp) : BackgroundService where TContext : DbContext
{
private readonly record struct QueueItem(
TaskCompletionSource Source,
Func<TContext, CancellationToken, Task> Action,
CancellationToken CancellationToken);
private Channel<QueueItem> channel = Channel.CreateUnbounded<QueueItem>(new() { SingleReader = true });
public async Task ExecuteAsync(
Func<TContext, CancellationToken, Task> action,
CancellationToken ct = default)
{
ArgumentNullException.ThrowIfNull(action);
TaskCompletionSource tcs = new();
await channel.Writer.WriteAsync(new(tcs, action, ct), ct);
await tcs.Task;
}
// From BackgroundService
protected override async Task ExecuteAsync(CancellationToken stoppingToken)
{
var reader = channel.Reader;
while (await reader.WaitToReadAsync(stoppingToken))
{
List<QueueItem> batch = new();
while (batch.Count < options.Value.MaxItemsInBatch
&& reader.TryRead(out var item)) batch.Add(item);
await ProcessBatch(sp, batch, stoppingToken);
}
}
private async Task ProcessBatch(
IServiceProvider sp,
IEnumerable<QueueItem> batch,
CancellationToken stoppingToken)
{
await using var scope = sp.CreateAsyncScope();
await using var ctx = scope.ServiceProvider.GetRequiredService<TContext>();
foreach (var item in batch)
{
if (stoppingToken.IsCancellationRequested)
{
item.Source.SetCanceled(stoppingToken);
continue;
}
var result = item.Action(ctx, item.CancellationToken);
await result.WaitAsync(item.CancellationToken)
.ConfigureAwait(ConfigureAwaitOptions.SuppressThrowing);
item.Source.SetFromTask(result);
}
}
}
internal class DbQueueServiceOptions
{
public int MaxItemsInBatch { get; set; } = 50;
}
Для организации передачи от вызывающего кода к обработчику очереди используется System.Threading.Channels.Channel, в принципе нет ни одного разумного аргумента не использовать эти классы для любых очередей
В качестве элемента очереди используется
record struct
, чтобы сократить количество аллокацийА для передачи результата вызывающему коду используется TaskCompletionSource
public ExecuteAsync
отправляет элемент в канал и возвращает Taskprotected override ExecuteAsync
ждет когда в канале будет хотя бы один элемент, получает из канала все что есть и отправляет весь батч на обработку.ProcessBatch
получает батч, создает контекст EF и в цикле выполняет функцииВсе выполняется в одном потоке, без конкурентности запросов
Результаты забега (для vus: 100)
HTTP
http_req_duration..............: avg=493.12ms min=8.26ms med=496.76ms max=676.42ms p(90)=510.17ms p(95)=514.03ms
{ expected_response:true }...: avg=493.12ms min=8.26ms med=496.76ms max=676.42ms p(90)=510.17ms p(95)=514.03ms
http_req_failed................: 0.00% 0 out of 30000
http_reqs......................: 30000 202.400329/s
Пропускная способность не зависит от количества виртуальных пользователей, но время обработки растет. Запросы приходят быстрее, чем удается их обрабатывать в один поток.
Код смотреть тут: https://github.com/gandjustas/habr-post-stock-api/tree/write-queue
Параллельная обработка батчей
Предыдущий вариант оказался очень медленным из-за обработки в одном потоке. Чтобы сделать параллельную обработку нескольких батчей достаточно поменять один метод класса очереди
protected override async Task ExecuteAsync(CancellationToken stoppingToken)
{
List<Task> tasks = new();
var reader = channel.Reader;
while (await reader.WaitToReadAsync(stoppingToken))
{
List<QueueItem> batch = new();
if (tasks.Count >= options.Value.MaxConcurrentBatches)
await Task.WhenAny(tasks.ToArray());
while (batch.Count < options.Value.MaxItemsInBatch
&& reader.TryRead(out var item)) batch.Add(item);
tasks.RemoveAll(t => t.IsCompleted);
tasks.Add(ProcessBatch(sp, batch, stoppingToken));
}
await Task.WhenAny(tasks.ToArray(), stoppingToken);
}
Теперь код не ожидает завершения обработки батча, а записывает задачу в List и продолжает цикл. И только если в List уже достаточно много задач, то мы дожидается завершения любой из них, а уже потом запускает новую.
Будет запускаться параллельно столько батчей, сколько указано в опции MaxConcurrentBatches
.
В рамках теста я вычислил оптимальный параметр по умолчанию для моей конфигурации
internal class DbQueueServiceOptions
{
public int MaxItemsInBatch { get; set; } = 50;
public int MaxConcurrentBatches { get; set; } = 15;
}
Результаты забега (для vus: 100)
HTTP
http_req_duration..............: avg=136.85ms min=3.42ms med=98.93ms max=924.03ms p(90)=303.1ms p(95)=382.81ms
{ expected_response:true }...: avg=136.85ms min=3.42ms med=98.93ms max=924.03ms p(90)=303.1ms p(95)=382.81ms
http_req_failed................: 0.00% 0 out of 30000
http_reqs......................: 30000 728.288949/s
Это еще на 9% лучше, чем результат без очереди. Кроме того снижает количество использованных соединений. В реальности, кроме запросов на создание резерва, будут еще запросы на получение остатков и они будут гораздо меньше упираться в количество соединений.
Но еще раз повторю, что в большинстве случаев вам это не нужно.
Код смотреть тут: https://github.com/gandjustas/habr-post-stock-api/tree/write-queue-with-parallel
Оптимизация хранения
То что будет дальше вам никогда не понадобится в продуктивных приложениях, даже если вы работает в большом маркетплейсе. Пример только для демонстрации возможностей, особенностей работы и расширения кругозора.
Для дальнейшей оптимизации можно сохранять данные о строках заказа в самом заказе в виде массивов.
Модель будет выглядеть так
[PrimaryKey(nameof(ItemId), nameof(WarehouseId))]
public class Stock
{
public int ItemId { get; set; }
public int WarehouseId { get; set; }
public int Quantity { get; set; }
public int Reserved { get; set; }
}
public class Order
{
[Key]
public Guid Id { get; set; }
public List<int> ItemIds { get; set; } = [];
public List<int> WarehouseIds { get; set; } = [];
public List<int> Quantities { get; set; } = [];
}
А код сервиса так
app.MapPost("/place-order/", async (OrderModel order,
DbQueueService<StockApiDataContext> worker,
CancellationToken ct) =>
{
var lines = from l in order.Lines
group l by new { l.ItemId, l.WarehouseId } into g
orderby g.Key.ItemId, g.Key.WarehouseId
select new
{
g.Key.ItemId,
g.Key.WarehouseId,
Quantity = g.Aggregate(0, (s, l) => s + l.Quantity)
};
Order dbOrder = new() { Id = order.Id };
foreach (var l in lines)
{
dbOrder.ItemIds.Add(l.ItemId);
dbOrder.WarehouseIds.Add(l.WarehouseId);
dbOrder.Quantities.Add(l.Quantity);
}
await worker.ExecuteAsync(async (ctx, ct) =>
{
ctx.Orders.Add(dbOrder);
await ctx.SaveChangesAsync(ct);
}, ct);
})
Для входных данных отдельная модель
public record OrderModel(Guid Id, ICollection<OrderLineModel> Lines);
public record OrderLineModel(int ItemId, int WarehouseId, int Quantity);
Самое важное - триггер, который должен обновить остатки. Так как невозможно такой триггер написать на C#, то придется сделать это на pgSQL
protected override void OnModelCreating(ModelBuilder modelBuilder)
{
var tableBuilder = modelBuilder.Entity<Stock>();
var reservedProp = tableBuilder.Property(s => s.Reserved).HasDefaultValue(0);
var quantityProp = tableBuilder.Property(s => s.Quantity);
tableBuilder.ToTable(t =>
t.HasCheckConstraint("check_stock", $"{quantityProp.Metadata.GetColumnName()} >= {reservedProp.Metadata.GetColumnName()}"));
modelBuilder.Entity<Order>()
.AfterInsert(t =>
t.Action(a =>
a.ExecuteRawSql("""
UPDATE stock
SET reserved = reserved + l.q
FROM unnest({0},{1},{2}) AS l(i,w,q)
WHERE (item_id,warehouse_id) = (l.i,l.w);
""",
tableRef => tableRef.New.ItemIds,
tableRef => tableRef.New.WarehouseIds,
tableRef => tableRef.New.Quantities
)
)
);
}
Ключевое - функция unnest, которая преобразует массивы в строки.
Но такой код, к сожалению, теряет обновления остатков.
Если после теста выполнить запрос, который разворачивает все массивы из заказов и группирует по item_id
и warehouse_id
и сравним с количеством зарезервированных товаров в таблице stock
, то такой запрос вернет много несовпадений
select s.*, t.r
from stock s
join (
select l.i, l.w, SUM(l.q) as r
from orders o,
lateral unnest(o.item_ids,o.warehouse_ids,o.quantities) as l(i,w,q)
group by 1,2) t on (s.item_id,s.warehouse_id) = (t.i,t.w)
where s.reserved <> t.r
Я не смог победить эту проблему даже ручными блокировками. С запросом ниже все равно появляются потерянные обновления.
WITH l AS (
SELECT s.ctid, l.q
FROM stock s
JOIN unnest({0},{1},{2}) AS l(i,w,q)
on (s.item_id,s.warehouse_id) = (l.i,l.w)
FOR NO KEY UPDATE
)
UPDATE stock s
SET reserved = reserved + l.q
FROM l
WHERE s.ctid = l.ctid;
Если установить MaxConcurrentBatches = 1
, то проблема пропадает, но и скорость становится неприемлемо низкой.
Также проблему решает поднятие уровня изоляции до repeatable read, но это вызывает огромное количество конфликтов и скорость также падает.
У меня есть подозрение, что это какой-то баг Postgres, но у меня не хватает знаний ни подтвердить, ни опровергнуть эту гипотезу.
Код смотреть тут: https://github.com/gandjustas/habr-post-stock-api/tree/store-lines-in-array
Обновление остатков в триггере в цикле
В одном из постов на StackOverflow (к сожалению не сохранил ссылку) увидел совет использовать цикл в триггере.
FOR x IN SELECT l.*
FROM UNNEST(NEW.item_ids,NEW.warehouse_ids,NEW.quantities) as l(i,w,q)
LOOP
UPDATE stock s
SET reserved = reserved + x.q
WHERE (s.item_id,s.warehouse_id) = (x.i,x.w);
END LOOP;
Такой триггер решает проблему без уменьшения параллелизма и без повышения уровня изоляции. Но этот триггер требует объявить переменную x
в функции триггера, а используемое расширение для EF не дает такой возможности.
Но никто не мешает непосредственно в коде миграции исправить триггер
migrationBuilder.Sql("""
CREATE FUNCTION "LC_TRIGGER_AFTER_INSERT_ORDER"() RETURNS trigger as $LC_TRIGGER_AFTER_INSERT_ORDER$
DECLARE x RECORD;
BEGIN
FOR x IN SELECT l.*
FROM UNNEST(NEW.item_ids,NEW.warehouse_ids,NEW.quantities) as l(i,w,q)
LOOP
UPDATE stock s
SET reserved = reserved + x.q
WHERE (s.item_id,s.warehouse_id) = (x.i,x.w);
END LOOP;
RETURN NEW;
END;
$LC_TRIGGER_AFTER_INSERT_ORDER$ LANGUAGE plpgsql;
CREATE TRIGGER LC_TRIGGER_AFTER_INSERT_ORDER AFTER INSERT
ON "orders"
FOR EACH ROW EXECUTE PROCEDURE "LC_TRIGGER_AFTER_INSERT_ORDER"();
""");
Теперь все работает без конфликтов и аномалий
Результаты забега (для vus: 100)
HTTP
http_req_duration..............: avg=133.57ms min=2.87ms med=97.05ms max=949.55ms p(90)=296.95ms p(95)=378.7ms
{ expected_response:true }...: avg=133.57ms min=2.87ms med=97.05ms max=949.55ms p(90)=296.95ms p(95)=378.7ms
http_req_failed................: 0.00% 0 out of 30000
http_reqs......................: 30000 746.43081/s
Код смотреть тут: https://github.com/gandjustas/habr-post-stock-api/tree/trigger-with-loop
Идемпотентность
Все предыдущие версии кода будут возвращать ошибку если попытаться создать еще один заказ с тем же Id
. Теперь я сделаю код идемпотентным, как в API сервиса, так и на уровне базы данных
app.MapPut("/place-order/{id}", async (Guid id, ICollection<OrderLineModel> lines,
DbQueueService<StockApiDataContext> worker,
CancellationToken ct) =>
{
var q = from l in lines
group l by new { l.ItemId, l.WarehouseId } into g
orderby g.Key.ItemId, g.Key.WarehouseId
select new
{
g.Key.ItemId,
g.Key.WarehouseId,
Quantity = g.Aggregate(0, (s, l) => s + l.Quantity)
};
List<int> itemIds = [], warehouseIds = [], quantities = [];
foreach (var l in q)
{
itemIds.Add(l.ItemId);
warehouseIds.Add(l.WarehouseId);
quantities.Add(l.Quantity);
}
await worker.ExecuteAsync(async (ctx, ct) =>
{
var db = ctx.Database;
await db.CreateExecutionStrategy().ExecuteInTransactionAsync(async ct =>
{
await db.ExecuteSqlAsync($"""
INSERT INTO orders
VALUES({id},{itemIds},{warehouseIds},{quantities})
ON CONFLICT (id) DO NOTHING
""", ct);
}, ct => Task.FromResult(false), ct);
}, ct);
})
PUT
вместоPOST
, что сообщает потребителю о том, что метод идемпотентенВ url для
PUT
необходимо указатьId
Для вставки строки в БД я использую запрос, который просто ничего не делает, если
Id
заказа в базе уже есть
Результаты забега (для vus: 100)
HTTP
http_req_duration..............: avg=132ms min=3.3ms med=94.79ms max=1.21s p(90)=292.3ms p(95)=378.37ms
{ expected_response:true }...: avg=132ms min=3.3ms med=94.79ms max=1.21s p(90)=292.3ms p(95)=378.37ms
http_req_failed................: 0.00% 0 out of 30000
http_reqs......................: 30000 755.053098/s
Пропускная способность на 13% выше варианта без очереди и postgres-специфичных оптимизаций, и всего на 3.7% выше варианта без оптимизаций.
Выводы
Задача обновления остатков решается на уровне изоляции read committed без блокировок высокой гранулярности.
Закон Парето для оптимизации кода выполняется - 80% достижимого быстродействия достигается при 20% усилий, оставшиеся 20% результата требуют 80% усилий. Главное знать куда прикладывать усилия.
Дедлоки и прочие временные ошибки не всегда являются проблемой, иногда они являются частью нормальной работы. Просто включите повторение запросов.
Стоит использовать Entity Framework как для управления схемой базы данных, так и для реализации запросов. Можно не использовать change tracker, а использовать EF как генератор команд и интерфейс работы с базой.
Репозиторий со всеми коммитами тут
PS. Я бы остановился на варианте с логикой в базе данных и не стал бы прикручивать очередь, ограничился настройкой параметров пула и таймаутов.