Sobes.tech
Back to questions
Middle
1

Как передавать большие объёмы данных (миллионы строк) между тасками в Airflow?

Companies where asked
Бюро 1440Бюро 1440

Answer from AI

sobes.tech AI

В Airflow не рекомендуется напрямую передавать большие объёмы данных между тасками через XCom, так как это может привести к проблемам с производительностью и хранением. Вместо этого лучше использовать внешние хранилища:

  • Сохранять данные в базу данных, например, PostgreSQL или ClickHouse.
  • Использовать распределённые файловые системы или облачные хранилища (S3, GCS).
  • Передавать между тасками только ссылки или ключи к этим данным.

Пример:

from airflow import DAG
from airflow.operators.python import PythonOperator
from datetime import datetime

def extract(**kwargs):
    # Сохраняем данные в файл или БД
    data_path = '/tmp/large_data.csv'
    # ... код сохранения ...
    kwargs['ti'].xcom_push(key='data_path', value=data_path)

def process(**kwargs):
    ti = kwargs['ti']
    data_path = ti.xcom_pull(key='data_path', task_ids='extract')
    # Загружаем и обрабатываем данные из data_path

with DAG('example_dag', start_date=datetime(2023,1,1), schedule_interval='@daily') as dag:
    t1 = PythonOperator(task_id='extract', python_callable=extract)
    t2 = PythonOperator(task_id='process', python_callable=process)
    t1 >> t2

Такой подход позволяет эффективно работать с большими объёмами данных, не перегружая Airflow.