Привет! Меня зовут Артем Гаврилов и я работаю в Tarantool. Сегодня я расскажу, как быстро создать объектное хранилище на основе платформы in-memory вычислений Tarantool и распределённой файловой системы IPFS (InterPlanetary File System).
Мы рассмотрим пример шардирования стороннего приложения с помощью Tarantool и сделаем MVP объектного хранилища с отказоустойчивостью на уровне ЦОДа, в то время как более простые решения отказоустойчивы только на уровне нескольких серверов.
Тем, кто знаком с IPFS, вероятно, будет интересно читать начиная с раздела «С чем мы столкнёмся».
IPFS — это вообще что?
Если кратко, IPFS (от англ. InterPlanetary File System) — это полностью распределённое файловое хранилище. Оно обладает несколькими характерными свойствами:
- Доступ к данным по их хешу.
- Управление и доступ к файлам через протокол HTTP.
- Все файлы на ноде делятся на закреплённые (pinned) и незакреплённые. Различие между ними в том, что первые никогда не удаляются из ноды. По-настоящему нода хранит именно закреплённые файлы, а все остальные — это просто кеш на диске.
При отсутствии файла на конкретной ноде IPFS скачает его с другой. Закрепление делается вручную через API. - Если запросить у сервера файл, который на нём не хранится, то сервер сначала скачает его как незакреплённый, а потом отдаст по HTTP. А если ему не хватит места, то вытеснит один из незакреплённых файлов.
- Хранилище публично, все ноды связаны и образуют глобальный кластер. Однако данные, которые никто не запрашивает, по логике предыдущего пункта со временем уйдут из всех нод и навсегда потеряются.
Таким образом каждая нода IPFS является сразу шлюзом в HTTP, кешем и холодным хранилищем. Из этого следует, что система сама может переносить данные между нодами. Но IPFS нужна помощь в выборе того, как складывать данные для холодного хранения. Также крайне желательно всегда обращаться к серверу, на котором закреплён нужный файл.
Почему мы выбрали IPFS?
На то есть несколько причин.
- IPFS — законченный продукт, готовый хранить большие объёмы данных.
- IPFS широко применяется в различных сферах. Так, например, он используется криптовалютой Ethereum для реализации NFT токенов: в блокчейне сохраняются только 256 бит хеша файла, а сам файл публикуется в IPFS, не занимая место в блокчейне.
- Некоторые крупные компании используют публичные серверы, для кеширования своих данных (или для обхода блокировок). Например, Cloudflare.
- IPFS также устраняет некоторые проблемы безопасности. В частности, адресация на основе содержимого позволяет хранить и рассчитывать хеши в метаданных без создания отдельного решения.
Подробне про преимущества IPFS можно почитать здесь.
При чём тут Tarantool?
С помощью Tarantool (точнее, Tarantool Cartridge) мы будем шардировать данные и искать IPFS-ноды, на которых эти данные расположены.
Tarantool подходит для этой задачи, так как:
- У платформы есть готовый механизм шардирования (vshard), к которому можно добавить свою логику.
- Tarantool может общаться по HTTP с IPFS.
Кроме того, платформа Tarantool — это база данных со встроенным сервером приложений, что также сильно упростит реализацию.
Почему не IPFS Cluster?
Существует нативное решение IPFS Cluster, однако у него есть недостатки. Он просто считает количество копий закреплённых данных, а не распределяет их по наборам метрик, как типовой модуль шардирования Tarantool vshard. Это влияет на уровень отказоустойчивости: если собирать набор реплик из серверов в разных ЦОДах, то можно добиться отказоустойчивости даже при проблеме с одним из ЦОДов (а такие бывают). Помимо прочего, IPFS-кластер ограничивает нас отсутствием метаданных и поисковыми индексами только по хешу.
С чем мы столкнёмся?
Длительные операции
Работа с большими файлами — всегда долгая операция. При этом триггеры также накладывают ограничение: они должны выполняться в рамках одной транзакции и как следствие не могут делать сетевые вызовы. Поэтому надо будет предусмотреть асинхронное обновление.
Асинхронная обработка предполагает работу с очередями, и для этого мы будем использовать библиотеку Tarantool-queue, которая позволяет заводить локальные (не распределённые) очереди задач.
Большой объём трафика
Надо будет очень аккуратно пользоваться HTTP, не читая лишний раз тело запроса и активно пользуясь редиректами.
Сторонний API, под который надо подстроиться
IPFS — готовый продукт, менять исходный код которого мы не можем. Это означает, что наши возможности ограничиваются возможностями его API.
Базовая концепция решения
Мы будем придерживаться такой концепции:
- Разворачиваем рядом с каждым хранилищем Tarantool ноду IPFS.
- Управляем IPFS-нодой только с локального хранилища.
- При этом на IPFS-ноде лежат те же объекты, что и в локальном хранилище.
- Задачи, связанные с IPFS, выполняем в очередях.
- Когда мы хотим закрепить или открепить новый файл, мы запишем данные о нём в хранилище и поставим в соответствующую очередь задачу.
В конечном итоге получится такая топология:
Минимальный вариант: поиск по хешам
Схема данных и очереди задач
Делаем две записи о хеше:
- Шардированную — которая находится на тех серверах, на чьих локальных нодах IPFS мы хотим закрепить файлы (желаемое расположение данных).
- Нешардированную — которая соответствует последнему состоянию (закреплённости) в локальном хранилище (фактическое расположение данных).
Также сделаем две очереди: на закрепление (добавление) и открепление (удаление) данных:
local queue = require('queue')
local function init_schema(is_master)
local pinned = box.schema.space.create('pinned',{format={
{'ipfs_id', 'string'},
{'bucket_id', 'unsigned'}
}, if_not_exists=true})
pinned:create_index('primary', {parts={{1, 'string'}}, unique=true, if_not_exists=true})
pinned:create_index('bucket_id', {parts={{2, 'unsigned'}}, unique=false, if_not_exists=true})
if is_master then
queue.create_tube('ipfs_add', 'fifottl', {if_not_exists = true})
queue.create_tube('ipfs_remove', 'fifottl', {if_not_exists = true})
end
local pinned_no_shard = box.schema.space.create('pinned_no_shard',{format={
{'ipfs_id', 'string'},
{'status', 'string'}
}, if_not_exists=true})
pinned_no_shard:create_index('primary', {parts={{1, 'string'}}, unique=true, if_not_exists=true})
end
return {
init_schema=init_schema
}
Задачи на взаимодействие с IPFS будем ставить при записи в шардированную очередь, а при их выполнении будем обновлять данные в нешардированной.
Немного технических полезностей
Небольшой модуль для взаимодействия с IPFS
Нам понадобятся:
- чтение конфигурации IPFS (чтобы узнать, куда кидать запрос);
- get_gateway_address — функция для получения адреса шлюза; не мудрим, просто добавим в конфигурацию IPFS-ноды свой параметр gateway_pub_address;
- управление закреплением файлов(добавление и удаление).
local http = require('http.client').new()
local json = require('json')
local api_prefix = 'http://127.0.0.1:5001'
local function get_config()
local resp = http:post(api_prefix..'/api/v0/config/show')
if resp.status == 200 then
return json.decode(resp.body)
else
error(resp.reason)
end
end
local function get_address()
local config, err = get_config()
if err ~= nil then
return nil, err
end
return assert(config.gateway_pub_address, 'no gateway_pub_address in ipfs config')
end
local function pin_add(ipfs_id)
local resp = http:post(api_prefix..'/api/v0/pin/add?arg='..ipfs_id)
assert(resp.status == 200, resp.reason)
end
local function pin_rm(ipfs_id)
local resp = http:post(api_prefix..'/api/v0/pin/rm?arg='..ipfs_id)
assert(resp.status == 200, resp.reason)
end
return {
get_config = get_config,
get_address = get_address,
pin_add = pin_add,
pin_rm = pin_rm
}
Выполнение операции на всём наборе реплик одновременно
Наша очередь, а также спейс с нешардированными данными будут синхронно реплицироваться на весь набор реплик. При этом для полного повторения топологии нам надо положить рядом с каждым экземпляром Tarantool экземпляр IPFS. Поэтому при выполнении задачи из очереди надо вызывать функцию на всём наборе реплик. Для этого напишем небольшую функцию:
local function get_curr_replicaset()
local me = cartridge_lua_api_topology.get_self()
return cartridge.admin_get_servers(me.uuid)[1].replicaset
end
local function call_on_whole_replicaset(func, params, options)
local servers_uri = {}
for _, server in pairs(get_curr_replicaset().servers) do
table.insert(servers_uri, server.uri)
end
local map_results, map_errors = cartridge_pool.map_call(func, params, {uri_list = servers_uri, timeout = options.timeout})
assert(map_errors == nil, map_errors)
return map_results
end
Как сделать отложенную синхронизацию с IPFS
Покажем в картинках, как будет происходить работа с асинхронным данными. Обратите внимание, что помимо простых операций записи и удаления у нас есть операция решардинга.Чтобы её правильно выполнять, будем при обычном удалении делать пометки в нешардируемом спейсе. Если пометка есть, то можно удалять сразу, иначе надо дождаться закрепления пина на новом хранилище.
Добавление
Удаление
Решардинг
Реализация
Нам понадобится обрабатывать следующие запросы к роутеру:
- HTTP-запрос, перенаправления на объект в IPFS:
GET /tnt_ipfs/pin/:ipfs_id
- HTTP-запрос на добавление пина:
PUT /tnt_ipfs/pin/:ipfs_id
- HTTP-запрос на удаление пина:
DELETE /tnt_ipfs/pin/:ipfs_id
- HTTP-запрос на получение статуса пина:
GET /tnt_ipfs/pin/:ipfs_id/status
А также следующие запросы к хранилищу:
- Запрос, который отдаст нам адрес соответствующего ему шлюза:
ipfs.get_local_ipfs_addr
- Запрос на добавление пина:
ipfs.pin_write
- Запрос на удаление пина:
ipfs.pin_delete
- Запрос на получение статуса пина:
ipfs.pin_status
- Запрос между хранилищами на вызов:
ipfs.pin_add
—ipfs.internal.pin_add
- Запрос между хранилищами на вызов:
ipfs.pin_rm
—ipfs.internal.pin_rm
Код для роли роутера
local function init(opts) -- luacheck: no unused args
local httpd = assert(cartridge.service_get('httpd'), "Failed to get httpd service")
httpd:route({method = 'GET', path = '/tnt_ipfs/pin/:ipfs_id', public = true}, function(req)
local ipfs_id = req:stash('ipfs_id')
local bucket_id = vshard.router.bucket_id_mpcrc32(ipfs_id)
local gateway_addr, err = vshard.router.callbro(bucket_id, 'ipfs.get_local_ipfs_addr')
if err ~= nil then
return {
status = 500,
body = err
}
end
return req:redirect_to(gateway_addr..'/ipfs/'..ipfs_id)
end)
httpd:route({method = 'PUT', path = '/tnt_ipfs/pin/:ipfs_id', public = true}, function(req)
local ipfs_id = req:stash('ipfs_id')
local bucket_id = vshard.router.bucket_id_mpcrc32(ipfs_id)
local _, err = vshard.router.callrw(bucket_id, 'ipfs.pin_write', {{ipfs_id, bucket_id}})
if err ~= nil then
return {
status = 500,
body = err
}
end
return {status = 200}
end)
httpd:route({method = 'DELETE', path = '/tnt_ipfs/pin/:ipfs_id', public = true}, function(req)
local ipfs_id = req:stash('ipfs_id')
local bucket_id = vshard.router.bucket_id_mpcrc32(ipfs_id)
local _, err = vshard.router.callrw(bucket_id, 'ipfs.pin_delete', {ipfs_id})
if err ~= nil then
return {
status = 500,
body = err
}
end
return {status = 200}
end)
httpd:route({method = 'GET', path = '/tnt_ipfs/pin/:ipfs_id/status', public = true}, function(req)
local ipfs_id = req:stash('ipfs_id')
local bucket_id = vshard.router.bucket_id_mpcrc32(ipfs_id)
local pin_status, err = vshard.router.callro(bucket_id, 'ipfs.pin_status', {ipfs_id})
if err ~= nil then
return {
status = 500,
body = err
}
end
return {
status = 200,
body = pin_status
}
end)
return true
end
Код для роли хранилища
Тут всё немного сложнее:
- Напишем в отдельном файле реализацию всех API-запросов:
local ipfs = require('app.ipfs')
local function get_local_ipfs_addr()
return ipfs.get_address()
end
local function pin_write(tuple)
box.space.pinned:put(tuple)
end
local function pin_delete(ipfs_id)
box.space.pinned_no_shard:put({ipfs_id, 'CAN_UNPIN'})
box.space.pinned:delete(ipfs_id)
end
local function pin_status(ipfs_id)
if box.space.pinned:get(ipfs_id) ~= nil then
-- Want to pin
if box.space.pinned_no_shard:get(ipfs_id) ~= nil then
return 'Pinned'
else
return 'Wait pining'
end
else
-- Want to unpin
local pns = box.space.pinned_no_shard:get(ipfs_id)
if pns ~= nil then
if pns.status == 'CAN_UNPIN' then
return 'Wait unpining'
else
return 'Wait reshard'
end
else
return 'Not found'
end
end
end
return {
ipfs = {
get_local_ipfs_addr = get_local_ipfs_addr,
pin_write = pin_write,
pin_delete = pin_delete,
pin_status = pin_status,
internal = {
pin_add = ipfs.pin_add,
pin_rm = ipfs.pin_rm
}
}
}
- Нам понадобится триггер для постановки задач в очередь:
local function on_replace(old, new, s, op)
if old == nil then
if new == nil then
-- ничего не делаем
else
queue.tube.ipfs_add:put(new.ipfs_id, {delay=1})
end
else
if new == nil then
queue.tube.ipfs_remove:put(old.ipfs_id, {delay=1})
else
if old.ipfs_id ~= new.ipfs_id then
queue.tube.ipfs_add:put(new.ipfs_id, {delay=1})
queue.tube.ipfs_remove:put(old.ipfs_id, {delay=1})
end
end
end
end
- Ещё нам понадобятся воркеры для обработки задач:
local ran = false
local function add_queue_worker()
while ran do
local task = queue.tube.ipfs_add:take()
-- проверяем, актуально ли задание
local p = box.space.pinned:get(task[3])
local pns = box.space.pinned_no_shard:get(task[3])
if p == nil or pns ~= nil then
queue.tube.ipfs_add:ack(task[1])
else
-- пытаемся закрепить
local ok, err = pcall(function()
call_on_whole_replicaset('ipfs.internal.pin_add', {task[3]})
end)
if ok then -- удалось
box.space.pinned_no_shard:put({task[3], 'PINNED'})
queue.tube.ipfs_add:ack(task[1])
else -- не удалось
log.error(err)
queue.tube.ipfs_add:release(task[1], {delay=10})
end
end
end
end
local function rm_queue_worker()
while ran do
local task = queue.tube.ipfs_remove:take()
-- проверяем, актуально ли задание
local p = box.space.pinned:get(task[3])
local pns = box.space.pinned_no_shard:get(task[3])
if p ~= nil or pns == nil then
queue.tube.ipfs_add:ack(task[1])
else
local ok, err
-- проверяем, появился ли объект на новом месте
ok, err = pcall(function()
local pns = box.space.pinned_no_shard:get(task[3])
if pns and pns.status == 'PINNED' then
-- проверяем, появился ли пин на новом месте
local bucket_id = vshard.router.bucket_id_mpcrc32(task[3])
assert(vshard.router.callro(bucket_id, 'ipfs.pin_status', {task[3]}) == 'Pinned')
end
end)
if ok == nil then --рано откреплять
queue.tube.ipfs_remove:release(task[1], {delay=10})
else -- пытаемся открепить
ok, err = pcall(function()
call_on_whole_replicaset('ipfs.internal.pin_rm', {task[3]})
end)
if ok ~= nil then -- удалось
box.space.pinned_no_shard:delete(task[3])
queue.tube.ipfs_remove:ack(task[1])
else -- не удалось
log.error(err)
queue.tube.ipfs_remove:release(task[1], {delay=10})
end
end
end
end
end
- Добавим функции для включения и выключения триггера и воркеров:
local function activate()
if not ran then
-- запускаем воркеры
ran = true
fiber.create(add_queue_worker)
fiber.create(rm_queue_worker)
-- включаем триггеры
if #box.space.pinned:on_replace() == 0 then
box.space.pinned:on_replace(on_replace)
end
end
end
local function deactivate()
if ran then
-- останавливаем воркеры
ran = false
-- выключаем триггеры
if #box.space.pinned:on_replace() == 1 then
box.space.pinned:on_replace(nil, on_replace)
end
end
end
- Ну и наконец запихнём в роль включение триггера и воркеров только для мастера:
local function init(opts) -- luacheck: no unused args
init_schema.init_schema(opts.is_master)
rawset(_G, 'ipfs', storage_api)
if opts.is_master then
storage_master_service.activate()
else
storage_master_service.deactivate()
end
return true
end
local function stop() -- luacheck: no unused args
rawset(_G, 'ipfs')
storage_master_service.deactivate()
return true
end
local function apply_config(conf, opts) -- luacheck: no unused args
init_schema.init_schema(opts.is_master)
if opts.is_master then
storage_master_service.activate()
else
storage_master_service.deactivate()
end
return true
end
Запускаем и тестируем
Запускать будем в Docker. При этом в один контейнер поместим сразу и Tarantool, и IPFS (по одному экземпляру). Для этого воспользуемся cartridge pack docker
и следующим Docker-файлом:
FROM centos:7
RUN yum -y update; yum -y install wget; \
wget https://dist.ipfs.io/go-ipfs/v0.12.2/go-ipfs_v0.12.2_linux-amd64.tar.gz; \
tar -xvzf go-ipfs_v0.12.2_linux-amd64.tar.gz; \
cd go-ipfs; \
./install.sh
Теперь с помощью docker-compose соберём кластер (полный Docker-файл можно посмотреть в репозитории):
version: '2.1'
services:
Tarantool-ipfs1:
image: tnt_ipfs:0.1.0.0
container_name: Tarantool-ipfs1
environment:
- IPFS_PATH=/data-ipfs
- CARTRIDGE_DATA_DIR=/data
- TARANTOOL_INSTANCE_NAME=r1-router
- TARANTOOL_ADVERTISE_URI=172.19.1.1:3301
- TARANTOOL_CLUSTER_COOKIE=secret
- TARANTOOL_HTTP_PORT=8001
ports:
- 4001:4001 # IPFS p2p
- 5001:5001 # IPFS API
- 8081:8081 # IPFS Gateway
- 3301:3301 # Tarantool Iproto
- 8001:8001 # Tarantool HTTP
volumes:
- ../data/Tarantool-ipfs1:/data
- ../data/Tarantool-ipfs1-ipfs:/data-ipfs
networks: # Нужно. чтобы Tarantool не ругался при запуске на смену IP
main:
ipv4_address: 172.19.1.1
...
networks:
main:
ipam:
config:
- subnet: 172.19.1.0/16
Чтобы IPFS запускалась вместе с Tarantool, добавим в начало init.lua строку require('os').execute('ipfs daemon --init --writable --enable-gc &')
Остаётся сконфигурировать IPFS. Сделать это можно только из Docker, так как по умолчанию API принимает запросы только от 127.0.0.1. Также надо указать IP, на котором мы открывали порт для входящих соединений.
#!/usr/bin/env bash
my_ip="X.X.X.X"
for i in 1 2 3 4 5; do
docker-compose exec "Tarantool-ipfs$i" curl --request POST -L --url "http://127.0.0.1:5001/api/v0/config?arg=gateway_pub_address&arg=http://192.168.1.69:808$i"
docker-compose exec "Tarantool-ipfs$i" curl --request POST -L --url "http://127.0.0.1:5001/api/v0/config?arg=Addresses.API&arg=/ip4/0.0.0.0/tcp/5001"
docker-compose exec "Tarantool-ipfs$i" curl --request POST -L --url "http://127.0.0.1:5001/api/v0/config?arg=Addresses.Announce&arg=\[\"/ip4/$my_ip/tcp/400$i\"\]&json=1"
docker-compose exec "Tarantool-ipfs$i" curl --request POST -L --url "http://127.0.0.1:5001/api/v0/config?arg=Addresses.Gateway&arg=/ip4/0.0.0.0/tcp/8081"
docker-compose exec "Tarantool-ipfs$i" curl --request POST -L --url "http://127.0.0.1:5001/api/v0/config/show"
docker-compose restart "Tarantool-ipfs$i"
done
Заработало. Теперь можно поиграть, получив, например, вот этот объект: Qme7ss3ARVgxv6rXqVPiikMJ8u2NLgmgszg13pYrDKEoiu
. При этом IPFS будет поначалу довольно долго его искать, прежде чем закрепить, но закрепив, будет отдавать мгновенно.
И ещё немного скриншотов. Для REST API я предпочитаю использовать Insomnia. Файлик с коллекцией запросов в конце статьи.
Для теста мы будем использовать вот этот хеш: Qme7ss3ARVgxv6rXqVPiikMJ8u2NLgmgszg13pYrDKEoiu
.
- Получим по API IPFS список закреплённых сейчас файлов. Изначально он не пустой, это нормально.
- Посмотрим статус закрепления:
GET /tnt_ipfs/pin/Qme7ss3ARVgxv6rXqVPiikMJ8u2NLgmgszg13pYrDKEoiu/status
.
Как и ожидалось, о закреплении пока ничего не известно. - Воспользуемся нашим API и закрепим наш объект
PUT /tnt_ipfs/pin/Qme7ss3ARVgxv6rXqVPiikMJ8u2NLgmgszg13pYrDKEoiu
.
Посмотрим статус закрепления снова, если делать это очень быстро, то можно поймать момент, когда он будет
Wait pining
, но потом станетPinned
:
- Снова получим по API IPFS список закреплённых сейчас файлов.
Файл закреплён и теперь точно никуда не уйдёт из хранилища. - Посмотрим на сам файл
GET /tnt_ipfs/pin/Qme7ss3ARVgxv6rXqVPiikMJ8u2NLgmgszg13pYrDKEoiu
:
Если раскрыть раздел Timeline, то можно увидеть тот самый редирект.
- Попробуем открепить
DELETE /tnt_ipfs/pin/Qme7ss3ARVgxv6rXqVPiikMJ8u2NLgmgszg13pYrDKEoiu
:
Посмотрим статус закрепления снова, если делать это очень быстро, то можно поймать момент, когда он будет
Wait pining
, но потом станетPinned
:
Обратите внимание, что GET /tnt_ipfs/pin/Qme7ss3ARVgxv6rXqVPiikMJ8u2NLgmgszg13pYrDKEoiu
вполне успешно выполнится, даже если файл не закреплён. Однако, если вы ранее не запрашивали или не закрепляли файл, запрос будет выполняться дольше. Дело в том что IPFS Gateway будет пытаться найти файл на чужих нодах. А незакрепленный файл со временем может быть вытеснен из нашей ноды. И если при этом никто в интернете его не сохранит, то он и вовсе будет безвозвратно утрачен.
Добавляем индексный слой
Если вы не знакомы с индексными слоями, то общие идеи их построения можно посмотреть здесь: https://habr.com/ru/company/vk/blog/657789/.
Схема данных
Индексный слой будет состоять из следующих записей:
path | bucket_id | name | is_dir | ipfs_id |
---|---|---|---|---|
полный путь до родительского каталога | bucket_id | имя | является ли запись каталогом | хеш в IPFS |
*bucket_id строится по path
Таким образом можно будет рекурсивно искать файлы.
Сразу добавим в код создание спейсов:
local objects = box.schema.space.create('objects',{format={
{'path', 'string'},
{'bucket_id', 'unsigned'},
{'name', 'string'},
{'is_dir', 'boolean'},
{'ipfs_id', 'string', is_nullable = true}
}, if_not_exists=true})
objects:create_index('primary', {parts={{1, 'string'},{3, 'string'}}, unique=true, if_not_exists=true})
objects:create_index('bucket_id', {parts={{2, 'unsigned'}}, unique=false, if_not_exists=true})
Реализация
Нам понадобится обрабатывать такие запросы к роутеру:
- HTTP-запрос на добавление директории:
PUT /tnt_ipfs/storage/*full_path?type=dir
- HTTP-запрос на добавление объекта:
PUT /tnt_ipfs/storage/*full_path?type=object&ipfs_id=...
- HTTP-запрос, перенаправляющий на объект в IPFS или отдающий листинг директории:
GET /tnt_ipfs/storage/*full_path
- HTTP-запрос на удаление директории или объекта:
DELETE /tnt_ipfs/storage/*full_path
И запросы к хранилищу:
- Запрос на добавление директорий или объектов:
storage.add
- Запрос на удаление директорий или объектов:
storage.del
- Запрос на получение информации о директории или объекте:
storage.get
- Запрос на листинг директории:
storage.ls
Код для роли роутера
local function convert_path(full_path)
local path, name = full_path:match("^(.*)/(.+)$")
if not path then
return '', full_path
end
return path, name
end
...
httpd:route({method = 'PUT', path = '/tnt_ipfs/storage/*full_path', public = true}, function(req)
local full_path = req:stash('full_path')
local path, name = convert_path(full_path)
local bucket_id = vshard.router.bucket_id_mpcrc32(path)
local type = req:query_param('type')
if type == 'dir' then
local _, err = vshard.router.callrw(bucket_id, 'storage.add', {path, bucket_id, name, true})
if err ~= nil then
return {
status = 500,
body = err
}
end
return {status = 200}
elseif type == 'object' then
local ipfs_id = req:query_param('ipfs_id')
local bucket_id2 = vshard.router.bucket_id_mpcrc32(ipfs_id)
local _, err = vshard.router.callrw(bucket_id2, 'ipfs.pin_write', {{ipfs_id, bucket_id2}})
if err ~= nil then
return {
status = 500,
body = err
}
end
_, err = vshard.router.callrw(bucket_id, 'storage.add', {path, bucket_id, name, false, ipfs_id})
if err ~= nil then
return {
status = 500,
body = err
}
end
return {status = 200}
end
return {status = 400}
end)
httpd:route({method = 'DELETE', path = '/tnt_ipfs/storage/*full_path', public = true}, function(req)
local full_path = req:stash('full_path')
local path, name = convert_path(full_path)
local bucket_id = vshard.router.bucket_id_mpcrc32(path)
local deleted, err = vshard.router.callrw(bucket_id, 'storage.del', {path, name})
if deleted == nil or err ~= nil then
return {
status = 500,
body = err
}
end
if deleted.is_dir then
return {status = 200}
else
bucket_id = vshard.router.bucket_id_mpcrc32(deleted.ipfs_id)
local _, err2 = vshard.router.callrw(bucket_id, 'ipfs.pin_delete', {deleted.ipfs_id})
if err2 ~= nil then
return {
status = 500,
body = err2
}
end
return {status = 200}
end
end)
httpd:route({method = 'GET', path = '/tnt_ipfs/storage/*full_path', public = true}, function(req)
local full_path = req:stash('full_path')
local path, name = convert_path(full_path)
local bucket_id = vshard.router.bucket_id_mpcrc32(path)
local main_record, err
if path == '' then --каталог в корне
main_record = {is_dir=true}
else
main_record, err = vshard.router.callro(bucket_id, 'storage.get', {path,name})
if err ~= nil then
return {
status = 500,
body = err
}
end
if main_record == nil then
return {status = 404}
end
end
if main_record.is_dir then
bucket_id = vshard.router.bucket_id_mpcrc32(full_path)
local result, err2 = vshard.router.callro(bucket_id, 'storage.ls', {full_path})
if err2 ~= nil then
return {
status = 500,
body = err2
}
end
return {
status = 200,
body = result
}
else
bucket_id = vshard.router.bucket_id_mpcrc32(main_record.ipfs_id)
local gateway_addr, err2 = vshard.router.callbro(bucket_id, 'ipfs.get_local_ipfs_addr')
if err2 ~= nil then
return {
status = 500,
body = err2
}
end
return req:redirect_to(gateway_addr..'/ipfs/'..main_record.ipfs_id)
end
end)
API хранилища
local function storage_add(path,bucket_id,name,is_dir,ipfs_id)
if is_dir then
box.space.objects:put({path,bucket_id,name,true})
else
box.space.objects:put({path,bucket_id,name,false,ipfs_id})
end
end
local function storage_del(path,name)
local data = box.space.objects:delete({path,name})
assert(data ~= nil)
return data:tomap({names_only=true})
end
local function storage_get(path,name)
local data = box.space.objects:get({path,name})
assert(data ~= nil)
return data:tomap({names_only=true})
end
local function storage_ls(path)
local result = {}
for _,i in box.space.objects:pairs({path}) do
table.insert(result, {name = i.name, is_dir = i.is_dir})
end
return result
end
Запускаем и тестируем
- Создадим каталог
PUT /tnt_ipfs/storage/a?type=dir
. Его мы увидеть не сможем, потому что он пустой. - Создадим второй каталог внутри первого:
PUT /tnt_ipfs/storage/a/b?type=dir
. - Посмотрим содержимое первого каталога:
GET /tnt_ipfs/storage/a
.
- Создадим объект внутри первого каталога:
PUT /tnt_ipfs/storage/a/c?type=object&ipfs_id=Qme7ss3ARVgxv6rXqVPiikMJ8u2NLgmgszg13pYrDKEoiu
. - Снова посмотрим содержимое первого каталога:
GET /tnt_ipfs/storage/a
.
- Посмотрим на сам файл:
GET /tnt_ipfs/storage/a/c
:
Если в разделе Timeline всё тот же редирект.
- Попробуем удалять файлы и директории:
DELETE /tnt_ipfs/storage/a/b
:
DELETE /tnt_ipfs/storage/a/с
:
Смотрим, как всё классно. Обсуждаем, что получилось
Чем хорошо это решение?
Приложив не так много усилий, мы получили работающий MVP объектного хранилища. И
если заменить gateway_pub_address
, то можно использовать серверы сторонних провайдеров для кеша ваших данных.
Решение гарантирует отказоустойчивость при проблемах с серверами в n-1 ЦОДах при условии, что все наборы реплик собраны из n хранилищ в разных ЦОДах. Также мы получили из коробки балансировку по всем нодам IPFS.
Как можно применить это на практике?
- Хранить большие объёмы публичных данных.
- Бекапить большие объёмы данных, уже залитых в IPFS. К примеру, NFT-токены записаны в блокчейне Ethereum как хеши, при этом сами файлы находятся в IPFS.
- По описанным в статье принципам можно шардировать и другие приложения.
Чего нехватает, чтобы это решение можно было назвать Prod-Ready?
Есть много причин считать это решение, скорее, MVP, нежели Prod-Ready. Однако его доработка явно выходит за рамки статьи.
Инструменты. Настоящее Prod-Ready решение должно содержать инструменты для мониторинга, исправления ошибок и замера использования пространства.
Reference counter. По-хорошему, на закреплениях должен быть Reference counter, чтобы их можно было безопасно добавлять в поиск несколько раз (иначе при удалении одного места в поиске произойдёт открепление и данные могут потеряться).
Добавление метаданных. В настоящих объектных хранилищах есть метаданные, которые могут использоваться для поиска, управления длительностью хранения файлов и прочего.
Разграничение прав. Хоть это и публичное хранилище, управление правами на запись в поисковый слой и ограничение пространства для разных пользователей всё равно является полезной функциональностью.
Альтернативные варианты шардирования. Существует более продвинутые варианты шардирования, учитывающие объём данных, нагрузку на диск и сеть. Их также можно применить к конечному приложению, поставив для управления и поиска Tarantool. Однако это не будет простым повторением шардирования, что многократно усложнит задачу.
Репозиторий с полным кодом: https://github.com/Artem3213212/TNT-IPFS
Скачать Tarantool можно на официальном сайте, а получить помощь — в Telegram-чате.