Airflow
Run Mage pipelines in Airflow
Airflow
Run Mage pipelines in Airflow
Need help integrating Mage into an existing Airflow project?
Check out this tutorial or get instant help from us in Slack.
We support running the pipeline in Airflow DAGs.
- You need to firstly install
mage_ai
library by addingmage_ai
to your requirements.txt file. - Then you need to download the mage pipeline code into your Airflow directory. You can achieve it by using a git submodule in your Airflow directory.
- In your Mage project’s metadata.yaml file, please specify a variables_dir that you want to store the output of each block execution (You need to have write permission to the variables_dir). Example:
variables_dir: /tmp
.
We provide multiple ways to run mage pipelines in Airflow.
- Create DAGs for all pipelines in a Mage project
- Run one pipeline in a BashOperator
- Run one pipeline in a PythonOperator
- Run one pipeine as an Airflow DAG
Create DAGs for all the pipelines in Mage project
from airflow import DAG
from airflow.operators.python_operator import PythonOperator
from datetime import datetime
from mage_ai.orchestration.airflow import create_dags
import os
ABSOLUTE_PATH = os.path.abspath(os.path.dirname(__file__))
project_path = os.path.join(ABSOLUTE_PATH, 'project_path')
create_dags(
project_path,
DAG,
PythonOperator,
blacklist_pipelines=[], # Blacklisted pipeline uuids
dag_settings=dict(
start_date=datetime(2022, 8, 2),
),
globals_dict=globals(),
)
Run pipeline in a BashOperator
Example code:
from airflow import DAG
from airflow.operators.bash_operator import BashOperator
from datetime import datetime
import os
ABSOLUTE_PATH = os.path.abspath(os.path.dirname(__file__))
project_path = os.path.join(ABSOLUTE_PATH, 'project_path')
pipeline_name = 'pipline_name'
dag = DAG(
'test_mage_pipeline',
start_date=datetime(2022, 7, 14),
description='Test running mage pipeline.',
schedule_interval='@once',
catchup=False,
)
task = BashOperator(
dag=dag,
task_id='run_pipeline',
bash_command=f'mage run {project_path} {pipeline_name}',
)
Run pipeline in a PythonOperator
Example code:
from airflow import DAG
from airflow.operators.python import PythonOperator
from datetime import datetime
import mage_ai
import os
ABSOLUTE_PATH = os.path.abspath(os.path.dirname(__file__))
project_path = os.path.join(ABSOLUTE_PATH, 'project_path')
pipeline_name = 'pipline_name'
dag = DAG(
'test_mage_pipeline',
start_date=datetime(2022, 7, 14),
description='Test running mage pipeline.',
schedule_interval='@once',
catchup=False,
)
def build_execute_pipeline(project_path, pipeline_name):
def _callable(ds, **kwargs):
mage_ai.run(pipeline_name, project_path)
return _callable
task = PythonOperator(
dag=dag,
task_id='run_pipeline',
python_callable=build_execute_pipeline(project_path, pipeline_name),
)
Run pipeline as an Airflow DAG
Example code:
from airflow import DAG
from airflow.operators.python import PythonOperator
from datetime import datetime
from mage_ai.data_preparation.models.pipeline import Pipeline
from mage_ai.shared.hash import ignore_keys
import mage_ai
import os
ABSOLUTE_PATH = os.path.abspath(os.path.dirname(__file__))
project_path = os.path.join(ABSOLUTE_PATH, 'project_path')
pipeline_name = 'pipline_name'
dag = DAG(
'test_mage_pipeline',
start_date=datetime(2022, 7, 14),
description='Test running mage pipeline.',
schedule_interval='@once',
catchup=False,
)
pipeline = Pipeline(pipeline_name, repo_path=project_path)
tasks = []
def build_execute_block(block):
def _callable(ds, **kwargs):
mage_ai.run(
pipeline_name,
project_path=project_path,
block_uuid=block.uuid,
analyze_outputs=False,
update_status=False,
)
return _callable
for uuid, b in pipeline.blocks_by_uuid.items():
if b.type == 'scratchpad':
continue
tasks.append(dict(
task_id=uuid,
upstream_task_ids=b.upstream_block_uuids,
python_callable=build_execute_block(b),
))
def initialize_tasks(dag, tasks):
operators = {}
for task_dict in tasks:
task_operator = PythonOperator(dag=dag, **ignore_keys(task_dict, [
'upstream_task_ids',
]))
operators[task_operator.task_id] = task_operator
for task_dict in tasks:
for task_id in task_dict.get('upstream_task_ids', []):
dag.set_dependency(task_id, task_dict['task_id'])
return operators
initialize_tasks(dag, tasks)