
Всем привет! Продолжаем разбираться с io_uring. Сегодня попробуем использовать io_uring для решения прикладных задач. Сначала мигрируем приложения из предыдущего материала с C на GO, затем подготовим почву для написания полноценных TCP серверов. Ну и без сравнения производительности не обойдется. Обойдемся без длинного вступления, вперед экспериментировать!
Дисклеймер
Это вторая статья из серии, посвященной io_uring. Чтобы понимать, о чем речь — ознакомьтесь с первым материалом.
Сегодня оставим в стороне теорию io_uring, но cфокусируемся на практических аспектах применения этой технологии, так что кода будет много. Для практических экспериментов была выбрана работа с сокетами, а в качестве эталона — runtime GO. Таким образом, в этом материале, код будет на языке GO, да и применение io_uring рассмотрим в контексте GO.
Если данная тема будет неинтересна, то подходите к статье номер 3. Там снова будет обсуждение фич io_uring, настроек и режимов работы, тюнинга производительности.
Echo-сервер, тушёный с приправами

Прежде чем соперничать с runtime'ом GO, немного поговорим об инструменте, который будем использовать. Для работы с io_uring можно было воспользоваться связкой CGO + liburing, но, так как особых помех для работы с системными вызовами семейства io_uring в GO нет (ну ладно, они есть, но говорить сегодня о них не будем), почему бы не сделать полностью нативную библиотеку? Давайте перепишем echo-сервер из прошлой статьи с помощью библиотеки go-uring:
GO echo-server
package main import ( "errors" "flag" "fmt" "github.com/godzie44/go-uring/uring" "log" "strconv" "syscall" ) const MaxConns = 4096 const Backlog = 512 const MaxMsgLen = 2048 type connType int const ( _ connType = iota ACCEPT READ WRITE ) // Для каждого активного соединения будем держать в памяти connInfo структуру. // fd файловый дескриптор сокета. // typ - состояние в котором находится сокет (ожидает чтения/записи/accept'а). // sendOp, recvOp - закешированные операции чтения из сокета/записи в сокет. type connInfo struct { fd int typ connType sendOp *uring.SendOp recvOp *uring.RecvOp } // Для каждого соединения предалоцируем операции чтения и записи. // Переиспользование операция чтения и записи снизит нагрузку на GC. func makeConns() [MaxConns]connInfo { var conns [MaxConns]connInfo for fd := range conns { conns[fd].recvOp = uring.Recv(uintptr(fd), nil, 0) conns[fd].sendOp = uring.Send(uintptr(fd), nil, 0) } return conns } // Буфер для соединений. var conns = makeConns() // Для каждого соединения инициализируем буфера для записи/чтения. func makeBuffers() [][]byte { buffs := make([][]byte, MaxConns) for i := range buffs { buffs[i] = make([]byte, MaxMsgLen) } return buffs } var buffs = makeBuffers() func main() { flag.Parse() port, _ := strconv.Atoi(flag.Arg(0)) // Создаем серверный сокет и слушаем порт. // Отметим, что при создании сокета флаг O_NON_BLOCK не устанавливается, // при этом все операции read/write и тд преобразуются в неблокирующие системные вызовы внутри io_uring serverSockFd, err := syscall.Socket(syscall.AF_INET, syscall.SOCK_STREAM, 0) checkErr(err) defer syscall.Close(serverSockFd) checkErr(syscall.SetsockoptInt(serverSockFd, syscall.SOL_SOCKET, syscall.SO_REUSEADDR, 1)) checkErr(syscall.Bind(serverSockFd, &syscall.SockaddrInet4{ Port: port, })) checkErr(syscall.Listen(serverSockFd, Backlog)) fmt.Printf("io_uring echo server listening for connections on port: %d\n", port) // Создаем экземпляр io_uring, не используем никакие кастомные настройки. // Вместимость SQ/SQ буферов устанавливаем в 4096 элементов. ring, err := uring.New(4096) checkErr(err) defer ring.Close() // Проверяем наличие фичи IORING_FEAT_FAST_POLL. // Для нас это наиболее "перформящая" фича в данном приложении, // фактически это встроенный в io_uring движок для поллинга I/O. if !ring.Params.FastPollFeature() { checkErr(errors.New("IORING_FEAT_FAST_POLL not available")) } // Добавляем первую операцию в SQ - слушаем серверный сокет на предмет новых входящих соединений. acceptOp := uring.Accept(uintptr(serverSockFd), 0) addAccept(ring, acceptOp) cqes := make([]*uring.CQEvent, Backlog) var cqe *uring.CQEvent for { // Сабмитим все SQE которые были добавлены на предыдущей итерации.. _, err = ring.Submit() checkErr(err) // Ждем когда в CQ буфере появится хотя бы одно CQE. _, err = ring.WaitCQEvents(1) if errors.Is(err, syscall.EAGAIN) || errors.Is(err, syscall.EINTR) { continue } checkErr(err) // Помещаем все "готовые" CQE в буфер cqes. n := ring.PeekCQEventBatch(cqes) for i := 0; i < n; i++ { cqe = cqes[i] // В поле user_data находится индекс соответствующего connInfo // в которой находится служебная информация по сокету. ud := conns[cqe.UserData] typ := ud.typ res := cqe.Res ring.SeenCQE(cqe) // Проверяя тип мы идентифицируем операцию результат которой находится в CQE (accept / recv / send). switch typ { case ACCEPT: addRead(ring, int(res)) addAccept(ring, acceptOp) case READ: if res <= 0 { _ = syscall.Shutdown(ud.fd, syscall.SHUT_RDWR) } else { addWrite(ring, ud.fd, res) } case WRITE: addRead(ring, ud.fd) } } } } // addAccept - добавляет в SQ accept операцию, fd - файловый дескриптор сервер сокета. func addAccept(ring *uring.Ring, acceptOp *uring.AcceptOp) { conns[acceptOp.Fd()].fd = acceptOp.Fd() conns[acceptOp.Fd()].typ = ACCEPT err := ring.QueueSQE(acceptOp, 0, uint64(acceptOp.Fd())) checkErr(err) } // addRead - добавляет в SQ read операцию, fd - файловый дескриптор клиентского сокета. func addRead(ring *uring.Ring, fd int) { buff := buffs[fd] ci := &conns[fd] ci.fd = fd ci.typ = READ ci.recvOp.SetBuffer(buff) err := ring.QueueSQE(ci.recvOp, 0, uint64(fd)) checkErr(err) } // addWrite - добавляет в SQ write операцию, fd - файловый дескриптор клиентского сокета. func addWrite(ring *uring.Ring, fd int, bytesRead int32) { buff := buffs[fd] ci := &conns[fd] ci.fd = fd ci.typ = WRITE ci.sendOp.SetBuffer(buff[:bytesRead]) err := ring.QueueSQE(ci.sendOp, 0, uint64(fd)) checkErr(err) } func checkErr(err error) { if err != nil { log.Fatal(err) } }
Код на GO получился несколько компактнее чем этот же сервер на plain C. С другой стороны, оба исходника довольно схожи как семантически, так и синтаксически. А что по поводу производительности? С подробным описанием методики сравнения можно ознакомиться здесь. Рассмотрим исключительно результаты:
c: 100 bytes: 128 | c: 50 bytes: 1024 | c: 500 bytes: 128 | c: 500 bytes: 1024 | c: 1000 bytes: 128 | c: 1000 bytes: 1024 | |
io_uring-echo-server (C lang) | 235356 | 224783 | 173670 | 155477 | 149407 | 139987 |
echo-server | 227884 | 222709 | 169001 | 150275 | 143664 | 128783 |
Отлично! GO-код не сильно проигрывает коду на C в контексте I/O нагрузки. Но, есть проблема, реализация транспортного слоя прибита гвоздями к логике приложения, что не позволяет использовать ее где-либо кроме как в echo-сервере. Для того чтобы писать полноценные приложения с io_uring в GO, нам нужно реализовать интерфейсы из пакета net: net.Conn и net.Listener. А для этого нужно разобраться в реализации стандартных net.Conn и net.Listener для протокола TCP и написать свои реализации. Но обо всем по порядку.
В логове net package
Рассмотрим необходимые нам интерфейсы подробнее.
net.Listener
Задача listener'а — слушать определенный порт и при подключении к серверу очередного клиента создать структуру реализующую net.Conn, для работы с этим клиентом. Соответствующий интерфейс:
// A Listener is a generic network listener for stream-oriented protocols. // // Multiple goroutines may invoke methods on a Listener simultaneously. type Listener interface { // Accept waits for and returns the next connection to the listener. Accept() (Conn, error) // Close closes the listener. // Any blocked Accept operations will be unblocked and return errors. Close() error // Addr returns the listener's network address. Addr() Addr }
net.Conn
net.Conn - абстракция над клиентским соединением, позволяет выполнять чтение и запись в соответствующий клиентский сокет. Интерфейс (для удобства немного упрощен):
type Conn interface{ // Read reads data from the connection. Read(b []byte) (n int, err error) // Write writes data to the connection. Write(b []byte) (n int, err error) // Close closes the connection. Close() error }
Соответствующие реализации этих интерфейсов лежат в основе работы транспортного уровня в GO. Имея на руках реализации net.Listner и net.Conn для, например, TCP протокола можно реализовать любой вышестоящий протокол, будь то HTTP или любой другой. К примеру, вот так будет выглядеть echo-server с использованием стандартных net.TCPListener и net.TCPConn:
обычный echo-server
package main import ( "io" "log" "net" ) const MaxConns = 4096 const MaxMsgLen = 2048 func initBuffs() [][]byte { buffs := make([][]byte, MaxConns) for i := range buffs { buffs[i] = make([]byte, MaxMsgLen) } return buffs } var buffs = initBuffs() func main() { listener, err := net.ListenTCP("tcp", &net.TCPAddr{ Port: 8080, }) checkErr(err) for { conn, err := listener.Accept() checkErr(err) go handleConn(conn) } } func handleConn(conn net.Conn) { f, _ := conn.(*net.TCPConn).File() buff := buffs[int(f.Fd())] for { n, err := conn.Read(buff) if err == io.EOF || n == 0 { checkErr(conn.Close()) return } checkErr(err) _, err = conn.Write(buff[:n]) checkErr(err) } } func checkErr(err error) { if err != nil { log.Fatal(err) } }
Все довольно просто, в бесконечном цикле принимаем входящие соединения, а затем, для каждого соединения, создаем горутину в которой начинаем читать данные из клиентского сокета и писать туда же. Но представьте, что какой-то из клиентов решил не передавать нам данные, к примеру, в течение минуты. В таком случае поток, на котором находится горутина выполняющая чтение, заблокируется? Естественно нет! Любой GO разработчик знает — это не проблема для нашего сервера. Но каким образом, казалось бы, блокирующие вызовы conn.Read не блокируют поток исполнения? И тут в дело вступает runtime, вернее его компонент — netpoller.
netpoller - тропа через block I/O
Рассмотрим как горутины работают с I/O операциями над сокетами на примере чтения. В GO горутина (G) желающая прочитать данные из сокета, сначала попытается выполнить чтение в неблокирующем режиме. Если чтение в настоящий момент невозможно (к примеру, в сокете нет данных) — G будет заблокирована. При этом поток выполнения (M) не блокируется, а G инициирующая чтение уходит в состояние сна. Ее место на логическом процессоре (P) займет другая горутина. Наконец, когда сокет будет готов для I/O (в нашем случае появились данные для чтения) - планировщик разбудит G, и наша программа выполнит запланированный системный вызов.
Для реализации подобного механизма необходим компонент, который:
позволяет G добавить себя в очередь для оповещения о готовности I/O
будет использован планировщиком, чтобы понять, каким G требуется процессорное время в связи с появлением I/O events
Этот компонент runtime'а называется netpoller и в том числе, благодаря ему, сервера, написанные на GO, обладают такой скоростью.
А вот его упрощенный интерфейс, который нас интересует:
func poll_runtime_pollWait(pd *pollDesc, mode int) int func netpoll(delay int64) gList
poll_runtime_pollWait - предоставляет возможность для G подписаться на интересующее I/O событие в сокете. Структура pollDesc описывает файловый дескриптор сокета, а mode - тип события, которого следует ждать (read/write)
netpoll - эта функция используется планировщиком для получения списка G которым нужно дать процессорное время
Реализация netpoller зависит от конкретной OS. В Linux netpoller реализован с помощью семейства системных вызовов epoll. Таким образом, в GO используется паттерн подписки на I/O события:

Итак, с помощью netpoller'а можно делать системные вызовы, не боясь заблокировать приложение. В свою очередь, с помощью io_uring можно добиться тех же целей, но другими средствами. Концепция событий отходит в сторону, ведь с io_uring можно просто поставить необходимую операцию в очередь на исполнение и позднее узнать результат операции.

Тут нужно сделать ремарку: можно точно так же отлавливать I/O события и с помощью io_uring операции IORING_OP_POLL_ADD, правда, на производительности такой подход отражается скорее в негативную сторону.
Приручение reactor
Ну что же, теперь можно написать конкурента netpoller'у с использованием io_uring. Назовем его reactor. Суть идеи проста, реактор выполняет операции поддерживаемые io_uring и дергает callback в userspace по завершению той или иной операции. Таким образом, reactor будет полноценным асинхронным I/O бэкендом. Вот его наивная реализация:
reactor.go
type Callback func(event uring.CQEvent) type Reactor struct { ring *uring.Ring callbacks map[uint64]Callback callbacksLock sync.Mutex queueSQELock sync.Mutex currentNonce uint64 submitSignal chan struct{} } func New(ring *uring.Ring) *Reactor { return &Reactor{ ring: ring, submitSignal: make(chan struct{}), callbacks: map[uint64]Callback{}, } } func (r *Reactor) Run(ctx context.Context) { wg := &sync.WaitGroup{} //запускаем два компонента реактора //consumer - обрабатывает новые CQE //publisher - отправляет новые SQE на обработку wg.Add(2) go func() { defer wg.Done() r.runConsumer(ctx) }() go func() { defer wg.Done() r.runPublisher(ctx) }() wg.Wait() } // сабмитим новые SQE в отдельной горутине func (r *Reactor) runPublisher(ctx context.Context) { defer close(r.submitSignal) for { select { //при поступлении сигнала обрабатываем новые SQE case <-r.submitSignal: r.queueSQELock.Lock() _, err := r.ring.Submit() r.queueSQELock.Unlock() if err != nil { log.Println("io_uring submit", err) } case <-ctx.Done(): return } } } // получаем результаты выполнения команд и дергаем соответствующие колбеки func (r *Reactor) runConsumer(ctx context.Context) { cqeBuff := make([]*uring.CQEvent, 512) // цикл обработки результатов операций (CQE) for { // командуем Publisher'у обработать новые SQE r.submitSignal <- struct{}{} // ждем хотябы одну завершенную операцию _, err := r.ring.WaitCQEventsWithTimeout(1, time.Millisecond) if errors.Is(err, syscall.EAGAIN) || errors.Is(err, syscall.EINTR) || errors.Is(err, syscall.ETIME) { runtime.Gosched() goto CheckCtxAndContinue } if err != nil { log.Println("io_uring wait", err) goto CheckCtxAndContinue } for n := r.ring.PeekCQEventBatch(cqeBuff); n > 0; n = r.ring.PeekCQEventBatch(cqeBuff) { for i := 0; i < n; i++ { cqe := cqeBuff[i] nonce := cqe.UserData //находим соответствующий callback r.callbacksLock.Lock() cb := r.callbacks[nonce] delete(r.callbacks, nonce) r.callbacksLock.Unlock() cb(uring.CQEvent{ UserData: cqe.UserData, Res: cqe.Res, Flags: cqe.Flags, }) } //сообщаем io_uring о N просмотренных CQE r.ring.AdvanceCQ(uint32(n)) } CheckCtxAndContinue: select { case <-ctx.Done(): return default: continue } } } //Queue добавляет операцию в SQ. func (r *Reactor) Queue(op uring.Operation, cb Callback) (uint64, error) { //генерируем уникальное значение UserData nonce := r.nextNonce() r.queueSQELock.Lock() defer r.queueSQELock.Unlock() //помещаем операцию в SQ err := r.ring.QueueSQE(op, 0, nonce) if err == nil { r.callbacksLock.Lock() r.callbacks[nonce] = cb r.callbacksLock.Unlock() } return nonce, err } func (r *Reactor) nextNonce() uint64 { return atomic.AddUint64(&r.currentNonce, 1) }
C помощью подобного компонента можно добавить операцию в SQ очередь io_uring и реактивно обработать результат этой операции. Внутри параллельно работают consumer для добавления операций в SQ и publisher для обработки результатов из CQ.
Теперь, при помощи reactor'а, можно реализовать заветные net.Listener и net.Conn интерфейсы.
net.Listener. Реализация с использованием reactor (очевидные участки кода выброшены, ссылка на полную реализацию будет ниже)
type Listener struct { fd int reactor *Reactor acceptChan chan uring.CQEvent addr net.Addr } func NewListener(addr string, reactor *Reactor) (*Listener, error) { tcpAddr, err := net.ResolveTCPAddr("tcp", addr) if err != nil { return nil, err } // serverSocket - создает и биндит серверный сокет sockFd, err := serverSocket(tcpAddr) if err != nil { return nil, err } return &Listener{ fd: sockFd, addr: tcpAddr, reactor: reactor, acceptChan: make(chan uring.CQEvent), }, nil } func (l *Listener) Accept() (net.Conn, error) { //помещаем accept операцию в реактор op := uring.Accept(uintptr(l.fd), 0) l.reactor.Queue(op, func(event uring.CQEvent) { l.acceptChan <- event }) // ждем появление подключения cqe := <-l.acceptChan if err := cqe.Error(); err != nil { return nil, err } fd := int(cqe.Res) rAddr, _ := op.Addr() tc := newConn(fd, l.addr, rAddr, l.reactor) return tc, nil }
net.Conn. Реализация с использованием reactor (очевидные участки кода выброшены, ссылка на полную реализацию будет ниже)
type Conn struct { Fd int lAddr, rAddr net.Addr reactor *Reactor readChan, writeChan chan uring.CQEvent readLock, writeLock sync.Mutex } func newConn(fd int, lAddr, rAddr net.Addr, r *Reactor) *Conn { return &Conn{ lAddr: lAddr, rAddr: rAddr, Fd: fd, reactor: r, readChan: make(chan uring.CQEvent), writeChan: make(chan uring.CQEvent), } } func (c *Conn) Read(b []byte) (n int, err error) { c.readLock.Lock() defer c.readLock.Unlock() //помещаем Recv операцию в реактор op := uring.Recv(uintptr(c.Fd), b, 0) c.reactor.Queue(op, func(event uring.CQEvent) { c.readChan <- event }) //ждем завершения чтения из сокета cqe := <-c.readChan if err = cqe.Error(); err != nil { return 0, &net.OpError{Op: "read", Net: "tcp", Source: c.lAddr, Addr: c.rAddr, Err: err} } if cqe.Res == 0 { err = &net.OpError{Op: "read", Net: "tcp", Source: c.lAddr, Addr: c.rAddr, Err: io.EOF} } runtime.KeepAlive(b) return int(cqe.Res), err } func (c *Conn) Write(b []byte) (n int, err error) { c.writeLock.Lock() defer c.writeLock.Unlock() //помещаем Send операцию в реактор op := uring.Send(uintptr(c.Fd), b, 0) c.reactor.Queue(op, func(event uring.CQEvent) { c.writeChan <- event }) //ждем завершения записи в сокет cqe := <-c.writeChan if err = cqe.Error(); err != nil { return 0, &net.OpError{Op: "write", Net: "tcp", Source: c.lAddr, Addr: c.rAddr, Err: err} } if cqe.Res == 0 { err = &net.OpError{Op: "write", Net: "tcp", Source: c.lAddr, Addr: c.rAddr, Err: io.ErrUnexpectedEOF} } runtime.KeepAlive(b) return int(cqe.Res), err } func (c *Conn) Close() error { return syscall.Close(c.Fd) }
Проверим на практике новые реализации, давно мы не писали echo-server! Вот его полный код (вместе с reactor'ом, net.Listener и net.Conn).
Сравним новый сервер с io_uring reactor'ом под капотом, с таким же сервером, который использует net.TCPListener и net.TCPConn из стандартной бибилиотеки (а значит, под капотом там netpoller). Как обычно смотрим на rps.
c: 100 bytes: 128 | c: 50 bytes: 1024 | c: 500 bytes: 128 | c: 500 bytes: 1024 | c: 1000 bytes: 128 | c: 1000 bytes: 1024 | |
net/http | 132664 | 139206 | 133039 | 139171 | 133480 | 139617 |
reactor (io_uring) | 30892 | 30077 | 39192 | 38375 | 46120 | 51204 |
Да уж, сравнение не в нашу пользу. Текущая реализация не использует на полную ресурсы железа и возможности io_uring — расплата за это низкая производительность. Тем не менее в плане функциональности даже такой, наивной, реализации reactor достаточно чтобы заменить собой netpoller. Кроме того, reactor можно использовать не только в контексте socket I/O, но и для всех прочих I/O.

О завершающей части
Конечно, такая низкая производительность не может нас устроить. Поэтому в третьей и заключительной части разберемся, почему решение с "наивным" reactor работает так медленно, рассмотрим способы ускорения (как с помощью настройки io_uring, так и с помощью оптимизаций кода reactor), а также поставим точку (хотя скорее многоточие) в битве с netpoller. Спасибо за внимание, stay tuned.
Дата-центр ITSOFT — размещение и аренда серверов и стоек в двух дата-центрах в Москве. За последние годы UPTIME 100%. Размещение GPU-ферм и ASIC-майнеров, аренда GPU-серверов, лицензии связи, SSL-сертификаты, администрирование серверов и поддержка сайтов.
