Sensors
Sensor templates
Adventure starts here
- Overview
- Quickstart
- Design
- Abstractions
Concepts
- Pipelines
- Blocks
- Overview
- Types
- Dynamic blocks
- Resources
- Data loader
- Transformer
- Data exporter
- Sensors
- Callbacks
- Conditionals
- Data integrations
- Triggers
- Pipeline runs
- Backfills
Project
- Setup
- Version control
- Docker
- Variables
- Custom libraries
- Database
- Credentials
- Code execution
- Timezones
- Settings
- Version upgrades
Contributing
- Overview
- Dev environment
- Backend
- Frontend
- Documentation
Sensors
Sensor templates
Generic
from mage_ai.orchestration.run_status_checker import check_status
if 'sensor' not in globals():
from mage_ai.data_preparation.decorators import sensor
@sensor
def check_condition(**kwargs) -> bool:
"""
Template code for checking if block or pipeline run completed.
"""
return check_status(
'pipeline_uuid',
kwargs['execution_date'],
block_uuid='block_uuid', # optional if you want the sensor to wait on a specific block
hours=24, # optional if you want to check for a specific number of hours. Default is 24
)
Google BigQuery
from mage_ai.settings.repo import get_repo_path
from mage_ai.io.bigquery import BigQuery
from mage_ai.io.config import ConfigFileLoader
from os import path
if 'sensor' not in globals():
from mage_ai.data_preparation.decorators import sensor
@sensor
def query_bigquery_and_check_condition(**kwargs) -> bool:
"""
Template code for checking the results of a BigQuery query.
Specify your configuration settings in 'io_config.yaml'.
Return: True if the sensor should complete, False if it should
keep waiting
"""
config_path = path.join(get_repo_path(), 'io_config.yaml')
config_profile = 'default'
query = 'Your BigQuery query' # Specify your SQL query here
loader = BigQuery.with_config(ConfigFileLoader(config_path, config_profile))
df = loader.load(query)
# Add your checks here
if df.empty:
return False
return True
MySQL
from mage_ai.settings.repo import get_repo_path
from mage_ai.io.config import ConfigFileLoader
from mage_ai.io.mysql import MySQL
from os import path
if 'sensor' not in globals():
from mage_ai.data_preparation.decorators import sensor
@sensor
def query_mysql_and_check_condition(**kwargs) -> bool:
"""
Template code for checking the results of a MySQL query.
Specify your configuration settings in 'io_config.yaml'.
Return: True if the sensor should complete, False if it should
keep waiting
"""
config_path = path.join(get_repo_path(), 'io_config.yaml')
config_profile = 'default'
query = 'Your MySQL query' # Specify your SQL query here
with MySQL.with_config(
ConfigFileLoader(config_path, config_profile)) as loader:
df = loader.load(query)
# Add your checks here
if df.empty:
return False
return True
PostgreSQL
from mage_ai.settings.repo import get_repo_path
from mage_ai.io.config import ConfigFileLoader
from mage_ai.io.postgres import Postgres
from os import path
if 'sensor' not in globals():
from mage_ai.data_preparation.decorators import sensor
@sensor
def query_postgres_and_check_condition(**kwargs) -> bool:
"""
Template code for checking the results of a Postgres query.
Specify your configuration settings in 'io_config.yaml'.
Return: True if the sensor should complete, False if it should
keep waiting
"""
config_path = path.join(get_repo_path(), 'io_config.yaml')
config_profile = 'default'
query = 'Your Postgres query' # Specify your SQL query here
with Postgres.with_config(
ConfigFileLoader(config_path, config_profile)) as loader:
df = loader.load(query)
# Add your checks here
if df.empty:
return False
return True
Amazon Redshift
from mage_ai.settings.repo import get_repo_path
from mage_ai.io.config import ConfigFileLoader
from mage_ai.io.redshift import Redshift
from os import path
if 'sensor' not in globals():
from mage_ai.data_preparation.decorators import sensor
@sensor
def query_redshift_and_check_condition(**kwargs) -> bool:
"""
Template code for checking the results of a Redshift query.
Specify your configuration settings in 'io_config.yaml'.
Return: True if the sensor should complete, False if it should
keep waiting
"""
config_path = path.join(get_repo_path(), 'io_config.yaml')
config_profile = 'default'
query = 'Your Redshift query' # Specify your SQL query here
with Redshift.with_config(
ConfigFileLoader(config_path, config_profile)) as loader:
df = loader.load(query)
# Add your checks here
if df.empty:
return False
return True
Amazon S3
from mage_ai.settings.repo import get_repo_path
from mage_ai.io.config import ConfigFileLoader
from mage_ai.io.s3 import S3
from os import path
if 'sensor' not in globals():
from mage_ai.data_preparation.decorators import sensor
@sensor
def check_condition(**kwargs) -> bool:
"""
Template code for checking if a file or folder exists in a S3 bucket
You will also need to fill out the following AWS related fields
in `io_config.yaml`:
- AWS_ACCESS_KEY_ID
- AWS_SECRET_ACCESS_KEY
- AWS_REGION
"""
config_path = path.join(get_repo_path(), 'io_config.yaml')
config_profile = 'default'
bucket_name = 'your_bucket_name'
s3_path = 'path/to/folder/or/file'
config_file_loader = ConfigFileLoader(config_path, config_profile)
return S3.with_config(config_file_loader).exists(
bucket_name, s3_path
)
Google Cloud Storage
from mage_ai.settings.repo import get_repo_path
from mage_ai.io.config import ConfigFileLoader
from mage_ai.io.google_cloud_storage import GoogleCloudStorage
from os import path
if 'sensor' not in globals():
from mage_ai.data_preparation.decorators import sensor
@sensor
def check_condition(*args, **kwargs) -> bool:
"""
Template code for checking if a file or folder exists in a Google Cloud Storage bucket.
Specify your configuration settings in 'io_config.yaml'.
Docs: https://docs.mage.ai/design/data-loading#googlecloudstorage
"""
config_path = path.join(get_repo_path(), 'io_config.yaml')
config_profile = 'default'
bucket_name = 'your_bucket_name'
object_key = 'your_object_key'
return GoogleCloudStorage.with_config(ConfigFileLoader(config_path, config_profile)).exists(
bucket_name,
object_key,
)
Snowflake
from mage_ai.settings.repo import get_repo_path
from mage_ai.io.config import ConfigFileLoader
from mage_ai.io.snowflake import Snowflake
from os import path
if 'sensor' not in globals():
from mage_ai.data_preparation.decorators import sensor
@sensor
def query_snowflake_and_check_condition(**kwargs) -> bool:
"""
Template code for checking the results of a Snowflake query.
Specify your configuration settings in 'io_config.yaml'.
Return: True if the sensor should complete, False if it should
keep waiting
"""
config_path = path.join(get_repo_path(), 'io_config.yaml')
config_profile = 'default'
query = 'Your Snowflake query' # Specify your SQL query here
with Snowflake.with_config(
ConfigFileLoader(config_path, config_profile)) as loader:
df = loader.load(query)
# Add your checks here
if df.empty:
return False
return True