Есть простая, можно сказать, типовая – задача, передать данные из системы «А» в систему «Б». А – классическая legacy-трехзвенка из 00х с IIS-MSSQL, «Б» - новая-нулевая-микросервисная с внутренней шиной на apache kafka и собственным ETL на Apache NiFi, развернута в k8s. Направление передачи – из «А» в «Б», по расписанию , в общем ничего сложного – «Работенка на 5 минут»: идем в NiFi делаем QueryDatabaseTable->PublishKafkaRecord и продолжаем спать – но тут начинаются «Нюансы»(ТМ) в виде ИБ, которая говорит, что прямая интеграция корпоративных систем – харам, архитектуры которой (дикие люди!) не нравится хождение в чужую БД (Подержи моё пиво! Я сто раз так делал!) и прочих скучных регламентов, требующих «наличия аутентификации», «направления установления соединения совпадающего с направлением передачи» и тому подобных глупостей.
И вот тут на сцену выходит корпоративная интеграционная шина – (low|no)-code решение, которое умеет в расписания, подключение к ИС по различным протоколам (в том числе и *dbc), передачу данных с помощью REST\SOAP, аутентификацию, обработку ошибок, алертинг и кучу других вещей. Оооок, шина по расписанию будет ходить в БэДэ (Или не БэДэ – там уже видно будет), забирать данные и передавать… А куда, собственно, передавать?
Первый вариант – «в kafka’у!» хорош примерно всем – кроме реализации. Собственно, бинарный протокол kafka’и шина не умеет, ИБ не умеет в инспекцию этого самого протокола, ingress-nginx контроллер не умеет (Нормально – не умеет, ssl-passthrough в данном случае не очень-то «нормально») в публикацию kafka’и, а согласовывать с ИБ публикацию брокеров через LB – удачи, пацаны. Плюс нормальная аутентификация\авторизация на kafka’е – тот еще геморрой между нами говоря. Отметаем.
Вариант «Бэ!» - делаем отдельный интеграционный сервис на каком-нибудь fastapi – пятью минутами уже не обойдешься, а когда количество интеграций переползет за первый десяток процесс может стать ув-ле-ка-тель-ным. Нет, ничего принципиально невозможного – но in-scale не дешево.
Вариант «Цэ» - используем тот же NiFi. HandleHTTPRequest-PublishKafka-HandleHTTPResponse, можно и за три минуты управиться. Правда аутентификацию «из коробки» он умеет только по сертификатам, а тот же basic уже надо делать на бизнес-слое, а стандартный корпоративный OIDC и вовсе употеешь. Опять же – http-сервер из NiFi такой себе, HA нет (NiFi-cluster он про LB, а не HA\FT), валидацию данных делать не так, чтобы удобно. Можно конечно. Но – душа просит, просит чего-то…
И тут в голову приходит kafka-rest. А почему бы и не да? Рест-адаптер для шины есть, он уже готов и делать его не надо все плюшки «варианта А» без минусов реализации (Не, ну понятно, что своих минусов там будет – лопатой ешь, начиная от stateful consuming и необходимостью работы через sticky-sessions, заканчивая… «Об этом я подумаю завтра!»(с)). Можно сделать за полчаса «из палка-и-веревка», все как мы любим.
Собственно деплой в 4 строчки (KAFKA_REST_BOOTSTRAP_SERVERS и KAFKA_REST_SCHEMA_REGISTRY_URL в переменных) сервис-ингресс… работает. Надо только аутентификацию с авторизацией прикрутить – и можно идти за кофе. Смотрим руководство (REST Proxy Security | Confluent Documentation) – а оно только в basic умеет, авторизация – вообще внутри kafka’и, что – см. выше. Лад-нень-ко, скотч вроде еще не кончился – прикрутим к этому делу oauth2-proxy для аутентификации, а авторизацию запилим на уровне ингресса – сделаем отдельный путь до каждого топика и запретим методы, отличные от POST:
apiVersion: networking.k8s.io/v1 kind: Ingress metadata: name: custom-kafka-rest namespace: <namespace> annotations: nginx.ingress.kubernetes.io/rewrite-target: /topics/custom-int-test nginx.ingress.kubernetes.io/configuration-snippet: | if ($request_method != POST) { return 403; } spec: ingressClassName: nginx rules: - host: <host> http: paths: - path: /krest/topics/custom-int-test pathType: Exact backend: service: name: custom-kafka-oauth2 port: name: http
С запретом методов косо-криво конечно, но работает. Дальше собственно oauth2-proxy – поскольку проверять мы будем по сути только наличие токена и интерактивная работа пользователей нас не интересует – то задача конфигурирования вот этого вот всего значительно упрощается.
Создаем в keycloak клиента, тип confidential, service account + авторизацию включаем, остальное можно убрать, создаем клиентскую роль добавляем её клиенту, проверяем – токен получается нормально. Ок. Идем с этим токеном в oauth2-proxy – не ок, keycloak-oidc ругается на отсутствующий audience, приходится делать еще и соответствующий mapper. Проверяем – вроде норм.
Делаем сайдкар к контейнеру с kafka-rest:
- --skip-jwt-bearer-tokens=true # Звучит страшно, переводится как «пропускаем всех с токеном» - --insecure-oidc-allow-unverified-email=false - --api-route=/topics/custom-int-test # вместо redirect’а по всем неавторизованным запросам к этому адресу отдаем 401 - --show-debug-on-error - --redirect-url=https://<host>/krest/oauth/callback # нафиг не нужно в данном случае – но без него не работае - --scope=openid - --provider=keycloak-oidc # Нам нужен контроль доступа привязанный к ролям - --oidc-issuer-url=https://<host>/auth/realms/<realm> - --http-address=0.0.0.0:4180 - --allowed-role=custom-kafka-rest:Krest #Специфично для KC, для других провайдеров не заработает - --proxy-prefix=/krest/oauth - --upstream=http://127.0.0.1:8082/ - --email-domain=<домен> - --ssl-insecure-skip-verify - --set-basic-auth=false - --client-id=custom-kafka-rest # Не нужно, но без этого не работает - --client-secret='not used' # Не нужно, но без этого не работает - --cookie-secret='not used # Не нужно, но без этого не работает
В процессе приходится подсовывать ненужное-ненужно в конфигурацию для удовлетворения «формальных требований», но в общем ничего сложного. На всякий случай - nginx.ingress.kubernetes.io/auth-signin и nginx.ingress.kubernetes.io/auth-url в аннотацию ингресса класть не нужно – «по условию задачи» мы просто проверяем предоставляемый токен, посылая всех на... 401 в случае его отсутствия.
В процессе выявляется не очень приятная штука – сделать «нормальную авторизацию» на одном oauth2-proxy похоже не получится. Указать несколько апстримов, разделив их путями – можно, указать несколько разрешенных ролей – тоже, а вот сделать так, чтобы в топик А мог писать владе��ец роли Х, а в топик Б – владелец роли У и никак иначе – видимо, нет. Конечно, всегда можно сделать для отдельных ингрессов отдельный сайдкар – но это уже «низкий класс, не чистая работа». Впрочем, пока – «пренебречь, вальсируем!» - аутентификацию с авторизацией мы, с грехом пополам, сделали – теперь надо родить контроль входных данных.
И тут на помощь приходит паркур schema-registry. В авро-фигавро и прочую бинарную сериализацию наша шина не умеет, а вот в json вполне себе. Сервисы, расположенные «за kafka’ой» ожидают получить что-то вроде:
{ "pointName": "125", "events": [ { "Value": 0.000000000000000e000, "timestamp": 1694537700000000000, "quality": 0, "annotatiton": None, }, { "Value": 0.000000000000000e000, "timestamp": 1694537700000000000, "quality": 0, "annotatiton": None, }, ]
Теоретически – типы значений могут быть разными (А conditional-схемы нуууу… эээ… всегда можно сделать несколько схем, правда?) – но у нас достаточно простой случай: что в БД есть, то и кладем – а тип данных в таблице прибит гвоздями. Нарисуем под это дело jsonschema’у:
{ "title": "JSONv3", "description": "Схема данных TSDS", "type": "object", "properties": { "pointName": { "type": "string" }, "events": { "type": "array", "items": { "properties": { "annotation": { "type": "null" }, "value": { "type": "number" }, "timestamp": { "type": "number" }, "quality": { "type": "number" } } } } } }
И запихнем её в schemaregistry.
Дальше необходимо определиться с версией API kafka-rest – их собственно две, вторая и третья (Традиционно - луДшая). Увы, нам подходит именно вторая, т.к. третья работу со схемами не поддерживает. V2 ожидает получить данные в следующем (минимальном) формате: {"records":[{"value":{<"Что":"вам угодно">}}]}. Ну, собственно заворачиваем желаемую сервисом структуру в требуемую REST-proxy обертку и пробуем:
curl -X POST -H "Content-Type: application/vnd.kafka.jsonschema.v2+json" --data '{"value_schema_id": <номер схемы>, "records":[{"value":{<данные>}}]}' "https://<host>/krest/topics/custom-int-test/" –k
#таки да:
<Response [200]> {"offsets":[{"partition":0,"offset":27,"error_code":null,"error":null},{"partition":0,"offset":28,"error_code":null,"error":null}],"key_schema_id":null,"value_schema_id":<номер схемы>}
#А в случае, если мы эти данные чуть-чуть, самую капельку, малость немножко поломаем – таки нет:
<Response [422]> {"error_code":42203,"message":"Conversion of JSON to Object failed: Failed to convert JSON using JSON Schema: #/events/0/quality: expected type: Number, found: String"}
Успех? Хм… не совсем. А что будет, если вместо данных, не соответствующих схеме мы попробуем загрузить данные _без_ схемы? А ничего. Совсем. Нет схемы – нет валидации – нет проблем.
Бида. Бида-бида даже. Нет, теоретически в конфлюэнтовской реализации есть broker-side контроль схем – но он проблемы с отсутствующей схемой не решает. Хм. А как собственно мы понимаем, используется схема или нет? Документация говорит, что по заголовку Content-Type:
application/vnd.kafka[.embedded_format].[api_version]+[serialization_format]
application/vnd.kafka.jsonschema.v2+json – т.е. есть «jsonschema» в качестве embedded format – проверяем, обычный json – не проверяем. Ну? Поняли, да? Иех, сгорел сарай – гори и хата:
Дети, не делайте так! (Взрослые – скажите, как правильно?)
nginx.ingress.kubernetes.io/configuration-snippet: | if ($request_method != POST) { return 403; } if ($content_type != "application/vnd.kafka.jsonschema.v2+json") { return 405; }
фигак!
curl -X POST -H "Content-Type: application/vnd.kafka.json.v2+json" --data '{"records":[{"value":{<данные>}}]}' "https://<host>/krest/topics/custom-int-test/" –k
<Response [405]> <html>
<head><title>405 Not Allowed</title></head>
<body>
<center><h1>405 Not Allowed</h1></center>
<hr><center>nginx</center>
</body>
</html>
Тут пытливый читатель может спросить: а что будет, если схема есть и данные ей соответствуют – просто схема эта, гм, другая? На это хочется спросить: «Ты вообще за меня или за медведя?!» - а написать придется что-то вроде: «данная ситуация будет обрабатываться на сервисе обработчике данных, читающем содержимое топика ХХХ».
В общем, осталось только из таблички
create table dbo.DataHour ( Date date not null, Hour int not null, ObjectId int not null, LayerId int not null constraint FK_DataHour_Layer references dbo.Layer, ParameterId int not null constraint FK_DataHour_Parameter references dbo.Parameter, Value float, ValueEnum int, SaveDate smalldatetime, constraint PK_DataHour primary key (Date, Hour, ObjectId, LayerId, ParameterId) with (fillfactor = 80) )
Эту самую жысонину слабать. С учетом того, что этот самый (low|no)-code на шине – то еще чудушко – проще оказалось запихнуть создание json’а в хранимку
И так - тем более не надо!
create procedure getdata(@fr timestamp) as begin declare @points table ( pointName int not null ) declare @result table ( pointName varchar(10), events varchar(max) ) declare @pointName int insert into @points select distinct ObjectId from dbo.DataHour where SaveDate > @fr select @pointName = min(pointName) from @points while @pointName is not null begin insert into @pointName values (caste @pointName as varchar(10)), (select value as Value, DATEDIFF_BIG(NANOSECOND, '1970-01-01 00:00:00.0000000', SaveDate) as timestamp, 0 as quality, null as annotatiton from dbo.DataHour where ObjectId = @pointName and SaveDate > @fr for json auto, include_null_values)) select @pointName = min(pointName) from @points where pointName > @pointName end select JSON_MODIFY((select pointName as [value.pointName], JSON_QUERY(events) as [value.events] from @result for json path, ROOT('records')), 'append $.value_schema_id', <номер схемы>) end
Цена вопроса со всем research’ем и приседушками – 4 часа, на описание\согласование решения больше уйдет. Рекомендую ли я это решение к использованию? В том виде, в котором оно описано – скорее «нет», костылей все же многовато – но докрутить до вменяемого состояния можно.
