Назад к вопросам
Middle
5
Как в Airflow создать много одинаковых тасков параллельно (например, загрузить много одинаковых файлов)?
Компании, где спрашивали
FOM GROUP
Ответ от нейросети
sobes.tech AI
В Apache Airflow для создания множества одинаковых тасков параллельно обычно используют динамическое создание тасков в цикле или оператор TaskGroup для группировки. Но более современный и удобный способ — использовать TaskFlow API с параметризацией, либо Dynamic Task Mapping (начиная с Airflow 2.3), который позволяет создавать множество тасков на основе списка параметров.
Пример с Dynamic Task Mapping для загрузки множества файлов:
from airflow import DAG
from airflow.decorators import task
from datetime import datetime
with DAG('load_files_dag', start_date=datetime(2024, 1, 1), schedule_interval=None) as dag:
@task
def load_file(file_name):
print(f"Загружаем файл {file_name}")
# Логика загрузки файла
files = ['file1.csv', 'file2.csv', 'file3.csv']
load_file.expand(file_name=files)
Здесь load_file.expand() создаст отдельный таск для каждого файла, которые будут выполняться параллельно, если позволяет конфигурация Airflow и ресурсы.
Если версия Airflow ниже 2.3, можно создавать таски в цикле:
for file_name in files:
load_file_task = PythonOperator(
task_id=f'load_{file_name}',
python_callable=load_file_function,
op_kwargs={'file_name': file_name},
dag=dag
)
Но Dynamic Task Mapping — более современный и удобный способ.