from airflow import DAG
from airflow.operators.python import PythonOperator
from datetime import datetime
from mage_ai.data_preparation.models.pipeline import Pipeline
from mage_ai.shared.hash import ignore_keys
import mage_ai
import os
ABSOLUTE_PATH = os.path.abspath(os.path.dirname(__file__))
project_path = os.path.join(ABSOLUTE_PATH, 'project_path')
pipeline_name = 'pipline_name'
dag = DAG(
'test_mage_pipeline',
start_date=datetime(2022, 7, 14),
description='Test running mage pipeline.',
schedule_interval='@once',
catchup=False,
)
pipeline = Pipeline(pipeline_name, repo_path=project_path)
tasks = []
def build_execute_block(block):
def _callable(ds, **kwargs):
mage_ai.run(
pipeline_name,
project_path=project_path,
block_uuid=block.uuid,
analyze_outputs=False,
update_status=False,
)
return _callable
for uuid, b in pipeline.blocks_by_uuid.items():
if b.type == 'scratchpad':
continue
tasks.append(dict(
task_id=uuid,
upstream_task_ids=b.upstream_block_uuids,
python_callable=build_execute_block(b),
))
def initialize_tasks(dag, tasks):
operators = {}
for task_dict in tasks:
task_operator = PythonOperator(dag=dag, **ignore_keys(task_dict, [
'upstream_task_ids',
]))
operators[task_operator.task_id] = task_operator
for task_dict in tasks:
for task_id in task_dict.get('upstream_task_ids', []):
dag.set_dependency(task_id, task_dict['task_id'])
return operators
initialize_tasks(dag, tasks)