Назад к вопросам
Бюро 1440
Middle
2
Как передавать большие объёмы данных (миллионы строк) между тасками в Airflow?
Компании, где спрашивали
Ответ от нейросети
sobes.tech AI
В Airflow не рекомендуется напрямую передавать большие объёмы данных между тасками через XCom, так как это может привести к проблемам с производительностью и хранением. Вместо этого лучше использовать внешние хранилища:
- Сохранять данные в базу данных, например, PostgreSQL или ClickHouse.
- Использовать распределённые файловые системы или облачные хранилища (S3, GCS).
- Передавать между тасками только ссылки или ключи к этим данным.
Пример:
from airflow import DAG
from airflow.operators.python import PythonOperator
from datetime import datetime
def extract(**kwargs):
# Сохраняем данные в файл или БД
data_path = '/tmp/large_data.csv'
# ... код сохранения ...
kwargs['ti'].xcom_push(key='data_path', value=data_path)
def process(**kwargs):
ti = kwargs['ti']
data_path = ti.xcom_pull(key='data_path', task_ids='extract')
# Загружаем и обрабатываем данные из data_path
with DAG('example_dag', start_date=datetime(2023,1,1), schedule_interval='@daily') as dag:
t1 = PythonOperator(task_id='extract', python_callable=extract)
t2 = PythonOperator(task_id='process', python_callable=process)
t1 >> t2
Такой подход позволяет эффективно работать с большими объёмами данных, не перегружая Airflow.