Визуализация атак на базе ELK (elasticsearch, kibana, logstash)

    Доброго времени суток. В процессе обслуживания большого количества серверов возникает необходимость централизации управления всем зоопарком машин, а так же централизованный сбор логов и аналитика логов на предмет выявления аномалий, ошибок и общей статистики.

    В качестве централизованного сбора логов используется rsyslog, а для структурирования и визуализации elasticsearch + kibana. Все бы ничего, но когда количество подключенных машин разрастается, то данных настолько много, что уходит (уходило) большое количество времени на их обработку и анализ. Наряду с другими интересными штуками всегда хотелось организовать свой центр безопасности. Этакая мультимониторная статистика с картами, графиками и прочим.

    В данной статье хочу описать свой опыт создания монитора статистики по атакам на ссш. Не будем рассматривать в этом плане защиту, потому как программисты и прочие могут не уметь подключаться на нестандартные порты или пользоваться сертификатами.

    Так как у нас уже есть развернутый elastic с kibana, то будем на его базе и городить нашу систему.

    Итак, у нас уже есть установленный докер и docker-compose, значит будем поднимать сервисы на нем.

    elastic:

    elasticsearch:
      build: elasticsearch:2.3.4
      container_name: elastic
      command: elasticsearch -Des.network.host=0.0.0.0
      net: host
      ports:
        - "9200:9200"
        - "9300:9300"
      volumes:
        - "/srv/docker/elastic/etc:/usr/share/elasticsearch/config"
        - "/srv/docker/elastic/db:/usr/share/elasticsearch/data"
        - "/etc/localtime:/etc/localtime:ro"
      restart: always
      environment:
        - ES_HEAP_SIZE=2g
    

    /srv/docker/elastic/elasticsearch.yml:

    cluster.name: Prod
    node.name: "central-syslog"
    http.port: 9200
    network.host: _non_loopback_
    discovery.zen.ping.multicast.enabled: false
    discovery.zen.ping.unicast.hosts: [
     ]
    transport.publish_host: 0.0.0.0
    #transport.publish_port: 9300
    http.cors.enabled : true
    http.cors.allow-origin : "*"
    http.cors.allow-methods : OPTIONS, HEAD, GET, POST, PUT, DELETE
    http.cors.allow-headers : X-Requested-With,X-Auth-Token,Content-Type, Content-Length
    script.engine.groovy.inline.aggs: on
    

    /srv/docker/elastic/logging.yml:

    logger:
      action: DEBUG
      com.amazonaws: WARN
    appender:
      console:
        type: console
        layout:
          type: consolePattern
          conversionPattern: "[%d{ISO8601}][%-5p][%-25c] %m%n"
    

    kibana:

    kibana:
      image: kibana
      restart: always
      container_name: kibana
      environment:
        SERVICE_NAME: 'kibana'
        ELASTICSEARCH_URL:  "http://x.x.x.x:9200"
      ports:
        - "4009:5601"
      volumes:
        - "/etc/localtime:/etc/localtime:ro"
    

    logstash:

    logstash:
     image: logstash:latest
     restart: always
     container_name: logstash
     hostname: logstash
     ports:
      - "1025:1025"
      - "1026:1026"
     volumes:
      - "/srv/docker/logstash/logstash.conf:/etc/logstash.conf:ro"
      - "/srv/docker/logstash/ssh-map.json:/etc/ssh-map.json:ro"
     command: "logstash -f /etc/logstash.conf"
    

    Итак. Мы запустили эластик и кибану, но осталось подготовить логстеш к обработке логов от внешних серверов. Я реализовал вот такую схему:

    rsyslog → logstash → elastic → kibana.

    Можно было бы использовать встроенный в rsyslog коннектор напрямую в эластик, но нам нужны данные по полям и с geoip для статистики.

    На серверах, подключаемых к мониторингу вносим в конфиг rsyslog (обычно это /etc/rsyslog.d/50-default.conf) вот такую запись:

    auth,authpriv.*                 @@x.x.x.x:1026
    

    Этой записью мы отправляем все события об авторизации на наш удаленный сервер (logstash).

    Далее, полученные логстешем логи нам нужно обработать и оформить. Для этого создаем маппинг полей, чтобы на выходе нам было удобно работать (/srv/docker/logstash/ssh-map.json):

    {
            "template": "logstash-*",
            "mappings": {
                    "ssh": {
                            "properties": {
                                    "@timestamp": {
                                            "type": "date",
                                            "format": "strict_date_optional_time||epoch_millis"
                                    },
                                    "@version": {
                                            "type": "string"
                                    },
                                    "username": {
                                            "type": "string"
                                    },
                                    "src_ip": {
                                            "type": "string"
                                    },
                                    "port": {
                                            "type": "long"
                                    },
                            }
                    }
            }
    }
    

    В ходе создания маппинга столкнулся с одним багом логстеша, а именно присвоению полю значения geo_point (при создании своего индекса выставляется значение у geoip.location — float), по которому в дальнейшем будет строиться heatmap на карте. Баг этот зарегистрирован и в качестве workaround мне пришлось использовать шаблон стандартных индексов logstash-*.

    Итак, маппинг у нас есть. Теперь нужно подготовить конфиг logstash, чтобы он фильтровал входящие данные и в нужном формате отдавал в эластик (/srv/docker/logstash/logstash.conf):

    input {
       tcp {
            port => 1026
            type => "security"
       }
    }
    
    filter {
      grok {
        match => ["message", "Failed password for (invalid user |)%{USERNAME:username} from %{IP:src_ip} port %{BASE10NUM:port} ssh2"]
        add_tag => "ssh_brute_force_attack"
      }
      grok {
        match => ["message", "Accepted password for %{USERNAME:username} from %{IP:src_ip} port %{BASE10NUM:port} ssh2"]
        add_tag => "ssh_sucessful_login"
      }
      geoip {
        source => "src_ip"
      }
    }
    output {
       if "ssh_brute_force_attack" in [tags] {
             elasticsearch {
                    hosts => ["x.x.x.x:9200"]
                    index => "logstash-%{+YYYY.MM.dd}"
                    manage_template => true
                    template_name => "ssh"
                    template => "/etc/ssh-map.json"
                    template_overwrite => true
             } 
       }
    }
    

    Конфиг удобочитаемый, понятный, поэтому комментировать считаю излишним.

    Итак. Логи ссш попадают в логстеш, он их обрабатывает и отправляет в индексы эластика. Осталось только настроить визуализацию:

    — Открываем веб интерфейс по адресу x.x.x.x:4009/
    — Переходим в Settings и добавляем работу с нашими индексами (logstash-*)

    Далее нам нужно в kibana создать поисковые запросы, визуализацию и дашборд.

    Во вкладке Discover после добавления индексов в kibana мы видим наши записи — все настроили верно.

    В левой колонке мы видим список полей для фильтрации, с ними и будем работать.

    Первым фильтром пойдет список атакуемых серверов:

    — около поля host нажимаем add
    — сохраняем поиск как ssh-brute-servers-under-attack (имя вариативно)

    Вторым фильтром будет список атакующих стран:

    — около поля geoip.country_name нажимаем add
    — сохраняем как ssh-brute-countries (имя вариативно)

    Третьим фильтром будет общее количество атак:

    — переходим на вкладку Discovery
    — сохраняем как ssh-brute-all

    Итак, на финальном экране у нас будет отображаться четыре разных параметра:

    1. Суммарное количество атак
    2. Атакующие страны
    3. Атакуемые серверы
    4. Карта с указателями на атакующие хосты

    Суммарное количество атак:

    — переходим на вкладку Visualize
    — выбираем тип визуализации Metric
    — From saved search — ssh-brute-all
    — Открываем Metric и меняем значение поля на — Суммарное количество атак
    — Сохраняем визуализацию

    Атакующие страны:

    — переходим на вкладку Visualize
    — выбираем тип визуализации Data table
    — From saved search — ssh-brute-countries
    — Открываем Metric и меняем значение поля на — Количество атак
    — Теперь нам нужно соотнести поля и посчитать в таблице «уники». Нажимаем split rows
    — Aggregation — terms
    — Field — geoip.country_name.raw
    — Custom label — Страна

    Если все ввели верно, то загорится зеленая кнопка play, после нажатия на которую увидим примерно такую картину:

    image

    — Сохраняем визуализацию

    Атакуемые серверы:
    — переходим на вкладку Visualize
    — выбираем тип визуализации Data table
    — From saved search — ssh-brute-servers-under-attack
    — Открываем Metric и меняем значение поля на — Количество атак
    — Теперь нам нужно соотнести поля и посчитать в таблице «уники». Нажимаем split rows
    — Aggregation — terms
    — Field — host.raw
    — Custom label — Сервер

    Если все ввели верно, то загорится зеленая кнопка play, после нажатия на которую увидим примерно такую картину:

    image

    — Сохраняем визуализацию

    Карта с указателями на атакующие хосты (самое интересное)

    — переходим на вкладку Visualize
    — выбираем тип визуализации Tile map
    — From new search — Select an index pattern — logstash-*
    — Открываем Geo Coordinates. Если все шаги были верными, автоматически поле Field заполнится geoip.location
    — Переходим в Options
    — Меняем хостинг карт (так как у MapRequest изменились условия и нужно получать токен и дополнительно что-то делать). Ставим галку в — WMS compliant map server
    — Приводим все поля к параметрам:

    WMS Url — basemap.nationalmap.gov/arcgis/services/USGSTopo/MapServer/WMSServer
    WMS layers* — 0
    WMS version* — 1.3.0
    WMS format* — image/png
    WMS attribution — Maps provided by USGS
    WMS styles* — пусто

    В итоге у нас должна отобразиться карта атак:

    image

    — Сохраняем визуализацию

    Теперь у нас всё есть для того, чтобы сделать свой дашборд.

    Переходим на вкладку Dashboard, нажимаем на Add visualization (плюс в круге справа сверху) и добавляем свои сохраненные визуализации на экран и сохраняем экран. Поддерживается Drag'n'Drop. В итоге у меня получился вот такой экран:

    image

    Теперь для подключения новых хостов к системе достаточно будет указывать у них передачу логов авторизации на сервер logstash и при возникновении событий хосты и информация будут добавляться на экран и в эластик.

    При желании вы всегда сможете добавить логи брутфорса контрольных панелей, собрать более детализированную статистику и добавить на экран по аналогии с этой статьей.
    • +19
    • 17,9k
    • 6
    Поделиться публикацией

    Комментарии 6

      +3
      Как раз несколько недель назад был на DevSecCon, где делали что-то сильно похожее на воркшопе. Кому интересно все материалы тут.
        0
        Ожидал увидеть проект Amsterdam (https://github.com/StamusNetworks/Amsterdam) или аналог, а тут сбор SSH логов :)))
          0
          Хорошо, когда есть возможность слушать трафик до хоста, либо с хоста перенаправлять трафик, но бывают случаи, когда это сделать невозможно. Тем более, что сбор логов с хостов и так есть.
          0
          Спасибо за хорошее описание!
          Немного не понял по файлам Docker-a… elastik:, kibana:, logstash: это части одного файла docker-compose.yml?
          Или создаются /srv/docker/elastic/elastik.yml, /srv/docker/kibana/kibana.yml и т.д.?
            0
            Можете для удобства их кинуть в один файл docker-compose.yml и просто пускать все сервисы одной командой — docker-compose up -d
              0
              Спасибо! Еще немного не понятно с полем geoip.country_name, при discovery этого поля нет. Вы подключали какие-то модули?

          Только полноправные пользователи могут оставлять комментарии. Войдите, пожалуйста.

          Самое читаемое