In your pipeline, you can add sensors that run continuously and only complete when an external pipeline or external block is successfully ran for a particular partition.

Then, you can add a block in your pipeline that depends on that sensor.

If there is a block with a sensor as an upstream dependency, that block won’t start running until the sensor has evaluated its condition successfully.


Setup

  1. Add a sensor to your pipeline
  2. Configuring a sensor

Example

You can add sensors to a pipeline the same way you add other types of blocks.

from mage_ai.orchestration.run_status_checker import check_status


@sensor
def check_condition(*args, **kwargs) -> bool:
    return check_status(
        'pipeline_uuid',
        kwargs['execution_date'],
        block_uuid='block_uuid',
        hours=24,
    )

In the above code, change pipeline_uuid to the actual pipeline UUID you want this sensor to keep checking whether or not it’s finished running successfully.

The block_uuid positional argument is optional. If you add a value for the block_uuid positional argument, then the sensor will check the status of the block in that pipeline. Only when that specific block is finished running successfully, then the sensor will be complete.

The ‘hours’ positional argument is optional. If you add a value for the ‘hours’ positional argument, then the sensor will check the status of the pipeline for that many hours in past from execution_date. The default value is 24 hours.

Sensors can also have upstream block dependencies. The output of the upstream blocks will be passed into the sensor in the args parameter like in other Mage block types.

Currently, the sensor makes a check every 60 seconds.

Templates

When adding a sensor block, here are the available templates to choose from:

  1. Google BigQuery
  2. MySQL
  3. PostgreSQL
  4. Amazon Redshift
  5. Amazon S3
  6. Snowflake

Below are the code examples in those templates.

Google BigQuery

Check the results of a BigQuery query.

from mage_ai.settings.repo import get_repo_path
from mage_ai.io.bigquery import BigQuery
from mage_ai.io.config import ConfigFileLoader
from os import path


@sensor
def query_bigquery_and_check_condition(*args, **kwargs) -> bool:
    config_path = path.join(get_repo_path(), 'io_config.yaml')
    config_profile = 'default'

    query = 'Your BigQuery query'  # Specify your SQL query here

    loader = BigQuery.with_config(ConfigFileLoader(config_path, config_profile))
    df = loader.load(query)

    # Add your checks here
    if df.empty:
        return False

    return True

MySQL

Check the results of a MySQL query.

from mage_ai.settings.repo import get_repo_path
from mage_ai.io.config import ConfigFileLoader
from mage_ai.io.mysql import MySQL
from os import path


@sensor
def query_mysql_and_check_condition(*args, **kwargs) -> bool:
    config_path = path.join(get_repo_path(), 'io_config.yaml')
    config_profile = 'default'

    query = 'Your MySQL query'  # Specify your SQL query here

    with MySQL.with_config(
            ConfigFileLoader(config_path, config_profile)) as loader:
        df = loader.load(query)

        # Add your checks here
        if df.empty:
            return False

    return True

PostgreSQL

Check the results of a PostgreSQL query.

from mage_ai.settings.repo import get_repo_path
from mage_ai.io.config import ConfigFileLoader
from mage_ai.io.postgres import Postgres
from os import path


@sensor
def query_postgres_and_check_condition(*args, **kwargs) -> bool:
    config_path = path.join(get_repo_path(), 'io_config.yaml')
    config_profile = 'default'

    query = 'Your Postgres query'  # Specify your SQL query here

    with Postgres.with_config(
            ConfigFileLoader(config_path, config_profile)) as loader:
        df = loader.load(query)

        # Add your checks here
        if df.empty:
            return False

    return True

Amazon Redshift

Check the results of a Redshift query.

from mage_ai.settings.repo import get_repo_path
from mage_ai.io.config import ConfigFileLoader
from mage_ai.io.redshift import Redshift
from os import path


@sensor
def query_redshift_and_check_condition(*args, **kwargs) -> bool:
    config_path = path.join(get_repo_path(), 'io_config.yaml')
    config_profile = 'default'

    query = 'Your Redshift query'  # Specify your SQL query here

    with Redshift.with_config(
            ConfigFileLoader(config_path, config_profile)) as loader:
        df = loader.load(query)

        # Add your checks here
        if df.empty:
            return False

    return True

Amazon S3

Check if a file or folder exists in a S3 bucket.

from mage_ai.settings.repo import get_repo_path
from mage_ai.io.config import ConfigFileLoader
from mage_ai.io.s3 import S3
from os import path


@sensor
def check_condition(*args, **kwargs) -> bool:
    config_path = path.join(get_repo_path(), 'io_config.yaml')
    config_profile = 'default'

    bucket_name = 'your_bucket_name'
    s3_path = 'path/to/folder/or/file'

    config_file_loader = ConfigFileLoader(config_path, config_profile)
    return S3.with_config(config_file_loader).exists(
        bucket_name, s3_path
    )

Snowflake

Check the results of a Snowflake query.

from mage_ai.settings.repo import get_repo_path
from mage_ai.io.config import ConfigFileLoader
from mage_ai.io.snowflake import Snowflake
from os import path


@sensor
def query_snowflake_and_check_condition(*args, **kwargs) -> bool:
    config_path = path.join(get_repo_path(), 'io_config.yaml')
    config_profile = 'default'

    query = 'Your Snowflake query'  # Specify your SQL query here

    with Snowflake.with_config(
            ConfigFileLoader(config_path, config_profile)) as loader:
        df = loader.load(query)

        # Add your checks here
        if df.empty:
            return False

    return True

Was this page helpful?