Sobes.tech
Назад к вопросам
Middle
5

Как в Airflow создать много одинаковых тасков параллельно (например, загрузить много одинаковых файлов)?

Компании, где спрашивали
FOM GROUP

Ответ от нейросети

sobes.tech AI

В Apache Airflow для создания множества одинаковых тасков параллельно обычно используют динамическое создание тасков в цикле или оператор TaskGroup для группировки. Но более современный и удобный способ — использовать TaskFlow API с параметризацией, либо Dynamic Task Mapping (начиная с Airflow 2.3), который позволяет создавать множество тасков на основе списка параметров.

Пример с Dynamic Task Mapping для загрузки множества файлов:

from airflow import DAG
from airflow.decorators import task
from datetime import datetime

with DAG('load_files_dag', start_date=datetime(2024, 1, 1), schedule_interval=None) as dag:

    @task
    def load_file(file_name):
        print(f"Загружаем файл {file_name}")
        # Логика загрузки файла

    files = ['file1.csv', 'file2.csv', 'file3.csv']

    load_file.expand(file_name=files)

Здесь load_file.expand() создаст отдельный таск для каждого файла, которые будут выполняться параллельно, если позволяет конфигурация Airflow и ресурсы.

Если версия Airflow ниже 2.3, можно создавать таски в цикле:

for file_name in files:
    load_file_task = PythonOperator(
        task_id=f'load_{file_name}',
        python_callable=load_file_function,
        op_kwargs={'file_name': file_name},
        dag=dag
    )

Но Dynamic Task Mapping — более современный и удобный способ.