from airflow import DAG
from airflow.operators.python import PythonOperator
from datetime import datetime
from mage_ai.data_preparation.models.pipeline import Pipeline
from mage_ai.shared.hash import ignore_keys
import mage_ai
import os
ABSOLUTE_PATH = os.path.abspath(os.path.dirname(__file__))
project_path = os.path.join(ABSOLUTE_PATH, 'project_path')
pipeline_name = 'pipline_name'
dag = DAG(
    'test_mage_pipeline',
    start_date=datetime(2022, 7, 14),
    description='Test running mage pipeline.',
    schedule_interval='@once',
    catchup=False,
)
pipeline = Pipeline(pipeline_name, repo_path=project_path)
tasks = []
def build_execute_block(block):
    def _callable(ds, **kwargs):
        mage_ai.run(
            pipeline_name,
            project_path=project_path,
            block_uuid=block.uuid,
            analyze_outputs=False,
            update_status=False,
        )
    return _callable
for uuid, b in pipeline.blocks_by_uuid.items():
    if b.type == 'scratchpad':
        continue
    tasks.append(dict(
        task_id=uuid,
        upstream_task_ids=b.upstream_block_uuids,
        python_callable=build_execute_block(b),
    ))
def initialize_tasks(dag, tasks):
    operators = {}
    for task_dict in tasks:
        task_operator = PythonOperator(dag=dag, **ignore_keys(task_dict, [
            'upstream_task_ids',
        ]))
        operators[task_operator.task_id] = task_operator
    for task_dict in tasks:
        for task_id in task_dict.get('upstream_task_ids', []):
            dag.set_dependency(task_id, task_dict['task_id'])
    return operators
initialize_tasks(dag, tasks)