# How to build a high-performance application on Tarantool from scratch

• Tutorial

I came to Mail.ru Group in 2013, and I required a queue for one task. First of all, I decided to check what the company had already got. They told me they had this Tarantool product, and I checked how it worked and decided that adding a queue broker to it could work perfectly well.

I contacted Kostja Osipov, the senior expert in Tarantool, and the next day he gave me a 250-string script that was capable of managing almost everything I needed. Since that moment, I have been in love with Tarantool. It turned out that a small amount of code written with a quite simple script language was capable of ensuring some totally new performance for this DBMS.

Today, I’m going to tell you how to instantiate your own queue in Tarantool 2.2.

At that moment, I enjoyed a simple and fast queue broker Beanstalkd. It offered a user-friendly interface, task status tracking by connection (client’s disconnection returned the task into the queue), as well as practical opportunities for dealing with delayed tasks. I wanted to make something like that.

Here is how the service works: there is a queue broker that accepts and stores tasks; there are clients: producers sending tasks (put method); and consumers taking tasks up (take method).

This is how one task’s lifecycle looks like. The task is sent with the put method and then goes to the ready state. The take operation changes the task’s status to taken. The taken task can be acknowledged (ack) and removed or changed back to ready (release-d).

## Neighborhood setup

Today, Tarantool is also a LuaJIT interpreter. To start working with it, an entry point is required – an initial file init.lua. After that a box.cfg() shall be called – it starts the DBMS internals.

For local development, you only have to connect and start the console. Then create and run a file as follows:

require'strict'.on()
box.cfg{}

require'console'.start()
os.exit()

The console is interactive and can be instantly put to use. There is no need to install and/or set up multiple tools and learn to use them. It only takes to write 10 to 15 strings of code on any local machine.

Another advice from me is to use the strict mode. Lua language is quite easy on the variable declaration, and this mode to a certain extent will help you with error management. If you build Tarantool on your own in the DEBUG mode, the strict mode will be on by default.

Let’s run our file with Tarantool:

tarantool init.lua

You’ll see something like:

2020-07-09 20:00:11.344 [30043] main/102/init.lua C> Tarantool 2.2.3-1-g98ecc909a
2020-07-09 20:00:11.345 [30043] main/102/init.lua C> log level 5
2020-07-09 20:00:11.346 [30043] main/102/init.lua I> mapping 268435456 bytes for memtx tuple arena...
2020-07-09 20:00:11.347 [30043] main/102/init.lua I> mapping 134217728 bytes for vinyl tuple arena...
2020-07-09 20:00:11.370 [30043] main/102/init.lua I> instance uuid 38c59892-263e-42de-875c-8f67539191a3
2020-07-09 20:00:11.371 [30043] main/102/init.lua I> initializing an empty data directory
2020-07-09 20:00:11.408 [30043] main/102/init.lua I> assigned id 1 to replica 38c59892-263e-42de-875c-8f67539191a3
2020-07-09 20:00:11.408 [30043] main/102/init.lua I> cluster uuid 7723bdf4-24e8-4957-bd6c-6ab502a1911c
2020-07-09 20:00:11.425 [30043] snapshot/101/main I> saving snapshot `./00000000000000000000.snap.inprogress'
2020-07-09 20:00:11.437 [30043] snapshot/101/main I> done
2020-07-09 20:00:11.439 [30043] main/102/init.lua I> ready to accept requests
2020-07-09 20:00:11.439 [30043] main/104/checkpoint_daemon I> scheduled next checkpoint for Thu Jul  9 21:11:59 2020
tarantool>

## Writing a queue

Let’s create a file queue.lua to write our app. We can add all of it right to init.lua, but working with an independent file is handier.

Now, connect the queue as a module from the init.lua file:

require'strict'.on()

box.cfg{}

queue = require 'queue'

require'console'.start()
os.exit()

All the following modifications will be made in queue.lua.

As we’re making a queue, we need a place to store the task data. Let’s create a space – a data table. It can be made optionless, but we’re going to add something at once. For regular restart we have to indicate that a space shall be created only in case it doesn’t exist (if_not_exists). Another thing – in Tarantool, you can indicate the field format with content description (and it is a good idea to do so).

I’m going to take a very simple structure for the queue. I’ll need only task id-s, their statuses, and some random data. Data can’t be used with a primary index, so we create an index in accordance with id. Make sure the field type of the format and the index match.

box.schema.create_space('queue',{ if_not_exists = true; })

box.space.queue:format( {
{ name = 'id';     type = 'number' },
{ name = 'status'; type = 'string' },
{ name = 'data';   type = '*'      },
} );

box.space.queue:create_index('primary', {
parts = { 1,'number' };
if_not_exists = true;
})

Then, we make a global queue table that will contain our functions, attributes, and methods. First of all, we bring out two functions: putting a task (put) and taking a task (take).

The queue will show states of the tasks. For status indication, we’ll make another table. Numbers or strings can be used as values, but I like one-symbol references – they can be semantically relevant, and they take little place to be stored. First of all, we create two statuses: R=READY and T=TAKEN.

local queue = {}

local STATUS = {}
STATUS.TAKEN = 'T'

function queue.put(...)

end

function queue.take(...)

end

return queue

How do we make put? Easy as pie. We need to generate an id and insert the data to a space with the READY status. There are many ways to generate an indicator, but we’ll take clock.realtime. It can automatically determine the message queue. However, remember that the clock is likely to readjust, causing a wrong message order. Another thing is that a task with the same value can appear in the queue. You can check if there is a task with the same id, and in case of collision you just one unit. This takes microseconds, and this situation is highly unlikely, so efficiency won’t be affected.

All the arguments of the function shall be added to our task:

local clock = require 'clock'
function gen_id()
local new_id
repeat
new_id = clock.realtime64()
until not box.space.queue:get(new_id)
return new_id
end

function queue.put(...)
local id = gen_id()
return box.space.queue:insert{ id, STATUS.READY, { ... } }
end

After we’ve written the put function, we can restart Tarantool and call this function instantly. The task will be added to the queue, now looking like a tuple. We can add random data and even nested structures to it. Tuples that Tarantool uses to store data are packed into the MessagePack, which facilitates storing of these structures.

tarantool> queue.put("hello")
---
- [1594325382148311477, 'R', ['hello']]
...

tarantool> queue.put("my","data",1,2,3)
---
- [1594325394527830491, 'R', ['my', 'data', 1, 2, 3]]
...

tarantool> queue.put({ complex = { struct = "data" }})
---
- [1594325413166109943, 'R', [{'complex': {'struct': 'data'}}]]
...

Everything we put remains within the space. We can take space commands to see what we have there:

tarantool> box.space.queue:select()
---
- - [1594325382148311477, 'R', ['hello']]
- [1594325394527830491, 'R', ['my', 'data', 1, 2, 3]]
- [1594325413166109943, 'R', [{'complex': {'struct': 'data'}}]]
...

Everything we put remains within the space. We can take space commands to see what we have there:

tarantool> box.space.queue:select()
---
- - [1594325382148311477, 'R', ['hello']]
- [1594325394527830491, 'R', ['my', 'data', 1, 2, 3]]
- [1594325413166109943, 'R', [{'complex': {'struct': 'data'}}]]
...

Now, we need to learn how to take tasks. For this, we make a take function. We take the tasks that are ready for processing, i. e., the ones with the READY status. We can check the primary key and find the first ready task, but if there’re a lot of tasks to be processed this scenario won’t work. We’ll need a special index using the status field. One of the main differences between Tarantool and the key-value databases is that the former facilitates the creation of diverse indexes, almost like in relational databases: using various fields, composite ones, of different kinds.

Then, we create the second index, indicating that the first field shows status. This will be our search option. The second field is id. It will put the tasks with the same status in the ascending order.

box.space.queue:create_index('status', {
parts = { 2, 'string', 1, 'number' };
if_not_exists = true;
})

Let’s take predefined functions for our selection. There’s a special iterator that is applied to a space as pairs. We pass a part of the key to it. Here, we have to deal with a composite index, which contains two fields. We use the first one for searching and the second one for putting things in order. We command the system to find the tuples that match the READY status in the first part of their index. And the system will present them put in order in accordance with the second part of the index. If we find anything, we’ll take that task, update it and return it. An update is required to prevent anybody with the same take call taking it. If there are no tasks, we return nil.

function queue.take()
local found = box.space.queue.index.status
if found then
return box.space.queue
:update( {found.id}, {{'=', 2, STATUS.TAKEN }})
end
return
end

Please, note that the first tuple level in Tarantool is an array. It has no names, but only numbers, and that’s why the field number used to be required at operations like update. Let’s make an auxiliary element – a table, to match the field names and numbers. To compile such a table we can use the format we’ve already written:

local F = {}
for no,def in pairs(box.space.queue:format()) do
F[no] = def.name
F[def.name] = no
end

For better visibility, we can correct descriptions of indexes like:

box.space.queue:format( {
{ name = 'id';     type = 'number' },
{ name = 'status'; type = 'string' },
{ name = 'data';   type = '*'      },
} );

local F = {}
for no,def in pairs(box.space.queue:format()) do
F[no] = def.name
F[def.name] = no
end

box.space.queue:create_index('primary', {
parts = { F.id, 'number' };
if_not_exists = true;
})

box.space.queue:create_index('status', {
parts = { F.status, 'string', F.id, 'number' };
if_not_exists = true;
})

Now we can implement take in whole:

function queue.take(...)
for _,t in
box.space.queue.index.status
do
return box.space.queue:update({t.id},{
{ '=', F.status, STATUS.TAKEN }
})
end
return
end

Let’s check how it works. For this, we’ll put one task and call take twice. If by that moment we have any data in the space, we can clear it with the command box.space.queue:truncate():

tarantool> queue.put("my","data",1,2,3)
---
- [1594325927025602515, 'R', ['my', 'data', 1, 2, 3]]
...

tarantool> queue.take()
---
- [1594325927025602515, 'T', ['my', 'data', 1, 2, 3]]
...

tarantool> queue.take()
---
...

The first take will return us the task we’ve put. As soon as we call take for the second time, nil is returned, because there are no more ready-tasks (with R status). To make sure, we run a select command from the space:

tarantool> box.space.queue:select()
---
- - [1594325927025602515, 'T', ['my', 'data', 1, 2, 3]]
...

The consumer taking the task shall either acknowledge its procession or release it without a procession. In the latter case, somebody else will be able to take the task. For this, two functions are used: ack and release. They receive the task’s id and look for it. If the task’s status shows it’s been taken, we process it. These functions are really similar: one removes processed tasks, and the other returns them with a ready status.

function queue.ack(id)
local t = assert(box.space.queue:get{id},"Task not exists")
if t and t.status == STATUS.TAKEN then
return box.space.queue:delete{t.id}
else
end
end

function queue.release(id)
local t = assert(box.space.queue:get{id},"Task not exists")
if t and t.status == STATUS.TAKEN then
else
end
end

Let’s see how it works with all four functions. We will put two tasks and take the first of them, then releasing them. It returns to the R status. The second take call takes the same task. If we process it, it will be removed. The third take call will take the second task. The order will be observed. In case the task has been taken, it won’t be available for anybody else.

---
...

---
...

---
...

---
...

---
...

---
...

---
...

---
...

---
- null
...

This is a properly working queue. We are already capable of writing a consumer to process the tasks. However, there is a problem. When we call take, the function instantly returns either a task or an empty string. If we write a cycle for task procession and start it, it will run unproductively, doing nothing and simply wasting the CPU.

while true do
-- ...
end
end

To fix this, we’ll need a primitive channel. It enables message communication. In fact, it’s a FIFO queue for fiber communication. We have a fiber that puts the tasks when we access the database through the network or via the console. At the fiber, our Lua-code is executed, and it needs some primitive to inform the other fiber awaiting the task that there is a new one available.

This is how a channel works: it can contain a buffer with N slots where a message can be located, even if no one is checking the channel. Another option is creating a channel without a buffer: this way, messages will be only acceptable in the slots that somebody waiting for. Let’s say, we create a channel for two buffer elements. It has two slots for put. If one consumer is waiting at the channel, it will create a third slot for put. If we are going to send messages via this channel, three put operations will be enabled without blocking, but the fourth put operation will be blocked by the fiber that sends messages via this channel. This is how an inter-fiber communication is set up. If for any chance you are familiar with channels in Go, they are literally the same there:

Let’s slightly modify the take function. First of all, we add a new argument – timeout, implying we’re ready to wait for the task within a set period of time. We make a cycle to search for a ready task. If it can’t be found, the cycle will compute how long it has to wait.

Now, let’s make a channel that will wait along with this timeout. While the fiber is pending at the channel (asleep), it can be woken up externally by sending a message via the channel.

local fiber = require 'fiber'
queue._wait = fiber.channel()
function queue.take(timeout)
if not timeout then timeout = 0 end
local now = fiber.time()
local found
found = box.space.queue.index.status
local left = (now + timeout) - fiber.time()
if left <= 0 then return end
queue._wait:get(left)
end
end
return box.space.queue
:update( {found.id}, {{'=', F.status, STATUS.TAKEN }})
end

Altogether, take tries to take the task and if this is managed, the task is returned. However, if there is no task, it can be awaited for the rest of the timeout. Besides, the other party that creates the task will be able to wake this fiber up.

To make the performance of various tests more convenient, we can globally connect the fiber module in the init.lua file:

fiber = require 'fiber'

Let’s see how this works without waking the fiber up. In an independent fiber, we’ll put a task with a 0.1 sec. delay, i. e. at first the queue will be empty, and the task will appear in 0.1 sec. after starting. Upon that, we’ll set up a 3 sec. timeout for the take call. After the start, take will try to find the task, and then if there’s none, it goes to sleep for 3 sec. In 3 sec. it wakes up, searches again, and finds the task.

tarantool> do
box.space.queue:truncate()
fiber.create(function()
fiber.sleep(0.1)
end)
local start = fiber.time()
return queue.take(3), { wait = fiber.time() - start }
end

---
- wait: 3.0017817020416
...

Now, let’s make take wake up at the tasks’ appearance. For this, we’ll take our old put function and update it with a message sent via the channel. The message can be literally anything. Let it be true here.

Previously, I demonstrated that put can be blocked if the channel lacks place. At the same time, the task producer doesn’t care if there are consumers on the other side. It shouldn’t get blocked while waiting for a consumer. So, it’s a reasonable thing to set up a zero timeout for blocking here. If there are some consumers out there, i. e., the ones who need to be messaged about the new task, we’ll wake them up. Otherwise, we won’t be able to send the message via the channel. An alternative option is to check if the channel has any active readers.

function queue.put(...)
local id = gen_id()

queue._wait:put(true,0)
end

return box.space.queue:insert{ id, STATUS.READY, { ... } }
end

Now the take code is going to work in a totally different way. We create a task in 0.1 sec. and take instantly wakes up and receives it. We’ve got rid of the hot cycle that has been continuously pending, awaiting tasks. If we don’t put a task, the fiber will wait for 3 seconds.

tarantool> do
box.space.queue:truncate()
fiber.create(function()
fiber.sleep(0.1)
end)
local start = fiber.time()
return queue.take(3), { wait = fiber.time() - start }
end

---
- wait: 0.10164666175842
...

We’ve tested how things work within the instance, and now let’s try some networking. First of all, let’s create a server. For that, we add the listen option to box.cfg of our init.lua file (it will be a port used for listening). At the same time, we’ll need to give permissions. Right now we’re not going to study privilege setting up in detail, but let’s make every connection have an execution privilege. To read about the rights please check this.

require'strict'.on()
fiber = require 'fiber'

box.cfg{
listen = '127.0.0.1:3301'
}
box.schema.user.grant('guest', 'super', nil, nil, { if_not_exists = true })

queue = require 'queue'

require'console'.start()
os.exit()

Let’s create a producer client for task generation. Tarantool already has a module that facilitates connection to another Tarantool.

#!/usr/bin/env tarantool

if #arg < 1 then
error("Need arguments",0)
end

local netbox = require 'net.box'
local conn = netbox.connect('127.0.0.1:3301')

local yaml = require 'yaml'
local res = conn:call('queue.put',{unpack(arg)})
print(yaml.encode(res))
conn:close()

$tarantool producer.lua "hi" --- [1594327270675788959, 'R', ['hi']] ... The consumer will connect, call take with a timeout, and process the result. If it receives the task, we’ll print or release it but won’t process it yet. Let’s say, we’ve received the task. #!/usr/bin/env tarantool local netbox = require 'net.box' local conn = netbox.connect('127.0.0.1:3301') local yaml = require 'yaml' while true do local task = conn:call('queue.take', { 1 }) if task then print("Got task: ", yaml.encode(task)) conn:call('queue.release', { task.id }) else print "No more tasks" end end But when we try to release the task, something odd happens:$ tarantool consumer.lua
--- [1594327270675788959, 'T', ['hi']]
...

ER_EXACT_MATCH: Invalid key part count in an exact match (expected 1, got 0)

Let’s delve into this matter. When the consumer will once again attempt to execute the task we’ll see that at the previous start it has taken the task but hasn’t been able to return it. Some error’s occurred, and the tasks got stuck. Such tasks become unavailable for other consumers, and there is nobody to return them to, as the code used to take them has been completed.

$tarantool consumer.lua No more tasks No more tasks Select shows that the tasks have been taken. tarantool> box.space.queue:select() --- - - [1594327004302379957, 'T', ['task 3']] - [1594327270675788959, 'T', ['hi']] ... We have several issues at once here. Let’s start with the automatic release of the tasks in case the client disconnects. Tarantool contains triggers for client connection and disconnection. If we add them, we’ll be able to learn about connection and disconnection events. local log = require 'log' box.session.on_connect(function() log.info( "connected %s from %s", box.session.id(), box.session.peer() ) end) box.session.on_disconnect(function() log.info( "disconnected %s from %s", box.session.id(), box.session.peer() ) end) 2020-07-09 20:52:09.107 [32604] main/115/main I> connected 2 from 127.0.0.1:36652 2020-07-09 20:52:10.260 [32604] main/116/main I> disconnected 2 from nil 2020-07-09 20:52:10.823 [32604] main/116/main I> connected 3 from 127.0.0.1:36654 2020-07-09 20:52:11.541 [32604] main/115/main I> disconnected 3 from nil There is this term, session id, and we can check the IP address used to connect, as well as the time of disconnection. However, calling session.peer()actually calls getpeername(2) right over the socket. That’s why at disconnection we don’t see who’s disconnected (as getpeername is called over a closed socket). Let’s do some minor hacking, then. Tarantool has a box.session.storage — a temporary table, to which anything you wish can be saved during the session lifetime. At connection, we can keep in mind the ones connected to know who’s disconnected. This will make adjustments easier. box.session.on_connect(function() box.session.storage.peer = box.session.peer() log.info( "connected %s from %s", box.session.id(), box.session.storage.peer ) end) box.session.on_disconnect(function() log.info( "disconnected %s from %s", box.session.id(), box.session.storage.peer ) end) So, we have a client disconnection event. And we need to somehow release the tasks it has taken. Let’s introduce the term “possession of the task.” The session that has taken the task ought to answer for it. Let’s make two tables to save these data and modify the take function: queue.taken = {}; -- list of tasks taken queue.bysid = {}; -- list of tasks for the specific session function queue.take(timeout) if not timeout then timeout = 0 end local now = fiber.time() local found while not found do found = box.space.queue.index.status :pairs({STATUS.READY},{ iterator = 'EQ' }):nth(1) if not found then local left = (now + timeout) - fiber.time() if left <= 0 then return end queue._wait:get(left) end end local sid = box.session.id() log.info("Register %s by %s", found.id, sid) queue.taken[ found.id ] = sid queue.bysid[ sid ] = queue.bysid[ sid ] or {} queue.bysid[ sid ][ found.id ] = true return box.space.queue :update( {found.id}, {{'=', F.status, STATUS.TAKEN }}) end We’ll use this table to memorize that a certain task has been taken by a certain session. We’ll also need to modify the task returning code, ack, and release. Let’s make a single common function to check if the task is there and if it has been taken by a specific session. Then, it will be impossible to take the task under one connection and then return under another one requesting its deletion due to its procession completion. local function get_task( id ) if not id then error("Task id required", 2) end local t = box.space.queue:get{id} if not t then error(string.format( "Task {%s} was not found", id ), 2) end if not queue.taken[id] then error(string.format( "Task %s not taken by anybody", id ), 2) end if queue.taken[id] ~= box.session.id() then error(string.format( "Task %s taken by %d. Not you (%d)", id, queue.taken[id], box.session.id() ), 2) end return t end Now ack and release functions become very simple. We use them to call get_task, which checks if the task is possessed by us and if it is taken. Then we can work with it. function queue.ack(id) local t = get_task(id) queue.taken[ t.id ] = nil queue.bysid[ box.session.id() ][ t.id ] = nil return box.space.queue:delete{t.id} end function queue.release(id) local t = get_task(id) if queue._wait:has_readers() then queue._wait:put(true,0) end queue.taken[ t.id ] = nil queue.bysid[ box.session.id() ][ t.id ] = nil return box.space.queue :update({t.id},{{'=', F.status, STATUS.READY }}) end To reset statuses of all the tasks to R SQL or Lua snippet can be used: box.execute[[ update "queue" set "status" = 'R' where "status" = 'T' ]] box.space.queue.index.status:pairs({'T'}):each(function(t) box.space.queue:update({t.id},{{'=',2,'R'}}) end) When we call the consumer again, it replies: task ID required.$ tarantool consumer.lua
...

Thus, we’ve found the first problem in our code. When we work in Tarantool, a tuple is always associated with the space. The latter has a format, and the format has field names. That’s why we can use field names in a tuple. When we take it beyond the database, a tuple becomes just an array with a number of fields. If we refine the format of return from the function, we’ll be able to return not tuples, but objects with names. For this, we’ll apply the method :tomap{ names_only = true }:

function queue.put(...)
--- ...
return box.space.queue
:insert{ id, STATUS.READY, { ... } }
:tomap{ names_only = true }
end

function queue.take(timeout)
--- ...
return box.space.queue
:update( {found.id}, {{'=', F.status, STATUS.TAKEN }})
:tomap{ names_only = true }
end

function queue.ack(id)
--- ...
return box.space.queue:delete{t.id}:tomap{ names_only = true }
end

function queue.release(id)
--- ...
return box.space.queue
:tomap{ names_only = true }
end

return queue

Having replaced it, we’ll encounter another issue.

\$ tarantool consumer.lua
--- {'status': 'T', 'data': ['hi'], 'id': 1594327270675788959}
...

ER_PROC_LUA: queue.lua:117: Task 1594327270675788959ULL not taken by anybody

If we try to release the task the system will answer that we haven’t taken it. Moreover, we will see the same ID, but with a suffix – ULL.

Here we encounter a trick of the LuaJIT extention: FFI (Foreign Function Interface). Let’s delve into this matter. We add five values to the table using various alternatives of numeral 1 designation as the keys.

tarantool> t = {}
tarantool> t[1] = 1
tarantool> t["1"] = 2
tarantool> t[1LL] = 3
tarantool> t[1ULL] = 4
tarantool> t[1ULL] = 5
tarantool> t
---
- 1: 1
1: 5
1: 4
'1': 2
1: 3
...

We would expect them to be displayed as 2 (string + number) or 3 (string + number + LL). But when displayed, all the keys will appear in the table separately: we will still see all the values – 1, 2, 3, 4, 5. Moreover, at serialization, we won’t see any difference between regular, signed, or unsigned numbers.

tarantool> return t[1], t['1'], t[1LL], t[1ULL]
---
- 1
- 2
- null
- null
...

However, the most amusing thing happens when we try to extract data from the table. It goes well with regular Lua-types (number and string), but it doesn’t with LL (long long) and ULL (unsigned long long). They are a separate type of cdata, intended for working with C language types. When saving into a Lua table, cdata is hashed by address, not by value. Two numbers, no matter if they are the same in value, simply have different addresses. And when we add ULL to the table, we can’t extract them using the same value.

That’s why we’ll have to change our queue and key possession a bit. This is a forced move, but it will enable random modification of our keys in the future. Somehow, we need to transform our key into a string or a number. Let’s take the MessagePack. In Tarantool, it’s used to store tuples, and it will pack our values just like Tarantool itself does. With this pack, we’ll transform a random key into a string that will become a key to our table.

local msgpack = require 'msgpack'

local function keypack( key )
return msgpack.encode( key )
end

local function keyunpack( data )
return msgpack.decode( data )
end

Then we add the key package to take and save it into the table. In the function get_task we need to check if the key has passed in a correct format, and if not, we change it to int64. After that, we use keypack to pack the key to the MessagePack. As this packed key will be required by all the functions that use it, we’ll return it from get_task, so that ack and release could use it and clean it out from the sessions.

function queue.take(timeout)
if not timeout then timeout = 0 end
local now = fiber.time()
local found
found = box.space.queue.index.status
local left = (now + timeout) - fiber.time()
if left <= 0 then return end
queue._wait:get(left)
end
end

local sid = box.session.id()
log.info("Register %s by %s", found.id, sid)
local key = keypack( found.id )
queue.taken[ key ] = sid
queue.bysid[ sid ] = queue.bysid[ sid ] or {}
queue.bysid[ sid ][ key ] = true

return box.space.queue
:update( {found.id}, {{'=', F.status, STATUS.TAKEN }})
:tomap{ names_only = true }
end

if not id then error("Task id required", 2) end
id = tonumber64(id)
local key = keypack(id)
local t = box.space.queue:get{id}
if not t then
end
if not queue.taken[key] then
error(string.format( "Task %s not taken by anybody", id ), 2)
end
if queue.taken[key] ~= box.session.id() then
error(string.format( "Task %s taken by %d. Not you (%d)",
id, queue.taken[key], box.session.id() ), 2)
end
return t, key
end

function queue.ack(id)
queue.taken[ key ] = nil
queue.bysid[ box.session.id() ][ key ] = nil
return box.space.queue:delete{t.id}:tomap{ names_only = true }
end

function queue.release(id)
queue.taken[ key ] = nil
queue.bysid[ box.session.id() ][ key ] = nil
return box.space.queue
:tomap{ names_only = true }
end

As we have a disconnection trigger, we know that a certain session has disconnected – the one possessing certain keys. We can take all the keys from that session and automatically return them to their initial state — ready. Besides, this session may contain some tasks awaiting to be take-n. Let’s mark them in the session.storage for the tasks not to be taken.

box.session.on_disconnect(function()
log.info( "disconnected %s from %s", box.session.id(), box.session.storage.peer )
box.session.storage.destroyed = true

local sid = box.session.id()
local bysid = queue.bysid[ sid ]
if bysid then
while next(bysid) do
for key, id in pairs(bysid) do
log.info("Autorelease %s by disconnect", id);
queue.taken[key] = nil
bysid[key] = nil
local t = box.space.queue:get(id)
if t then
end
end
end
queue.bysid[ sid ] = nil
end
end)

function queue.take(timeout)
if not timeout then timeout = 0 end
local now = fiber.time()
local found
found = box.space.queue.index.status
local left = (now + timeout) - fiber.time()
if left <= 0 then return end
queue._wait:get(left)
end
end

if box.session.storage.destroyed then return end

local sid = box.session.id()
log.info("Register %s by %s", found.id, sid)
local key = keypack( found.id )
queue.taken[ key ] = sid
queue.bysid[ sid ] = queue.bysid[ sid ] or {}
queue.bysid[ sid ][ key ] = found.id

return box.space.queue
:update( {found.id}, {{'=', F.status, STATUS.TAKEN }})
:tomap{ names_only = true }
end

For testing purposes, tasks can be taken as a group:

tarantoolctl connect 127.0.0.1:3301 <<< 'queue.take()'

At adjustment, you might see that you’ve taken the tasks, thus dropping the queue, but at restart the tasks aren’t possessed by anybody (because all connections were interrupted when you switched off), but they get the taken status. That’s why we’ll update our code with status modification at startup. Thus, the database will be started, releasing all the tasks taken.

while true do
local t = box.space.queue.index.status:pairs({STATUS.TAKEN}):nth(1)
if not t then break end
box.space.queue:update({ t.id }, {{'=', F.status, STATUS.READY }})
log.info("Autoreleased %s at start", t.id)
end

Now we have a queue ready for operation.

Thereat, we only have to add delayed tasks. For that, let’s add a new field and a relevant index. We’re going to use this field to store the time when a certain task’s state should be changed. To this end, we’re modifying the put function and adding a new status: W=WAITING.

box.space.queue:format( {
{ name = 'id';     type = 'number' },
{ name = 'status'; type = 'string' },
{ name = 'runat';  type = 'number' },
{ name = 'data';   type = '*'      },
} )

box.space.queue:create_index('runat', {
parts = { F.runat, 'number', F.id, 'number' };
if_not_exists = true;
})

STATUS.WAITING = 'W'

As we are flip-flopping the pattern, and as this is a development mode, let’s clear the previous pattern (via the console):

box.space.queue.drop()
box.snapshot()

Now, let’s restart our queue.

Then, we add support of delay in put and release. If delay is passed on, the task’s status shall be changed to WAITING, and we have to define when it is subject to the procession. Another thing we need is a processor. For this, we can use background fibers. At any moment we can create a fiber that isn’t associated with any connections and works in the background. Let’s make a fiber that will work infinitely and await the nearest tasks.

function queue.put(data, opts)
local id = gen_id()

local runat = 0

if opts and opts.delay then
runat = clock.realtime() + tonumber(opts.delay)
status = STATUS.WAITING
else
queue._wait:put(true,0)
end
end

return box.space.queue
:insert{ id, status, runat, data }
:tomap{ names_only=true }
end

function queue.release(id, opts)
queue.taken[ key ] = nil
queue.bysid[ box.session.id() ][ key ] = nil

local runat = 0

if opts and opts.delay then
runat = clock.realtime() + tonumber(opts.delay)
status = STATUS.WAITING
else
end

return box.space.queue
:update({t.id},{{ '=', F.status, status },{ '=', F.runat, runat }})
:tomap{ names_only = true }
end

If a time comes for some of the tasks, we modify it, changing its status from waiting to ready and also notifying the clients that might be awaiting a task.

Now, we put a delayed task. Call take, make sure that there are no ready tasks. Call it again with a timeout, which fits into the task’s appearance. As soon as it appears, we see it’s been done by the fiber queue.runat.

queue._runat = fiber.create(function()
fiber.name('queue.runat')
while true do
local remaining

local now = clock.realtime()
for _,t in box.space.queue.index.runat
:pairs( { 0 }, { iterator = 'GT' })
do
if t.runat > now then
remaining = t.runat - now
break
else
if t.status == STATUS.WAITING then
log.info("Runat: W->R %s",t.id)
box.space.queue:update({ t.id }, {
{'=', F.runat, 0 },
})
else
log.error("Runat: bad status %s for %s", t.status, t.id)
box.space.queue:update({ t.id },{{ '=', F.runat, 0 }})
end
end
end

if not remaining or remaining > 1 then remaining = 1 end
fiber.sleep(remaining)
end
end)

## Monitoring

Never forget about monitoring the queue, because it can extend too much or even run out. We can count the number of tasks with every status in the queue and start sending the data to monitoring.

function queue.stats()
return {
total   = box.space.queue:len(),
waiting = box.space.queue.index.status:count({STATUS.WAITING}),
taken   = box.space.queue.index.status:count({STATUS.TAKEN}),
}
end

tarantool> queue.stats()
---
taken: 2
waiting: 5
total: 17
...

tarantool> local clock = require 'clock' local s = clock.time() local r = queue.stats() return r, clock.time() - s
---
taken: 2
waiting: 5
total: 17
- 0.00057339668273926
...

Such monitoring will work quite fast as long as there are not too many tasks. The normal state of the queue is empty. But suppose we have a million tasks. Our stats function still shows the correct value but works rather slowly. The issue is caused by the index:count call — this is always a full scan by index. Let’s cash the values of the counters.

queue._stats = {}
for k,v in pairs(STATUS) do
queue._stats[v] = 0LL
end

for _,t in box.space.queue:pairs() do
queue._stats[ t[F.status] ] = (queue._stats[ t[F.status] ] or 0LL)+1
end

function queue.stats()
return {
total   = box.space.queue:len(),
waiting = queue._stats[ STATUS.WAITING ],
taken   = queue._stats[ STATUS.TAKEN ],
}
end

Now, this function will work very fast regardless of the number of records. We only have to update the counters at any operations. Prior to every operation, we have to reduce one value and increase the other. We also can manually set updates of the functions, but errors and contradictions are possible. Luckily, Tarantool has triggers for spaces that are capable of tracing any changes in the space. You can even manually execute space:update or space:delete – the trigger will take that into account, too. The trigger will account for all the statuses according to the value used in the database. At the restart, we’ll once account for the values of all the counters.

box.space.queue:on_replace(function(old,new)
if old then
queue._stats[ old[ F.status ] ] = queue._stats[ old[ F.status ] ] - 1
end
if new then
queue._stats[ new[ F.status ] ] = queue._stats[ new[ F.status ] ] + 1
end
end)

There is one more operation that can’t be traced in the space directly but affects its content: space:truncate(). To monitor the clearing of the space a special space trigger can be used — _truncate.

box.space._truncate:on_replace(function(old,new)
if new.id == box.space.queue.id then
for k,v in pairs(queue._stats) do
queue._stats[k] = 0LL
end
end
end)

After that everything will work accurately and consistently. Statistics can be sent over the network. Tarantool has convenient non-blocking sockets that can be used rather low-level, almost like in C.

To demonstrate how it works let send the metrics in a Graphite format using UDP:

local socket = require 'socket'
local errno = require 'errno'

local graphite_host = '127.0.0.1'
local graphite_port = 2003

local ai = socket.getaddrinfo(graphite_host, graphite_port, 1, { type = 'SOCK_DGRAM' })
for _,info in pairs(ai) do
break
end
if not addr then error("Failed to resolve host") end

queue._monitor = fiber.create(function()
fiber.name('queue.monitor')
fiber.yield()
local remote = socket('AF_INET', 'SOCK_DGRAM', 'udp')
while true do
for k,v in pairs(queue.stats()) do
local msg = string.format("queue.stats.%s %s %s\n", k, tonumber(v), math.floor(fiber.time()))
local res = remote:sendto(addr, port, msg)
if not res then
log.error("Failed to send: %s", errno.strerror(errno()))
end
end
fiber.sleep(1)
end
end)

or using TCP:

local socket = require 'socket'
local errno = require 'errno'

local graphite_host = '127.0.0.1'
local graphite_port = 2003

queue._monitor = fiber.create(function()
fiber.name('queue.monitor')
fiber.yield()
while true do
local remote =  require 'socket'.tcp_connect(graphite_host, graphite_port)
if not remote then
log.error("Failed to connect to graphite %s",errno.strerror())
fiber.sleep(1)
else
while true do
local data = {}
for k,v in pairs(queue.stats()) do
table.insert(data,string.format("queue.stats.%s %s %s\n",k,tonumber(v),math.floor(fiber.time())))
end
data = table.concat(data,'')
if not remote:send(data) then
log.error("%s",errno.strerror())
break
end
fiber.sleep(1)
end
end
end
end)

An important feature of the Tarantool platform is hot code reloading. It is rarely required in regular apps, but when you have Gigabytes of data stored in the database and every reload takes time, hot reloading is quite helpful.

When Lua loads some code on require, the content of the file is interpreted, and the returned result is cashed in the system table package.loaded under the module’s name. Subsequent require calls of the same module won’t read the file again but will return its cached value. To make Lua reinterpret and redownload the file you just have to delete the relevant record from package.loaded[…] and call require again. You need to memorize what the runtime has preloaded because there won’t be files for reloading of inbuilt modules. The simplest code snippet for reload procession looks like:

require'strict'.on()
fiber = require 'fiber'

box.cfg{
listen = '127.0.0.1:3301'
}
box.schema.user.grant('guest', 'super', nil, nil, { if_not_exists = true })

local not_first_run = rawget(_G,'_NOT_FIRST_RUN')
_NOT_FIRST_RUN = true
if not_first_run then
end
end
else
end
end

queue = require 'queue'

require'console'.start()
os.exit()

require'strict'.on()
fiber = require 'fiber'

box.cfg{
listen = '127.0.0.1:3301'
}
box.schema.user.grant('guest', 'super', nil, nil, { if_not_exists = true })

queue = require 'queue'

require'console'.start()
os.exit()

To make the code reloadable, you should write it in a slightly different way. Mind that the code can be executed repeatedly. At first, it is executed at the first start, and subsequently, it is executed at reloading. We have to clearly process this situation.

local queue = {}
local old = rawget(_G,'queue')
if old then
queue.taken = old.taken
queue.bysid = old.bysid
queue._triggers = old._triggers
queue._stats = old._stats
queue._wait = old._wait
queue._runch = old._runch
queue._runat = old._runat
else
queue.taken = {}
queue.bysid = {}
queue._triggers = {}
queue._stats = {}
queue._wait = fiber.channel()
queue._runch = fiber.cond()
while true do
local t = box.space.queue.index.status:pairs({STATUS.TAKEN}):nth(1)
if not t then break end
box.space.queue:update({ t.id }, {{'=', F.status, STATUS.READY }})
log.info("Autoreleased %s at start", t.id)
end
for k,v in pairs(STATUS) do
queue._stats[v] = 0LL
end
for _,t in box.space.queue:pairs() do
queue._stats[ t[F.status] ] = (queue._stats[ t[F.status] ] or 0LL)+1
end
log.info("Perform initial stat counts %s", box.tuple.new{ queue._stats })
end

Besides, you need to remember about trigger reloading. If you leave the issue as is, every reload will cause the installation of an additional trigger. However, triggers support an indication of the old function, so the installation of the trigger returns it. That’s why we’ll just save the installation result to a variable and pass it on as an argument. At the first start, there will be no variable, and a new trigger will be installed. However, at subsequent loading, the trigger will be replaced.

queue._triggers.on_replace = box.space.queue:on_replace(function(old,new)
if old then
queue._stats[ old[ F.status ] ] = queue._stats[ old[ F.status ] ] - 1
end
if new then
queue._stats[ new[ F.status ] ] = queue._stats[ new[ F.status ] ] + 1
end
end, queue._triggers.on_replace)

queue._triggers.on_truncate = box.space._truncate:on_replace(function(old,new)
if new.id == box.space.queue.id then
for k,v in pairs(queue._stats) do
queue._stats[k] = 0LL
end
end
end, queue._triggers.on_truncate)

queue._triggers.on_connect = box.session.on_connect(function()
box.session.storage.peer = box.session.peer()
log.info( "connected %s from %s", box.session.id(), box.session.storage.peer )
end, queue._triggers.on_connect)

queue._triggers.on_disconnect = box.session.on_disconnect(function()
log.info( "disconnected %s from %s", box.session.id(), box.session.storage.peer )
box.session.storage.destroyed = true

local sid = box.session.id()
local bysid = queue.bysid[ sid ]
if bysid then
while next(bysid) do
for key, id in pairs(bysid) do
log.info("Autorelease %s by disconnect", id);
queue.taken[key] = nil
bysid[key] = nil
local t = box.space.queue:get(id)
if t then
end
end
end
queue.bysid[ sid ] = nil
end
end, queue._triggers.on_disconnect)

Another essential element at reloading is fibers. A fiber is started in the background, and we don’t control it in any way. It has while … true written, and it never stops and doesn’t reload on its own. To communicate with it we’ll need a channel or rather a fiber.cond: condition variable.

There are several different approaches to fiber reload. For example, the old ones can be deleted with the fiber.kill call, but this is not a very consistent take on the issue, as we may call kill at the wrong time. This is why we usually use the fiber generation attribute: the fiber proceeds working only in the generation it has been created. At code reload, the generation changes and the fiber clearly ends. Moreover, we can prevent the simultaneous operation of several fibers, checking the status of the previous generation fiber.

queue._runat = fiber.create(function(queue, gen, old_fiber)
fiber.name('queue.runat.'..gen)

log.info("Waiting for old to die")
queue._runch:wait(0.1)
end

log.info("Started...")
local remaining

local now = clock.realtime()

for _,t in box.space.queue.index.runat
:pairs( {0}, { iterator = 'GT' })
do
if t.runat > now then
remaining = t.runat - now
break
else
if t.status == STATUS.WAITING then
log.info("Runat: W->R %s",t.id)
box.space.queue:update({ t.id }, {
{ '=', F.runat, 0 },
})
else
log.error("Runat: bad status %s for %s", t.status, t.id)
box.space.queue:update({ t.id },{{ '=', F.runat, 0 }})
end
end
end

if not remaining or remaining > 1 then remaining = 1 end
queue._runch:wait(remaining)
end

log.info("Finished")

And in the end, at code reload you get an error saying the console is already on. This is how this situation can be dealt with:

if not fiber.self().storage.console then
require'console'.start()
os.exit()
end

## Let’s summarize

We’ve written a working network queue with delayed processing, automatic task return by means of triggers, statistics forwarding in Graphite using TCP, and explored quite a few issues. With average state-of-the-art hardware, such a queue will easily support the transmission of 20+ thousand messages per second. It contains about 300 code strings and can be compiled in a day, document studies included.

Final files:

queue.lua
local clock = require 'clock'
local errno = require 'errno'
local fiber = require 'fiber'
local log = require 'log'
local msgpack = require 'msgpack'
local socket = require 'socket'

box.schema.create_space('queue',{ if_not_exists = true; })

box.space.queue:format( {
{ name = 'id';     type = 'number' },
{ name = 'status'; type = 'string' },
{ name = 'runat';  type = 'number' },
{ name = 'data';   type = '*'      },
} );

local F = {}
for no,def in pairs(box.space.queue:format()) do
F[no] = def.name
F[def.name] = no
end

box.space.queue:create_index('primary', {
parts = { F.id, 'number' };
if_not_exists = true;
})

box.space.queue:create_index('status', {
parts = { F.status, 'string', F.id, 'number' };
if_not_exists = true;
})

box.space.queue:create_index('runat', {
parts = { F.runat, 'number', F.id, 'number' };
if_not_exists = true;
})

local STATUS = {}
STATUS.TAKEN = 'T'
STATUS.WAITING = 'W'

local queue = {}
local old = rawget(_G,'queue')
if old then
queue.taken = old.taken
queue.bysid = old.bysid
queue._triggers = old._triggers
queue._stats = old._stats
queue._wait = old._wait
queue._runch = old._runch
queue._runat = old._runat
else
queue.taken = {}
queue.bysid = {}
queue._triggers = {}
queue._stats = {}
queue._wait = fiber.channel()
queue._runch = fiber.cond()
while true do
local t = box.space.queue.index.status:pairs({STATUS.TAKEN}):nth(1)
if not t then break end
box.space.queue:update({ t.id }, {{'=', F.status, STATUS.READY }})
log.info("Autoreleased %s at start", t.id)
end

for k,v in pairs(STATUS) do queue._stats[v] = 0LL end
for _,t in box.space.queue:pairs() do
queue._stats[ t[F.status] ] = (queue._stats[ t[F.status] ] or 0LL)+1
end
log.info("Perform initial stat counts %s", box.tuple.new{ queue._stats })
end

local function gen_id()
local new_id
repeat
new_id = clock.realtime64()
until not box.space.queue:get(new_id)
return new_id
end

local function keypack( key )
return msgpack.encode( key )
end

local function keyunpack( data )
return msgpack.decode( data )
end

queue._triggers.on_replace = box.space.queue:on_replace(function(old,new)
if old then
queue._stats[ old[ F.status ] ] = queue._stats[ old[ F.status ] ] - 1
end
if new then
queue._stats[ new[ F.status ] ] = queue._stats[ new[ F.status ] ] + 1
end
end, queue._triggers.on_replace)

queue._triggers.on_truncate = box.space._truncate:on_replace(function(old,new)
if new.id == box.space.queue.id then
for k,v in pairs(queue._stats) do
queue._stats[k] = 0LL
end
end
end, queue._triggers.on_truncate)

queue._triggers.on_connect = box.session.on_connect(function()
box.session.storage.peer = box.session.peer()
end, queue._triggers.on_connect)

queue._triggers.on_disconnect = box.session.on_disconnect(function()
box.session.storage.destroyed = true
local sid = box.session.id()
local bysid = queue.bysid[ sid ]
if bysid then
log.info( "disconnected %s from %s", box.session.id(), box.session.storage.peer )
while next(bysid) do
for key, id in pairs(bysid) do
log.info("Autorelease %s by disconnect", id);
queue.taken[key] = nil
bysid[key] = nil
local t = box.space.queue:get(id)
if t then
end
end
end
queue.bysid[ sid ] = nil
end
end, queue._triggers.on_disconnect)

queue._runat = fiber.create(function(queue, gen, old_fiber)
fiber.name('queue.runat.'..gen)

log.info("Waiting for old to die")
queue._runch:wait(0.1)
end

log.info("Started...")
local remaining

local now = clock.realtime()

for _,t in box.space.queue.index.runat
:pairs( {0}, { iterator = 'GT' })
do
if t.runat > now then
remaining = t.runat - now
break
else
if t.status == STATUS.WAITING then
log.info("Runat: W->R %s",t.id)
box.space.queue:update({ t.id }, {
{ '=', F.runat, 0 },
})
else
log.error("Runat: bad status %s for %s", t.status, t.id)
box.space.queue:update({ t.id },{{ '=', F.runat, 0 }})
end
end
end

if not remaining or remaining > 1 then remaining = 1 end
queue._runch:wait(remaining)
end

log.info("Finished")

local graphite_host = '127.0.0.1'
local graphite_port = 2003
queue._monitor = fiber.create(function(gen)
fiber.name('queue.mon.'..gen)
fiber.yield()
local remote =  require 'socket'.tcp_connect(graphite_host, graphite_port)
if not remote then
log.error("Failed to connect to graphite %s",errno.strerror())
fiber.sleep(1)
else
local data = {}
for k,v in pairs(queue.stats()) do
table.insert(data,string.format("queue.stats.%s %s %s\n",k,tonumber(v),math.floor(fiber.time())))
end
data = table.concat(data,'')
if not remote:send(data) then
log.error("%s",errno.strerror())
break
end
fiber.sleep(1)
end
end
end

function queue.put(data, opts)
local id = gen_id()

local runat = 0
if opts and opts.delay then
runat = clock.realtime() + tonumber(opts.delay)
status = STATUS.WAITING
else
queue._wait:put(true,0)
end
end

return box.space.queue
:insert{ id, status, runat, data }
:tomap{ names_only=true }
end

function queue.take(timeout)
if not timeout then timeout = 0 end
local now = fiber.time()
local found
found = box.space.queue.index.status
local left = (now + timeout) - fiber.time()
if left <= 0 then return end
queue._wait:get(left)
end
end

if box.session.storage.destroyed then return end

local sid = box.session.id()
log.info("Register %s by %s", found.id, sid)
local key = keypack( found.id )
queue.taken[ key ] = sid
queue.bysid[ sid ] = queue.bysid[ sid ] or {}
queue.bysid[ sid ][ key ] = found.id

return box.space.queue
:update( {found.id}, {{'=', F.status, STATUS.TAKEN }})
:tomap{ names_only = true }
end

if not id then error("Task id required", 2) end
id = tonumber64(id)
local key = keypack(id)
local t = box.space.queue:get{id}
if not t then
end
if not queue.taken[key] then
error(string.format( "Task %s not taken by anybody", id ), 2)
end
if queue.taken[key] ~= box.session.id() then
error(string.format( "Task %s taken by %d. Not you (%d)",
id, queue.taken[key], box.session.id() ), 2)
end
return t, key
end

function queue.ack(id)
queue.taken[ key ] = nil
queue.bysid[ box.session.id() ][ key ] = nil
return box.space.queue:delete{t.id}:tomap{ names_only = true }
end

function queue.release(id, opts)
queue.taken[ key ] = nil
queue.bysid[ box.session.id() ][ key ] = nil

local runat = 0

if opts and opts.delay then
runat = clock.realtime() + tonumber(opts.delay)
status = STATUS.WAITING
else
end

return box.space.queue
:update({t.id},{{'=', F.status, status },{ '=', F.runat, runat }})
:tomap{ names_only = true }
end

function queue.stats()
return {
total   = box.space.queue:len(),
waiting = queue._stats[ STATUS.WAITING ],
taken   = queue._stats[ STATUS.TAKEN ],
}
end

return queue

init.lua
require'strict'.on()
fiber = require 'fiber'

box.cfg{
listen = '127.0.0.1:3301'
}
box.schema.user.grant('guest', 'super', nil, nil, { if_not_exists = true })

queue = require 'queue'

if not fiber.self().storage.console then
require'console'.start()
os.exit()
end
Mail.ru Group
Building the Internet