Как стать автором
Обновить
0
Cloudera
Американская компания, разработчик

Архитектура непрерывной потоковой доставки в Cloudera Flow Management

Время на прочтение7 мин
Количество просмотров1.2K
Автор оригинала: Wendell Bu

Cloudera Flow Management, основанная на Apache NiFi и являющаяся частью платформы Cloudera DataFlow, используется некоторыми из крупнейших организаций в мире для обеспечения простого в использовании, мощного и надежного способа распределения и высокоскоростной обработки данных в современной экосистеме больших данных. Клиенты все чаще используют CFM для ускорения обработки потоковых данных на предприятии от концепции до реализации. Интерфейс разработки потоков Cloudera отличается от типичных стилей структурированного кодирования, что часто создает проблему применения лучших практик непрерывного совершенствования/непрерывной доставки (CI/CD) в стиле DevOps для доставки потоков.

В этом блоге мы рассмотрим сквозной жизненный цикл процесса потока данных, который способствует непрерывной доставке с минимальным временем простоя. Мы надеемся, что вы почувствуете вдохновение, чтобы внедрить некоторые из этих идей в свои собственные процессы Cloudera Flow Management CD. Ниже представлена архитектура подобного решения:

Архитектура непрерывной потоковой доставки
Архитектура непрерывной потоковой доставки

 В этом процессе можно выделить следующие шаги.

1. Разработка

Разработчики проектируют DataFlow и тестируют его в общем многопользовательском DEV кластере. Команда может работать только в своих собственных группах процессов, контролируемых политиками Apache Ranger для NiFi. Если какие-либо группы потребуют изменить или добавить службу корневого контроллера, администратор поможет с этим изменением и скопирует его в выделенную группу процессов root_controller_services. 

2. Контроль версий

Для контроля версий разработчики вручную добавляют или обновляют протестированные потоки в DEV раздел NiFi Registry. При заданных политиках в Apache Ranger для NiFi Registry одна команда может наблюдать только за назначенным ей сегментом. Администратор обновляет версию группы процессов root_controller_services в сегменте администратора реестра. 

3. Запрос Source Change

Чтобы разработчики не переносили потоки, которые все еще проходят тестирование, в среду более высокого уровня, они должны создать файл запроса на изменение (source change) для коллегиальной проверки исходника после тестирования потоков. Этот файл находится в проекте git изменений исходника. Разработчики создают новую ветку этого проекта git, а затем обновляют файл source_change_request.json, чтобы он содержал измененные потоки и версии. Если какие-то службы корневого контроллера были обновлены, администратор обновит версию группы процессов root_controller_services в файле source_change_request.json.

Как только все они будут готовы, владелец изменения должен дать запрос git pull, чтобы выполнить Peer Review.

{
    "env": "dev",
    "nifi_api_url": "https://nifi-dev.example.cloudera.com:9091/nifi-api",
    "reg_api_url": "https://registry-dev.example.cloudera.com:61443/nifi-registry-api",
    "reg_client_name": "registry-dev.example.cloudera.com",
    "buckets": [
        {
            "name": "teama_bucket",
            "flows": [
                {"name": "flowa1_feeding", "version": "2"},
                {"name": "flowa1_digestion", "version": "2"},
            ]
        },
        {
            "name": "admin_bucket",
            "flows": [
                {"name": "root_controller_services", "version": "1"}
            ]
        }
    ]
}

Пример source_change_request.json

4. Peer Review

Члены команды рецензируют код, проверяя статус потока и тестируя в DEV NiFi.

5. Утверждение изменений Source Change

Если команда подтверждает, что изменение готово к продвижению, они могут объединить этот запрос с главной ветвью проекта изменения исходника. 

6. Продвижение на более высокий уровень

Cloudera предлагает создать задание Jenkins для запуска скрипта python Promo_source_changes.py, отслеживая главную ветвь проекта изменения исходника. Этот скрипт использует NiFi / NiFi Registry API для экспорта версии потока из DEV NiFi Registry, а затем импортирует ее в UAT NiFi Registry. Пример кода:

for bucket in source_env_conf['buckets']:
  login_env(source_env_conf)
  source_bucket = nipyapi.versioning.get_registry_bucket(bucket['name'])
  for flow in bucket['flows'][:]:
    # Check out a flow from Source NiFi Registry
    login_env(source_env_conf)
    source_versioned_flow = nipyapi.versioning.get_flow_in_bucket(source_bucket.identifier, identifier=flow['name'])
    exported_flow = nipyapi.versioning.export_flow_version(bucket_id=source_bucket.identifier, flow_id=source_versioned_flow.identifier, version=flow['version'], mode='yaml')
    # Version control this flow in Dest NiFi Registry
    login_env(dest_env_conf)
    dest_bucket = nipyapi.versioning.get_registry_bucket(dest_env_conf['buckets'][0])
    dest_flow = nipyapi.versioning.get_flow_in_bucket(bucket_id=dest_bucket.identifier, identifier=flow['name'])
    if dest_flow is None:
      imported_flow = nipyapi.versioning.import_flow_version(bucket_id=dest_bucket.identifier, encoded_flow=exported_flow, flow_name=flow['name'])
    else:
      imported_flow = nipyapi.versioning.import_flow_version(bucket_id=dest_bucket.identifier, encoded_flow=exported_flow, flow_id=dest_flow.identifier)
    log.info("Flow %s Version %s is imported from ENV %s into ENV %s.", flow['name'], flow['version'], source_env_conf['env'], dest_env_conf['env'])
    # Remove promoted flow from the json
    if flow in bucket['flows']:
      bucket['flows'].remove(flow)
  log.info("All %d flows in %s bucket %s are imported into %s bucket %s.", len(bucket['flows']), source_env_conf['env'], bucket['name'], dest_env_conf['env'], dest_env_conf['buckets'][0])

Пример promote_source_changes.py  

7. Запрос Destination Change

В реестре UAT NiFi скрипт generate_dest_change_request.sh зарегистрирован как перехватчик событий реестра.

<eventHookProvider>
  <class>org.apache.nifi.registry.provider.hook.ScriptEventHookProvider</class>
  <property name="Script Path">/var/cloudera/flow_cd/generate_dest_change_request.sh</property>
  <property name="Working Directory">/var/cloudera/flow_cd/</property>
  <property name="Whitelisted Event Type 1">CREATE_FLOW_VERSION</property>
</eventHookProvider>

Конфигурация NiFi Registry Event Hook  

CREATE_FLOW_VERSION feeb0fbe-5d7e-4363-b58b-142fa80775e1 
1a0b614c-3d0f-471a-b6b1-645e6091596d 4 flow_cd Update-Attributes

Пример события CREATE_FLOW_VERSION    

Это эквивалентно следующему:

CREATE_FLOW_VERSION [BUCKET_ID=5d81dc5e-79e1-4387-8022-79e505f5e3a0, 
FLOW_ID=a89bf6b7-41f9-4a96-86d4-0aeb3c3c25be, VERSION=4, USER=flow_cd, 
COMMENT=Update-Attributes]

Любое обновление версии отдельного потока запускает сценарий для создания новой ветви проекта git запроса на изменение цели и создания файла dest_change_request.json путем сравнения новой и основной версии. 

{
   "env":"uat",
   "nifi_api_url":"https://nifi-uat.example.cloudera.com:9091/nifi-api",
   "reg_api_url":"https://registry-uat.example.cloudera.com:61443/nifi-registry-api",
   "reg_client_name":"registry-uat.example.cloudera.com",
   "bucket":{
      "name":"uat_bucket",
      "flows":[
         {
            "name":"flowb1_digestion",
            "version":"4",
            "sensitive_parameters":[
               

            ],
            "comment":"Update Flow Attributes",
            "deployed_version":"2",
            "deployed_comment":"Update Logic"
         }
      ]
   }
}

Пример dest_change_request.json  

Для рассмотрения запроса release менеджером в git создается запрос на перенос.

8. Утверждение внедрения кода

Поскольку каждое изменение версии группы процессов запускает одно событие реестра NiFi, если диспетчер версий содержит несколько групп процессов и несколько служб корневого контроллера, то для одного запроса на изменение исходника он будет получать несколько запросов на изменение цели.

Диспетчеру версий необходимо объединить связанные запросы на изменение и просмотреть изменения. Если перечислены какие-либо чувствительные параметры, он должен подтвердить, требуются ли в целевом NiFi новые значения чувствительных параметров. После внесения этих изменений конфиденциальных параметров запрос на перенос может быть одобрен и объединен с master ветвью в git.

{
   "env":"uat",
   "nifi_api_url":"https://nifi-uat.example.cloudera.com:9091/nifi-api",
   "reg_api_url":"https://registry-uat.example.cloudera.com:61443/nifi-registry-api",
   "reg_client_name":"registry-uat.example.cloudera.com",
   "bucket":{
      "name":"uat_bucket",
      "flows":[
         {
            "name":"flowa1_digestion",
            "version":"1",
            "sensitive_parameters":[
               {
                  "name":"access_key_id"
               },
               {
                  "name":"secret_access_key"
               }
            ],
            "comment":"Initial Version"
         },
         {
            "name":"root_controller_services",
            "version":"3",
            "sensitive_parameters":[
               {
                  "name":"oracle_password"
               }
            ],
            "comment":"root_DBCPConnectionPool",
            "deployed_version":"2",
            "deployed_comment":"root_PropertiesFileLookupService"
         },
         {
            "name":"flowb1_digestion",
            "version":"4",
            "sensitive_parameters":[           
            ],
            "comment":"Update Flow Attributes",
            "deployed_version":"2",
            "deployed_comment":"Update Logic"
         }
      ]
   }
}

Пример комбинированного dest_change_request.json  

9. Развертывание новой версии

Наконец, объединенная главная ветвь запускает скрипт python deploy_dest_changes.py для автоматического развертывания новой версии потока в среде UAT - без простоев или с минимальным временем простоя.

    # Connect flowx_feeding with flowx_digestion
    if feeding_pg is not None and digestion_pg is not None:
      digestion_inputport = nipyapi.canvas.list_all_input_ports(pg_id=digestion_pg.id)
      feeding_outputport = nipyapi.canvas.list_all_output_ports(pg_id=feeding_pg.id)
      if digestion_inputport[0] is None or feeding_outputport[0] is None:
        raise SystemExit('Error: the flowx_feeding pg must have an output port, and the flowx_digestion pg must have an input port!')
      nipyapi.canvas.create_connection(feeding_outputport[0], digestion_inputport[0])
      nipyapi.canvas.schedule_process_group(digestion_pg.id, scheduled=True)
      nipyapi.canvas.schedule_process_group(feeding_pg.id, scheduled=True)

Соединение подпотоков совершенно нового потока

      # Stop the input port
      nipyapi.canvas.schedule_components(pg_id=digestion_pg.id, scheduled=False, components=input_port)
      all_connections =  nipyapi.canvas.list_all_connections(pg_id=digestion_pg.id)
      queued_count = sum(locale.atoi(connection.status.aggregate_snapshot.queued_count) for connection in all_connections)
      # Wait for Queues are empty
      while (queued_count > 0):
        log.info("There are still %d queued events, waiting for all are processed.", queued_count)
        time.sleep(10)
        all_connections =  nipyapi.canvas.list_all_connections(pg_id=digestion_pg.id)
        queued_count = sum(locale.atoi(connection.status.aggregate_snapshot.queued_count) for connection in all_connections)
      log.info("Process Group %s has no queued event, start updating new version now!", flow['name'])
      nipyapi.canvas.schedule_process_group(digestion_pg.id, scheduled=False)
      nipyapi.versioning.update_flow_ver(process_group=digestion_pg, target_version=flow['version'])
      nipyapi.canvas.schedule_process_group(digestion_pg.id, scheduled=True)

Развертывание новой версии активного подпотока

Благодаря превосходному NiPyApi (https://github.com/Chaffelson/nipyapi), веденному Дэниелом Чаффельсоном, скрипты Python для Cloudera Flow Management оказаались намного проще, чем ожидалось.

Теги:
Хабы:
Всего голосов 2: ↑2 и ↓0+2
Комментарии0

Публикации

Информация

Сайт
ru.cloudera.com
Дата регистрации
Дата основания
2008
Численность
1 001–5 000 человек
Местоположение
США
Представитель
Кирилл

Истории