We support running the pipeline in Airflow DAGs.
We provide multiple ways to run mage pipelines in Airflow.
from airflow import DAG
from airflow.operators.python_operator import PythonOperator
from datetime import datetime
from mage_ai.orchestration.airflow import create_dags
import os
ABSOLUTE_PATH = os.path.abspath(os.path.dirname(__file__))
project_path = os.path.join(ABSOLUTE_PATH, 'project_path')
create_dags(
project_path,
DAG,
PythonOperator,
blacklist_pipelines=[],
dag_settings=dict(
start_date=datetime(2022, 8, 2),
),
globals_dict=globals(),
)
from airflow import DAG
from airflow.operators.bash_operator import BashOperator
from datetime import datetime
import os
ABSOLUTE_PATH = os.path.abspath(os.path.dirname(__file__))
project_path = os.path.join(ABSOLUTE_PATH, 'project_path')
pipeline_name = 'pipline_name'
dag = DAG(
'test_mage_pipeline',
start_date=datetime(2022, 7, 14),
description='Test running mage pipeline.',
schedule_interval='@once',
catchup=False,
)
task = BashOperator(
dag=dag,
task_id='run_pipeline',
bash_command=f'mage run {project_path} {pipeline_name}',
)
from airflow import DAG
from airflow.operators.python import PythonOperator
from datetime import datetime
import mage_ai
import os
ABSOLUTE_PATH = os.path.abspath(os.path.dirname(__file__))
project_path = os.path.join(ABSOLUTE_PATH, 'project_path')
pipeline_name = 'pipline_name'
dag = DAG(
'test_mage_pipeline',
start_date=datetime(2022, 7, 14),
description='Test running mage pipeline.',
schedule_interval='@once',
catchup=False,
)
def build_execute_pipeline(project_path, pipeline_name):
def _callable(ds, **kwargs):
mage_ai.run(pipeline_name, project_path)
return _callable
task = PythonOperator(
dag=dag,
task_id='run_pipeline',
python_callable=build_execute_pipeline(project_path, pipeline_name),
)
from airflow import DAG
from airflow.operators.python import PythonOperator
from datetime import datetime
from mage_ai.data_preparation.models.pipeline import Pipeline
from mage_ai.shared.hash import ignore_keys
import mage_ai
import os
ABSOLUTE_PATH = os.path.abspath(os.path.dirname(__file__))
project_path = os.path.join(ABSOLUTE_PATH, 'project_path')
pipeline_name = 'pipline_name'
dag = DAG(
'test_mage_pipeline',
start_date=datetime(2022, 7, 14),
description='Test running mage pipeline.',
schedule_interval='@once',
catchup=False,
)
pipeline = Pipeline(pipeline_name, repo_path=project_path)
tasks = []
def build_execute_block(block):
def _callable(ds, **kwargs):
mage_ai.run(
pipeline_name,
project_path=project_path,
block_uuid=block.uuid,
analyze_outputs=False,
update_status=False,
)
return _callable
for uuid, b in pipeline.blocks_by_uuid.items():
if b.type == 'scratchpad':
continue
tasks.append(dict(
task_id=uuid,
upstream_task_ids=b.upstream_block_uuids,
python_callable=build_execute_block(b),
))
def initialize_tasks(dag, tasks):
operators = {}
for task_dict in tasks:
task_operator = PythonOperator(dag=dag, **ignore_keys(task_dict, [
'upstream_task_ids',
]))
operators[task_operator.task_id] = task_operator
for task_dict in tasks:
for task_id in task_dict.get('upstream_task_ids', []):
dag.set_dependency(task_id, task_dict['task_id'])
return operators
initialize_tasks(dag, tasks)