
Данные библиотеки можно использовать при работе со SparkOperator
Создание виртуального окружения с необходимыми библиотеками
Создание задачи в даге и установка окружения с SparkSubmit
Создание виртуального окружения с необходимыми библиотеками
Для начала необходимо зайти в Jupiterlab и перейти в терминал.
После этого в терминале необходимо выполнить поочередно несколько следующих команд, при помощи которых мы создадим и активируем виртуальную среду conda. В данном примере среда будет называться kenv.
cd /path
conda create -p /path/kenv python=3.7
conda activate /path/kenv
После этого есть возможность добавить библиотеки, с которыми нам потребуется работать, в качестве примера возьмем библиотеки pyarrow и openpyxl.
Установка их происходит следующим образом.
conda install pyarrow
conda install openpyxl
После установки необходимых библиотек необходимо упаковать данное окружение в архив и положить в hdfs для дальнейшего использования.
conda install conda-pack
conda pack -o /path/kenv.tar.gz -j 4
hdfs dfs -put /path/kenv.tar.gz hdfs:///path/NAME
Создание задачи в даге и установка окружения в SparkSubmit
После того как виртуальное окружение лежит в hdfs необходимо положить создать задачу в даге, которая это окружение будет принимать и использовать.
Необходимо произвести стандартное создание задачи в airflow, используя SparkSubmitOperator и добавить в него 2 параметра
archives (он должен содержать путь к виртуальному окружению, а после того, без пробелов, необходимо указать #enviroment, как это показано в примере ниже)
env_vars (тут необходимо указать 2 элемента как в примере)
archives="hdfs:///path/kenv.tar.gz#environment",
env_vars={
'PYSPARK_PYTHON': './environment/bin/python',
'PYSPARK_DRIVER_PYTHON': './environment/bin/python'
}
Также необходимо добавить, что исполняемый скрипт обязательно должен быть присвоен параметру application и в случае, если все сделано правильно в предыдущих пунктах, то он уже будет выполняться в виртуальной среде.
from airflow import DAG
from airflow.providers.apache.spark.operators.spark_submit import SparkSubmitOperator
from airflow.operators.python_operator import PythonOperator
# Определите DAG
with DAG(
dag_id='name_dag',
description=’Описание работы дага’,
schedule_interval=None,
start_date=datetime(2024, 8, 20),
catchup=False,
tags=["new_lib"]
) as dag:
# Задача для запуска Spark
spark_task = SparkSubmitOperator(
task_id=’task_name’,
application='/airflow/path/name_script.py',
conn_id='cbr_spark',
conf={
"spark.authenticate": "true",
"spark.executor.extraJavaOptions": "-XX:+UseParallelGC -Dfile.encoding=UTF-8 -XX:ThreadStackSize=8192 -Djdk.xml.totalEntitySizeLimit=0 -Duser.timezone=GMT",
"spark.driver.extraJavaOptions": "-Djava.security.auth.login.config=/etc/zookeeper/conf/jaas.conf -XX:+UseParallelGC -Djdk.xml.totalEntitySizeLimit=0 -Duser.timezone=GMT",
"spark.yarn.am.extraJavaOptions": "-XX:+UseParallelGC -Duser.timezone=GMT",
"spark.sql.session.timeZone": "UTC",
"spark.hadoop.dfs.user.home.dir.prefix": "/user/.user"
},
files='hdfs:///path/CA.pem',
jars='hdfs:///path/clickhouse-client-0.3.2-patch11.jar,hdfs:///path/clickhouse-http-client-0.3.2-patch11.jar,hdfs:///path/clickhouse-jdbc-0.3.2.jar',
archives="hdfs:///path/kenv.tar.gz#environment",
env_vars={
'PYSPARK_PYTHON': './environment/bin/python',
'PYSPARK_DRIVER_PYTHON': './environment/bin/python'
}
)
После выполнения всех этих действий вы можете использовать библиотеки в вашем скрипте и после этого запускать даг.