You need to firstly install mage_ai
library by adding mage_ai
to your
requirements.txt file. Then you need to download the mage pipeline code into
your Prefect directory. You can achieve it by using a git submodule in your
Prefect directory.
We provide two ways to run mage pipelines in Airflow.
- Run pipeline in as a task or ShellTask.
- Run pipeline as a Prefect Flow.
Run pipeline as a Prefect task
from prefect import task, Flow
import mage_ai
import os
ABSOLUTE_PATH = os.path.abspath(os.path.dirname(__file__))
project_path = os.path.join(ABSOLUTE_PATH, 'project_path')
pipeline_name = 'pipeline_name'
@task
def run_pipeline():
mage_ai.run(pipeline_name, project_path)
with Flow('My Mage Flow') as flow:
run_pipeline()
flow.run()
Run pipeline in a ShellTask
from prefect import task, Flow
from prefect.tasks.shell import ShellTask
from prefect.triggers import all_finished
import mage_ai
import os
import prefect
ABSOLUTE_PATH = os.path.abspath(os.path.dirname(__file__))
project_path = os.path.join(ABSOLUTE_PATH, 'project_path')
pipeline_name = 'pipeline_name'
mage_task = ShellTask(return_all=True)
@task(trigger=all_finished)
def output_print(output):
logger = prefect.context.get("logger")
for o in output:
logger.info(o)
with Flow('My Mage Flow') as flow:
run_pipeline = mage_task(command=f'mage run {project_path} {pipeline_name}')
output = output_print(run_pipeline)
flow.run()
Run pipeline as a Prefect flow
from prefect import Task, Flow
from mage_ai.data_preparation.models.pipeline import Pipeline
import asyncio
import os
ABSOLUTE_PATH = os.path.abspath(os.path.dirname(__file__))
project_path = os.path.join(ABSOLUTE_PATH, 'prefect_repo')
pipeline_name = 'default_pipeline'
pipeline = Pipeline(pipeline_name, repo_path=project_path)
tasks = {}
class ExecuteBlock(Task):
def run(self, block):
asyncio.run(
block.execute(
analyze_outputs=False,
update_status=False,
)
)
flow = Flow('My Mage Flow')
blocks_by_uuid = pipeline.blocks_by_uuid
for uuid, b in blocks_by_uuid.items():
if b.type == 'scratchpad':
continue
task = ExecuteBlock()
tasks[uuid] = task
for uuid, task in tasks.items():
flow.set_dependencies(
task=task,
upstream_tasks=[
tasks[block_uuid] for block_uuid in blocks_by_uuid[uuid].upstream_block_uuids
],
keyword_tasks=dict(block=blocks_by_uuid[uuid])
)
flow.run()