«Дружим» redis с nginx

    Не секрет, что для защиты от HTTP-DDoS зачастую используют связку nginx в качестве фронтенда и некий другой web-сервер в качестве бакенда. При этом ввиду большой нагрузки возникает проблема хранения логов для дальнейшего их анализа. Можно хранить в текстовом файле, но, естественно, анализировать/ротировать его весьма неудобно. Можно гнать данные напрямую в, например, mysql через пайп, но выигрывая в удобстве анализа мы проигрываем в производительности, особенно это заметно при фрагментации. Золотой серединой, пожалуй, будет no-sql решение.
    Для себя я выбрал redis.

    Для хранения логов напишем небольшой модуль для nginx. Изначально модуль был реализован c использованием официального Си-шного клиента hiredis, но случайно наткнулся на аналогичный модуль от Валерия Холодкова, который уже обладал большим функционалом, и взаимодействие реализовано без использования сторонних клиентов и решил позаимствовать его идеи (на тестах модули показали практически равные значения производительности).
    В модуле от Валерия исправлена работа EVALSHA, устранены небольшие баги и добавлена поддержка SETEX.
    ngx_http_redislog_module.c
    /*
     * Copyright (C)    2012 Valery Kholodkov
     *                  2014 Alexander V Makkoveev
     */
    
    #include <ngx_config.h>
    #include <ngx_core.h>
    #include <ngx_http.h>
    #include <nginx.h>
    
    #define NGX_SOCKETLOG_FACILITY_LOCAL7      23
    #define NGX_SOCKETLOG_SEVERITY_INFO        6
    
    #define NGX_REDIS_APPEND                   "*3" CRLF "$6" CRLF "APPEND" CRLF
    #define NGX_REDIS_AUTH                     "*2" CRLF "$4" CRLF "AUTH" CRLF
    
    //#define NGX_DEF_FORMAT    "combined"
    #define NGX_DEF_FORMAT      "main"
    #define IF_DEBUG            0  
    #define IF_DEBUG_2          0
    //*****************************************************************************
    
    typedef struct ngx_http_log_op_s  ngx_http_log_op_t;
    
    typedef u_char *(*ngx_http_log_op_run_pt) (ngx_http_request_t *r, u_char *buf,
        ngx_http_log_op_t *op);
    
    typedef size_t (*ngx_http_log_op_getlen_pt) (ngx_http_request_t *r,
        uintptr_t data);
    
    struct ngx_redislog_peer;
    
    typedef void (*ngx_redislog_send_handler_pt)(struct ngx_redislog_peer*);
    
    struct ngx_http_log_op_s {
        size_t                      len;
        ngx_http_log_op_getlen_pt   getlen;
        ngx_http_log_op_run_pt      run;
        uintptr_t                   data;
    };
    
    typedef struct {
        ngx_str_t                   name;
    #if defined nginx_version && nginx_version >= 7018
        ngx_array_t                *flushes;
    #endif
        ngx_array_t                *ops;        /* array of ngx_http_log_op_t */
    } ngx_http_log_fmt_t;
    
    typedef struct {
        ngx_array_t                 formats;    /* array of ngx_http_log_fmt_t */
        ngx_uint_t                  combined_used; /* unsigned  combined_used:1 */
    } ngx_http_log_main_conf_t;
    
    typedef struct {
        ngx_str_t                           name;
        struct sockaddr                     *sockaddr;
        socklen_t                           socklen;
        ngx_msec_t                          write_timeout;
        ngx_msec_t                          read_timeout;
        ngx_msec_t                          connect_timeout;
        ngx_msec_t                          reconnect_timeout;
        ngx_msec_t                          flush_timeout;
        ngx_msec_t                          ping_timeout;
    
        ngx_bufs_t                          bufs;
        size_t                              recv_buf_size;
    
        ngx_str_t                           password;
    
        unsigned                            authenticate:1;
    } ngx_redislog_peer_conf_t;
    
    typedef struct {
        ngx_array_t                         *peers;
    } ngx_redislog_conf_t;
    
    typedef struct ngx_redislog_peer {
        ngx_redislog_peer_conf_t           *conf;
        ngx_peer_connection_t               conn;
        ngx_event_t                         reconnect_timer;
        ngx_event_t                         flush_timer;
        ngx_event_t                         ping_timer;
        ngx_log_t                           *log;
        ngx_pool_t                          *pool;
    
        ngx_chain_t                         *busy;
        ngx_chain_t                         *free;
    
        ngx_buf_t                           *recv_buf;
    
        ngx_uint_t                          discarded;
        ngx_uint_t                          reconnect_timeout;
    
        ngx_uint_t                          num_queued;
        ngx_uint_t                          state;
        u_char                              *password_pos;
    
        ngx_redislog_send_handler_pt        send_handler;
    
        unsigned                            connecting:1;
        unsigned                            authenticated:1;
        unsigned                            flush_timer_set:1;
    } ngx_redislog_peer_t;
    
    typedef struct {
        ngx_str_t                           peer_name;
        ngx_uint_t                          peer_idx;
        ngx_http_log_fmt_t                  *format;
        ngx_http_complex_value_t            *key;
        ngx_str_t                           command;
        ngx_str_t                           arg1;
    //***
        //ngx_str_t                           arg_num;
    //***
        ngx_http_complex_value_t            *_if;
        ngx_http_complex_value_t            *ifnot;
    
        unsigned                            has_arg1;
    } ngx_http_redislog_t;
    
    typedef struct {
        ngx_array_t                  *logs;       /* array of ngx_http_redislog_t */
        unsigned                     off;
    } ngx_http_redislog_conf_t;
    
    static ngx_array_t ngx_redislog_peers;
    
    static void ngx_redislog_reconnect_peer(ngx_redislog_peer_t *p);
    static void ngx_http_redislog_append(ngx_redislog_peer_t *p, u_char *buf, size_t len);
    static void ngx_http_redislog_send(ngx_redislog_peer_t *p);
    static void ngx_redislog_flush_handler(ngx_event_t*);
    static u_char *ngx_redislog_size(u_char*, u_char*, size_t);
    static size_t ngx_redislog_size_len(size_t);
    
    static ngx_int_t ngx_redislog_process_buf(ngx_redislog_peer_t*, ngx_buf_t*);
    
    static void ngx_redislog_read_handler(ngx_event_t *rev);
    static void ngx_redislog_idle_read_handler(ngx_event_t *rev);
    
    static char *ngx_http_redislog_set_log(ngx_conf_t *cf, ngx_command_t *cmd, void *conf);
    static char *ngx_http_redislog_command(ngx_conf_t *cf, ngx_command_t *cmd, void *conf);
    
    static void *ngx_http_redislog_create_loc_conf(ngx_conf_t *cf);
    static char *ngx_http_redislog_merge_loc_conf(ngx_conf_t *cf, void *parent,
        void *child);
    
    static ngx_int_t ngx_http_redislog_yyyy_variable(ngx_http_request_t *r,
        ngx_http_variable_value_t *v, uintptr_t data);
    static ngx_int_t ngx_http_redislog_yyyymm_variable(ngx_http_request_t *r,
        ngx_http_variable_value_t *v, uintptr_t data);
    static ngx_int_t ngx_http_redislog_yyyymmdd_variable(ngx_http_request_t *r,
        ngx_http_variable_value_t *v, uintptr_t data);
    static ngx_int_t ngx_http_redislog_yyyymmddhh_variable(ngx_http_request_t *r,
        ngx_http_variable_value_t *v, uintptr_t data);
    
    static void *ngx_redislog_create_conf(ngx_cycle_t *cycle);
    static ngx_int_t ngx_http_redislog_add_variables(ngx_conf_t *cf);
    static ngx_int_t ngx_http_redislog_init(ngx_conf_t *cf);
    static ngx_int_t ngx_redislog_init_process(ngx_cycle_t *cycle);
    
    static ngx_command_t ngx_http_redislog_commands[] = {
    
        { ngx_string("access_redislog"),
          NGX_HTTP_MAIN_CONF|NGX_HTTP_SRV_CONF|NGX_HTTP_LOC_CONF|NGX_HTTP_SIF_CONF|NGX_HTTP_LIF_CONF
                            |NGX_HTTP_LMT_CONF|NGX_CONF_TAKE1234,
          ngx_http_redislog_set_log,
          NGX_HTTP_LOC_CONF_OFFSET,
          0,
          NULL },
    
          ngx_null_command
    };
    
    static ngx_http_module_t  ngx_http_redislog_module_ctx = {
        ngx_http_redislog_add_variables,      /* preconfiguration */
        ngx_http_redislog_init,               /* postconfiguration */
    
        NULL,                                  /* create main configuration */
        NULL,                                  /* init main configuration */
    
        NULL,                                  /* create server configuration */
        NULL,                                  /* merge server configuration */
    
        ngx_http_redislog_create_loc_conf,    /* create location configration */
        ngx_http_redislog_merge_loc_conf      /* merge location configration */
    };
    
    extern ngx_module_t ngx_http_log_module;
    
    ngx_module_t  ngx_http_redislog_module = {
        NGX_MODULE_V1,
        &ngx_http_redislog_module_ctx,        /* module context */
        ngx_http_redislog_commands,           /* module directives */
        NGX_HTTP_MODULE,                       /* module type */
        NULL,                                  /* init master */
        NULL,                                  /* init module */
        NULL,                                  /* init process */
        NULL,                                  /* init thread */
        NULL,                                  /* exit thread */
        NULL,                                  /* exit process */
        NULL,                                  /* exit master */
        NGX_MODULE_V1_PADDING
    };
    
    static ngx_command_t  ngx_redislog_commands[] = {
    
        { ngx_string("redislog"),
          NGX_MAIN_CONF|NGX_CONF_TAKE23,
          ngx_http_redislog_command,
          0,
          0,
          NULL },
    
          ngx_null_command
    };
    
    static ngx_core_module_t  ngx_redislog_module_ctx = {
        ngx_string("redislog"),
        ngx_redislog_create_conf,
        NULL
    };
    
    ngx_module_t  ngx_core_redislog_module = {
        NGX_MODULE_V1,
        &ngx_redislog_module_ctx,             /* module context */
        ngx_redislog_commands,                /* module directives */
        NGX_CORE_MODULE,                       /* module type */
        NULL,                                  /* init master */
        NULL,                                  /* init module */
        ngx_redislog_init_process,            /* init process */
        NULL,                                  /* init thread */
        NULL,                                  /* exit thread */
        NULL,                                  /* exit process */
        NULL,                                  /* exit master */
        NGX_MODULE_V1_PADDING
    };
    
    static ngx_http_variable_t ngx_http_redislog_variables[] = {
    
        { ngx_string("redislog_yyyy"), NULL, ngx_http_redislog_yyyy_variable, 0,
          0, 0 },
    
        { ngx_string("redislog_yyyymm"), NULL, ngx_http_redislog_yyyymm_variable, 0,
          0, 0 },
    
        { ngx_string("redislog_yyyymmdd"), NULL, ngx_http_redislog_yyyymmdd_variable, 0,
          0, 0 },
    
        { ngx_string("redislog_yyyymmddhh"), NULL, ngx_http_redislog_yyyymmddhh_variable, 0,
          0, 0 },
    
        { ngx_null_string, NULL, NULL, 0, 0, 0 }
    };
    //-----------------------------------------------------------------------------
    ngx_int_t
    ngx_http_redislog_handler(ngx_http_request_t *r)
    {
        
        u_char                    *line, *p;
        size_t                    len, command_size_len, arg1_size_len, record_len, key_size_len, record_size_len;
        ngx_uint_t                i, l;
        ngx_str_t                 key, _if, ifnot;
        ngx_http_redislog_t      *log;
        ngx_http_log_op_t         *op;
        ngx_http_redislog_conf_t *slcf;
        time_t                    time;
        ngx_tm_t                  tm;
        ngx_redislog_peer_t      **peer;
    
        ngx_log_debug0(NGX_LOG_DEBUG_HTTP, r->connection->log, 0,
                       "http redislog handler");
    
        slcf = ngx_http_get_module_loc_conf(r, ngx_http_redislog_module);
    
        ngx_log_debug3(NGX_LOG_DEBUG_HTTP, r->connection->log, 0,
               "redislog conf=%p, off=%ud, logs=%p", slcf, slcf->off, slcf->logs);
    
        if(slcf->off || slcf->logs == NULL) {
            return NGX_OK;
        }
    
        time = ngx_time();
        ngx_gmtime(time, &tm);
    
        log = slcf->logs->elts;
    
        for (l = 0; l < slcf->logs->nelts; l++) {
    #if defined nginx_version && nginx_version >= 7018
            ngx_http_script_flush_no_cacheable_variables(r, log[l].format->flushes);
    #endif
    
            len = 0;
            op = log[l].format->ops->elts;
            for (i = 0; i < log[l].format->ops->nelts; i++) {
                if (op[i].len == 0) {
                    len += op[i].getlen(r, op[i].data);
    
                } else {
                    len += op[i].len;
                }
            }
    
            if(log[l].ifnot != NULL) {
                if(ngx_http_complex_value(r, log[l].ifnot, &ifnot) != NGX_OK) {
                    return NGX_ERROR;
                }
    
                if(ifnot.len && (ifnot.len != 1 || ifnot.data[0] != '0')) {
                    continue;
                }
            }
    
            if(log[l]._if != NULL) {
                if(ngx_http_complex_value(r, log[l]._if, &_if) != NGX_OK) {
                    return NGX_ERROR;
                }
    
                if(!_if.len || (_if.len == 1 && _if.data[0] == '0')) {
                    continue;
                }
            }
    
            if(ngx_http_complex_value(r, log[l].key, &key) != NGX_OK) {
                return NGX_ERROR;
            }
            
            command_size_len = ngx_redislog_size_len(log[l].command.len);
            key_size_len = ngx_redislog_size_len(key.len);
    
            if(log[l].arg1.len) {
                arg1_size_len = ngx_redislog_size_len(log[l].arg1.len);
            }
            else {
                arg1_size_len = 0;
            }
    
            len += 2 + sizeof(CRLF) - 1 + 1 + command_size_len + 1 + sizeof(CRLF) - 1
                + command_size_len + sizeof(CRLF) - 1 + log[l].command.len + sizeof(CRLF) - 1
                + key_size_len + sizeof(CRLF) - 1 + key.len + sizeof(CRLF) - 1
                + 1 + NGX_OFF_T_LEN + sizeof(CRLF) - 1 + sizeof(CRLF) - 1;
    
            if(ngx_strncmp(log[l].command.data, "APPEND", 6) == 0) {
                len++;
            }
    
            if(log[l].has_arg1) {
                len += arg1_size_len + sizeof(CRLF) - 1 + log[l].arg1.len + sizeof(CRLF) - 1;
            }
    
    #if defined nginx_version && nginx_version >= 7003
            line = ngx_pnalloc(r->pool, len);
    #else
            line = ngx_palloc(r->pool, len);
    #endif
            if (line == NULL) {
                return NGX_ERROR;
            }
    
            p = line;
    
            for(i = 0; i < log[l].format->ops->nelts; i++) {
                p = op[i].run(r, p, &op[i]);
            }
    
            if(ngx_strncmp(log[l].command.data, "APPEND", 6) == 0) {
                *p++ = LF;
            }
    
            record_len = p - line;
    
            record_size_len = ngx_redislog_size_len(record_len);
    
            p = line;
    
            /*
             * Redis append to time series command
                *3
                $6
                APPEND
                $nnn
                key
                $nnn
                log record
            */
            *p++ = '*';
    
            //*p++ = log[l].has_arg1 ? '4' : '3';
            /* SETEX */
            if(ngx_strncmp(log[l].command.data, "SETEX", 5) == 0)
            *p++ = '4';
            else
            *p++ = log[l].has_arg1 ? '5' : '3';
    
            p = ngx_copy(p, CRLF, sizeof(CRLF) - 1);
    
            *p++ = '$';
            //*p++ = '0';
    
            p = ngx_redislog_size(p, p + command_size_len, log[l].command.len);
    
            p = ngx_copy(p, CRLF, sizeof(CRLF) - 1);
    
            p = ngx_copy(p, log[l].command.data, log[l].command.len);
    
            p = ngx_copy(p, CRLF, sizeof(CRLF) - 1);
    
            if(log[l].has_arg1 && ngx_strncmp(log[l].command.data, "EVALSHA", 7) == 0) {
                *p++ = '$';
                p = ngx_redislog_size(p, p + arg1_size_len, log[l].arg1.len);
                p = ngx_copy(p, CRLF, sizeof(CRLF) - 1);
                p = ngx_copy(p, log[l].arg1.data, log[l].arg1.len);
                p = ngx_copy(p, CRLF, sizeof(CRLF) - 1);
    
                *p++ = '$';
                p = ngx_redislog_size(p, p + 1, 1);
                p = ngx_copy(p, CRLF, sizeof(CRLF) - 1);
                p = ngx_copy(p, "1", 1);
                p = ngx_copy(p, CRLF, sizeof(CRLF) - 1);
            }
    /*
    *p++ = '$';
    *p++ = '1';
    *p++ = LF;
    *p++ = '0';
    *p++ = LF;
    */
    
            *p++ = '$';
    
            p = ngx_redislog_size(p, p + key_size_len, key.len);
    
            p = ngx_copy(p, CRLF, sizeof(CRLF) - 1);
    
            p = ngx_copy(p, key.data, key.len);
    
            p = ngx_copy(p, CRLF, sizeof(CRLF) - 1);
    
                    if(log[l].has_arg1 && ngx_strncmp(log[l].command.data, "SETEX", 5) == 0) {
                *p++ = '$';
                p = ngx_redislog_size(p, p + arg1_size_len, log[l].arg1.len);
                p = ngx_copy(p, CRLF, sizeof(CRLF) - 1);
                p = ngx_copy(p, log[l].arg1.data, log[l].arg1.len);
                p = ngx_copy(p, CRLF, sizeof(CRLF) - 1);
    
            }
    
            *p++ = '$';
    
            p = ngx_redislog_size(p, p + record_size_len, record_len);
    
            p = ngx_copy(p, CRLF, sizeof(CRLF) - 1);
    
            for(i = 0; i < log[l].format->ops->nelts; i++) {
                p = op[i].run(r, p, &op[i]);
            }
    
            if(ngx_strncmp(log[l].command.data, "APPEND", 6) == 0) {
                *p++ = LF;
            }
    
            p = ngx_copy(p, CRLF, sizeof(CRLF) - 1);
    
            peer = ngx_redislog_peers.elts;
    
            peer += log[l].peer_idx;
    
            ngx_http_redislog_append(*peer, line, p - line);
            //ngx_http_redislog_append(*peer, line, p - line + 5);
        }
    
        return NGX_OK;
    }
    
    static u_char *ngx_redislog_size(u_char *p, u_char *q, size_t sz)
    {
        u_char *end = q;
    
        while(p != q) {
            *--q = (sz % 10 + '0');
            sz /= 10;
        }
    
        return end;
    }
    
    static size_t ngx_redislog_size_len(size_t sz)
    {
        size_t len = 0;
        
        while(sz != 0) {
            sz /= 10;
            len++;
        }
    
        return len;
    }
    
    static u_char*
    ngx_redislog_buf_append(ngx_buf_t *buf, u_char *p, size_t *len)
    {
        size_t remaining = buf->end - buf->last;
    
        if(remaining > *len) {
            remaining = *len;
        }
    
        buf->last = ngx_copy(buf->last, p, remaining);
        *len -= remaining;
    
        return p + remaining;
    }
    
    static void
    ngx_http_redislog_append(ngx_redislog_peer_t *peer, u_char *buf, size_t len)
    {
        u_char *p;
        ngx_chain_t *last, *q;
        size_t remaining;
        ngx_uint_t num_busy = 0;
    
        /*
         * Find last busy buffer
         */
        last = peer->busy;
        
        while(last != NULL && last->next != NULL) {
            last = last->next;
        }
    
        /*
         * See if message fits into remaining space
         */
        remaining = (last != NULL ? last->buf->end - last->buf->last : 0);
    
        q = peer->free;
    
        while(remaining <= len && q != NULL) {
            remaining += (q->buf->end - q->buf->last);
            q = q->next;
        }
    
        /*
         * No memory for this message, discard it
         */
        if(remaining < len) {
            peer->discarded++;
            return;
        }
    
        /*
         * Append message to the buffers
         */
        if(last != NULL) {
            p = ngx_redislog_buf_append(last->buf, buf, &len);
        }
        else {
            p = buf;
        }
    
        while(peer->free != NULL && len != 0) {
            q = peer->free;
    
            p = ngx_redislog_buf_append(q->buf, p, &len);
    
            peer->free = peer->free->next;
    
            q->next = NULL;
    
            if(last == NULL) {
                peer->busy = q;
            }
            else {
                last->next = q;
            }
    
            last = q;
        }
    
        peer->num_queued++;
    
        q = peer->busy;
    
        while(q != NULL) {
            num_busy++;
            q = q->next;
        }
    
        if(!peer->flush_timer_set) {
            peer->flush_timer.handler = ngx_redislog_flush_handler;
            peer->flush_timer.data = peer;
            peer->flush_timer.log = peer->conn.log;
    
            ngx_add_timer(&peer->flush_timer, peer->conf->flush_timeout);
    
            peer->flush_timer_set = 1;
        }
    
        if(num_busy >= 2) {
            ngx_log_debug1(NGX_LOG_DEBUG_HTTP, peer->conn.connection->log, 0,
                           "redislog num queued is now %ud, set read handler", peer->num_queued);
    
            peer->conn.connection->read->handler = ngx_redislog_read_handler;
            /*
             * Send it
             */
            ngx_http_redislog_send(peer);
        }
    }
    
    static void
    ngx_http_redislog_send(ngx_redislog_peer_t *p)
    {
        ngx_chain_t                         *written;
        ngx_connection_t                    *c;
        ngx_chain_t                         *dummy = NULL;
    
        c = p->conn.connection;
    
        if(c == NULL || c->fd == -1) {
            return;
        }
    
        if(!c->write->ready) {
            return;
        }
    
        if(p->flush_timer_set) {
            ngx_del_timer(&p->flush_timer);
            p->flush_timer_set = 0;
        }
    
        ngx_log_debug0(NGX_LOG_DEBUG_HTTP, c->log, 0,
                       "redislog send handler");
    
        if(p->busy != NULL) {
            written = c->send_chain(c, p->busy, 0);
    
            if(written == NGX_CHAIN_ERROR) {
                ngx_log_error(NGX_LOG_ERR, c->log, 0,
                              "redislog write error");
                ngx_close_connection(c);
                ngx_redislog_reconnect_peer(p);
                return;
            }
    
            ngx_chain_update_chains(p->pool, &p->free, &p->busy, &dummy, 0);
    
            if(written != NULL) {
    
                if(!c->write->ready && !c->write->timer_set) {
                    ngx_add_timer(c->write, p->conf->write_timeout);
                }
    
                if(ngx_handle_write_event(c->write, 0) != NGX_OK) {
                    ngx_close_connection(c);
                    ngx_redislog_reconnect_peer(p);
                }
    
                return;
            }
        }
    }
    
    static void ngx_redislog_auth_send(ngx_redislog_peer_t *peer)
    {
        ngx_connection_t                    *c;
        ngx_str_t                           *password;
        ssize_t                             n;
    
        c = peer->conn.connection;
    
        if(c == NULL || c->fd == -1) {
            return;
        }
    
        password = &peer->conf->password;
    
        ngx_log_debug0(NGX_LOG_DEBUG_HTTP, c->log, 0,
                       "redislog auth send handler");
    
        n = c->send(c, peer->password_pos, password->len - (peer->password_pos - password->data));
    
        if(n > 0) {
            peer->password_pos += n;
    
            if(peer->password_pos >= (password->data + password->len)) {
                peer->send_handler = ngx_http_redislog_send;
                ngx_http_redislog_send(peer);
            }
    
            return;
        }
    
        if(n == NGX_ERROR) {
            ngx_close_connection(c);
            ngx_redislog_reconnect_peer(peer);
            return;
        }
    
        if(!c->write->timer_set) {
            ngx_add_timer(c->write, peer->conf->write_timeout);
        }
    
        if(ngx_handle_write_event(c->write, 0) != NGX_OK) {
            ngx_close_connection(c);
            ngx_redislog_reconnect_peer(peer);
            return;
        }
    }
    
    static void ngx_redislog_flush_handler(ngx_event_t *ev)
    {
        ngx_redislog_peer_t    *peer = ev->data;
    
        ngx_log_debug0(NGX_LOG_DEBUG_HTTP, peer->log, 0,
                       "redislog flush handler, set read handler");
    
        peer->flush_timer_set = 0;
        peer->conn.connection->read->handler = ngx_redislog_read_handler;
    
        ngx_http_redislog_send(peer);
    }
    
    static void ngx_redislog_connected_handler(ngx_redislog_peer_t *peer)
    {
        ngx_connection_t                    *c;
    
        c = peer->conn.connection;
    
        ngx_del_timer(c->read);
    
        /*
         * Once the connection has been established, we need to
         * reset the reconnect timeout to it's initial value
         */
        peer->reconnect_timeout = peer->conf->reconnect_timeout;
    
        if(peer->discarded != 0) {
            ngx_log_error(NGX_LOG_ERR, peer->log, 0,
                "redislog peer \"%V\" discarded %ui messages",
                &peer->conf->name, peer->discarded);
    
            peer->discarded = 0;
        }
    }
    
    static ngx_int_t ngx_redislog_process_buf(ngx_redislog_peer_t *peer, ngx_buf_t *buf)
    {
        u_char                             *p, *q;
    
        p = buf->pos;
        q = buf->last;
    
        while(p != q) {
            if(!peer->state) {
                if(*p == '+' || *p == '-' || *p == ':') {
                    if(peer->conf->authenticate && !peer->authenticated) {
                        if(*p == '-') {
                            ngx_log_error(NGX_LOG_ERR, peer->log, 0,
                                "redis authentication failure");
                            return NGX_ERROR;
                        }
                        peer->authenticated = 1;
                    }
                    else {
                        if(peer->num_queued) {
                            peer->num_queued--;
                        }
                        else {
                            ngx_log_error(NGX_LOG_ERR, peer->log, 0,
                                "too many responses from redis");
                            return NGX_ERROR;
                        }
                    }
                }
                if(*p == '-') {
                    ngx_log_error(NGX_LOG_ERR, peer->log, 0,
                        "redislog error");
                }
                peer->state++;
            }
            else {
                if(*p == LF) {
                    peer->state = 0;
                }
                else if(peer->state == 1) {
                    if(*p == CR) {
                        peer->state++;
                    }
                }
            }
    
            p++;
        }
    
        buf->pos = p;
    
        return NGX_OK;
    }
    
    static void ngx_redislog_read_handler(ngx_event_t *rev)
    {
        ngx_connection_t                   *c;
        ngx_redislog_peer_t                *peer;
        ngx_buf_t                          *buf;
        ssize_t                            n, size;
        ngx_int_t                          rc;
    
        c = rev->data;
        peer = c->data;
    
        ngx_log_debug0(NGX_LOG_DEBUG_HTTP, rev->log, 0,
                       "redislog read handler");
    
        if(c->read->timer_set) {
            ngx_del_timer(c->read);
        }
    
        if(rev->timedout || c->error || c->close) {
            if(rev->timedout) {
                ngx_log_error(NGX_LOG_ERR, rev->log, NGX_ETIMEDOUT,
                              "redislog peer timed out");
            }
    
            if(rev->error) {
                ngx_log_error(NGX_LOG_ERR, rev->log, 0,
                              "redislog peer connection error");
            }
    
            ngx_close_connection(c);
    
            if(!c->close) {
                ngx_redislog_reconnect_peer(peer);
            }
            return;
        }
    
        buf = peer->recv_buf;
    
        for( ;; ) {
            for( ;; ) {
                if(buf->last == buf->end) {
                    break;
                }
    
                size = buf->end - buf->last;
    
                n = c->recv(c, buf->last, size);
    
                ngx_log_debug1(NGX_LOG_DEBUG_HTTP, c->log, 0,
                               "redislog peer recv %z", n);
    
                if(n == NGX_AGAIN) {
                    break;
                }
    
                if(n == 0) {
                    if(peer->num_queued != 0) {
                        ngx_log_error(NGX_LOG_INFO, c->log, 0,
                                      "redis closed the connection prematurely");
                    }
                }
    
                if(n == 0 || n == NGX_ERROR) {
                    c->error = 1;
                    goto reconnect;
                }
    
                buf->last += n;
            }
    
            rc = ngx_redislog_process_buf(peer, buf);
    
            if(rc != NGX_OK) {
                goto reconnect;
            }
    
            buf->pos = buf->last = buf->start;
    
            ngx_log_debug1(NGX_LOG_DEBUG_HTTP, c->log, 0,
                           "redislog num queued is now %ud", peer->num_queued);
    
            if(peer->num_queued == 0) {
                break;
            }
    
            if (!c->read->ready) {
                if(ngx_handle_read_event(c->read, 0) != NGX_OK) {
                    goto reconnect;
                }
    
                if(!c->read->timer_set) {
                    ngx_add_timer(c->read, peer->conf->read_timeout);
                }
    
                return;
            }
        }
    
        ngx_log_debug0(NGX_LOG_DEBUG_HTTP, rev->log, 0,
                       "redislog set idle read handler");
    
        c->read->handler = ngx_redislog_idle_read_handler;
    
        return;
    
    reconnect:
        ngx_close_connection(c);
        ngx_redislog_reconnect_peer(peer);
    }
    
    static void ngx_redislog_idle_read_handler(ngx_event_t *rev)
    {
        ngx_connection_t                    *c;
        ngx_redislog_peer_t                *peer;
    
        int                                 n;
        char                                buf[1];
        ngx_err_t                           err;
    
        c = rev->data;
        peer = c->data;
    
        ngx_log_debug0(NGX_LOG_DEBUG_HTTP, rev->log, 0,
                       "redislog idle read handler");
    
        if(rev->timedout || c->error || c->close) {
            if(rev->timedout) {
                ngx_log_error(NGX_LOG_ERR, rev->log, NGX_ETIMEDOUT,
                              "redislog peer timed out");
            }
    
            if(rev->error) {
                ngx_log_error(NGX_LOG_ERR, rev->log, 0,
                              "redislog peer connection error");
            }
    
            ngx_close_connection(c);
    
            if(!c->close) {
                ngx_redislog_reconnect_peer(peer);
            }
            return;
        }
    
    #if (NGX_HAVE_KQUEUE)
        if (ngx_event_flags & NGX_USE_KQUEUE_EVENT) {
    
            if(!rev->pending_eof) {
                goto no_error;
            }
    
            rev->eof = 1;
            c->error = 1;
    
            if(rev->kq_errno) {
                rev->error = 1;
            }
    
            goto reconnect;
        }
    #endif
    
        n = recv(c->fd, buf, 1, MSG_PEEK);
    
        err = ngx_socket_errno;
    
        ngx_log_debug1(NGX_LOG_DEBUG_HTTP, rev->log, err,
                       "redislog recv(): %d", n);
    
        if(n > 0) {
            goto no_error;
        }
    
        if(n == -1) {
            if(err == NGX_EAGAIN) {
                goto no_error;
            }
    
            rev->error = 1;
    
        }
        else {
            err = 0;
        }
    
        rev->eof = 1;
        c->error = 1;
    
        ngx_log_error(NGX_LOG_ERR, rev->log, err,
                      "redislog connection error");
    
    #if (NGX_HAVE_KQUEUE)
    reconnect:
    #endif
        ngx_close_connection(c);
        ngx_redislog_reconnect_peer(peer);
        return;
    
    no_error:
        if(peer->connecting) {
            ngx_redislog_connected_handler(peer);
            peer->connecting = 0;
        }
    }
    
    static void ngx_redislog_write_handler(ngx_event_t *wev)
    {
        ngx_connection_t                    *c;
        ngx_redislog_peer_t                *peer;
    
        c = wev->data;
        peer = c->data;
    
        ngx_log_debug0(NGX_LOG_DEBUG_HTTP, wev->log, 0,
                       "redislog write handler");
    
        if(wev->timedout || c->error || c->close) {
            if(wev->timedout) {
                ngx_log_error(NGX_LOG_ERR, wev->log, NGX_ETIMEDOUT,
                              "redislog peer timed out");
            }
    
            if(wev->error) {
                ngx_log_error(NGX_LOG_ERR, wev->log, 0,
                              "redislog peer connection error");
            }
    
            ngx_close_connection(c);
    
            if(!c->close) {
                ngx_redislog_reconnect_peer(peer);
            }
            return;
        }
    
        if(peer->connecting) {
            ngx_redislog_connected_handler(peer);
            peer->connecting = 0;
        }
    
        if(c->write->timer_set) {
            ngx_del_timer(c->write);
        }
    
        peer->send_handler(peer);
    }
    
    static ngx_int_t ngx_redislog_connect_peer(ngx_redislog_peer_t *peer)
    {
        ngx_int_t               rc;
    
        ngx_log_error(NGX_LOG_INFO, peer->log, 0,
                      "redislog connect peer \"%V\"", &peer->conf->name);
    
        peer->conn.sockaddr = peer->conf->sockaddr;
        peer->conn.socklen = peer->conf->socklen;
        peer->conn.name = &peer->conf->name;
        peer->conn.get = ngx_event_get_peer;
        peer->conn.log = peer->log;
        peer->conn.log_error = NGX_ERROR_ERR;
    
        rc = ngx_event_connect_peer(&peer->conn);
    
        if (rc == NGX_ERROR || rc == NGX_BUSY || rc == NGX_DECLINED) {
            if(peer->conn.connection) {
                ngx_close_connection(peer->conn.connection);
            }
    
            return NGX_ERROR;
        }
    
        peer->conn.connection->data = peer;
        peer->conn.connection->pool = peer->pool;
        
        peer->password_pos = peer->conf->password.data;
        peer->authenticated = 0;
    
        peer->conn.connection->read->handler = ngx_redislog_read_handler;
        peer->conn.connection->write->handler = ngx_redislog_write_handler;
    
        peer->send_handler = peer->conf->authenticate ? ngx_redislog_auth_send
            : ngx_http_redislog_send;
    
        ngx_add_timer(peer->conn.connection->read, peer->conf->connect_timeout);
    
        peer->connecting = 1;
    
        return NGX_OK;
    }
    
    static void ngx_redislog_connect_handler(ngx_event_t *ev)
    {
        ngx_int_t               rc;
        ngx_redislog_peer_t    *peer = ev->data;
    
        rc = ngx_redislog_connect_peer(peer);
    
        if(rc != NGX_OK) {
            ngx_redislog_reconnect_peer(peer);
        }
    }
    
    static void ngx_redislog_reconnect_peer(ngx_redislog_peer_t *p)
    {
        p->conn.connection = NULL;
    
        p->reconnect_timer.handler = ngx_redislog_connect_handler;
        p->reconnect_timer.data = p;
        p->reconnect_timer.log = p->conn.log;
    
        ngx_add_timer(&p->reconnect_timer, p->reconnect_timeout);
    
        p->reconnect_timeout *= 2;
    
        if(p->discarded != 0) {
            ngx_log_error(NGX_LOG_ERR, p->log, 0,
                "redislog peer \"%V\" discarded %ui messages",
                &p->conf->name, p->discarded);
    
            p->discarded = 0;
        }
    }
    
    static ngx_int_t
    ngx_http_redislog_yyyy_variable(ngx_http_request_t *r,
        ngx_http_variable_value_t *v, uintptr_t data)
    {
        u_char *line;
    
        line = ngx_palloc(r->pool, sizeof("yyyy")-1);
    
        if(line == NULL) {
            return NGX_ERROR;    
        }
    
        (void) ngx_copy(line, ngx_cached_http_log_iso8601.data, sizeof("yyyy")-1);
    
        v->valid = 1;
        v->no_cacheable = 1;
        v->not_found = 0;
    
        v->data = line;
        v->len = sizeof("yyyy")-1;
    
        return NGX_OK;
    }
    
    static ngx_int_t
    ngx_http_redislog_yyyymm_variable(ngx_http_request_t *r,
        ngx_http_variable_value_t *v, uintptr_t data)
    {
        u_char *line, *p;
    
        line = ngx_palloc(r->pool, sizeof("yyyymm")-1);
    
        if(line == NULL) {
            return NGX_ERROR;    
        }
    
        p = ngx_copy(line, ngx_cached_http_log_iso8601.data, sizeof("yyyy")-1);
        (void) ngx_copy(p, ngx_cached_http_log_iso8601.data + 5, sizeof("mm")-1);
    
        v->valid = 1;
        v->no_cacheable = 1;
        v->not_found = 0;
    
        v->data = line;
        v->len = sizeof("yyyymm")-1;
    
        return NGX_OK;
    }
    
    static ngx_int_t
    ngx_http_redislog_yyyymmdd_variable(ngx_http_request_t *r,
        ngx_http_variable_value_t *v, uintptr_t data)
    {
        u_char *line, *p;
    
        line = ngx_palloc(r->pool, sizeof("yyyymmdd")-1);
    
        if(line == NULL) {
            return NGX_ERROR;    
        }
    
        p = ngx_copy(line, ngx_cached_http_log_iso8601.data, sizeof("yyyy")-1);
        p = ngx_copy(p, ngx_cached_http_log_iso8601.data + 5, sizeof("mm")-1);
        (void) ngx_copy(p, ngx_cached_http_log_iso8601.data + 8, sizeof("dd")-1);
    
        v->valid = 1;
        v->no_cacheable = 1;
        v->not_found = 0;
    
        v->data = line;
        v->len = sizeof("yyyymmdd")-1;
    
        return NGX_OK;
    }
    
    static ngx_int_t
    ngx_http_redislog_yyyymmddhh_variable(ngx_http_request_t *r,
        ngx_http_variable_value_t *v, uintptr_t data)
    {
        u_char *line, *p;
    
        line = ngx_palloc(r->pool, sizeof("yyyymmddhh")-1);
    
        if(line == NULL) {
            return NGX_ERROR;    
        }
    
        p = ngx_copy(line, ngx_cached_http_log_iso8601.data, sizeof("yyyy")-1);
        p = ngx_copy(p, ngx_cached_http_log_iso8601.data + 5, sizeof("mm")-1);
        p = ngx_copy(p, ngx_cached_http_log_iso8601.data + 8, sizeof("dd")-1);
        (void) ngx_copy(p, ngx_cached_http_log_iso8601.data + 11, sizeof("hh")-1);
    
        v->valid = 1;
        v->no_cacheable = 1;
        v->not_found = 0;
    
        v->data = line;
        v->len = sizeof("yyyymmddhh")-1;
    
        return NGX_OK;
    }
    
    static void *
    ngx_http_redislog_create_loc_conf(ngx_conf_t *cf)
    {
        ngx_http_redislog_conf_t  *conf;
    
        conf = ngx_pcalloc(cf->pool, sizeof(ngx_http_redislog_conf_t));
        if (conf == NULL) {
            return NGX_CONF_ERROR;
        }
    
        return conf;
    }
    
    static char *
    ngx_http_redislog_merge_loc_conf(ngx_conf_t *cf, void *parent, void *child)
    {
        ngx_http_redislog_conf_t *prev = parent;
        ngx_http_redislog_conf_t *conf = child;
    
        if(conf->logs || conf->off) {
            return NGX_CONF_OK;
        }
    
        conf->logs = prev->logs;
        conf->off = prev->off;
    
        return NGX_CONF_OK;
    }
    
    static void *
    ngx_redislog_create_conf(ngx_cycle_t *cycle)
    {
        ngx_redislog_conf_t  *slcf;
    
        slcf = ngx_pcalloc(cycle->pool, sizeof(ngx_redislog_conf_t));
        if(slcf == NULL) {
            return NULL;
        }
    
        return slcf;
    }
    
    static ngx_int_t
    ngx_http_redislog_add_variables(ngx_conf_t *cf)
    {
        ngx_http_variable_t  *var, *v;
    
        for (v = ngx_http_redislog_variables; v->name.len; v++) {
    
            var = ngx_http_add_variable(cf, &v->name, v->flags);
            if (var == NULL) {
                return NGX_ERROR;
            }
    
            var->get_handler = v->get_handler;
            var->data = v->data;
        }
    
        return NGX_OK;
    }
    
    static ngx_int_t
    ngx_http_redislog_find_peer_by_name(ngx_conf_t *cf, ngx_str_t *name)
    {
        ngx_redislog_conf_t         *slcf;
        ngx_redislog_peer_conf_t    *pc;
        ngx_uint_t                   i;
    
        slcf = (ngx_redislog_conf_t *) ngx_get_conf(cf->cycle->conf_ctx, ngx_core_redislog_module);
    
        pc = slcf->peers->elts;
    
        for(i = 0; i < slcf->peers->nelts; i++) {
            if(pc[i].name.len == name->len
                && ngx_strncmp(pc[i].name.data, name->data, name->len) == 0)
            {
                return i;
            }
        }
    
        return NGX_DECLINED;
    }
    
    static char *
    ngx_http_redislog_set_log(ngx_conf_t *cf, ngx_command_t *cmd, void *conf)
    {
        ngx_http_redislog_conf_t            *slcf = conf;
    
        ngx_uint_t                          i;
        ngx_str_t                           *value, name, command, arg1, _if;
        //ngx_str_t                           arg_num;
    
        ngx_http_redislog_t                 *log;
        ngx_http_log_fmt_t                  *fmt;
        ngx_http_log_main_conf_t            *lmcf;
        ngx_int_t                           rc;
        ngx_http_compile_complex_value_t    ccv;
        unsigned                            format_set;
    
        format_set = 0;
        value = cf->args->elts;
    
        if (ngx_strcmp(value[1].data, "off") == 0) {
            slcf->off = 1;
            return NGX_CONF_OK;
        }
        slcf->off = 0;
    
        if (slcf->logs == NULL) {
            slcf->logs = ngx_array_create(cf->pool, 2, sizeof(ngx_http_redislog_t));
            if (slcf->logs == NULL) {
                return NGX_CONF_ERROR;
            }
        }
    
        lmcf = ngx_http_conf_get_module_main_conf(cf, ngx_http_log_module);
    
        if(lmcf == NULL) {
            ngx_conf_log_error(NGX_LOG_EMERG, cf, 0,
                               "redislog module requires log module to be compiled in");
            return NGX_CONF_ERROR;
        }
    
        log = ngx_array_push(slcf->logs);
        if (log == NULL) {
            return NGX_CONF_ERROR;
        }
    
        ngx_memzero(log, sizeof(ngx_http_redislog_t));
    
        log->peer_name = value[1];
    
        rc = ngx_http_redislog_find_peer_by_name(cf, &log->peer_name);
    
        if(rc == NGX_DECLINED) {
            ngx_conf_log_error(NGX_LOG_EMERG, cf, 0,
                               "redislog peer %V is not defined", &log->peer_name);
            return NGX_CONF_ERROR;
        }
    
        log->peer_idx = rc;
    
        /*
         * Create and compile key
         */
        log->key = ngx_palloc(cf->pool, sizeof(ngx_http_complex_value_t));
        if(log->key == NULL) {
            return NGX_CONF_ERROR;
        }
    
        ngx_memzero(&ccv, sizeof(ngx_http_compile_complex_value_t));
    
        ccv.cf = cf;
        ccv.value = &value[2];
        ccv.complex_value = log->key;
    
        if(ngx_http_compile_complex_value(&ccv) != NGX_OK) {
            return NGX_CONF_ERROR;
        }
    
        ngx_str_set(&command, "APPEND");
        //ngx_str_set(&arg1, "");
        ngx_str_set(&arg1, "test1");
        //ngx_str_set(&arg_num, "0");
        ngx_str_set(&name, "main");
    
    
        if (cf->args->nelts >= 4) {
            for (i = 3; i < cf->args->nelts; i++) {
    
                if (ngx_strncmp(value[i].data, "format=", 7) == 0) {
    
                    format_set = 1;
    
                    name = value[i];
    
                    name.len -= 7;
                    name.data += 7;
    
                    if (ngx_strcmp(name.data, "combined") == 0) {
                        lmcf->combined_used = 1;
                    }
                    continue;
                }
    
                if (ngx_strncmp(value[i].data, "command=", 8) == 0) {
    
                    command = value[i];
    
                    command.len -= 8;
                    command.data += 8;
    
                    continue;
                }
    
                if (ngx_strncmp(value[i].data, "arg1=", 5) == 0) {
    
                    arg1 = value[i];
    
                    arg1.len -= 5;
                    arg1.data += 5;
    
                    log->has_arg1 = 1;
    
                    continue;
                }
    
                if (ngx_strncmp(value[i].data, "if=", 3) == 0) {
                    if(log->_if != NULL) {
                        continue;
                    }
    
                    _if = value[i];
    
                    _if.len -= 3;
                    _if.data += 3;
    
                    /*
                     * Create and compile if script
                     */
                    log->_if = ngx_palloc(cf->pool, sizeof(ngx_http_complex_value_t));
                    if(log->_if == NULL) {
                        return NGX_CONF_ERROR;
                    }
    
                    ngx_memzero(&ccv, sizeof(ngx_http_compile_complex_value_t));
    
                    ccv.cf = cf;
                    ccv.value = &_if;
                    ccv.complex_value = log->_if;
    
                    if(ngx_http_compile_complex_value(&ccv) != NGX_OK) {
                        return NGX_CONF_ERROR;
                    }
    
                    continue;
                }
    
                if (ngx_strncmp(value[i].data, "ifnot=", 6) == 0) {
                    if(log->ifnot != NULL) {
                        continue;
                    }
    
                    _if = value[i];
    
                    _if.len -= 6;
                    _if.data += 6;
    
                    /*
                     * Create and compile if script
                     */
                    log->ifnot = ngx_palloc(cf->pool, sizeof(ngx_http_complex_value_t));
                    if(log->ifnot == NULL) {
                        return NGX_CONF_ERROR;
                    }
    
                    ngx_memzero(&ccv, sizeof(ngx_http_compile_complex_value_t));
    
                    ccv.cf = cf;
                    ccv.value = &_if;
                    ccv.complex_value = log->ifnot;
    
                    if(ngx_http_compile_complex_value(&ccv) != NGX_OK) {
                        return NGX_CONF_ERROR;
                    }
    
                    continue;
                }
    
                ngx_conf_log_error(NGX_LOG_EMERG, cf, 0,
                                   "invalid parameter \"%V\"", &value[i]);
                return NGX_CONF_ERROR;
            }
        }
    
        if(!format_set) {
            name.len = sizeof(NGX_DEF_FORMAT) - 1;
            name.data = (u_char *) NGX_DEF_FORMAT;
            lmcf->combined_used = 1;
            
        }
    
        log->command = command;
    
        if(log->has_arg1) {
            log->arg1 = arg1;
        }
    
        fmt = lmcf->formats.elts;
        for (i = 0; i < lmcf->formats.nelts; i++) {
            if (fmt[i].name.len == name.len
                && ngx_strcasecmp(fmt[i].name.data, name.data) == 0)
            {
                log->format = &fmt[i];
                goto done;
            }
        }
    
        ngx_conf_log_error(NGX_LOG_EMERG, cf, 0,
                           "unknown log format \"%V\"", &name);
        return NGX_CONF_ERROR;
    
    done:
    
        return NGX_CONF_OK;
    }
    
    static char *
    ngx_http_redislog_command(ngx_conf_t *cf, ngx_command_t *cmd, void *conf)
    {
        ngx_str_t                           *value;
        ngx_redislog_conf_t                 *slcf;
        ngx_url_t                           u;
        ngx_redislog_peer_conf_t            *peer;
        u_char                              *p;
        size_t                              pass_size_len;
    
        slcf = (ngx_redislog_conf_t *) ngx_get_conf(cf->cycle->conf_ctx, ngx_core_redislog_module);
    
        value = cf->args->elts;
    
        ngx_memzero(&u, sizeof(ngx_url_t));
    
        u.url = value[2];
        u.default_port = 6379;
        u.no_resolve = 0;
    
        if(ngx_parse_url(cf->pool, &u) != NGX_OK) {
            ngx_conf_log_error(NGX_LOG_EMERG, cf, 0, "%V: %s", &u.host, u.err);
            return NGX_CONF_ERROR;
        }
    
        if(slcf->peers == NULL) {
            slcf->peers = ngx_array_create(cf->pool, 2, sizeof(ngx_redislog_peer_conf_t));
            if (slcf->peers == NULL) {
                return NGX_CONF_ERROR;
            }
        }
    
        peer = ngx_array_push(slcf->peers);
        if(peer == NULL) {
            return NGX_CONF_ERROR;
        }
    
        peer->name = value[1];
        peer->sockaddr = u.addrs[0].sockaddr;
        peer->socklen = u.addrs[0].socklen;
    
        if(cf->args->nelts >= 4) {
            /*
             * Alloc space for authentication packet and create it
             */
            pass_size_len = ngx_redislog_size_len(value[3].len);
    
            peer->password.len = sizeof(NGX_REDIS_AUTH)-1 + 1 + pass_size_len
                + sizeof(CRLF)-1 + value[3].len + sizeof(CRLF)-1;
    
            peer->password.data = ngx_palloc(cf->pool, peer->password.len);
            if(peer->password.data == NULL) {
                return NGX_CONF_ERROR;
            }
    
            p = ngx_copy(peer->password.data, NGX_REDIS_AUTH, sizeof(NGX_REDIS_AUTH)-1);    
            
            *p++ = '$';
    
            p = ngx_redislog_size(p, p + pass_size_len, value[3].len);
    
            p = ngx_copy(p, CRLF, sizeof(CRLF)-1);
    
            p = ngx_copy(p, value[3].data, value[3].len);
    
            p = ngx_copy(p, CRLF, sizeof(CRLF)-1);
    
            peer->authenticate = 1;
        }
    
        peer->write_timeout     = 30000;
        peer->read_timeout      = 30000;
        peer->connect_timeout   = 30000;
        peer->reconnect_timeout = 5000;
        peer->flush_timeout     = 2000;
        peer->ping_timeout      = 30000;
    
        peer->bufs.num          = 200;
        peer->bufs.size         = 2048;
        peer->recv_buf_size     = 1024;
    
        return NGX_CONF_OK;
    }
    
    static ngx_int_t
    ngx_http_redislog_init(ngx_conf_t *cf)
    {
        ngx_http_core_main_conf_t    *cmcf;
        ngx_http_handler_pt          *h;
    
        cmcf = ngx_http_conf_get_module_main_conf(cf, ngx_http_core_module);
    
        h = ngx_array_push(&cmcf->phases[NGX_HTTP_LOG_PHASE].handlers);
        if (h == NULL) {
            return NGX_ERROR;
        }
    
        *h = ngx_http_redislog_handler;
    
        return NGX_OK;
    }
    
    static ngx_int_t
    ngx_redislog_init_process(ngx_cycle_t *cycle)
    {
        ngx_int_t                           rc;
        ngx_redislog_conf_t                *slcf;
        ngx_uint_t                          i;
        ngx_redislog_peer_conf_t           *pc;
        ngx_redislog_peer_t                *peer, **ppeer;
    
        slcf = (ngx_redislog_conf_t *) ngx_get_conf(cycle->conf_ctx, ngx_core_redislog_module);
    
        if(slcf->peers == NULL || slcf->peers->nelts == 0) {
            return NGX_OK;
        }
    
        rc = ngx_array_init(&ngx_redislog_peers, cycle->pool,
            slcf->peers->nelts, sizeof(ngx_redislog_peer_t*));
    
        if(rc != NGX_OK) {
            return rc;
        }
    
        pc = slcf->peers->elts;
    
        for(i = 0; i < slcf->peers->nelts; i++) {
            ppeer = ngx_array_push(&ngx_redislog_peers);
    
            if(ppeer == NULL) {
                return NGX_ERROR;
            }
    
            peer = ngx_pcalloc(cycle->pool, sizeof(ngx_redislog_peer_t));
    
            if(peer == NULL) {
                return NGX_ERROR;
            }
    
            peer->free = ngx_create_chain_of_bufs(cycle->pool, &pc[i].bufs);
    
            if(peer->free == NULL) {
                return NGX_ERROR;
            }
    
            peer->recv_buf = ngx_create_temp_buf(cycle->pool, pc[i].recv_buf_size);
    
            if(peer->recv_buf == NULL) {
                return NGX_HTTP_INTERNAL_SERVER_ERROR;
            }
    
            *ppeer = peer;
    
            peer->pool = cycle->pool;
            peer->conf = &pc[i];
            peer->log = cycle->log;
    
            peer->reconnect_timeout = pc[i].reconnect_timeout;
    
            ngx_redislog_connect_peer(peer);
        }
        
        return NGX_OK;
    }
    
    



    Пример конфигурации (пишем с ключом «$remote_addr^$status^$connection^$connection_requests^$msec» значение «$http_user_agent», которое будет «жить» 20 секунд):
    root@redis:~ # cat /usr/local/etc/nginx/nginx.conf | egrep "(access_redislog|log_format)" | grep -v "#"
    log_format  main  '$http_user_agent';
    access_redislog test $remote_addr^$status^$connection^$connection_requests^$msec command=SETEX arg1=20;
    

    Мониторим в redis:
    root@redis:~ # redis-cli
    redis 127.0.0.1:6379> monitor
    OK
    1389288187.176047 [0 78.72.78.56:31865] "SETEX" "78.72.78.209^200^9^1^1389288185.175" "20" "Mozilla/5.0 (Windows NT 6.3; WOW64) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/31.0.1650.63 Safari/537.36" 
    


    Решаем аналогичную задачу с помощью EVALSHA (просто для примера, конечно, так делать не надо):
    Создаём ключ:
    # sha1 -s "return {redis.call('SET',KEYS[1],ARGV[1]); redis.call('EXPIRE',KEYS[1],20);}"
    SHA1 ("return {redis.call('SET',KEYS[1],ARGV[1]); redis.call('EXPIRE',KEYS[1],20);}") = c82db723c86778baa89099e1b65ebf21cb48ce34
    # cat /usr/local/etc/nginx/nginx.conf | egrep "(access_redislog|log_format)" | grep -v "#"
    log_format  main  '$http_user_agent';
    access_redislog test $remote_addr^$status^$connection^$connection_requests^$msec command=EVALSHA arg1=c82db723c86778baa89099e1b65ebf21cb48ce34;
    


    Мониторим:
    1389290907.814024 [0 78.72.78.56:21435] "EVALSHA" "c82db723c86778baa89099e1b65ebf21cb48ce34" "1" "78.72.78.209^200^9^2^1389290905.813" "Mozilla/5.0 (Windows NT 6.3; WOW64) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/31.0.1650.63 Safari/537.36"
    1389290907.814091 [0 lua] "SET" "78.72.78.209^200^9^2^1389290905.813" "Mozilla/5.0 (Windows NT 6.3; WOW64) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/31.0.1650.63 Safari/537.36"
    1389290907.814132 [0 lua] "EXPIRE" "78.72.78.209^200^9^2^1389290905.813" "20"
    


    Обе операции (SETEX и EVALSHA) атомарны, но, естественно, при наличии нативного SETEX дёргать LUA для таких задач неразумно.
    Для наглядности пару стресс-тестов:

    С использованием нативного SETEX
    ab
    # ab -n 100000 -c 4 -v 0 http://127.0.0.1:80/
    This is ApacheBench, Version 2.3 <$Revision: 655654 $>
    Copyright 1996 Adam Twiss, Zeus Technology Ltd, http://www.zeustech.net/
    Licensed to The Apache Software Foundation, http://www.apache.org/
    
    Benchmarking 127.0.0.1 (be patient)
    Completed 10000 requests
    Completed 20000 requests
    Completed 30000 requests
    Completed 40000 requests
    Completed 50000 requests
    Completed 60000 requests
    Completed 70000 requests
    Completed 80000 requests
    Completed 90000 requests
    Completed 100000 requests
    Finished 100000 requests
    
    
    Server Software:        nginx/1.4.3
    Server Hostname:        127.0.0.1
    Server Port:            80
    
    Document Path:          /
    Document Length:        612 bytes
    
    Concurrency Level:      4
    Time taken for tests:   4.338 seconds
    Complete requests:      100000
    Failed requests:        0
    Write errors:           0
    Total transferred:      84400000 bytes
    HTML transferred:       61200000 bytes
    Requests per second:    23050.90 [#/sec] (mean)
    Time per request:       0.174 [ms] (mean)
    Time per request:       0.043 [ms] (mean, across all concurrent requests)
    Transfer rate:          18998.99 [Kbytes/sec] received
    
    Connection Times (ms)
                  min  mean[+/-sd] median   max
    Connect:        0    0   0.0      0       0
    Processing:     0    0   0.1      0       1
    Waiting:        0    0   0.1      0       1
    Total:          0    0   0.1      0       1
    
    Percentage of the requests served within a certain time (ms)
      50%      0
      66%      0
      75%      0
      80%      0
      90%      0
      95%      0
      98%      0
      99%      0
     100%      1 (longest request)
    



    С использованием SETEX реализованного через EVAL
    ab
    # ab -n 100000 -c 4 -v 0 http://127.0.0.1:80/
    This is ApacheBench, Version 2.3 <$Revision: 655654 $>
    Copyright 1996 Adam Twiss, Zeus Technology Ltd, http://www.zeustech.net/
    Licensed to The Apache Software Foundation, http://www.apache.org/
    
    Benchmarking 127.0.0.1 (be patient)
    Completed 10000 requests
    Completed 20000 requests
    Completed 30000 requests
    Completed 40000 requests
    Completed 50000 requests
    Completed 60000 requests
    Completed 70000 requests
    Completed 80000 requests
    Completed 90000 requests
    Completed 100000 requests
    Finished 100000 requests
    
    
    Server Software:        nginx/1.4.3
    Server Hostname:        127.0.0.1
    Server Port:            80
    
    Document Path:          /
    Document Length:        612 bytes
    
    Concurrency Level:      4
    Time taken for tests:   59.520 seconds
    Complete requests:      100000
    Failed requests:        1413
       (Connect: 0, Receive: 0, Length: 1413, Exceptions: 0)
    Write errors:           0
    Total transferred:      83207428 bytes
    HTML transferred:       60335244 bytes
    Requests per second:    1680.12 [#/sec] (mean)
    Time per request:       2.381 [ms] (mean)
    Time per request:       0.595 [ms] (mean, across all concurrent requests)
    Transfer rate:          1365.22 [Kbytes/sec] received
    
    Connection Times (ms)
                  min  mean[+/-sd] median   max
    Connect:        0    0   1.1      0     251
    Processing:     0    2  13.0      0     251
    Waiting:        0    2  12.9      0     251
    Total:          0    2  13.0      0     253
    
    Percentage of the requests served within a certain time (ms)
      50%      0
      66%      0
      75%      1
      80%      1
      90%      1
      95%      1
      98%     44
      99%     77
     100%    253 (longest request)
    



    Ну и с записью просто в файл (в память)
    ab
    # ab -n 100000 -c 4 -v 0 http://127.0.0.1:80/
    This is ApacheBench, Version 2.3 <$Revision: 655654 $>
    Copyright 1996 Adam Twiss, Zeus Technology Ltd, http://www.zeustech.net/
    Licensed to The Apache Software Foundation, http://www.apache.org/
    
    Benchmarking 127.0.0.1 (be patient)
    Completed 10000 requests
    Completed 20000 requests
    Completed 30000 requests
    Completed 40000 requests
    Completed 50000 requests
    Completed 60000 requests
    Completed 70000 requests
    Completed 80000 requests
    Completed 90000 requests
    Completed 100000 requests
    Finished 100000 requests
    
    
    Server Software:        nginx/1.4.3
    Server Hostname:        127.0.0.1
    Server Port:            80
    
    Document Path:          /
    Document Length:        612 bytes
    
    Concurrency Level:      4
    Time taken for tests:   4.544 seconds
    Complete requests:      100000
    Failed requests:        0
    Write errors:           0
    Total transferred:      84400000 bytes
    HTML transferred:       61200000 bytes
    Requests per second:    22007.62 [#/sec] (mean)
    Time per request:       0.182 [ms] (mean)
    Time per request:       0.045 [ms] (mean, across all concurrent requests)
    Transfer rate:          18139.09 [Kbytes/sec] received
    
    Connection Times (ms)
                  min  mean[+/-sd] median   max
    Connect:        0    0   0.0      0       0
    Processing:     0    0   0.0      0       1
    Waiting:        0    0   0.0      0       1
    Total:          0    0   0.0      0       1
    
    Percentage of the requests served within a certain time (ms)
      50%      0
      66%      0
      75%      0
      80%      0
      90%      0
      95%      0
      98%      0
      99%      0
     100%      1 (longest request)
    



    Тем самым вышеописанная реализация хранения с SETEX позволила добиться равных результатов с хранением в файле плюс мы получили авторотацию логов без каких-либо особенных затрат.

    Похожие публикации

    AdBlock похитил этот баннер, но баннеры не зубы — отрастут

    Подробнее
    Реклама

    Комментарии 40

      +2
      Не секрет, что для защиты от HTTP-DDoS зачастую используют связку nginx в качестве фронтенда и некий другой web-сервер в качестве бакенда.

      А зачем нужен другой web-сервер. Приведите пример, пожалуйста, как вы это делаете, почему недостаточно только nginx?
        0
        наверное имелось ввиду что-то типа mod_php + Apache, но тогда достаточно смешно говорить о высокой производительности и ддос-е, так как такая связка или очень древняя, или вряд ли это кому-либо актуальное приложение настолько, чтобы его досить
          –1
          И чем вам Apache + mod_php не угодил? [sarcasm mode on]
            +1
            Вообще из mod_php + Apache можно выжать очень много, если нормально приготовить. Если поставить поверх nginx для отдачи статики и кеша, выходит вполне себе нормально. Оно, конечно, не особо надо т.к. есть fpm, но возможно.
              0
              fpm ни разу не быстрее mod_php, возможно ресурсов жрет поменьше, но не быстрее
                0
                Я, в общем-то, про то же. Они примерно одинаково отрабатывают.
                  +1
                  Оно «быстрее» в плане того, что может переварить большое коннектов при прочих равных. Именно потому что кушает меньше памяти => больше воркеров можно запустить. Отсюда и легенда про быстроту (ибо народ обычно тестит ab с флагом -c и видит крутые цифры).
              0
              Да, собственно, ничего не мешает использовать такое логирование только на нгинкс, но у меня, обычно, всё в связке (apache, WEBrick или тот-же nginx на бакенде) и nginx на высокопроизводительном фронтенде, который их «прикрывает».
                +1
                Какая аргументация использования второго web-сервера? И какой язык программирования, чтобы понять ситуацию?
                  –1
                  apache + mod_rewrite. Когда не хочется переписывать .htaccess у сайтов.
                    +2
                    Странное решение, замедлить работу сайта из-за десятка правил в htaccess? И погуглите nginx htaccess converter.
                      +1
                      Когда сайтов более 50 + есть куча вложенных .htaccess в разных папках — тогда стоит задуматься о том, чтобы переписывать для каждого правила и искать .htaccess'ы.
                    0
                    Прокси — это типичная реализация nginx (http://nginx.org/en/docs/http/ngx_http_proxy_module.html), что используется в качестве бакенда не принципиально. У меня в качестве бакендов есть и webrick(ruby) и apache(php) и IIS и даже небольшой самописанный. Также десяток-другой VPS'ок, которые «упадут» задолго до преславутых C10k.
                      0
                      Можно вопрос? WEBrick у вас же не в продакшене?
                        0
                        Он не совсем у меня, клиентский. Ставили временно, но нет ничего более постоянного, чем временное…
                          0
                          Ну это как-то не серьезно совсем.
                  0
                  Ну, например, есть еще томкат контейнер, или jetty. На pho свет клином не сошелся. За nginx может быть и несколько разношерстных серверов.
                  +1
                  Тем самым вышеописанная реализация хранения с SETEX позволила добиться равных результатов с хранением в файле
                  Или вы в обоих случаях протестировали производительность ab.
                    0
                    Вы имеете ввиду, что я не «довёл» сохранялку до неработоспособного состояния, чтобы понять в каком случае раньше начнутся проблемы? Да, возможно, стоило попытаться, но задача сохранения логов стоит не только в самом хранении, а ещё и в анализе/выборке/парсинге и ротации этих логов.
                    Если же сравнивать голое сохранение, то здесь, пожалуй надо использовать SET, а не SETEX.
                      0
                      Я имею в виду, что обычно один поток ab не способен полностью нагрузить даже одного рабочего процесса nginx в hello world тестах. Поэтому на основе представленных данных я лично не могу сделать тот вывод, что был процитирован.
                        0
                        Да, согласен, вывод несколько преждевремен.
                        Изначально меня интересовало сравнение SETEX/EVALSHA, где этого теста оказалось достаточно.
                        Попытаюсь завтра нагрузить сильнее.
                          0
                          ab запустил с 9 PC непрерывно и с локального снимаю статистику
                          redis
                          # ab -n 100000 -c 16 -v 0 127.0.0.1:80/
                          Time taken for tests: 20.768 seconds
                          Complete requests: 100000
                          Failed requests: 0
                          Write errors: 0
                          Total transferred: 84400000 bytes
                          HTML transferred: 61200000 bytes
                          Requests per second: 4815.09 [#/sec] (mean)
                          Time per request: 3.323 [ms] (mean)
                          Time per request: 0.208 [ms] (mean, across all concurrent requests)
                          Transfer rate: 3968.69 [Kbytes/sec] received

                          Connection Times (ms)
                          min mean[±sd] median max
                          Connect: 0 2 0.3 2 5
                          Processing: 1 2 0.3 2 6
                          Waiting: 1 2 0.3 2 5
                          Total: 2 3 0.4 3 8

                          Percentage of the requests served within a certain time (ms)
                          50% 3
                          66% 3
                          75% 4
                          80% 4
                          90% 4
                          95% 4
                          98% 4
                          99% 4
                          100% 8 (longest request)


                          файл
                          # ab -n 100000 -c 16 -v 0 127.0.0.1:80/
                          Concurrency Level: 16
                          Time taken for tests: 19.616 seconds
                          Complete requests: 100000
                          Failed requests: 0
                          Write errors: 0
                          Total transferred: 84400000 bytes
                          HTML transferred: 61200000 bytes
                          Requests per second: 5097.93 [#/sec] (mean)
                          Time per request: 3.139 [ms] (mean)
                          Time per request: 0.196 [ms] (mean, across all concurrent requests)
                          Transfer rate: 4201.81 [Kbytes/sec] received

                          Connection Times (ms)
                          min mean[±sd] median max
                          Connect: 0 2 0.2 2 5
                          Processing: 1 2 0.2 2 5
                          Waiting: 1 2 0.2 2 5
                          Total: 2 3 0.4 3 7

                          Percentage of the requests served within a certain time (ms)
                          50% 3
                          66% 3
                          75% 3
                          80% 3
                          90% 4
                          95% 4
                          98% 4
                          99% 4
                          100% 7 (longest request)


                          оттетстил раз 10, разница в пределах статистической погрешности, большей нагрузки создать нечем
                            0
                            Гляньте github.com/wg/wrk
                            Но думаю результат не сильно изменится.
                              0
                              А при этом ещё сравнивали сколько было залогировано? Как я написал ниже, если я не ошибся, то модуль начинает просто дропать логи, когда не справляется с нагрузкой.
                                0
                                Локально сделанные запросы были залогированы все, с других машин не успевал проверить. На неделе оттестирую ещё.
                                  0
                                  Достаточно сопоставить количество залогированых запросов с суммарным количеством запросов сделанных со всех машин.
                                    0
                                    Да это понятно, они у меня просто лились непрерывно просто для нагрузки + тестировал-таки с SETEX 20.
                                  0
                                  «не справляется с нагрузкой» не совсем корректный термин. Модуль отбрасывает записи, когда переполняется временный буфер отправки (400k). Однако предполагается, что сеть, через которую выполняется логгирование, достаточно эффективна, чтобы вовремя относить поступающие записи.

                                  Подобное снижение качества сервиса происходит с логгированием в файл, когда заканчиваются дисковые буферы.
                                    0
                                    Помимо пропускной способности сети, есть ещё на другом конце сам redis, который должен успевать читать.
                          +3
                          Кто-то уже пробовал такую связку в бою?

                          У меня есть ничем не обоснованное подозрение, что всё это может сломаться в самый неподходящий момент. Раз Redis хранит всю свою базу в памяти, значит эта система будет работать до тех пор, пока на сервере с редисом она не закончилась. В случае DDoS-атаки количество запросов резко возрастает. Предположим, что количество залогированных запросов стало таким, что Redis увел сервер в swap и стал тормозить, замедляя работу и самого nginx-а. Наверное, такое возможно?

                          Та же ситуация произойдет, если мы забудем настроить ротацию логов в Redis. В какой-то момент без видимых причин nginx начнет отдавать запросы заметно медленее. А поскольку причина проблемы неочевидна, то разработчик, скорее всего, сначала перероет свое приложение в поисках регрессии, а потом уже вспомнит про логи в Redis.

                          Кроме того, интересно, что произойдет, если Redis отвалится. Насколько я вижу, модуль пытается переустановить соединение. Мне кажется, это катастрофически должно сказаться на производительности сервера (кстати, этот вариант легко проверяется с помощью бенчмарков).

                          Если бы я настраивал что-то подобное, то скорее всего, я бы предпочел, чтобы nginx скидывал логи в обычные предсказуемые файлы, а потом по крону парсил бы их содержимое и запихивал в любую базу (и скорее всего, это был бы не Redis).
                            0
                            В таком виде эта связка родилась сегодня утром )
                            Выложил на хабр именно для того чтобы услышать мнения о возможных доработках.
                            Использовать её планирую только в варианте с SETEX, т.е. ротация априори включена.
                            Вариант с падением redis оттестирую и отпишусь.
                              0
                              Насколько я вижу, модуль просто начинает дропать логи если не справляется с нагрузкой или не удается установить соединение до того, как у него заканчивается буфер.
                              0
                              Если сравнивается SETEX и EVALSHA, то надо было еще добавить MULTI / EXEC.
                                0
                                Не представляю какое преимущество может дать redis для анализа логов. Если какие-то счётчики держать, то ещё можно понять, но целиком логи очевидно удобнее и надёжнее в файл или какую-то систему аггрегации логов (syslog / logstash etc)
                                  0
                                  Поддерживаю ваше мнение. Logstash + ElasticSearch + Kibana = хорошая альтернатива.
                                    0
                                    Частично Вы сами ответили на свой вопрос — можно держать счётчики.
                                    Ну или, например, сделайте выборку из файла всех запросов сделанных с ip 1.2.3.4 к локейшину /test/ за последние 20 сек
                                    А если хранить логи просто «чтобы было», то в redis, конечно, необходимости нет.
                                      +3
                                      Не вижу сложностей в том, чтобы сделать такую выборку: пара грепов, либо один Perl/AWK ну или PyPy + простенький парсер.

                                      grep -E "\[18/Sep/2013:06:50:(0|1)" access.log | grep "GET /api/" | wc -l
                                      
                                      Выборка с 06:50:00 по 06:50:19 (20 секунд) GET запросов к "/api/*"

                                      А со счётчиками проблема в том, что нет гибкости. Захотели вы сделать выборку не за 20 секунд, а за 120 и уже нужно лезть в конфиги nginx, делать релоад, ждать накопления данных.
                                        0
                                        grep -E "\[18/Sep/2013:06:50:(0|1)" access.log | grep «GET /api/» | wc -l
                                        Выборка с 06:50:00 по 06:50:19 (20 секунд) GET запросов к "/api/*"

                                        А если секунды не уложились в одну минуту/день/год? :)
                                        И какова производительность этого комбайна?
                                          +2
                                          Если секунды не уложились, просто пишем более подходящую регулярку.
                                          Учитывая, что там все в конвеере, то фигачиться очень быстро. Не раз приходилось анализировать логи в несколько гигов и более сложными однострочниками, всегда это был вопрос менее минуты. В такой ситуации логи в redis дадут только минусы — отдать несколько гигов ОЗУ ради того, что бы получить результат не за, условно, минуту, а секунду, видится абсолютно нецелесообразно. Дисковый кэш ОСи вещь достаточно умная что бы на неё можно было полагаться.

                                          В общем решение интересно чисто академически. Для продакшена не пригодное, при атаке тем более.
                                    –3
                                    По-моему, это гениально. Правда, очень поможет для фильтрации.

                                    Только полноправные пользователи могут оставлять комментарии. Войдите, пожалуйста.

                                    Самое читаемое