Привет всем! Меня зовут Сергей Камардин, я программист команды Почты Mail.Ru.
Это статья о том, как мы разработали высоконагруженный WebSocket-сервер на Go.
Если тема WebSocket вам близка, но Go — не совсем, надеюсь, статья все равно покажется вам интересной с точки зрения идей и приемов оптимизации.
1. Предисловие
Чтобы обозначить контекст рассказа, стоит сказать пару слов о том, для чего нам понадобился такой сервер.
В Почте Mail.Ru есть множество систем, состояние которых меняется. Очевидно, что такой системой является и хранилище писем пользователей. Об изменении состояния — о событиях — можно узнавать несколькими способами. В основном это либо периодический опрос системы (polling), либо — в обратном направлении — уведомления со стороны системы об изменении ее состояния.
У обоих способов есть свои плюсы и минусы, однако если говорить о почте, то чем быстрее пользователь получит новое письмо — тем лучше. Polling в почте — это около 50 тысяч HTTP-запросов в секунду, 60% которых возвращают статус 304, что означает отсутствие изменений в ящике.
Поэтому, чтобы сократить нагрузку на серверы и ускорить доставку писем пользователям, было решено изобрести велосипед написать publisher-subscriber сервер (он же bus, message-broker или event-channel), который, с одной стороны, получает сообщения об изменении состояний, а с другой — подписки на такие сообщения.
Было:
+-----------+ +-----------+ +-----------+
| | ◄-------+ | | ◄-------+ | |
| Storage | | API | HTTP | Browser |
| | +-------► | | +-------► | |
+-----------+ +-----------+ +-----------+
Стало:
+-------------+ +---------+ WebSocket +-----------+
| Storage | | API * | +-----------► | Browser |
+-------------+ +---------+ (3) +-----------+
+ (2) ▲
| |
(1) ▼ +
+---------------------------------+
| Bus |
+---------------------------------+
На первой схеме отображено то, как было раньше. Браузер периодически ходил в API и спрашивал об изменениях на Storage (хранилище писем).
На второй — новый вариант архитектуры. Браузер устанавливает WebSocket-соединение с API, по которому происходит уведомление о событиях Storage. API является клиентом к серверу Bus и отправляет ему данные своих подписчиков (об этом сервере речи сегодня идти не будет; возможно, расскажу о нем в следующих публикациях). В момент получения нового письма Storage посылает об этом уведомление в Bus (1), Bus — своим подписчикам (2). API определяет, какому соединению отправить полученное уведомление, и посылает его в браузер пользователю (3).
Как вы могли догадаться, речь сегодня пойдет об API, или WebSocket-сервере. Забегая вперед, скажу, что на сервере будет около 3 миллионов живых соединений. Эта цифра еще не раз всплывет в последующем рассказе об оптимизациях.
2. Idiomatic way
Посмотрим на то, как бы мы реализовали некоторые части нашего сервера, используя возможности Go, без использования системных вызовов в собственном коде.
Прежде чем рассматривать работу с net/http
, поговорим об отправке и получении данных. Данные, которые находятся над протоколом WebSocket (например, json-конверты), здесь и далее я стану называть пакетами. Давайте начнем реализацию структуры Channel
, которая будет содержать в себе логику получения и отправки пакетов через WebSocket-соединение.
2.1. Channel struct
// Packet represents application level data.
type Packet struct {
...
}
// Channel wraps user connection.
type Channel struct {
conn net.Conn // WebSocket connection.
send chan Packet // Outgoing packets queue.
}
func NewChannel(conn net.Conn) *Channel {
c := &Channel{
conn: conn,
send: make(chan Packet, N),
}
go c.reader()
go c.writer()
return c
}
Хочу обратить ваше внимание на запуск двух горутин чтения и записи. Для каждой горутины нужен свой стек, который в зависимости от операционной системы и версии Go может иметь начальный размер от 2 до 8 Кбайт. Если учесть цифру, названную ранее (3 миллиона живых соединений), то на все соединения нам потребуется 24 Гбайт памяти (при стеке в 4 Кбайт). И это без учета памяти, выделяемой на структуру Channel
, очередь исходящих пакетов ch.send
и другие внутренние поля.
2.2. Горутины I/O
Посмотрим на реализацию «читателя» из соединения:
func (c *Channel) reader() {
// We make buffered read to reduce read syscalls.
buf := bufio.NewReader(c.conn)
for {
pkt, _ := readPacket(buf)
c.handle(pkt)
}
}
Достаточно просто, верно? Мы используем буфер, чтобы сократить количество syscall’ов на чтение и вычитывать сразу столько, сколько позволит нам размер buf
. В бесконечном цикле мы ожидаем поступления новых данных в соединение и читаем следующий пакет. Попрошу запомнить слова ожидаем поступления новых данных: к ним мы еще вернемся позже.
Парсинг и обработка входящих пакетов останутся в стороне, поскольку это неважно для тех оптимизаций, о которых будет идти речь. А вот на buf
все же стоит обратить внимание сейчас: по умолчанию это 4 Кбайт и значит это еще 12 Гбайт памяти. Аналогичная ситуация с «писателем»:
func (c *Channel) writer() {
// We make buffered write to reduce write syscalls.
buf := bufio.NewWriter(c.conn)
for pkt := range c.send {
_ := writePacket(buf, pkt)
buf.Flush()
}
}
Мы итерируемся по каналу исходящих пакетов c.send
и пишем их в буфер. Это, как внимательный читатель уже мог догадаться, еще 4 Кбайт и 12 Гбайт памяти на наши 3 миллиона соединений.
2.3. HTTP
Простая реализация Channel
у нас есть, теперь нужно раздобыть WebSocket-соединение, с которым мы будем работать. Поскольку мы все еще находимся под заголовком Idiomatic way, то сделаем это в соответствующем ключе.
Если вы не знакомы с тем, как работает WebSocket, то стоит сказать, что клиент переходит на протокол WebSocket с помощью специального механизма в HTTP, который называется Upgrade. После успешной обработки Upgrade-запроса сервер и клиент используют TCP-соединение для обмена бинарными WebSocket-фреймами.
Вот тут описана структура фрейма внутри соединения.
import (
"net/http"
"some/websocket"
)
http.HandleFunc("/v1/ws", func(w http.ResponseWriter, r *http.Request) {
conn, _ := websocket.Upgrade(r, w)
ch := NewChannel(conn)
//...
})
Обратим внимание на то, что http.ResponseWriter
внутри себя содержит буфер записи bufio.Writer
на 4 Кбайт, а для инициализации *http.Request
происходит выделение буфера чтения bufio.Reader
тоже на 4 Кбайт.
Независимо от используемой библиотеки WebSocket после успешного ответа на Upgrade-запрос сервер получает буферы I/O вместе с TCP-соединением при вызове responseWriter.Hijack()
.
Hint: в некоторых случаях при помощиgo:linkname
можно вернуть буферы в пулnet/http
через вызовnet/http.putBufio{Reader,Writer}
.
Таким образом, нам нужно еще 24 Гбайт памяти на 3 миллиона соединений.
Итого уже 72 Гбайт памяти на приложение, которое еще ничего не делает!
3. Оптимизации
Стоит освежить в памяти то, о чем мы говорили в предисловии, и вспомнить, как ведет себя пользовательское соединение. После перехода на WebSocket клиент посылает пакет с интересующими его событиями — т. е. подписывается на события. После этого (не считая технических сообщений типа ping/pong
) клиент может ничего больше не отправить за все время жизни соединения.
Время жизни соединения может быть от нескольких секунд до нескольких дней.
Получается, что наши Channel.reader()
и Channel.writer()
большую часть времени находятся в ожидании обработки данных на получение или отправку. А вместе с ними данных ожидают и буферы I/O, каждый по 4 Кбайт.
Теперь становится очевидным то, что можно сделать некоторые вещи лучше, не так ли?
3.1. netpoll
Помните реализацию Channel.reader()
, который ожидал поступления новых данных, блокируясь на вызове conn.Read()
внутри bufio.Reader
? При наличии данных в соединении runtime go «будил» нашу горутину и позволял прочитать очередной пакет. После этого горутина снова блокировалась на ожидании новых данных. Давайте посмотрим, как runtime в go понимает, что горутину нужно «разбудить».
Заглянув в реализацию conn.Read()
, мы увидим, что внутри происходит вызов net.netFD.Read()
:
// net/fd_unix.go
func (fd *netFD) Read(p []byte) (n int, err error) {
//...
for {
n, err = syscall.Read(fd.sysfd, p)
if err != nil {
n = 0
if err == syscall.EAGAIN {
if err = fd.pd.waitRead(); err == nil {
continue
}
}
}
//...
break
}
//...
}
Сокеты в go неблокирующие. EAGAIN говорит о том, что данных в сокете нет, и, чтобы не блокироваться на чтении из пустого сокета, ОС возвращает нам управление.
Мы видим, что происходит системный вызов read()
из файлового дескриптора соединения. В случае если чтение возвращает ошибку EAGAIN
, runtime делает вызов pollDesc.waitRead()
:
// net/fd_poll_runtime.go
func (pd *pollDesc) waitRead() error {
return pd.wait('r')
}
func (pd *pollDesc) wait(mode int) error {
res := runtime_pollWait(pd.runtimeCtx, mode)
//...
}
Если покопать глубже, то мы увидим, что в Linux netpoll
реализован с помощью epoll
. Почему бы нам не использовать такой же подход для своих соединений? Мы могли бы выделять буфер на чтение и запускать горутину только тогда, когда это действительно нужно: когда в сокете точно есть данные.
На github.com/golang/go есть issue на экспорт функций netpoll.
3.2. Избавляемся от горутин
Предположим, что у нас есть реализация netpoll для Go. Теперь мы можем не запускать горутину Channel.reader()
с буфером внутри, а «подписаться» вместо этого на событие наличия данных в соединении:
ch := NewChannel(conn)
// Make conn to be observed by netpoll instance.
// Note that EventRead is identical to EPOLLIN on Linux.
poller.Start(conn, netpoll.EventRead, func() {
// We spawn goroutine here to prevent poller wait loop
// to become locked during receiving packet from ch.
go Receive(ch)
})
// Receive reads a packet from conn and handles it somehow.
func (ch *Channel) Receive() {
buf := bufio.NewReader(ch.conn)
pkt := readPacket(buf)
c.handle(pkt)
}
С Channel.writer()
дело обстоит проще — мы можем запускать горутину и аллоцировать буфер только тогда, когда мы собираемся отправить пакет:
func (ch *Channel) Send(p Packet) {
if c.noWriterYet() {
go ch.writer()
}
ch.send <- p
}
После чтения исходящих пакетов из ch.send
(одного или нескольких) writer завершит свою работу и освободит стек и буфер.
Отлично! Мы сэкономили 48 Гбайт — избавились от стека и буферов I/O внутри двух постоянно «работающих» горутин.
3.3. Контроль ресурсов
Большое количество соединений — это не только большое потребление памяти. При разработке сервера у нас не раз случались race condition’ы и deadlock’и, которые очень часто сопровождались так называемым self-DDoS — ситуацией, когда клиенты приложения безудержно пытались подсоединиться к серверу и еще больше доламывали ломали его.
Например, если вдруг по какой-то причине мы не могли обрабатывать ping/pong
сообщения, но обработчик idle-соединений продолжал такие соединения отключать (предполагая, что соединения закрыты некорректно, поэтому от них нет данных), то получалось, что клиент, вместо того чтобы после подключения ожидать событий, терял соединение каждые N секунд и пробовал подключиться снова.
Было бы здорово, если бы заблокированный или перегруженный сервер просто переставал принимать новые соединения, а балансер перед ним (например, nginx) переходил бы к следующему экземпляру сервера.
Более того, независимо от нагруженности сервера, если вдруг все клиенты по любой из причин захотят отправить нам пакет, сэкономленные ранее 48 Гбайт будут снова в деле — по сути, мы вернемся к изначальному состоянию горутины и буфера на каждое соединение.
3.3.1 Goroutine pool
Мы можем ограничить количество одновременно обрабатываемых пакетов с помощью пула горутин. Вот как выглядит наивная реализация такого пула:
package gpool
func New(size int) *Pool {
return &Pool{
work: make(chan func()),
sem: make(chan struct{}, size),
}
}
func (p *Pool) Schedule(task func()) error {
select {
case p.work <- task:
case p.sem <- struct{}{}:
go p.worker(task)
}
}
func (p *Pool) worker(task func()) {
defer func() { <-p.sem }
for {
task()
task = <-p.work
}
}
И теперь наш код с netpoll принимает следующий вид:
pool := gpool.New(128)
poller.Start(conn, netpoll.EventRead, func() {
// We will block poller wait loop when
// all pool workers are busy.
pool.Schedule(func() {
Receive(ch)
})
})
То есть теперь мы выполняем чтение пакета не сразу при обнаружении данных в сокете, но при первой возможности занять свободную горутину в пуле.
Аналогично мы поменяем Send()
:
pool := gpool.New(128)
func (ch *Channel) Send(p Packet) {
if c.noWriterYet() {
pool.Schedule(ch.writer)
}
ch.send <- p
}
Вместо go ch.writer()
мы хотим делать запись в одной из переиспользуемых горутин. Таким образом, в случае пула из N
горутин мы гарантируем то, что при N одновременно обрабатываемых запросах и пришедшем N + 1
мы не будем аллоцировать N + 1
буфер на чтение. Пул горутин также позволяет лимитировать Accept()
и Upgrade()
новых соединений и избегать большинства ситуаций с DDoS.
3.4. Zero-copy upgrade
Давайте уйдем немного в сторону протокола WebSocket. Как уже говорилось выше, клиент переходит на протокол WebSocket с помощью HTTP-запроса Upgrade. Вот как это выглядит:
GET /ws HTTP/1.1
Host: mail.ru
Connection: Upgrade
Sec-Websocket-Key: A3xNe7sEB9HixkmBhVrYaA==
Sec-Websocket-Version: 13
Upgrade: websocket
HTTP/1.1 101 Switching Protocols
Connection: Upgrade
Sec-Websocket-Accept: ksu0wXWG+YmkVx+KQR2agP0cQn4=
Upgrade: websocket
То есть HTTP-запрос и его заголовки в нашем случае нужны только для того, чтобы перейти на протокол WebSocket. Это знание, а также то, что хранится внутри http.Request
, наводит нас на мысль, что, возможно, в целях оптимизации мы могли бы отказаться от ненужных аллокаций и копирований при разборе HTTP-запроса и уйти от стандартного сервера net/http
.
http.Request
содержит, например,поле с одноименным типом Header
, которое безусловно заполняется всеми заголовками запроса путем копирования данных из соединения в строки. Представьте, сколько лишних данных можно держать внутри этого поля, например при большом размере заголовкаCookie
.
Но что взять взамен?
3.4.1. Реализации WebSocket
К сожалению, все существовавшие на момент оптимизации нашего сервера библиотеки позволяли делать upgrade только при использовании стандартного net/http
-сервера. Более того, ни одна (из двух) библиотек не позволяли применить все оптимизации чтения и записи, описанные выше. Для того чтобы эти оптимизации работали, нам нужно иметь достаточно низкоуровневый API для работы с WebSocket. Для переиспользования буферов нам нужно, чтобы функции по работе с соединением выглядели так:
func ReadFrame(io.Reader) (Frame, error)
func WriteFrame(io.Writer, Frame) error
Имея библиотеку с таким API, мы могли бы читать пакеты из соединения следующим образом (запись пакетов выглядела бы аналогично):
// getReadBuf, putReadBuf are intended to
// reuse *bufio.Reader (with sync.Pool for example).
func getReadBuf(io.Reader) *bufio.Reader
func putReadBuf(*bufio.Reader)
// readPacket must be called when data could be read from conn.
func readPacket(conn io.Reader) error {
buf := getReadBuf()
defer putReadBuf(buf)
buf.Reset(conn)
frame, _ := ReadFrame(buf)
parsePacket(frame.Payload)
//...
}
Короче говоря, настало время запилить свою либу.
3.4.2. github.com/gobwas/ws
Идеологически библиотека ws
была написана с мыслью, что она не должна навязывать пользователю логику работы с протоколом. Все методы чтения и записи принимают стандартные интерфейсы io.Reader
и io.Writer
, что позволяет использовать или не использовать буферизацию, равно как и любые другие обертки вокруг I/O.
Кроме upgrade-запросов от стандартного net/http
, ws
поддерживает zero-copy upgrade — обработку upgrade-запросов и переход на WebSocket без выделений памяти и копирований. ws.Upgrade()
при этом принимает io.ReadWriter
(net.Conn
реализует этот интерфейс) — т. е. мы могли бы использовать стандартный net.Listen()
и передавать полученное соединение от ln.Accept()
сразу в ws.Upgrade()
. При этом библиотека дает возможность копировать любые данные запроса для будущего использования в приложении (например, Cookie
для проверки сессии).
Ниже сравнение обработки upgrade-запроса: стандартный net/http
-сервер против net.Listen()
и zero-copy upgrade:
BenchmarkUpgradeHTTP 5156 ns/op 8576 B/op 9 allocs/op
BenchmarkUpgradeTCP 973 ns/op 0 B/op 0 allocs/op
Переход на ws
и zero-copy upgrade позволил сэкономить еще 24 Гбайт — тех самых, которые выделялись на буферы I/O при обработке запроса в хэндлере net/http
.
3.5. Всё вместе
Давайте структурируем оптимизации, о которых я рассказал.
- Горутина на чтение с буфером внутри — дорого.
Решение: netpoll (epoll, kqueue); переиспользование буферов. - Горутина на запись с буфером внутри — дорого.
Решение: стартовать горутину тогда, когда нужно; переиспользование буферов. - При лавине подключений netpoll не сработает.
Решение: переиспользовать горутины с лимитом на их количество. net/http
не самый быстрый способ обработать Upgrade на WebSocket.
Решение: использовать zero-copy upgrade на «голом» TCP-соединении.
Примерно так мог бы выглядеть код сервера:
import (
"net"
"github.com/gobwas/ws"
)
ln, _ := net.Listen("tcp", ":8080")
for {
// Try to accept incoming connection inside free pool worker.
// If there no free workers for 1ms, do not accept anything and try later.
// This will help us to prevent many self-ddos or out of resource limit cases.
err := pool.ScheduleTimeout(time.Millisecond, func() {
conn := ln.Accept()
_ = ws.Upgrade(conn)
// Wrap WebSocket connection with our Channel struct.
// This will help us to handle/send our app's packets.
ch := NewChannel(conn)
// Wait for incoming bytes from connection.
poller.Start(conn, netpoll.EventRead, func() {
// Do not cross the resource limits.
pool.Schedule(func() {
// Read and handle incoming packet(s).
ch.Recevie()
})
})
})
if err != nil {
time.Sleep(time.Millisecond)
}
}
4. Заключение
Premature optimization is the root of all evil (or at least most of it) in programming. Donald Knuth
Безусловно, оптимизации выше актуальны далеко не во всех случаях. Например, если отношение свободных ресурсов (память, CPU) к количеству живых соединений достаточно велико, наверное, нет смысла в оптимизации. Однако знание того, где и что можно улучшить, надеюсь, будет полезным.
Спасибо за внимание!