Есть простая, можно сказать, типовая – задача, передать данные из системы «А» в систему «Б». А – классическая legacy-трехзвенка из 00х с IIS-MSSQL, «Б» - новая-нулевая-микросервисная с внутренней шиной на apache kafka и собственным ETL на Apache NiFi, развернута в k8s. Направление передачи – из «А» в «Б», по расписанию , в общем ничего сложного – «Работенка на 5 минут»: идем в NiFi делаем QueryDatabaseTable->PublishKafkaRecord и продолжаем спать – но тут начинаются «Нюансы»(ТМ) в виде ИБ, которая говорит, что прямая интеграция корпоративных систем – харам, архитектуры которой (дикие люди!) не нравится хождение в чужую БД (Подержи моё пиво! Я сто раз так делал!) и прочих скучных регламентов, требующих «наличия аутентификации», «направления установления соединения совпадающего с направлением передачи» и тому подобных глупостей.
И вот тут на сцену выходит корпоративная интеграционная шина – (low|no)-code решение, которое умеет в расписания, подключение к ИС по различным протоколам (в том числе и *dbc), передачу данных с помощью REST\SOAP, аутентификацию, обработку ошибок, алертинг и кучу других вещей. Оооок, шина по расписанию будет ходить в БэДэ (Или не БэДэ – там уже видно будет), забирать данные и передавать… А куда, собственно, передавать?
Первый вариант – «в kafka’у!» хорош примерно всем – кроме реализации. Собственно, бинарный протокол kafka’и шина не умеет, ИБ не умеет в инспекцию этого самого протокола, ingress-nginx контроллер не умеет (Нормально – не умеет, ssl-passthrough в данном случае не очень-то «нормально») в публикацию kafka’и, а согласовывать с ИБ публикацию брокеров через LB – удачи, пацаны. Плюс нормальная аутентификация\авторизация на kafka’е – тот еще геморрой между нами говоря. Отметаем.
Вариант «Бэ!» - делаем отдельный интеграционный сервис на каком-нибудь fastapi – пятью минутами уже не обойдешься, а когда количество интеграций переползет за первый десяток процесс может стать ув-ле-ка-тель-ным. Нет, ничего принципиально невозможного – но in-scale не дешево.
Вариант «Цэ» - используем тот же NiFi. HandleHTTPRequest-PublishKafka-HandleHTTPResponse, можно и за три минуты управиться. Правда аутентификацию «из коробки» он умеет только по сертификатам, а тот же basic уже надо делать на бизнес-слое, а стандартный корпоративный OIDC и вовсе употеешь. Опять же – http-сервер из NiFi такой себе, HA нет (NiFi-cluster он про LB, а не HA\FT), валидацию данных делать не так, чтобы удобно. Можно конечно. Но – душа просит, просит чего-то…
И тут в голову приходит kafka-rest. А почему бы и не да? Рест-адаптер для шины есть, он уже готов и делать его не надо все плюшки «варианта А» без минусов реализации (Не, ну понятно, что своих минусов там будет – лопатой ешь, начиная от stateful consuming и необходимостью работы через sticky-sessions, заканчивая… «Об этом я подумаю завтра!»(с)). Можно сделать за полчаса «из палка-и-веревка», все как мы любим.
Собственно деплой в 4 строчки (KAFKA_REST_BOOTSTRAP_SERVERS и KAFKA_REST_SCHEMA_REGISTRY_URL в переменных) сервис-ингресс… работает. Надо только аутентификацию с авторизацией прикрутить – и можно идти за кофе. Смотрим руководство (REST Proxy Security | Confluent Documentation) – а оно только в basic умеет, авторизация – вообще внутри kafka’и, что – см. выше. Лад-нень-ко, скотч вроде еще не кончился – прикрутим к этому делу oauth2-proxy для аутентификации, а авторизацию запилим на уровне ингресса – сделаем отдельный путь до каждого топика и запретим методы, отличные от POST:
apiVersion: networking.k8s.io/v1
kind: Ingress
metadata:
name: custom-kafka-rest
namespace: <namespace>
annotations:
nginx.ingress.kubernetes.io/rewrite-target: /topics/custom-int-test
nginx.ingress.kubernetes.io/configuration-snippet: |
if ($request_method != POST) {
return 403;
}
spec:
ingressClassName: nginx
rules:
- host: <host>
http:
paths:
- path: /krest/topics/custom-int-test
pathType: Exact
backend:
service:
name: custom-kafka-oauth2
port:
name: http
С запретом методов косо-криво конечно, но работает. Дальше собственно oauth2-proxy – поскольку проверять мы будем по сути только наличие токена и интерактивная работа пользователей нас не интересует – то задача конфигурирования вот этого вот всего значительно упрощается.
Создаем в keycloak клиента, тип confidential, service account + авторизацию включаем, остальное можно убрать, создаем клиентскую роль добавляем её клиенту, проверяем – токен получается нормально. Ок. Идем с этим токеном в oauth2-proxy – не ок, keycloak-oidc ругается на отсутствующий audience, приходится делать еще и соответствующий mapper. Проверяем – вроде норм.
Делаем сайдкар к контейнеру с kafka-rest:
- --skip-jwt-bearer-tokens=true # Звучит страшно, переводится как «пропускаем всех с токеном»
- --insecure-oidc-allow-unverified-email=false
- --api-route=/topics/custom-int-test # вместо redirect’а по всем неавторизованным запросам к этому адресу отдаем 401
- --show-debug-on-error
- --redirect-url=https://<host>/krest/oauth/callback # нафиг не нужно в данном случае – но без него не работае
- --scope=openid
- --provider=keycloak-oidc # Нам нужен контроль доступа привязанный к ролям
- --oidc-issuer-url=https://<host>/auth/realms/<realm>
- --http-address=0.0.0.0:4180
- --allowed-role=custom-kafka-rest:Krest #Специфично для KC, для других провайдеров не заработает
- --proxy-prefix=/krest/oauth
- --upstream=http://127.0.0.1:8082/
- --email-domain=<домен>
- --ssl-insecure-skip-verify
- --set-basic-auth=false
- --client-id=custom-kafka-rest # Не нужно, но без этого не работает
- --client-secret='not used' # Не нужно, но без этого не работает
- --cookie-secret='not used # Не нужно, но без этого не работает
В процессе приходится подсовывать ненужное-ненужно в конфигурацию для удовлетворения «формальных требований», но в общем ничего сложного. На всякий случай - nginx.ingress.kubernetes.io/auth-signin и nginx.ingress.kubernetes.io/auth-url в аннотацию ингресса класть не нужно – «по условию задачи» мы просто проверяем предоставляемый токен, посылая всех на... 401 в случае его отсутствия.
В процессе выявляется не очень приятная штука – сделать «нормальную авторизацию» на одном oauth2-proxy похоже не получится. Указать несколько апстримов, разделив их путями – можно, указать несколько разрешенных ролей – тоже, а вот сделать так, чтобы в топик А мог писать владелец роли Х, а в топик Б – владелец роли У и никак иначе – видимо, нет. Конечно, всегда можно сделать для отдельных ингрессов отдельный сайдкар – но это уже «низкий класс, не чистая работа». Впрочем, пока – «пренебречь, вальсируем!» - аутентификацию с авторизацией мы, с грехом пополам, сделали – теперь надо родить контроль входных данных.
И тут на помощь приходит паркур schema-registry. В авро-фигавро и прочую бинарную сериализацию наша шина не умеет, а вот в json вполне себе. Сервисы, расположенные «за kafka’ой» ожидают получить что-то вроде:
{
"pointName": "125",
"events": [
{
"Value": 0.000000000000000e000,
"timestamp": 1694537700000000000,
"quality": 0,
"annotatiton": None,
},
{
"Value": 0.000000000000000e000,
"timestamp": 1694537700000000000,
"quality": 0,
"annotatiton": None,
},
]
Теоретически – типы значений могут быть разными (А conditional-схемы нуууу… эээ… всегда можно сделать несколько схем, правда?) – но у нас достаточно простой случай: что в БД есть, то и кладем – а тип данных в таблице прибит гвоздями. Нарисуем под это дело jsonschema’у:
{
"title": "JSONv3",
"description": "Схема данных TSDS",
"type": "object",
"properties": {
"pointName": {
"type": "string"
},
"events": {
"type": "array",
"items": {
"properties": {
"annotation": {
"type": "null"
},
"value": {
"type": "number"
},
"timestamp": {
"type": "number"
},
"quality": {
"type": "number"
}
}
}
}
}
}
И запихнем её в schemaregistry.
Дальше необходимо определиться с версией API kafka-rest – их собственно две, вторая и третья (Традиционно - луДшая). Увы, нам подходит именно вторая, т.к. третья работу со схемами не поддерживает. V2 ожидает получить данные в следующем (минимальном) формате: {"records":[{"value":{<"Что":"вам угодно">}}]}. Ну, собственно заворачиваем желаемую сервисом структуру в требуемую REST-proxy обертку и пробуем:
curl -X POST -H "Content-Type: application/vnd.kafka.jsonschema.v2+json" --data '{"value_schema_id": <номер схемы>, "records":[{"value":{<данные>}}]}' "https://<host>/krest/topics/custom-int-test/" –k
#таки да:
<Response [200]> {"offsets":[{"partition":0,"offset":27,"error_code":null,"error":null},{"partition":0,"offset":28,"error_code":null,"error":null}],"key_schema_id":null,"value_schema_id":<номер схемы>}
#А в случае, если мы эти данные чуть-чуть, самую капельку, малость немножко поломаем – таки нет:
<Response [422]> {"error_code":42203,"message":"Conversion of JSON to Object failed: Failed to convert JSON using JSON Schema: #/events/0/quality: expected type: Number, found: String"}
Успех? Хм… не совсем. А что будет, если вместо данных, не соответствующих схеме мы попробуем загрузить данные _без_ схемы? А ничего. Совсем. Нет схемы – нет валидации – нет проблем.
Бида. Бида-бида даже. Нет, теоретически в конфлюэнтовской реализации есть broker-side контроль схем – но он проблемы с отсутствующей схемой не решает. Хм. А как собственно мы понимаем, используется схема или нет? Документация говорит, что по заголовку Content-Type:
application/vnd.kafka[.embedded_format].[api_version]+[serialization_format]
application/vnd.kafka.jsonschema.v2+json – т.е. есть «jsonschema» в качестве embedded format – проверяем, обычный json – не проверяем. Ну? Поняли, да? Иех, сгорел сарай – гори и хата:
Дети, не делайте так! (Взрослые – скажите, как правильно?)
nginx.ingress.kubernetes.io/configuration-snippet: | if ($request_method != POST) { return 403; } if ($content_type != "application/vnd.kafka.jsonschema.v2+json") { return 405; }
фигак!
curl -X POST -H "Content-Type: application/vnd.kafka.json.v2+json" --data '{"records":[{"value":{<данные>}}]}' "https://<host>/krest/topics/custom-int-test/" –k
<Response [405]> <html>
<head><title>405 Not Allowed</title></head>
<body>
<center><h1>405 Not Allowed</h1></center>
<hr><center>nginx</center>
</body>
</html>
Тут пытливый читатель может спросить: а что будет, если схема есть и данные ей соответствуют – просто схема эта, гм, другая? На это хочется спросить: «Ты вообще за меня или за медведя?!» - а написать придется что-то вроде: «данная ситуация будет обрабатываться на сервисе обработчике данных, читающем содержимое топика ХХХ».
В общем, осталось только из таблички
create table dbo.DataHour
(
Date date not null,
Hour int not null,
ObjectId int not null,
LayerId int not null
constraint FK_DataHour_Layer
references dbo.Layer,
ParameterId int not null
constraint FK_DataHour_Parameter
references dbo.Parameter,
Value float,
ValueEnum int,
SaveDate smalldatetime,
constraint PK_DataHour
primary key (Date, Hour, ObjectId, LayerId, ParameterId)
with (fillfactor = 80)
)
Эту самую жысонину слабать. С учетом того, что этот самый (low|no)-code на шине – то еще чудушко – проще оказалось запихнуть создание json’а в хранимку
И так - тем более не надо!
create procedure getdata(@fr timestamp)
as
begin
declare
@points table
(
pointName int not null
)
declare
@result table
(
pointName varchar(10),
events varchar(max)
)
declare @pointName int
insert into @points
select distinct ObjectId
from dbo.DataHour
where SaveDate > @fr
select @pointName = min(pointName)
from @points
while @pointName is not null
begin
insert into @pointName
values (caste @pointName as varchar(10)),
(select
value
as Value,
DATEDIFF_BIG(NANOSECOND, '1970-01-01 00:00:00.0000000', SaveDate) as timestamp,
0
as quality, null
as annotatiton
from dbo.DataHour
where ObjectId = @pointName
and SaveDate > @fr
for json auto, include_null_values))
select @pointName = min(pointName)
from @points
where pointName > @pointName
end select JSON_MODIFY((select pointName as [value.pointName],
JSON_QUERY(events) as [value.events]
from @result
for json path, ROOT('records')), 'append $.value_schema_id', <номер схемы>)
end
Цена вопроса со всем research’ем и приседушками – 4 часа, на описание\согласование решения больше уйдет. Рекомендую ли я это решение к использованию? В том виде, в котором оно описано – скорее «нет», костылей все же многовато – но докрутить до вменяемого состояния можно.