Никак не доходили руки переписать go-meter. Увеличить производительность, получить более полный контроль над процессом и довести до приближения к wrk. В идеале хочется увидеть легко и удобно расширяемую альтернативу. Да, в wrk недавно появилась поддержка Lua скриптов, которые решают многие неудобства, но и там тоже есть неприятные нюансы, например, расширенную статистику собирать не получится, так как методы вывода статистики работают только на первом потоке, и к собранным данным на других потоках доступа нет, поэтому сводится опять к тому, что-бы разбираться в исходниках и делать под себя, но это не тривиальная задача. И так, готовим нагрузочный тест на Go, c плюшками. Кому интересно, прошу под кат.
С начала разберемся что нам нужно:
— отправка GET/POST/PUT/DELETE запросов
— перебор URL, и POST body
— контроль над открытыми соединениями
— контроль над потоками
— указание продолжительности тестирования
— ограничения по максимальному количеству запросов в секунду
— возможность исключить несколько первых секунд из статистики, чтобы избежать искажения в момент прогрева HTTP сервера
— пул соединений
— простые Request/Response
— статистика
— profit
Пул соединений пишем на основе ограниченного канала. Выглядеть он будет как простой пул объектов, взяли объект из канала, поработали, положили обратно.
Request/Response тут можно почитать исходники Go, посмотреть как реализовано там, и сделать упрощенную аналогию, главным отличием будет возможность получить объем трафика каждого запроса/ответа и сэкономить драгоценное время.
Для того что бы наши потоки выключились, когда время тестирования закончится, сделаем канал для завершения работы потоков и канал, по которому каждый поток будет сообщать, что он корректно завершил свою работу
засечем время, и также будем ждать Ctr+C(SIGTERM), чтобы наше приложение могло завершить тестирование в любой момент
Теперь посмотрим на сам воркер: для ограничения по количеству запросов в секунду возьмем для каждого его долю от общего количества, 4 раза в секунду будем увеличивать счетчик и ждать либо освободившиеся соединение либо завершение работы
Как только соединение освободится, формируем следующий запрос и запускаем асинхронно отправку его, так по кругу пока не кончится время. После того как запрос отправлен, а ответ прочитан, соединение возвращаем в пул, и поток снова подхватит его.
Осталось дело за малым, собрать статистику из объектов RequestStats и оформить ее
Как парсить аргументы запуска и форматировать вывод статистики я опущу, так как это не интересно. А теперь давайте проверим, что у нас получилось. Для пробы натравим wrk на Node.js кластер
и тоже самое на go с GOMAXPROCS=1
Получаем 28106 против 34388 запросов в секунду — это примерно на 20% меньше, по сравнению с чистым Cи + event loop + nio. Довольно неплохо, при изменении GOMAXPROCS разницы практически нет, так как большую часть процессорного времени отбирает Node.js.
Минусы:
— потеря 20% производительности, можно попробовать упростить Request/Response, может дать немного производительности
— еще нет поддержи HTTPS
— еще нельзя указать пользовательские HTTP заголовки и timeout
Все исходники тут — Github
Как пользоваться
Спасибо за внимание!
Что есть и что нужно
С начала разберемся что нам нужно:
— отправка GET/POST/PUT/DELETE запросов
— перебор URL, и POST body
— контроль над открытыми соединениями
— контроль над потоками
— указание продолжительности тестирования
— ограничения по максимальному количеству запросов в секунду
— возможность исключить несколько первых секунд из статистики, чтобы избежать искажения в момент прогрева HTTP сервера
План
— пул соединений
— простые Request/Response
— статистика
— profit
мысли вслух
Раз нужно контролировать соединения, стандартный http.Client нам не подходит (да и большой он для такой задачи), умеет слишком много из-за чего страдает производительность. Так как у нас подразумевается несколько потоков воркеров для отправки запросов, то нам нужен пул соединений, которые они будут между собой делить. Воркеру ждать ответа от сервера не имеет смысла, мы просто теряем на это драгоценное время. Как оценить проходящий трафик? Стандартные http.Request, http.Respose такой информации не дают, использовать их не получится, значит нужно реализовывать простой Request/Response, который нам все неоходимое даст. Собирать сырые данные и в конце их агрегировать не получится, так как память не резиновая. Собираем стату на лету.
Поехали
Пул соединений пишем на основе ограниченного канала. Выглядеть он будет как простой пул объектов, взяли объект из канала, поработали, положили обратно.
type Connection struct {
conn net.Conn
manager *ConnectionManager
}
type ConnectionManager struct {
conns chan *Connection
config *Config
}
func NewConnectionManager(config *Config) (result *ConnectionManager) {
result = &ConnectionManager{config: config, conns: make(chan *Connection, config.Connections)}
for i := 0; i < config.Connections; i++ {
connection := &Connection{manager: result}
if connection.Dial() != nil {
ConnectionErrors++
}
result.conns <- connection
}
return
}
func (this *ConnectionManager) Get() *Connection {
return <-this.conns
}
func (this *Connection) Dial() error {
if this.IsConnected() {
this.Disconnect()
}
conn, err := net.Dial("tcp4", this.manager.config.Url.Host)
if err == nil {
this.conn = conn
}
return err
}
func (this *Connection) Disconnect() {
this.conn.Close()
this.conn = nil
}
func (this *Connection) IsConnected() bool {
return this.conn != nil
}
func (this *Connection) Return() {
this.manager.conns <- this
}
Request/Response тут можно почитать исходники Go, посмотреть как реализовано там, и сделать упрощенную аналогию, главным отличием будет возможность получить объем трафика каждого запроса/ответа и сэкономить драгоценное время.
Request
type Request struct {
Method string
URL *url.URL
Header map[string][]string
Body io.Reader
ContentLength int64
Host string
BufferSize int64
}
func (req *Request) Write(w io.Writer) error {
bw := &bytes.Buffer{}
fmt.Fprintf(bw, "%s %s HTTP/1.1\r\n", valueOrDefault(req.Method, "GET"), req.URL.RequestURI())
fmt.Fprintf(bw, "Host: %s\r\n", req.Host)
userAgent := ""
if req.Header != nil {
if ua := req.Header["User-Agent"]; len(ua) > 0 {
userAgent = ua[0]
}
}
if userAgent != "" {
fmt.Fprintf(bw, "User-Agent: %s\r\n", userAgent)
}
if req.Method == "POST" || req.Method == "PUT" {
fmt.Fprintf(bw, "Content-Length: %d\r\n", req.ContentLength)
}
if req.Header != nil {
for key, values := range req.Header {
if key == "User-Agent" || key == "Content-Length" || key == "Host" {
continue
}
for _, value := range values {
fmt.Fprintf(bw, "%s: %s\r\n", key, value)
}
}
}
io.WriteString(bw, "\r\n")
if req.Method == "POST" || req.Method == "PUT" {
bodyReader := bufio.NewReader(req.Body)
_, err := bodyReader.WriteTo(bw)
if err != nil {
return err
}
}
req.BufferSize = int64(bw.Len())
_, err := bw.WriteTo(w)
return err
}
Response
type Response struct {
Status string
StatusCode int
Header map[string][]string
ContentLength int64
BufferSize int64
}
func ReadResponse(r *bufio.Reader) (*Response, error) {
tp := textproto.NewReader(r)
resp := &Response{}
line, err := tp.ReadLine()
if err != nil {
return nil, err
}
f := strings.SplitN(line, " ", 3)
resp.BufferSize += int64(len(f) + 2)
if len(f) < 2 {
return nil, errors.New("Response Header ERROR")
}
reasonPhrase := ""
if len(f) > 2 {
reasonPhrase = f[2]
}
resp.Status = f[1] + " " + reasonPhrase
resp.StatusCode, err = strconv.Atoi(f[1])
if err != nil {
return nil, errors.New("malformed HTTP status code")
}
resp.Header = make(map[string][]string)
for {
line, err := tp.ReadLine()
if err != nil {
return nil, errors.New("Response Header ERROR")
}
resp.BufferSize += int64(len(line) + 2)
if len(line) == 0 {
break
} else {
f := strings.SplitN(line, ":", 2)
resp.Header[f[0]] = append(resp.Header[strings.TrimSpace(f[0])], strings.TrimSpace(f[1]))
}
}
if cl := resp.Header["Content-Length"]; len(cl) > 0 {
i, err := strconv.ParseInt(cl[0], 10, 0)
if err == nil {
resp.ContentLength = i
}
}
buff := make([]byte, resp.ContentLength)
r.Read(buff)
resp.BufferSize += int64(resp.ContentLength)
return resp, nil
}
Для того что бы наши потоки выключились, когда время тестирования закончится, сделаем канал для завершения работы потоков и канал, по которому каждый поток будет сообщать, что он корректно завершил свою работу
WorkerQuit := make(chan bool, *_threads)
WorkerQuited := make(chan bool, *_threads)
засечем время, и также будем ждать Ctr+C(SIGTERM), чтобы наше приложение могло завершить тестирование в любой момент
//Start Ctr+C listen
signalChan := make(chan os.Signal, 1)
signal.Notify(signalChan, os.Interrupt, syscall.SIGTERM)
//Wait timers or SIGTERM
select {
case <-time.After(config.Duration):
case <-signalChan:
}
for i := 0; i < config.Threads; i++ {
config.WorkerQuit <- true
}
//Wait for threads complete
for i := 0; i < config.Threads; i++ {
<-config.WorkerQuited
}
Теперь посмотрим на сам воркер: для ограничения по количеству запросов в секунду возьмем для каждого его долю от общего количества, 4 раза в секунду будем увеличивать счетчик и ждать либо освободившиеся соединение либо завершение работы
func NewThread(config *Config) {
timerAllow := time.NewTicker(time.Duration(250) * time.Millisecond)
allow := int32(config.MRQ / 4 / config.Threads)
if config.MRQ == -1 {
allow = 2147483647
} else if allow <= 0 {
allow = 1
}
var connectionErrors int32 = 0
currentAllow := allow
for {
select {
//По таймеру выставляем счетчик на количество разрешенных запросов
case <-timerAllow.C:
currentAllow = allow
//Получаем свободное соединение
case connection := <-config.ConnectionManager.conns:
currentAllow--
//Если разрешенные запросы кончились - возвращаем соединение в пул
if currentAllow < 0 {
connection.Return()
} else {
//Формируем запрос
req := getRequest(config.Method, config.Url, config.Source.GetNext())
//Если нужно переподключаться на каждом запросе
if config.Reconnect && connection.IsConnected() {
connection.Disconnect()
}
//Если соединение разорвано, то пробуем его восстановить
if !connection.IsConnected() {
if connection.Dial() != nil {
connectionErrors++
}
}
//Отправляем запрос если есть соединение, иначе возвращаем соединение
if connection.IsConnected() {
go writeSocket(connection, req, config.RequestStats)
} else {
connection.Return()
}
}
//Ждем завершения
case <-config.WorkerQuit:
//Записываем ошибки по соединениям
atomic.AddInt32(&ConnectionErrors, connectionErrors)
//Подтверждаем завершение
config.WorkerQuited <- true
return
}
}
}
Как только соединение освободится, формируем следующий запрос и запускаем асинхронно отправку его, так по кругу пока не кончится время. После того как запрос отправлен, а ответ прочитан, соединение возвращаем в пул, и поток снова подхватит его.
Отправка запроса
func writeSocket(connection *Connection, req *http.Request, read chan *RequestStats) {
result := &RequestStats{}
//По окончанию обязательно отправляем статус и отдаем соединение в пул
defer func() {
connection.Return()
read <- result
}()
now := time.Now()
conn := connection.conn
bw := bufio.NewWriter(conn)
//Пишем запрос
err := req.Write(bw)
if err != nil {
result.WriteError = err
return
}
err = bw.Flush()
if err != nil {
result.WriteError = err
return
}
//Ждем ответа
res, err := http.ReadResponse(bufio.NewReader(conn))
if err != nil {
result.ReadError = err
return
}
//Собираем нужную информацию
result.Duration = time.Now().Sub(now)
result.NetOut = req.BufferSize
result.NetIn = res.BufferSize
result.ResponseCode = res.StatusCode
req.Body = nil
}
Осталось дело за малым, собрать статистику из объектов RequestStats и оформить ее
//Вся статистика
type StatsSource struct {
Readed int64
Writed int64
Requests int
Skiped int
Min time.Duration
Max time.Duration
Sum int64
Codes map[int]int
DurationPercent map[time.Duration]int
ReadErrors int
WriteErrors int
Work time.Duration
}
//Статистика для посекундных отчетов
type StatsSourcePerSecond struct {
Readed int64
Writed int64
Requests int
Skiped int
Sum int64
}
//Агрегатор статистики
func StartStatsAggregator(config *Config) {
allowStore := true
allowStoreTime := time.After(config.ExcludeSeconds)
if config.ExcludeSeconds.Seconds() > 0 {
allowStore = false
}
verboseTimer := time.NewTicker(time.Duration(1) * time.Second)
if config.Verbose {
fmt.Printf("%s %s %s %s %s %s\n",
newSpancesFormatRightf("Second", 10, "%s"),
newSpancesFormatRightf("Total", 10, "%s"),
newSpancesFormatRightf("Req/sec", 10, "%s"),
newSpancesFormatRightf("Avg/sec", 10, "%s"),
newSpancesFormatRightf("In/sec", 10, "%s"),
newSpancesFormatRightf("Out/sec", 10, "%s"),
)
} else {
verboseTimer.Stop()
}
source = StatsSource{
Codes: make(map[int]int),
DurationPercent: make(map[time.Duration]int),
}
perSecond := StatsSourcePerSecond{}
start := time.Now()
for {
select {
//Таймер для посекундных отчетов
case <-verboseTimer.C:
if perSecond.Requests-perSecond.Skiped > 0 && config.Verbose {
//Считаем среднее время ответа
avgMilliseconds := perSecond.Sum / int64(perSecond.Requests-perSecond.Skiped)
avg := time.Duration(avgMilliseconds) * time.Millisecond
//Пишем статистику
fmt.Printf("%s %s %s %s %s %s\n",
newSpancesFormatRightf(roundToSecondDuration(time.Now().Sub(start)), 10, "%v"),
newSpancesFormatRightf(source.Requests, 10, "%d"),
newSpancesFormatRightf(perSecond.Requests, 10, "%d"),
newSpancesFormatRightf(avg, 10, "%v"),
newSpancesFormatRightf(Bites(perSecond.Readed), 10, "%s"),
newSpancesFormatRightf(Bites(perSecond.Writed), 10, "%s"),
)
}
//Сбрасываем данные
perSecond = StatsSourcePerSecond{}
//Таймер для разрешения сбора статистики нужен для пропуска на старте
case <-allowStoreTime:
allowStore = true
//Получаем ответ от сервера
case res := <-config.RequestStats:
//Если были ошибки - просто их записываем, остальная информация нам не интересна
if res.ReadError != nil {
source.ReadErrors++
continue
} else if res.WriteError != nil {
source.WriteErrors++
continue
}
//Инкрементируем счетчики
source.Requests++
perSecond.Requests++
perSecond.Readed += res.NetIn
perSecond.Writed += res.NetOut
source.Readed += res.NetIn
source.Writed += res.NetOut
//Собираем статистику по запросам в разрезе HTTP кодов
source.Codes[res.ResponseCode]++
if !allowStore {
perSecond.Skiped++
source.Skiped++
continue
}
//Для среднего времени ответа
sum := int64(res.Duration.Seconds() * 1000)
source.Sum += sum
perSecond.Sum += sum
//Максимальное и минимальное время ответа
if source.Min > res.Duration {
source.Min = roundDuration(res.Duration)
}
if source.Max < res.Duration {
source.Max = roundDuration(res.Duration)
}
//Количество запросов в разрезе времени ответа округленная до 10 миллисекунд
duration := time.Duration(res.Duration.Nanoseconds()/10000000) * time.Millisecond * 10
source.DurationPercent[duration]++
//Завершение сбора статистики
case <-config.StatsQuit:
//Записываем общее время теста
source.Work = time.Duration(time.Now().Sub(start).Seconds()*1000) * time.Millisecond
if config.Verbose {
s := ""
for {
if len(s) >= 61 {
break
}
s += "-"
}
fmt.Println(s)
}
//Подтверждаем завершение
config.StatsQuit <- true
return
}
}
}
Подводим итоги
Как парсить аргументы запуска и форматировать вывод статистики я опущу, так как это не интересно. А теперь давайте проверим, что у нас получилось. Для пробы натравим wrk на Node.js кластер
% ./wrk -c 21 -t 7 -d 30s -L http://localhost:3001/index.html
Running 30s test @ http://localhost:3001/index.html
7 threads and 21 connections
Thread Stats Avg Stdev Max +/- Stdev
Latency 1.09ms 6.55ms 152.07ms 99.63%
Req/Sec 5.20k 3.08k 14.33k 58.75%
Latency Distribution
50% 490.00us
75% 0.89ms
90% 1.83ms
99% 5.04ms
1031636 requests in 30.00s, 153.48MB read
Requests/sec: 34388.25
Transfer/sec: 5.12MB
и тоже самое на go с GOMAXPROCS=1
% ./go-meter -t 7 -c 21 -d 30s -u http://localhost:3001/index.html
Running test threads: 7, connections: 21 in 30s GET http://localhost:3001/index.html
Stats: Min Avg Max
Latency 0 0 83ms
843183 requests in 30s, net: in 103MB, out 62MB
HTTP Codes:
200 100.00%
Latency:
0 99.99%
10ms - 80ms 0.01%
Requests: 28106.10/sec
Net In: 27MBit/sec
Net Out: 17MBit/sec
Transfer: 5.5MB/sec
Получаем 28106 против 34388 запросов в секунду — это примерно на 20% меньше, по сравнению с чистым Cи + event loop + nio. Довольно неплохо, при изменении GOMAXPROCS разницы практически нет, так как большую часть процессорного времени отбирает Node.js.
Минусы:
— потеря 20% производительности, можно попробовать упростить Request/Response, может дать немного производительности
— еще нет поддержи HTTPS
— еще нельзя указать пользовательские HTTP заголовки и timeout
Все исходники тут — Github
Как пользоваться
% go get github.com/a696385/go-meter
% $GOPATH/bin/go-meter -h
Спасибо за внимание!