Всем привет! Продолжаем разбираться с io_uring. Сегодня попробуем использовать io_uring для решения прикладных задач. Сначала мигрируем приложения из предыдущего материала с C на GO, затем подготовим почву для написания полноценных TCP серверов. Ну и без сравнения производительности не обойдется. Обойдемся без длинного вступления, вперед экспериментировать!
Дисклеймер
Это вторая статья из серии, посвященной io_uring. Чтобы понимать, о чем речь — ознакомьтесь с первым материалом.
Сегодня оставим в стороне теорию io_uring, но cфокусируемся на практических аспектах применения этой технологии, так что кода будет много. Для практических экспериментов была выбрана работа с сокетами, а в качестве эталона — runtime GO. Таким образом, в этом материале, код будет на языке GO, да и применение io_uring рассмотрим в контексте GO.
Если данная тема будет неинтересна, то подходите к статье номер 3. Там снова будет обсуждение фич io_uring, настроек и режимов работы, тюнинга производительности.
Echo-сервер, тушёный с приправами
Прежде чем соперничать с runtime'ом GO, немного поговорим об инструменте, который будем использовать. Для работы с io_uring можно было воспользоваться связкой CGO + liburing, но, так как особых помех для работы с системными вызовами семейства io_uring в GO нет (ну ладно, они есть, но говорить сегодня о них не будем), почему бы не сделать полностью нативную библиотеку? Давайте перепишем echo-сервер из прошлой статьи с помощью библиотеки go-uring:
GO echo-server
package main
import (
"errors"
"flag"
"fmt"
"github.com/godzie44/go-uring/uring"
"log"
"strconv"
"syscall"
)
const MaxConns = 4096
const Backlog = 512
const MaxMsgLen = 2048
type connType int
const (
_ connType = iota
ACCEPT
READ
WRITE
)
// Для каждого активного соединения будем держать в памяти connInfo структуру.
// fd файловый дескриптор сокета.
// typ - состояние в котором находится сокет (ожидает чтения/записи/accept'а).
// sendOp, recvOp - закешированные операции чтения из сокета/записи в сокет.
type connInfo struct {
fd int
typ connType
sendOp *uring.SendOp
recvOp *uring.RecvOp
}
// Для каждого соединения предалоцируем операции чтения и записи.
// Переиспользование операция чтения и записи снизит нагрузку на GC.
func makeConns() [MaxConns]connInfo {
var conns [MaxConns]connInfo
for fd := range conns {
conns[fd].recvOp = uring.Recv(uintptr(fd), nil, 0)
conns[fd].sendOp = uring.Send(uintptr(fd), nil, 0)
}
return conns
}
// Буфер для соединений.
var conns = makeConns()
// Для каждого соединения инициализируем буфера для записи/чтения.
func makeBuffers() [][]byte {
buffs := make([][]byte, MaxConns)
for i := range buffs {
buffs[i] = make([]byte, MaxMsgLen)
}
return buffs
}
var buffs = makeBuffers()
func main() {
flag.Parse()
port, _ := strconv.Atoi(flag.Arg(0))
// Создаем серверный сокет и слушаем порт.
// Отметим, что при создании сокета флаг O_NON_BLOCK не устанавливается,
// при этом все операции read/write и тд преобразуются в неблокирующие системные вызовы внутри io_uring
serverSockFd, err := syscall.Socket(syscall.AF_INET, syscall.SOCK_STREAM, 0)
checkErr(err)
defer syscall.Close(serverSockFd)
checkErr(syscall.SetsockoptInt(serverSockFd, syscall.SOL_SOCKET, syscall.SO_REUSEADDR, 1))
checkErr(syscall.Bind(serverSockFd, &syscall.SockaddrInet4{
Port: port,
}))
checkErr(syscall.Listen(serverSockFd, Backlog))
fmt.Printf("io_uring echo server listening for connections on port: %d\n", port)
// Создаем экземпляр io_uring, не используем никакие кастомные настройки.
// Вместимость SQ/SQ буферов устанавливаем в 4096 элементов.
ring, err := uring.New(4096)
checkErr(err)
defer ring.Close()
// Проверяем наличие фичи IORING_FEAT_FAST_POLL.
// Для нас это наиболее "перформящая" фича в данном приложении,
// фактически это встроенный в io_uring движок для поллинга I/O.
if !ring.Params.FastPollFeature() {
checkErr(errors.New("IORING_FEAT_FAST_POLL not available"))
}
// Добавляем первую операцию в SQ - слушаем серверный сокет на предмет новых входящих соединений.
acceptOp := uring.Accept(uintptr(serverSockFd), 0)
addAccept(ring, acceptOp)
cqes := make([]*uring.CQEvent, Backlog)
var cqe *uring.CQEvent
for {
// Сабмитим все SQE которые были добавлены на предыдущей итерации..
_, err = ring.Submit()
checkErr(err)
// Ждем когда в CQ буфере появится хотя бы одно CQE.
_, err = ring.WaitCQEvents(1)
if errors.Is(err, syscall.EAGAIN) || errors.Is(err, syscall.EINTR) {
continue
}
checkErr(err)
// Помещаем все "готовые" CQE в буфер cqes.
n := ring.PeekCQEventBatch(cqes)
for i := 0; i < n; i++ {
cqe = cqes[i]
// В поле user_data находится индекс соответствующего connInfo
// в которой находится служебная информация по сокету.
ud := conns[cqe.UserData]
typ := ud.typ
res := cqe.Res
ring.SeenCQE(cqe)
// Проверяя тип мы идентифицируем операцию результат которой находится в CQE (accept / recv / send).
switch typ {
case ACCEPT:
addRead(ring, int(res))
addAccept(ring, acceptOp)
case READ:
if res <= 0 {
_ = syscall.Shutdown(ud.fd, syscall.SHUT_RDWR)
} else {
addWrite(ring, ud.fd, res)
}
case WRITE:
addRead(ring, ud.fd)
}
}
}
}
// addAccept - добавляет в SQ accept операцию, fd - файловый дескриптор сервер сокета.
func addAccept(ring *uring.Ring, acceptOp *uring.AcceptOp) {
conns[acceptOp.Fd()].fd = acceptOp.Fd()
conns[acceptOp.Fd()].typ = ACCEPT
err := ring.QueueSQE(acceptOp, 0, uint64(acceptOp.Fd()))
checkErr(err)
}
// addRead - добавляет в SQ read операцию, fd - файловый дескриптор клиентского сокета.
func addRead(ring *uring.Ring, fd int) {
buff := buffs[fd]
ci := &conns[fd]
ci.fd = fd
ci.typ = READ
ci.recvOp.SetBuffer(buff)
err := ring.QueueSQE(ci.recvOp, 0, uint64(fd))
checkErr(err)
}
// addWrite - добавляет в SQ write операцию, fd - файловый дескриптор клиентского сокета.
func addWrite(ring *uring.Ring, fd int, bytesRead int32) {
buff := buffs[fd]
ci := &conns[fd]
ci.fd = fd
ci.typ = WRITE
ci.sendOp.SetBuffer(buff[:bytesRead])
err := ring.QueueSQE(ci.sendOp, 0, uint64(fd))
checkErr(err)
}
func checkErr(err error) {
if err != nil {
log.Fatal(err)
}
}
Код на GO получился несколько компактнее чем этот же сервер на plain C. С другой стороны, оба исходника довольно схожи как семантически, так и синтаксически. А что по поводу производительности? С подробным описанием методики сравнения можно ознакомиться здесь. Рассмотрим исключительно результаты:
c: 100 bytes: 128 | c: 50 bytes: 1024 | c: 500 bytes: 128 | c: 500 bytes: 1024 | c: 1000 bytes: 128 | c: 1000 bytes: 1024 | |
io_uring-echo-server (C lang) | 235356 | 224783 | 173670 | 155477 | 149407 | 139987 |
echo-server | 227884 | 222709 | 169001 | 150275 | 143664 | 128783 |
Отлично! GO-код не сильно проигрывает коду на C в контексте I/O нагрузки. Но, есть проблема, реализация транспортного слоя прибита гвоздями к логике приложения, что не позволяет использовать ее где-либо кроме как в echo-сервере. Для того чтобы писать полноценные приложения с io_uring в GO, нам нужно реализовать интерфейсы из пакета net: net.Conn и net.Listener. А для этого нужно разобраться в реализации стандартных net.Conn и net.Listener для протокола TCP и написать свои реализации. Но обо всем по порядку.
В логове net package
Рассмотрим необходимые нам интерфейсы подробнее.
net.Listener
Задача listener'а — слушать определенный порт и при подключении к серверу очередного клиента создать структуру реализующую net.Conn, для работы с этим клиентом. Соответствующий интерфейс:
// A Listener is a generic network listener for stream-oriented protocols.
//
// Multiple goroutines may invoke methods on a Listener simultaneously.
type Listener interface {
// Accept waits for and returns the next connection to the listener.
Accept() (Conn, error)
// Close closes the listener.
// Any blocked Accept operations will be unblocked and return errors.
Close() error
// Addr returns the listener's network address.
Addr() Addr
}
net.Conn
net.Conn - абстракция над клиентским соединением, позволяет выполнять чтение и запись в соответствующий клиентский сокет. Интерфейс (для удобства немного упрощен):
type Conn interface{
// Read reads data from the connection.
Read(b []byte) (n int, err error)
// Write writes data to the connection.
Write(b []byte) (n int, err error)
// Close closes the connection.
Close() error
}
Соответствующие реализации этих интерфейсов лежат в основе работы транспортного уровня в GO. Имея на руках реализации net.Listner и net.Conn для, например, TCP протокола можно реализовать любой вышестоящий протокол, будь то HTTP или любой другой. К примеру, вот так будет выглядеть echo-server с использованием стандартных net.TCPListener и net.TCPConn:
обычный echo-server
package main
import (
"io"
"log"
"net"
)
const MaxConns = 4096
const MaxMsgLen = 2048
func initBuffs() [][]byte {
buffs := make([][]byte, MaxConns)
for i := range buffs {
buffs[i] = make([]byte, MaxMsgLen)
}
return buffs
}
var buffs = initBuffs()
func main() {
listener, err := net.ListenTCP("tcp", &net.TCPAddr{
Port: 8080,
})
checkErr(err)
for {
conn, err := listener.Accept()
checkErr(err)
go handleConn(conn)
}
}
func handleConn(conn net.Conn) {
f, _ := conn.(*net.TCPConn).File()
buff := buffs[int(f.Fd())]
for {
n, err := conn.Read(buff)
if err == io.EOF || n == 0 {
checkErr(conn.Close())
return
}
checkErr(err)
_, err = conn.Write(buff[:n])
checkErr(err)
}
}
func checkErr(err error) {
if err != nil {
log.Fatal(err)
}
}
Все довольно просто, в бесконечном цикле принимаем входящие соединения, а затем, для каждого соединения, создаем горутину в которой начинаем читать данные из клиентского сокета и писать туда же. Но представьте, что какой-то из клиентов решил не передавать нам данные, к примеру, в течение минуты. В таком случае поток, на котором находится горутина выполняющая чтение, заблокируется? Естественно нет! Любой GO разработчик знает — это не проблема для нашего сервера. Но каким образом, казалось бы, блокирующие вызовы conn.Read не блокируют поток исполнения? И тут в дело вступает runtime, вернее его компонент — netpoller.
netpoller - тропа через block I/O
Рассмотрим как горутины работают с I/O операциями над сокетами на примере чтения. В GO горутина (G) желающая прочитать данные из сокета, сначала попытается выполнить чтение в неблокирующем режиме. Если чтение в настоящий момент невозможно (к примеру, в сокете нет данных) — G будет заблокирована. При этом поток выполнения (M) не блокируется, а G инициирующая чтение уходит в состояние сна. Ее место на логическом процессоре (P) займет другая горутина. Наконец, когда сокет будет готов для I/O (в нашем случае появились данные для чтения) - планировщик разбудит G, и наша программа выполнит запланированный системный вызов.
Для реализации подобного механизма необходим компонент, который:
позволяет G добавить себя в очередь для оповещения о готовности I/O
будет использован планировщиком, чтобы понять, каким G требуется процессорное время в связи с появлением I/O events
Этот компонент runtime'а называется netpoller и в том числе, благодаря ему, сервера, написанные на GO, обладают такой скоростью.
А вот его упрощенный интерфейс, который нас интересует:
func poll_runtime_pollWait(pd *pollDesc, mode int) int
func netpoll(delay int64) gList
poll_runtime_pollWait - предоставляет возможность для G подписаться на интересующее I/O событие в сокете. Структура pollDesc описывает файловый дескриптор сокета, а mode - тип события, которого следует ждать (read/write)
netpoll - эта функция используется планировщиком для получения списка G которым нужно дать процессорное время
Реализация netpoller зависит от конкретной OS. В Linux netpoller реализован с помощью семейства системных вызовов epoll. Таким образом, в GO используется паттерн подписки на I/O события:
Итак, с помощью netpoller'а можно делать системные вызовы, не боясь заблокировать приложение. В свою очередь, с помощью io_uring можно добиться тех же целей, но другими средствами. Концепция событий отходит в сторону, ведь с io_uring можно просто поставить необходимую операцию в очередь на исполнение и позднее узнать результат операции.
Тут нужно сделать ремарку: можно точно так же отлавливать I/O события и с помощью io_uring операции IORING_OP_POLL_ADD, правда, на производительности такой подход отражается скорее в негативную сторону.
Приручение reactor
Ну что же, теперь можно написать конкурента netpoller'у с использованием io_uring. Назовем его reactor. Суть идеи проста, реактор выполняет операции поддерживаемые io_uring и дергает callback в userspace по завершению той или иной операции. Таким образом, reactor будет полноценным асинхронным I/O бэкендом. Вот его наивная реализация:
reactor.go
type Callback func(event uring.CQEvent)
type Reactor struct {
ring *uring.Ring
callbacks map[uint64]Callback
callbacksLock sync.Mutex
queueSQELock sync.Mutex
currentNonce uint64
submitSignal chan struct{}
}
func New(ring *uring.Ring) *Reactor {
return &Reactor{
ring: ring,
submitSignal: make(chan struct{}),
callbacks: map[uint64]Callback{},
}
}
func (r *Reactor) Run(ctx context.Context) {
wg := &sync.WaitGroup{}
//запускаем два компонента реактора
//consumer - обрабатывает новые CQE
//publisher - отправляет новые SQE на обработку
wg.Add(2)
go func() {
defer wg.Done()
r.runConsumer(ctx)
}()
go func() {
defer wg.Done()
r.runPublisher(ctx)
}()
wg.Wait()
}
// сабмитим новые SQE в отдельной горутине
func (r *Reactor) runPublisher(ctx context.Context) {
defer close(r.submitSignal)
for {
select {
//при поступлении сигнала обрабатываем новые SQE
case <-r.submitSignal:
r.queueSQELock.Lock()
_, err := r.ring.Submit()
r.queueSQELock.Unlock()
if err != nil {
log.Println("io_uring submit", err)
}
case <-ctx.Done():
return
}
}
}
// получаем результаты выполнения команд и дергаем соответствующие колбеки
func (r *Reactor) runConsumer(ctx context.Context) {
cqeBuff := make([]*uring.CQEvent, 512)
// цикл обработки результатов операций (CQE)
for {
// командуем Publisher'у обработать новые SQE
r.submitSignal <- struct{}{}
// ждем хотябы одну завершенную операцию
_, err := r.ring.WaitCQEventsWithTimeout(1, time.Millisecond)
if errors.Is(err, syscall.EAGAIN) || errors.Is(err, syscall.EINTR) || errors.Is(err, syscall.ETIME) {
runtime.Gosched()
goto CheckCtxAndContinue
}
if err != nil {
log.Println("io_uring wait", err)
goto CheckCtxAndContinue
}
for n := r.ring.PeekCQEventBatch(cqeBuff); n > 0; n = r.ring.PeekCQEventBatch(cqeBuff) {
for i := 0; i < n; i++ {
cqe := cqeBuff[i]
nonce := cqe.UserData
//находим соответствующий callback
r.callbacksLock.Lock()
cb := r.callbacks[nonce]
delete(r.callbacks, nonce)
r.callbacksLock.Unlock()
cb(uring.CQEvent{
UserData: cqe.UserData,
Res: cqe.Res,
Flags: cqe.Flags,
})
}
//сообщаем io_uring о N просмотренных CQE
r.ring.AdvanceCQ(uint32(n))
}
CheckCtxAndContinue:
select {
case <-ctx.Done():
return
default:
continue
}
}
}
//Queue добавляет операцию в SQ.
func (r *Reactor) Queue(op uring.Operation, cb Callback) (uint64, error) {
//генерируем уникальное значение UserData
nonce := r.nextNonce()
r.queueSQELock.Lock()
defer r.queueSQELock.Unlock()
//помещаем операцию в SQ
err := r.ring.QueueSQE(op, 0, nonce)
if err == nil {
r.callbacksLock.Lock()
r.callbacks[nonce] = cb
r.callbacksLock.Unlock()
}
return nonce, err
}
func (r *Reactor) nextNonce() uint64 {
return atomic.AddUint64(&r.currentNonce, 1)
}
C помощью подобного компонента можно добавить операцию в SQ очередь io_uring и реактивно обработать результат этой операции. Внутри параллельно работают consumer для добавления операций в SQ и publisher для обработки результатов из CQ.
Теперь, при помощи reactor'а, можно реализовать заветные net.Listener и net.Conn интерфейсы.
net.Listener. Реализация с использованием reactor (очевидные участки кода выброшены, ссылка на полную реализацию будет ниже)
type Listener struct {
fd int
reactor *Reactor
acceptChan chan uring.CQEvent
addr net.Addr
}
func NewListener(addr string, reactor *Reactor) (*Listener, error) {
tcpAddr, err := net.ResolveTCPAddr("tcp", addr)
if err != nil {
return nil, err
}
// serverSocket - создает и биндит серверный сокет
sockFd, err := serverSocket(tcpAddr)
if err != nil {
return nil, err
}
return &Listener{
fd: sockFd,
addr: tcpAddr,
reactor: reactor,
acceptChan: make(chan uring.CQEvent),
}, nil
}
func (l *Listener) Accept() (net.Conn, error) {
//помещаем accept операцию в реактор
op := uring.Accept(uintptr(l.fd), 0)
l.reactor.Queue(op, func(event uring.CQEvent) {
l.acceptChan <- event
})
// ждем появление подключения
cqe := <-l.acceptChan
if err := cqe.Error(); err != nil {
return nil, err
}
fd := int(cqe.Res)
rAddr, _ := op.Addr()
tc := newConn(fd, l.addr, rAddr, l.reactor)
return tc, nil
}
net.Conn. Реализация с использованием reactor (очевидные участки кода выброшены, ссылка на полную реализацию будет ниже)
type Conn struct {
Fd int
lAddr, rAddr net.Addr
reactor *Reactor
readChan, writeChan chan uring.CQEvent
readLock, writeLock sync.Mutex
}
func newConn(fd int, lAddr, rAddr net.Addr, r *Reactor) *Conn {
return &Conn{
lAddr: lAddr,
rAddr: rAddr,
Fd: fd,
reactor: r,
readChan: make(chan uring.CQEvent),
writeChan: make(chan uring.CQEvent),
}
}
func (c *Conn) Read(b []byte) (n int, err error) {
c.readLock.Lock()
defer c.readLock.Unlock()
//помещаем Recv операцию в реактор
op := uring.Recv(uintptr(c.Fd), b, 0)
c.reactor.Queue(op, func(event uring.CQEvent) {
c.readChan <- event
})
//ждем завершения чтения из сокета
cqe := <-c.readChan
if err = cqe.Error(); err != nil {
return 0, &net.OpError{Op: "read", Net: "tcp", Source: c.lAddr, Addr: c.rAddr, Err: err}
}
if cqe.Res == 0 {
err = &net.OpError{Op: "read", Net: "tcp", Source: c.lAddr, Addr: c.rAddr, Err: io.EOF}
}
runtime.KeepAlive(b)
return int(cqe.Res), err
}
func (c *Conn) Write(b []byte) (n int, err error) {
c.writeLock.Lock()
defer c.writeLock.Unlock()
//помещаем Send операцию в реактор
op := uring.Send(uintptr(c.Fd), b, 0)
c.reactor.Queue(op, func(event uring.CQEvent) {
c.writeChan <- event
})
//ждем завершения записи в сокет
cqe := <-c.writeChan
if err = cqe.Error(); err != nil {
return 0, &net.OpError{Op: "write", Net: "tcp", Source: c.lAddr, Addr: c.rAddr, Err: err}
}
if cqe.Res == 0 {
err = &net.OpError{Op: "write", Net: "tcp", Source: c.lAddr, Addr: c.rAddr, Err: io.ErrUnexpectedEOF}
}
runtime.KeepAlive(b)
return int(cqe.Res), err
}
func (c *Conn) Close() error {
return syscall.Close(c.Fd)
}
Проверим на практике новые реализации, давно мы не писали echo-server! Вот его полный код (вместе с reactor'ом, net.Listener и net.Conn).
Сравним новый сервер с io_uring reactor'ом под капотом, с таким же сервером, который использует net.TCPListener и net.TCPConn из стандартной бибилиотеки (а значит, под капотом там netpoller). Как обычно смотрим на rps.
c: 100 bytes: 128 | c: 50 bytes: 1024 | c: 500 bytes: 128 | c: 500 bytes: 1024 | c: 1000 bytes: 128 | c: 1000 bytes: 1024 | |
net/http | 132664 | 139206 | 133039 | 139171 | 133480 | 139617 |
reactor (io_uring) | 30892 | 30077 | 39192 | 38375 | 46120 | 51204 |
Да уж, сравнение не в нашу пользу. Текущая реализация не использует на полную ресурсы железа и возможности io_uring — расплата за это низкая производительность. Тем не менее в плане функциональности даже такой, наивной, реализации reactor достаточно чтобы заменить собой netpoller. Кроме того, reactor можно использовать не только в контексте socket I/O, но и для всех прочих I/O.
О завершающей части
Конечно, такая низкая производительность не может нас устроить. Поэтому в третьей и заключительной части разберемся, почему решение с "наивным" reactor работает так медленно, рассмотрим способы ускорения (как с помощью настройки io_uring, так и с помощью оптимизаций кода reactor), а также поставим точку (хотя скорее многоточие) в битве с netpoller. Спасибо за внимание, stay tuned.
Дата-центр ITSOFT — размещение и аренда серверов и стоек в двух дата-центрах в Москве. За последние годы UPTIME 100%. Размещение GPU-ферм и ASIC-майнеров, аренда GPU-серверов, лицензии связи, SSL-сертификаты, администрирование серверов и поддержка сайтов.