Все началось с невинного, но такого необходимого действия — ликвидации технического долга. Мы с командой собрались реорганизовать структуру репозитория наших загрузок данных, которые мы пишем на Apache Airflow. За два с лишним года, его текущая структура потеряла актуальность и попросила себе новое платьице к лету...
Изначально все дополнительные модули у нас лежали в одном, god‑модуле util. Затем мы решили извлечь из него многие заматеревшие модули, которым уже стало тесно внутри, и они просились наверх, в корень репозитория.
Реорганизацию мы решили провести в несколько итераций, сначала самое простое — просто перенос модулей из одного места в другое. Казалось бы, что могло пойти не так. Но впереди, дорогой читатель, тебя ждет детективная история — не переключай канал.
Мы создали МР первой итерации, быстренько поревьюили его и влили в master. Через какие‑то мгновение содержимое синхронизировалось в ПОДы нашего Airflow и мы начали наблюдать что пойдет не так. Было пару DAGов которые висели с ошибками, но затем ошибки пропали. Мы облегченно выдохнули и пошли по своим делам.
Через некоторое время прозвонил первый тревожный прозвонок, вот он
# выше еще много стек-трейса
File "/home/airflow/.local/lib/python3.10/site-packages/airflow/operators/python.py", line 217, in execute_callable
return self.python_callable(*self.op_args, **self.op_kwargs)
File "/opt/airflow/dags/gitdags/gpb_api/tasks/by_dtm.py", line 371, in get_files_list
prev_file: DirectoryObject | None = previous_ti.xcom_pull(
File "/home/airflow/.local/lib/python3.10/site-packages/airflow/utils/session.py", line 79, in wrapper
return func(*args, session=session, **kwargs)
File "/home/airflow/.local/lib/python3.10/site-packages/airflow/models/taskinstance.py", line 3090, in xcom_pull
return XCom.deserialize_value(first)
File "/home/airflow/.local/lib/python3.10/site-packages/airflow/models/xcom.py", line 694, in deserialize_value
return BaseXCom._deserialize_value(result, False)
File "/home/airflow/.local/lib/python3.10/site-packages/airflow/models/xcom.py", line 684, in _deserialize_value
return pickle.loads(result.value)
ModuleNotFoundError: No module named 'util.airflow'
Все просто, нет модуля util.airflow — чего тут необычного.. Но дьявол на этот раз скрылся в деталях довольно глубоко…
Ошибка, как видно, происходит в одной из наших функций — это get_files_list, в строке 371. В этой строке мы пытаемся получить данные из XCom задачи. Так, стоп! Что??! А причем тут «у ели мы не ели карамели»?.. То есть еще раз: мы пытаемся получить данные из XCom задачи.
Ранее, в ходе предыдущего запуска DAGа, в XCom что‑то было сохранено, а теперь, когда мы пытаемся это что‑то получить, оно валится с ошибкой ModuleNotFound. Хорошо бы конечно, если бы мы в тот момент так нордически стойко рассуждали. Прозорливый читатель уже догадался в чем дело. Однако, ментальное состояние, когда на ПРОМе упала загрузка и грозит падением остальным, и состояние написания статьи, когда уже знаешь ответ — кардинально разные состояния.
Сначала мы решили пойти самым простым путем — пошли к девопс‑ам разбираться. Первая гипотеза была такая, на ТЕСТе, где автор МРа проводил испытания все работало нормально. Он проверил ровно тот же DAG, который сейчас на ПРОМе вызвал ошибку. Быстро выяснилось, что на ТЕСТе версия Airflow новее чем на ПРОМе. Источник проблемы найден! Экстренно обновляем ПРОМ! Дело ведь в этом! Попутно, мы сказали, что девопсы нам налили плохого кода в ПОДы Airflow. Ну типа, мы упирали, что там остались, какие‑то ошметки, которые приводят к появлению такой дичи. Девопс‑ы нас не поняли и выдвинули гипотезу о состоянии ТТХ наших рук. Мы немного повздорили, но быстро поняли, что нужно действовать сообща. Итого в тех‑окно было назначено обновление ПРОМа до версии которая крутилась на ТЕСТе.
И свершилось обновление!.. Но.. Да!.. Проблема не исчезла!..
Это катастрофа, шеф! (с) Гангстеры из м/ф Приключения капитана Врунгеля
Надо сказать, что функция get_files_list используется чуть более чем в 95% наших загрузок. Поэтому у нас перманентно весь ПРОМ стал находится в состоянии анабиоза. Это угнетало. А еще впереди ночь, во время которой особенно хочется спать. И кстати, это очень полезно для здоровья – спать по ночам.
Артем, участник нашей команды, стал исследовать обстоятельства работы DAGa в котором проявилась проблема. Выяснилось, что на ТЕСТе все работает нормально, в том случае, если XCom не был заполнен данными. То есть Артем, удалил все предыдущие запуски(DagRun) этого DAGа и ошибка исчезла… Для справки, удаление DagRun влечет за собой очистку всех связанных XCom всех задач DAGа.
Тут на меня снизошло внезапное прозрение! Источник проблемы сразу стал понятен… Если, мой дорогой читатель, вы ранее или сейчас, тоже догадались – то вы большой молодец! Если не догадались – то все равно молодец! Потому что наши читатели молодцы!
Источник ошибки находится в том, как Apache Airflow хранит данные XCom задач DAGа. А хранит он их в виде бинарных blob-блоков в таблице xcoms базы Airflow. Стало быть, если это бинарный блок, то что с ним нужно сделать? А я напомню, в XCom задачи DAGа мы можем сохранить любой тип данных. В том числе, совершенно кастомный, нами написанный, например, экземпляр namedtuple-объекта с полезными данными. Для сохранения в базу в виде бинарного блока, Airflow пиклит(pickle) данные которые ему подсунули для сохранения в XCom. Получает блок бинарных данных и сохраняет в базу.
При попытке получить значение из XCom в работающей задаче DAGа, происходит ровно обратный процесс. Получаем блок пикленных данных, распикливаем обратно, и!.. Падаем, Карл! Па-да-ем! Потому что, у нас сменилось местоположение модуля где хранился наш кастомный namedtuple-класс. Питон, при распикливании, пытается импортировать этот namedtuple-класс(это прописано на уровне бинарного содержимого pickle-объекта) и падает с ModuleNotFound! Каково, а!?
Теперь-то, оглядываясь назад, конечно понятно что return pickle.loads(result_value) из стек-трейса красноречиво об этом говорит, но до этого случая, у меня не было нейронной связи в голове на этот счет. Я вообще не придал значения! А оно эвона как…
Ну а дальше скучно и обыденно. Решили продублировать перенесенные модули на старые места. Цель следующая, в коде уже прописаны новые расположения, следовательно, нужно отремонтировать только один первый запуск каждого DAGа после переноса модулей. А дальше новое XCom значение уже будет с новым местоположением и мир и покой вернется в долину мумми-троллей. Что и было сделано. А кто слушал молодец!
Свежие новости, только подвезли. Это помогло.. конечно.. но! Не в полной мере. Один из наших DAGов продолжал «хандрить». А дело в том, что функция get_files_list, занимается тем, что получает список файлов доступных для обработки на внешнем ресурсе. А в этом DAGе сложилась следующая ситуация, продолжительное время новых файлов источник не выкладывает. Поэтому get_files_list, берет из XCom данные о последнем обработанном файле, мы их как раз и храним в кастомном namedtuple-объекте, затем понимает, что новых файлов нет, и DAG завершается. А записи другого, обновленного namedtuple-объекта уже не происходит. В DAGе она делается в следующем таске, но до него, понятное дело, выполнение уже не доходит.
Вай-вай-вай! Как же ты мне надоел! (с) Джинн из Приключений барона Мюнхаузена
Артем, наш коллега, и здесь оказался на высоте! Пока я собирал новый коммит, в котором решил вернуть подложенные модули, чтобы сработало получение данных из XCom, а потом уже перезаписалось, через костыль в виде
prev_file: DirectoryObject | None = previous_ti.xcom_pull(
task_ids=list_files_task_id,
key="prev_file",
)
previous_ti.xcom_push(key="prev_file", value=DirectoryObject(**prev_file.asdict()))
то есть, получаем старый обьект и тут же пересохраняем его как новый, поверх старого… Я рассчитывал что оно опять покрутится сутки, а потом мы дропнем этот коммит.
Но, Артем, нашел более изящное решение. Далее его слова
я вручную запустил даг, нажал чтобы все таски были саксесс, но при этом не выполнялись
и запустил еще раз даг, он отработал сам без проблем
Приходите к нам в Газпромбанк. У нас интересно! Спасибо за внимание