In your pipeline, you can add sensors that run continuously and only complete
when an external pipeline or external block is successfully ran for a particular
partition.
Then, you can add a block in your pipeline that depends on that sensor.
If there is a block with a sensor as an upstream dependency, that block won’t start
running until the sensor has evaluated its condition successfully.
Setup
- Add a sensor to your pipeline
- Configuring a sensor
Example
You can add sensors to a pipeline the same way you add other types of blocks.
from mage_ai.orchestration.run_status_checker import check_status
@sensor
def check_condition(*args, **kwargs) -> bool:
return check_status(
'pipeline_uuid',
kwargs['execution_date'],
block_uuid='block_uuid',
hours=24,
)
In the above code, change pipeline_uuid
to the actual pipeline UUID you want this sensor to keep
checking whether or not it’s finished running successfully.
The block_uuid
positional argument is optional. If you add a value for the block_uuid
positional argument, then the sensor will check the status of the block in that pipeline.
Only when that specific block is finished running successfully, then the sensor will be complete.
The ‘hours’ positional argument is optional. If you add a value for the ‘hours’ positional argument,
then the sensor will check the status of the pipeline for that many hours in past from execution_date.
The default value is 24 hours.
Sensors can also have upstream block dependencies. The output of the upstream blocks will be passed
into the sensor in the args
parameter like in other Mage block types.
Currently, the sensor makes a check every 60 seconds.
Templates
When adding a sensor block, here are the available templates to choose from:
- Google BigQuery
- MySQL
- PostgreSQL
- Amazon Redshift
- Amazon S3
- Snowflake
Below are the code examples in those templates.
Google BigQuery
Check the results of a BigQuery query.
from mage_ai.settings.repo import get_repo_path
from mage_ai.io.bigquery import BigQuery
from mage_ai.io.config import ConfigFileLoader
from os import path
@sensor
def query_bigquery_and_check_condition(*args, **kwargs) -> bool:
config_path = path.join(get_repo_path(), 'io_config.yaml')
config_profile = 'default'
query = 'Your BigQuery query'
loader = BigQuery.with_config(ConfigFileLoader(config_path, config_profile))
df = loader.load(query)
if df.empty:
return False
return True
MySQL
Check the results of a MySQL query.
from mage_ai.settings.repo import get_repo_path
from mage_ai.io.config import ConfigFileLoader
from mage_ai.io.mysql import MySQL
from os import path
@sensor
def query_mysql_and_check_condition(*args, **kwargs) -> bool:
config_path = path.join(get_repo_path(), 'io_config.yaml')
config_profile = 'default'
query = 'Your MySQL query'
with MySQL.with_config(
ConfigFileLoader(config_path, config_profile)) as loader:
df = loader.load(query)
if df.empty:
return False
return True
PostgreSQL
Check the results of a PostgreSQL query.
from mage_ai.settings.repo import get_repo_path
from mage_ai.io.config import ConfigFileLoader
from mage_ai.io.postgres import Postgres
from os import path
@sensor
def query_postgres_and_check_condition(*args, **kwargs) -> bool:
config_path = path.join(get_repo_path(), 'io_config.yaml')
config_profile = 'default'
query = 'Your Postgres query'
with Postgres.with_config(
ConfigFileLoader(config_path, config_profile)) as loader:
df = loader.load(query)
if df.empty:
return False
return True
Amazon Redshift
Check the results of a Redshift query.
from mage_ai.settings.repo import get_repo_path
from mage_ai.io.config import ConfigFileLoader
from mage_ai.io.redshift import Redshift
from os import path
@sensor
def query_redshift_and_check_condition(*args, **kwargs) -> bool:
config_path = path.join(get_repo_path(), 'io_config.yaml')
config_profile = 'default'
query = 'Your Redshift query'
with Redshift.with_config(
ConfigFileLoader(config_path, config_profile)) as loader:
df = loader.load(query)
if df.empty:
return False
return True
Amazon S3
Check if a file or folder exists in a S3 bucket.
from mage_ai.settings.repo import get_repo_path
from mage_ai.io.config import ConfigFileLoader
from mage_ai.io.s3 import S3
from os import path
@sensor
def check_condition(*args, **kwargs) -> bool:
config_path = path.join(get_repo_path(), 'io_config.yaml')
config_profile = 'default'
bucket_name = 'your_bucket_name'
s3_path = 'path/to/folder/or/file'
config_file_loader = ConfigFileLoader(config_path, config_profile)
return S3.with_config(config_file_loader).exists(
bucket_name, s3_path
)
Snowflake
Check the results of a Snowflake query.
from mage_ai.settings.repo import get_repo_path
from mage_ai.io.config import ConfigFileLoader
from mage_ai.io.snowflake import Snowflake
from os import path
@sensor
def query_snowflake_and_check_condition(*args, **kwargs) -> bool:
config_path = path.join(get_repo_path(), 'io_config.yaml')
config_profile = 'default'
query = 'Your Snowflake query'
with Snowflake.with_config(
ConfigFileLoader(config_path, config_profile)) as loader:
df = loader.load(query)
if df.empty:
return False
return True