Как стать автором
Обновить

Комментарии 22

Спасибо, полезно
А почему «проксировать из коробки nginx не может», разве модуль stream proxy не решает эту проблему? Поидее ведь достаточно в kafka настроить advertised.listeners на хост и порты nginx для проброса траффика tcp, или так не работает?
Изначально пробовал настроить передачу сообщений с помощью этого модуля, но не смог в нем реализовать механизм распределения сообщений по топикам. Калтуровский модуль как раз предоставляет такую возможность.
да, на уровне nginx не получится в этом случае роутить сообщения, но если использовать нативный клиент kafka, то роутинг и не нужен. А вот проблему с сертификатами решит без сборки дополнительных модулей.

Я только не понял, вы пишите, что «сможете принимать сообщения kafka», а модуль ведь про отправку сообщений в kafka? Как это работает в связке с ElasticSearch? Было бы хорошо увидеть всю схему этого решения.
Заменил формулировку
проксировать в Kafka из коробки веб-сервер не может
в публикации, она была действительно неверная. Вспомнил, что дело было не в роутинге, а то, что мне нужен был SSL. Теперь там указана основная причина, почему все сделано именно так:
Однако, поскольку Kafka не использует протокол HTTP для коммуникаций — REST Proxy от Confluent не в счет из-за условий лицензии, — а SSL-терминация при проксирования TCP доступна только в платной версии Nginx, использовать доступные из коробки модули для проксирования было нельзя.

Проблема с сертификатами связана со схемой шифрования, они добавляются в конфигурацию брокеров Kafk'и, поэтому какой-либо нативный клиент эту проблему решить не сможет, поскольку это отдельная сущность.

вы можете принимать сообщения для Kafka

Если рассматривать формулировку с позиции модуля, то да, технически верно написать — отправка сообщений в Kafk'у, но это взято из контекста. В этой формулировке сделан акцент на саму Kafk'у, она принимает/получает сообщения, Nginx является лишь прослойкой, чтобы обеспечить их прием по HTTP/HTTPS.

Связка Kafka — Elasticsearch будет работать через компонент Kafka Connect. Пока проект еще в разработке, мы тестируем различные коннекторы и по готовности посвятим этому отдельную статью со всеми подробностями.

Поддержу aig на счёт tcp-стрима у Nginx. Плюс в конфиге Kafka может потребоваться выставить параметр advertised.listeners, в котором указывается адрес и порт, на котором будет слушать Nginx. Вот комент из самого конфига:


Hostname and port the broker will advertise to producers and consumers. If not set,
it uses the value for "listeners" if configured. Otherwise, it will use the value
returned from java.net.InetAddress.getCanonicalHostName().
Заменил формулировку по поводу проксирования, она действительно была неверной, основной причиной выбора не в пользу tcp-стрима было отсутсвие SSL-терминации. Мне нужно было шифровать трафик до Nginx'a и не шифровать до Kafk'и, все подрбности теперь в самой публикации в части Зачем это нужно
> Мне нужно было шифровать трафик до Nginx'a и не шифровать до Kafk'и,

М? Это же работает из коробки в бесплатной версии?
Ничего не понял.

stream {
    server {
    listen 41194 ssl ;

    ssl_certificate      fullchain.pem ;
    ssl_certificate_key  privkey.pem ;
    ssl_dhparam          ssl/dhparams.pem;
    ssl_verify_depth     1;
    ssl_session_cache    shared:SSL1:10m;
    ssl_session_timeout  10m;

    ssl_protocols TLSv1.2 TLSv1.3;
    ssl_prefer_server_ciphers on;

    ssl_ciphers ECDH+AESGCM:DH+AESGCM:ECDH+AES256:DH+AES256:ECDH+AES128:DH+AES:ECDH+3DES:DH+3DES:RSA+AESGCM:RSA+AES:RSA+3DES:!aNULL:!MD5:!DSS;

       proxy_pass 127.0.0.2:22 ;
   }
}


# openssl s_client -quiet -connect 127.0.0.1:41194 2>/dev/null
SSH-2.0-OpenSSH_7.4p1 Debian-10+deb9u3
^C

Я тоже удивился, пошёл проверил, всё работает в бесплатной версии. Скорее всего документация устарела.
Ваша правда — да, это действительно работает, не без танцев, но было действительно интересное исследование.

Что по итогу получилось: tcp-стрим с SSL-терминацией действительно настроить можно, но сразу записать что-либо в топик не удалось, при трейсинге любого клиента, я заметил интересную особенность их поведения (не важно на чем они написаны: python, go, java и какую либу внутри себя используют). В общем, при инициализации первого соединения клиентом к брокеру, он проходил через nginx и в нем он запрашивают общую инфу о брокере, хостнейм брокера у меня является так же fqdn'ом, клиент получал тем самым fqdn брокера, резолвил его и вторым запросом уже устанавливал коннект к брокеру напрямую, миную nginx, при этом в логах кафки с трейсингом, никакой инфы о том, что клиент хочет писать с SSL. Там буквально ни слова об этом, но из-за этого клиент записать в топик ничего не мог, поскольку в клиенте я указал security_protocol='SSL', а брокер у меня работает по PLAINTEXT. Xтобы обмануть клиента, я на клиенте в хостс прописал fqdn брокера с адресом самого nginx'a и запись в топики пошла. Подампил трафик между клиентом, nginx'ом и кафкой, и действительно теперь вижу шифрованный трафик от клиента до nginx'a и не шифрованный от nginx'a до кафки. Это действительно работает, благодарю за комментарии, получил интересный опыт. :)

Прикладую трейс сети до хука в хостс:
Скрытый текст
strace -s100000 -f -e trace=network python3 k_send.py
strace: Process 18077 attached
[pid 18077] +++ exited with 0 +++
--- SIGCHLD {si_signo=SIGCHLD, si_code=CLD_EXITED, si_pid=18077, si_uid=0, si_status=0, si_utime=0, si_stime=0} ---
socketpair(AF_UNIX, SOCK_STREAM|SOCK_CLOEXEC, 0, [3, 4]) = 0
socket(AF_INET, SOCK_STREAM|SOCK_CLOEXEC, IPPROTO_IP) = 6
setsockopt(6, SOL_TCP, TCP_NODELAY, [1], 4) = 0
connect(6, {sa_family=AF_INET, sin_port=htons(9092), sin_addr=inet_addr("127.0.0.1")}, 16) = -1 EINPROGRESS (Operation now in progress)
connect(6, {sa_family=AF_INET, sin_port=htons(9092), sin_addr=inet_addr("127.0.0.1")}, 16) = 0
getsockname(6, {sa_family=AF_INET, sin_port=htons(60682), sin_addr=inet_addr("127.0.0.1")}, [16]) = 0
getpeername(6, {sa_family=AF_INET, sin_port=htons(9092), sin_addr=inet_addr("127.0.0.1")}, [16]) = 0
getsockopt(6, SOL_SOCKET, SO_TYPE, [1], [4]) = 0
getpeername(6, {sa_family=AF_INET, sin_port=htons(9092), sin_addr=inet_addr("127.0.0.1")}, [16]) = 0
strace: Process 18078 attached
[pid 18076] sendto(4, "x", 1, 0, NULL, 0) = 1
[pid 18076] sendto(4, "x", 1, 0, NULL, 0) = 1
[pid 18078] recvfrom(3, "xx", 1024, 0, NULL, NULL) = 2
[pid 18078] recvfrom(3, 0x7fa4d40032a0, 1024, 0, NULL, NULL) = -1 EAGAIN (Resource temporarily unavailable)
[pid 18076] sendto(4, "x", 1, 0, NULL, 0) = 1
[pid 18076] sendto(4, "x", 1, 0, NULL, 0) = 1
[pid 18078] recvfrom(3, "xx", 1024, 0, NULL, NULL) = 2
[pid 18078] recvfrom(3, 0x7fa4d4005e60, 1024, 0, NULL, NULL) = -1 EAGAIN (Resource temporarily unavailable)
[pid 18078] socket(AF_UNIX, SOCK_STREAM|SOCK_CLOEXEC|SOCK_NONBLOCK, 0) = 7
[pid 18078] connect(7, {sa_family=AF_UNIX, sun_path="/var/run/nscd/socket"}, 110) = -1 ENOENT (No such file or directory)
[pid 18078] socket(AF_UNIX, SOCK_STREAM|SOCK_CLOEXEC|SOCK_NONBLOCK, 0) = 7
[pid 18078] connect(7, {sa_family=AF_UNIX, sun_path="/var/run/nscd/socket"}, 110) = -1 ENOENT (No such file or directory)
[pid 18078] socket(AF_INET, SOCK_DGRAM|SOCK_CLOEXEC|SOCK_NONBLOCK, IPPROTO_IP) = 7
[pid 18078] setsockopt(7, SOL_IP, IP_RECVERR, [1], 4) = 0
[pid 18078] connect(7, {sa_family=AF_INET, sin_port=htons(53), sin_addr=inet_addr("172.28.0.253")}, 16) = 0
[pid 18078] sendmmsg(7, [{msg_hdr={msg_name=NULL, msg_namelen=0, msg_iov=[{iov_base="\6\364\1\0\0\1\0\0\0\0\0\0\17kafka-0-15-186\2back\2gcorelabs\3local\0\0\1\0\1", iov_len=43}], msg_iovlen=1, msg_controllen=0, msg_flags=0}, msg_len=43}, {msg_hdr={msg_name=NULL, msg_namelen=0, msg_iov=[{iov_base="\354\17\1\0\0\1\0\0\0\0\0\0\17kafka-0-15-186\2back\2gcorelabs\3local\0\0\34\0\1", iov_len=43}], msg_iovlen=1, msg_controllen=0, msg_flags=0}, msg_len=43}], 2, MSG_NOSIGNAL) = 2
[pid 18078] recvfrom(7, "\6\364\201\200\0\1\0\1\0\2\0\4\17kafka-0-15-186\2back\2gcorelabs\3local\0\0\1\0\1\300\f\0\1\0\1\0\0\1\32\0\4\n\\@\272\300\37\0\2\0\1\0\0\1\25\0\23\3ns1\tgcorelabs\3net\0\300\37\0\2\0\1\0\0\1\25\0\23\3ns2\4ns\10services\0\300f\0\1\0\1\0\0\1f\0\4\\\337M5\300G\0\1\0\1\0\0\1f\0\4\\\337d5\300f\0\34\0\1\0\09\206\0\20*\3\220\300\231\220\0\0\0\0\0\0\0\0 S\300G\0\34\0\1\0\1\213\6\0\20*\3\220\300\231\220\0\0\0\0\0\0\0\0\20S", 2048, 0, {sa_family=AF_INET, sin_port=htons(53), sin_addr=inet_addr("172.28.0.253")}, [28->16]) = 209
[pid 18078] recvfrom(7, "\354\17\201\200\0\1\0\0\0\1\0\0\17kafka-0-15-186\2back\2gcorelabs\3local\0\0\34\0\1\300\34\0\6\0\1\0\0\1,\0>\3ns1\tgcorelabs\3net\0\7support\tgcorelabs\3com\0`\267\307\16\0\0\25\30\0\0\25\30\0\22u\0\0\0\1,", 65536, 0, {sa_family=AF_INET, sin_port=htons(53), sin_addr=inet_addr("172.28.0.253")}, [28->16]) = 117
[pid 18078] socket(AF_INET, SOCK_STREAM|SOCK_CLOEXEC, IPPROTO_IP) = 7
[pid 18078] setsockopt(7, SOL_TCP, TCP_NODELAY, [1], 4) = 0
[pid 18078] connect(7, {sa_family=AF_INET, sin_port=htons(9092), sin_addr=inet_addr("10.0.15.186")}, 16) = -1 EINPROGRESS (Operation now in progress)
[pid 18078] connect(7, {sa_family=AF_INET, sin_port=htons(9092), sin_addr=inet_addr("10.0.15.186")}, 16) = -1 EALREADY (Operation already in progress)
[pid 18078] connect(7, {sa_family=AF_INET, sin_port=htons(9092), sin_addr=inet_addr("10.0.15.186")}, 16) = 0
[pid 18078] getsockname(7, {sa_family=AF_INET, sin_port=htons(42378), sin_addr=inet_addr("10.0.15.140")}, [16]) = 0
[pid 18078] getpeername(7, {sa_family=AF_INET, sin_port=htons(9092), sin_addr=inet_addr("10.0.15.186")}, [16]) = 0
[pid 18078] getsockopt(7, SOL_SOCKET, SO_TYPE, [1], [4]) = 0
[pid 18078] getpeername(7, {sa_family=AF_INET, sin_port=htons(9092), sin_addr=inet_addr("10.0.15.186")}, [16]) = 0
[pid 18076] sendto(4, "x", 1, 0, NULL, 0) = 1
[pid 18078] +++ exited with 0 +++
+++ exited with 0 +++

И после:
Скрытый текст
strace -s100000 -f -e trace=network python3 k_send.py
strace: Process 18208 attached
[pid 18208] +++ exited with 0 +++
--- SIGCHLD {si_signo=SIGCHLD, si_code=CLD_EXITED, si_pid=18208, si_uid=0, si_status=0, si_utime=0, si_stime=0} ---
socketpair(AF_UNIX, SOCK_STREAM|SOCK_CLOEXEC, 0, [3, 4]) = 0
socket(AF_INET, SOCK_STREAM|SOCK_CLOEXEC, IPPROTO_IP) = 6
setsockopt(6, SOL_TCP, TCP_NODELAY, [1], 4) = 0
connect(6, {sa_family=AF_INET, sin_port=htons(9092), sin_addr=inet_addr("127.0.0.1")}, 16) = -1 EINPROGRESS (Operation now in progress)
connect(6, {sa_family=AF_INET, sin_port=htons(9092), sin_addr=inet_addr("127.0.0.1")}, 16) = 0
getsockname(6, {sa_family=AF_INET, sin_port=htons(60744), sin_addr=inet_addr("127.0.0.1")}, [16]) = 0
getpeername(6, {sa_family=AF_INET, sin_port=htons(9092), sin_addr=inet_addr("127.0.0.1")}, [16]) = 0
getsockopt(6, SOL_SOCKET, SO_TYPE, [1], [4]) = 0
getpeername(6, {sa_family=AF_INET, sin_port=htons(9092), sin_addr=inet_addr("127.0.0.1")}, [16]) = 0
strace: Process 18209 attached
[pid 18207] sendto(4, "x", 1, 0, NULL, 0) = 1
[pid 18207] sendto(4, "x", 1, 0, NULL, 0) = 1
[pid 18209] recvfrom(3, "xx", 1024, 0, NULL, NULL) = 2
[pid 18209] recvfrom(3, 0x7ff7d40032a0, 1024, 0, NULL, NULL) = -1 EAGAIN (Resource temporarily unavailable)
[pid 18207] sendto(4, "x", 1, 0, NULL, 0) = 1
[pid 18207] sendto(4, "x", 1, 0, NULL, 0) = 1
[pid 18209] recvfrom(3, "xx", 1024, 0, NULL, NULL) = 2
[pid 18209] recvfrom(3, 0x7ff7d4005e60, 1024, 0, NULL, NULL) = -1 EAGAIN (Resource temporarily unavailable)
[pid 18209] socket(AF_UNIX, SOCK_STREAM|SOCK_CLOEXEC|SOCK_NONBLOCK, 0) = 7
[pid 18209] connect(7, {sa_family=AF_UNIX, sun_path="/var/run/nscd/socket"}, 110) = -1 ENOENT (No such file or directory)
[pid 18209] socket(AF_UNIX, SOCK_STREAM|SOCK_CLOEXEC|SOCK_NONBLOCK, 0) = 7
[pid 18209] connect(7, {sa_family=AF_UNIX, sun_path="/var/run/nscd/socket"}, 110) = -1 ENOENT (No such file or directory)
[pid 18209] socket(AF_INET, SOCK_STREAM|SOCK_CLOEXEC, IPPROTO_IP) = 7
[pid 18209] setsockopt(7, SOL_TCP, TCP_NODELAY, [1], 4) = 0
[pid 18209] connect(7, {sa_family=AF_INET, sin_port=htons(9092), sin_addr=inet_addr("10.0.15.140")}, 16) = -1 EINPROGRESS (Operation now in progress)
[pid 18209] connect(7, {sa_family=AF_INET, sin_port=htons(9092), sin_addr=inet_addr("10.0.15.140")}, 16) = 0
[pid 18209] getsockname(7, {sa_family=AF_INET, sin_port=htons(33920), sin_addr=inet_addr("10.0.15.140")}, [16]) = 0
[pid 18209] getpeername(7, {sa_family=AF_INET, sin_port=htons(9092), sin_addr=inet_addr("10.0.15.140")}, [16]) = 0
[pid 18209] getsockopt(7, SOL_SOCKET, SO_TYPE, [1], [4]) = 0
[pid 18209] getpeername(7, {sa_family=AF_INET, sin_port=htons(9092), sin_addr=inet_addr("10.0.15.140")}, [16]) = 0
[pid 18207] sendto(4, "x", 1, 0, NULL, 0) = 1
[pid 18209] +++ exited with 0 +++
+++ exited with 0 +++

10.0.15.140 — nginx, 10.0.15.186 — брокер кафки
С интереса — а зачем нужен ngx_http_kafka_log_module? Он чем-то лучше нативного системного сислога?
  • ngx_http_kafka_log_module перенаправляет запросы с Nginx'a в Kafk'у.
  • Syslog стандарт и система управления логами.

Тут не провести параллель для сравнения между ними.
— syslog нативно сливает логи в кафку, с высокой надежностью и эффективностью.
— nginx может писать любые данные в лог в любом произвольном формате нативно.

Вполне себе рабочая схема, без использования третьесторонних модулей.
Схема рабочая — бесспорно. :) Но я делаю лог-сервис для наших пользователей, они могут использовать различные лог шиперы, главное, чтобы они имели возможность писать в Кафку и для меня было важно обеспечить им шифрованный канал. А далее, как в статье написано, поскольку наши серты кафка не приняла из-за их схемы шифрования, я поработал над костылем, но мы его не используем. Договорились с СБ и конвертнули серты, прописав их в конфигурацию Кафки. Nginx нам теперь не нужен, а сама статья больше для поделиться опытом и думаю будет кому-нибудь полезной.
Ну да, валидно.
Мой поинт был больше про то, что если меньше читать SO и не хватать первый же нагугленный модуль — а подумать, как скрестить уже имеющиеся системные средства — можно строить очень гибкие и эффективные пайплайны.
Собственно, солюшины какие-либо я для этого не читал, модуль нашел не гуглением, а просмотром доступных реп в гитхабе и исследованием их.

Вся проблема лишь в том, что у меня нет углубленного опыта работы с nginx'ом, поскольку я больше ориентируюсь на проекты связанные с виртуализацией, а для проксирования ранее больше использовал haproxy. Nginx в основном ранее настраивал для простых вещей и меня ввела в заблуждение сама его документации, что ssl терминация для проксирования tcp доступна в его платной версии (ссылку я скидывал в одном из комментов и она есть в статье, там конечно явно это не написано, но в prerequisites не упоминается опенсорсный nginx).

В остальном, я видел немалое количестве проектов, чтобы предположить, что не всегда есть возможность, работая с «динозавровым» легаси «строить очень гибкие и эффективные пайплайны» и возможно в одном из таких проектов кому-то будет удобно задействовать кафку именно таким образом. Статья не истинна в последней инстанции и сама Кафка обладает достаточной гибкостью, чтобы в нее писать так как захочется желающему работать с ней и не использовать то, что я написал. Это лишь +еще один способ.

Cant make with nginx 1.24.1 can anybody help?

What the problem do you have with nginx 1.24.1?

Само решение в статье очень интересное, спасибо. Сам так хотел сделать.

Но, если я правильно понимаю, SSL не поддерживается модулем, только если кафка без авторизации можно в неё слать?
При этом вроде как и lua-resty-kafka не умеет. Кто-то имел опыт с альтернативными решениями?

Зарегистрируйтесь на Хабре, чтобы оставить комментарий