Никак не доходили руки переписать go-meter. Увеличить производительность, получить более полный контроль над процессом и довести до приближения к wrk. В идеале хочется увидеть легко и удобно расширяемую альтернативу. Да, в wrk недавно появилась поддержка Lua скриптов, которые решают многие неудобства, но и там тоже есть неприятные нюансы, например, расширенную статистику собирать не получится, так как методы вывода статистики работают только на первом потоке, и к собранным данным на других потоках доступа нет, поэтому сводится опять к тому, что-бы разбираться в исходниках и делать под себя, но это не тривиальная задача. И так, готовим нагрузочный тест на Go, c плюшками. Кому интересно, прошу под кат.
С начала разберемся что нам нужно:
— отправка GET/POST/PUT/DELETE запросов
— перебор URL, и POST body
— контроль над открытыми соединениями
— контроль над потоками
— указание продолжительности тестирования
— ограничения по максимальному количеству запросов в секунду
— возможность исключить несколько первых секунд из статистики, чтобы избежать искажения в момент прогрева HTTP сервера
— пул соединений
— простые Request/Response
— статистика
— profit
Пул соединений пишем на основе ограниченного канала. Выглядеть он будет как простой пул объектов, взяли объект из канала, поработали, положили обратно.
Request/Response тут можно почитать исходники Go, посмотреть как реализовано там, и сделать упрощенную аналогию, главным отличием будет возможность получить объем трафика каждого запроса/ответа и сэкономить драгоценное время.
Для того что бы наши потоки выключились, когда время тестирования закончится, сделаем канал для завершения работы потоков и канал, по которому каждый поток будет сообщать, что он ко��ректно завершил свою работу
засечем время, и также будем ждать Ctr+C(SIGTERM), чтобы наше приложение могло завершить тестирование в любой момент
Теперь посмотрим на сам воркер: для ограничения по количеству запросов в секунду возьмем для каждого его долю от общего количества, 4 раза в секунду будем увеличивать счетчик и ждать либо освободившиеся соединение либо завершение работы
Как только соединение освободится, формируем следующий запрос и запускаем асинхронно отправку его, так по кругу пока не кончится время. После того как запрос отправлен, а ответ прочитан, соединение возвращаем в пул, и поток снова подхватит его.
Осталось дело за малым, собрать статистику из объектов RequestStats и оформить ее
Как парсить аргументы запуска и форматировать вывод статистики я опущу, так как это не интересно. А теперь давайте проверим, что у нас получилось. Для пробы натравим wrk на Node.js кластер
и тоже самое на go с GOMAXPROCS=1
Получаем 28106 против 34388 запросов в секунду — это примерно на 20% меньше, по сравнению с чистым Cи + event loop + nio. Довольно неплохо, при изменении GOMAXPROCS разницы практически нет, так как большую часть процессорного времени отбирает Node.js.
Минусы:
— потеря 20% производительности, можно попробовать упростить Request/Response, может дать немного производительности
— еще нет поддержи HTTPS
— еще нельзя указать пользовательские HTTP заголовки и timeout
Все исходники тут — Github
Как пользоваться
Спасибо за внимание!
Что есть и что нужно
С начала разберемся что нам нужно:
— отправка GET/POST/PUT/DELETE запросов
— перебор URL, и POST body
— контроль над открытыми соединениями
— контроль над потоками
— указание продолжительности тестирования
— ограничения по максимальному количеству запросов в секунду
— возможность исключить несколько первых секунд из статистики, чтобы избежать искажения в момент прогрева HTTP сервера
План
— пул соединений
— простые Request/Response
— статистика
— profit
мысли вслух
Раз нужно контролировать соединения, стандартный http.Client нам не подходит (да и большой он для такой задачи), умеет слишком много из-за чего страдает производительность. Так как у нас подразумевается несколько потоков воркеров для отправки запросов, то нам нужен пул соединений, которые они будут между собой делить. Воркеру ждать ответа от сервера не имеет смысла, мы просто теряем на это драгоценное время. Как оценить проходящий трафик? Стандартные http.Request, http.Respose такой информации не дают, использовать их не получится, значит нужно реализовывать простой Request/Response, который нам все неоходимое даст. Собирать сырые данные и �� конце их агрегировать не получится, так как память не резиновая. Собираем стату на лету.
Поехали
Пул соединений пишем на основе ограниченного канала. Выглядеть он будет как простой пул объектов, взяли объект из канала, поработали, положили обратно.
type Connection struct { conn net.Conn manager *ConnectionManager } type ConnectionManager struct { conns chan *Connection config *Config } func NewConnectionManager(config *Config) (result *ConnectionManager) { result = &ConnectionManager{config: config, conns: make(chan *Connection, config.Connections)} for i := 0; i < config.Connections; i++ { connection := &Connection{manager: result} if connection.Dial() != nil { ConnectionErrors++ } result.conns <- connection } return } func (this *ConnectionManager) Get() *Connection { return <-this.conns } func (this *Connection) Dial() error { if this.IsConnected() { this.Disconnect() } conn, err := net.Dial("tcp4", this.manager.config.Url.Host) if err == nil { this.conn = conn } return err } func (this *Connection) Disconnect() { this.conn.Close() this.conn = nil } func (this *Connection) IsConnected() bool { return this.conn != nil } func (this *Connection) Return() { this.manager.conns <- this }
Request/Response тут можно почитать исходники Go, посмотреть как реализовано там, и сделать упрощенную аналогию, главным отличием будет возможность получить объем трафика каждого запроса/ответа и сэкономить драгоценное время.
Request
type Request struct { Method string URL *url.URL Header map[string][]string Body io.Reader ContentLength int64 Host string BufferSize int64 } func (req *Request) Write(w io.Writer) error { bw := &bytes.Buffer{} fmt.Fprintf(bw, "%s %s HTTP/1.1\r\n", valueOrDefault(req.Method, "GET"), req.URL.RequestURI()) fmt.Fprintf(bw, "Host: %s\r\n", req.Host) userAgent := "" if req.Header != nil { if ua := req.Header["User-Agent"]; len(ua) > 0 { userAgent = ua[0] } } if userAgent != "" { fmt.Fprintf(bw, "User-Agent: %s\r\n", userAgent) } if req.Method == "POST" || req.Method == "PUT" { fmt.Fprintf(bw, "Content-Length: %d\r\n", req.ContentLength) } if req.Header != nil { for key, values := range req.Header { if key == "User-Agent" || key == "Content-Length" || key == "Host" { continue } for _, value := range values { fmt.Fprintf(bw, "%s: %s\r\n", key, value) } } } io.WriteString(bw, "\r\n") if req.Method == "POST" || req.Method == "PUT" { bodyReader := bufio.NewReader(req.Body) _, err := bodyReader.WriteTo(bw) if err != nil { return err } } req.BufferSize = int64(bw.Len()) _, err := bw.WriteTo(w) return err }
Response
type Response struct { Status string StatusCode int Header map[string][]string ContentLength int64 BufferSize int64 } func ReadResponse(r *bufio.Reader) (*Response, error) { tp := textproto.NewReader(r) resp := &Response{} line, err := tp.ReadLine() if err != nil { return nil, err } f := strings.SplitN(line, " ", 3) resp.BufferSize += int64(len(f) + 2) if len(f) < 2 { return nil, errors.New("Response Header ERROR") } reasonPhrase := "" if len(f) > 2 { reasonPhrase = f[2] } resp.Status = f[1] + " " + reasonPhrase resp.StatusCode, err = strconv.Atoi(f[1]) if err != nil { return nil, errors.New("malformed HTTP status code") } resp.Header = make(map[string][]string) for { line, err := tp.ReadLine() if err != nil { return nil, errors.New("Response Header ERROR") } resp.BufferSize += int64(len(line) + 2) if len(line) == 0 { break } else { f := strings.SplitN(line, ":", 2) resp.Header[f[0]] = append(resp.Header[strings.TrimSpace(f[0])], strings.TrimSpace(f[1])) } } if cl := resp.Header["Content-Length"]; len(cl) > 0 { i, err := strconv.ParseInt(cl[0], 10, 0) if err == nil { resp.ContentLength = i } } buff := make([]byte, resp.ContentLength) r.Read(buff) resp.BufferSize += int64(resp.ContentLength) return resp, nil }
Для того что бы наши потоки выключились, когда время тестирования закончится, сделаем канал для завершения работы потоков и канал, по которому каждый поток будет сообщать, что он ко��ректно завершил свою работу
WorkerQuit := make(chan bool, *_threads) WorkerQuited := make(chan bool, *_threads)
засечем время, и также будем ждать Ctr+C(SIGTERM), чтобы наше приложение могло завершить тестирование в любой момент
//Start Ctr+C listen signalChan := make(chan os.Signal, 1) signal.Notify(signalChan, os.Interrupt, syscall.SIGTERM) //Wait timers or SIGTERM select { case <-time.After(config.Duration): case <-signalChan: } for i := 0; i < config.Threads; i++ { config.WorkerQuit <- true } //Wait for threads complete for i := 0; i < config.Threads; i++ { <-config.WorkerQuited }
Теперь посмотрим на сам воркер: для ограничения по количеству запросов в секунду возьмем для каждого его долю от общего количества, 4 раза в секунду будем увеличивать счетчик и ждать либо освободившиеся соединение либо завершение работы
func NewThread(config *Config) { timerAllow := time.NewTicker(time.Duration(250) * time.Millisecond) allow := int32(config.MRQ / 4 / config.Threads) if config.MRQ == -1 { allow = 2147483647 } else if allow <= 0 { allow = 1 } var connectionErrors int32 = 0 currentAllow := allow for { select { //По таймеру выставляем счетчик на количество разрешенных запросов case <-timerAllow.C: currentAllow = allow //Получаем свободное соединение case connection := <-config.ConnectionManager.conns: currentAllow-- //Если разрешенные запросы кончились - возвращаем соединение в пул if currentAllow < 0 { connection.Return() } else { //Формируем запрос req := getRequest(config.Method, config.Url, config.Source.GetNext()) //Если нужно переподключаться на каждом запросе if config.Reconnect && connection.IsConnected() { connection.Disconnect() } //Если соединение разорвано, то пробуем его восстановить if !connection.IsConnected() { if connection.Dial() != nil { connectionErrors++ } } //Отправляем запрос если есть соединение, иначе возвращаем соединение if connection.IsConnected() { go writeSocket(connection, req, config.RequestStats) } else { connection.Return() } } //Ждем завершения case <-config.WorkerQuit: //Записываем ошибки по соединениям atomic.AddInt32(&ConnectionErrors, connectionErrors) //Подтверждаем завершение config.WorkerQuited <- true return } } }
Как только соединение освободится, формируем следующий запрос и запускаем асинхронно отправку его, так по кругу пока не кончится время. После того как запрос отправлен, а ответ прочитан, соединение возвращаем в пул, и поток снова подхватит его.
Отправка запроса
func writeSocket(connection *Connection, req *http.Request, read chan *RequestStats) { result := &RequestStats{} //По окончанию обязательно отправляем статус и отдаем соединение в пул defer func() { connection.Return() read <- result }() now := time.Now() conn := connection.conn bw := bufio.NewWriter(conn) //Пишем запрос err := req.Write(bw) if err != nil { result.WriteError = err return } err = bw.Flush() if err != nil { result.WriteError = err return } //Ждем ответа res, err := http.ReadResponse(bufio.NewReader(conn)) if err != nil { result.ReadError = err return } //Собираем нужную информацию result.Duration = time.Now().Sub(now) result.NetOut = req.BufferSize result.NetIn = res.BufferSize result.ResponseCode = res.StatusCode req.Body = nil }
Осталось дело за малым, собрать статистику из объектов RequestStats и оформить ее
//Вся статистика type StatsSource struct { Readed int64 Writed int64 Requests int Skiped int Min time.Duration Max time.Duration Sum int64 Codes map[int]int DurationPercent map[time.Duration]int ReadErrors int WriteErrors int Work time.Duration } //Статистика для посекундных отчетов type StatsSourcePerSecond struct { Readed int64 Writed int64 Requests int Skiped int Sum int64 } //Агрегатор статистики func StartStatsAggregator(config *Config) { allowStore := true allowStoreTime := time.After(config.ExcludeSeconds) if config.ExcludeSeconds.Seconds() > 0 { allowStore = false } verboseTimer := time.NewTicker(time.Duration(1) * time.Second) if config.Verbose { fmt.Printf("%s %s %s %s %s %s\n", newSpancesFormatRightf("Second", 10, "%s"), newSpancesFormatRightf("Total", 10, "%s"), newSpancesFormatRightf("Req/sec", 10, "%s"), newSpancesFormatRightf("Avg/sec", 10, "%s"), newSpancesFormatRightf("In/sec", 10, "%s"), newSpancesFormatRightf("Out/sec", 10, "%s"), ) } else { verboseTimer.Stop() } source = StatsSource{ Codes: make(map[int]int), DurationPercent: make(map[time.Duration]int), } perSecond := StatsSourcePerSecond{} start := time.Now() for { select { //Таймер для посекундных отчетов case <-verboseTimer.C: if perSecond.Requests-perSecond.Skiped > 0 && config.Verbose { //Считаем среднее время ответа avgMilliseconds := perSecond.Sum / int64(perSecond.Requests-perSecond.Skiped) avg := time.Duration(avgMilliseconds) * time.Millisecond //Пишем статистику fmt.Printf("%s %s %s %s %s %s\n", newSpancesFormatRightf(roundToSecondDuration(time.Now().Sub(start)), 10, "%v"), newSpancesFormatRightf(source.Requests, 10, "%d"), newSpancesFormatRightf(perSecond.Requests, 10, "%d"), newSpancesFormatRightf(avg, 10, "%v"), newSpancesFormatRightf(Bites(perSecond.Readed), 10, "%s"), newSpancesFormatRightf(Bites(perSecond.Writed), 10, "%s"), ) } //Сбрасываем данные perSecond = StatsSourcePerSecond{} //Таймер для разрешения сбора статистики нужен для пропуска на старте case <-allowStoreTime: allowStore = true //Получаем ответ от сервера case res := <-config.RequestStats: //Если были ошибки - просто их записываем, остальная информация нам не интересна if res.ReadError != nil { source.ReadErrors++ continue } else if res.WriteError != nil { source.WriteErrors++ continue } //Инкрементируем счетчики source.Requests++ perSecond.Requests++ perSecond.Readed += res.NetIn perSecond.Writed += res.NetOut source.Readed += res.NetIn source.Writed += res.NetOut //Собираем статистику по запросам в разрезе HTTP кодов source.Codes[res.ResponseCode]++ if !allowStore { perSecond.Skiped++ source.Skiped++ continue } //Для среднего времени ответа sum := int64(res.Duration.Seconds() * 1000) source.Sum += sum perSecond.Sum += sum //Максимальное и минимальное время ответа if source.Min > res.Duration { source.Min = roundDuration(res.Duration) } if source.Max < res.Duration { source.Max = roundDuration(res.Duration) } //Количество запросов в разрезе времени ответа округленная до 10 миллисекунд duration := time.Duration(res.Duration.Nanoseconds()/10000000) * time.Millisecond * 10 source.DurationPercent[duration]++ //Завершение сбора статистики case <-config.StatsQuit: //Записываем общее время теста source.Work = time.Duration(time.Now().Sub(start).Seconds()*1000) * time.Millisecond if config.Verbose { s := "" for { if len(s) >= 61 { break } s += "-" } fmt.Println(s) } //Подтверждаем завершение config.StatsQuit <- true return } } }
Подводим итоги
Как парсить аргументы запуска и форматировать вывод статистики я опущу, так как это не интересно. А теперь давайте проверим, что у нас получилось. Для пробы натравим wrk на Node.js кластер
% ./wrk -c 21 -t 7 -d 30s -L http://localhost:3001/index.html Running 30s test @ http://localhost:3001/index.html 7 threads and 21 connections Thread Stats Avg Stdev Max +/- Stdev Latency 1.09ms 6.55ms 152.07ms 99.63% Req/Sec 5.20k 3.08k 14.33k 58.75% Latency Distribution 50% 490.00us 75% 0.89ms 90% 1.83ms 99% 5.04ms 1031636 requests in 30.00s, 153.48MB read Requests/sec: 34388.25 Transfer/sec: 5.12MB
и тоже самое на go с GOMAXPROCS=1
% ./go-meter -t 7 -c 21 -d 30s -u http://localhost:3001/index.html Running test threads: 7, connections: 21 in 30s GET http://localhost:3001/index.html Stats: Min Avg Max Latency 0 0 83ms 843183 requests in 30s, net: in 103MB, out 62MB HTTP Codes: 200 100.00% Latency: 0 99.99% 10ms - 80ms 0.01% Requests: 28106.10/sec Net In: 27MBit/sec Net Out: 17MBit/sec Transfer: 5.5MB/sec
Получаем 28106 против 34388 запросов в секунду — это примерно на 20% меньше, по сравнению с чистым Cи + event loop + nio. Довольно неплохо, при изменении GOMAXPROCS разницы практически нет, так как большую часть процессорного времени отбирает Node.js.
Минусы:
— потеря 20% производительности, можно попробовать упростить Request/Response, может дать немного производительности
— еще нет поддержи HTTPS
— еще нельзя указать пользовательские HTTP заголовки и timeout
Все исходники тут — Github
Как пользоваться
% go get github.com/a696385/go-meter % $GOPATH/bin/go-meter -h
Спасибо за внимание!
