Pull to refresh

Работа с библиотеками, которые не установлены в Airflow

Reading time3 min
Views2.2K

Данные библиотеки можно использовать при работе со SparkOperator

  • Создание виртуального окружения с необходимыми библиотеками

  • Создание задачи в даге и установка окружения с SparkSubmit

Создание виртуального окружения с необходимыми библиотеками

Для начала необходимо зайти в Jupiterlab и перейти в терминал.

После этого в терминале необходимо выполнить поочередно несколько следующих команд, при помощи которых мы создадим и активируем виртуальную среду conda. В данном примере среда будет называться kenv.

cd /path
conda create -p /path/kenv python=3.7
conda activate /path/kenv

После этого есть возможность добавить библиотеки, с которыми нам потребуется работать, в качестве примера возьмем библиотеки pyarrow и openpyxl.

Установка их происходит следующим образом.

conda install pyarrow
conda install openpyxl

После установки необходимых библиотек необходимо упаковать данное окружение в архив и положить в hdfs для дальнейшего использования.

conda install conda-pack
conda pack -o /path/kenv.tar.gz -j 4
hdfs dfs -put /path/kenv.tar.gz hdfs:///path/NAME

Создание задачи в даге и установка окружения в SparkSubmit

После того как виртуальное окружение лежит в hdfs необходимо положить создать задачу в даге, которая это окружение будет принимать и использовать.

Необходимо произвести стандартное создание задачи в airflow, используя SparkSubmitOperator и добавить в него 2 параметра

  • archives (он должен содержать путь к виртуальному окружению, а после того, без пробелов, необходимо указать #enviroment, как это показано в примере ниже)

  • env_vars (тут необходимо указать 2 элемента как в примере)

archives="hdfs:///path/kenv.tar.gz#environment",
env_vars={
    'PYSPARK_PYTHON': './environment/bin/python',
    'PYSPARK_DRIVER_PYTHON': './environment/bin/python'
}

Также необходимо добавить, что исполняемый скрипт обязательно должен быть присвоен параметру application и в случае, если все сделано правильно в предыдущих пунктах, то он уже будет выполняться в виртуальной среде.

from airflow import DAG
from airflow.providers.apache.spark.operators.spark_submit import SparkSubmitOperator
from airflow.operators.python_operator import PythonOperator


# Определите DAG

with DAG(

    dag_id='name_dag',

    description=’Описание работы дага’,

    schedule_interval=None,

    start_date=datetime(2024, 8, 20),

    catchup=False,

    tags=["new_lib"]

) as dag:   

# Задача для запуска Spark

    spark_task = SparkSubmitOperator(

        task_id=’task_name’,

        application='/airflow/path/name_script.py', 

        conn_id='cbr_spark', 

        conf={

            "spark.authenticate": "true",

            "spark.executor.extraJavaOptions": "-XX:+UseParallelGC -Dfile.encoding=UTF-8 -XX:ThreadStackSize=8192 -Djdk.xml.totalEntitySizeLimit=0 -Duser.timezone=GMT",

            "spark.driver.extraJavaOptions": "-Djava.security.auth.login.config=/etc/zookeeper/conf/jaas.conf -XX:+UseParallelGC -Djdk.xml.totalEntitySizeLimit=0 -Duser.timezone=GMT",

            "spark.yarn.am.extraJavaOptions": "-XX:+UseParallelGC -Duser.timezone=GMT",

            "spark.sql.session.timeZone": "UTC",

   "spark.hadoop.dfs.user.home.dir.prefix": "/user/.user"  

        },

        files='hdfs:///path/CA.pem',

        jars='hdfs:///path/clickhouse-client-0.3.2-patch11.jar,hdfs:///path/clickhouse-http-client-0.3.2-patch11.jar,hdfs:///path/clickhouse-jdbc-0.3.2.jar',

        archives="hdfs:///path/kenv.tar.gz#environment",

        env_vars={

            'PYSPARK_PYTHON': './environment/bin/python',

            'PYSPARK_DRIVER_PYTHON': './environment/bin/python'

        }

    )

После выполнения всех этих действий вы можете использовать библиотеки в вашем скрипте и после этого запускать даг.

Tags:
Hubs:
Total votes 5: ↑4 and ↓1+3
Comments2

Articles