Adventure starts here
- Overview
- Quickstart
- Design
- Abstractions
Concepts
- Pipelines
- Blocks
- Overview
- Types
- Dynamic blocks
- Resources
- Data integrations
- Triggers
- Pipeline runs
- Backfills
Project
- Setup
- Version control
- Docker
- Variables
- Custom libraries
- Database
- Credentials
- IO Config Setup
- Code execution
- Timezones
- Settings
- Version upgrades
Contributing
- Overview
- Dev environment
- Backend
- Frontend
- Documentation
Data loader
Data loader templates
Generic
Copy
if 'data_loader' not in globals():
from mage_ai.data_preparation.decorators import data_loader
if 'test' not in globals():
from mage_ai.data_preparation.decorators import test
@data_loader
def load_data(*args, **kwargs):
"""
Template code for loading data from any source.
Returns:
Anything (e.g. data frame, dictionary, array, int, str, etc.)
"""
# Specify your data loading logic here
return {}
@test
def test_output(output, *args) -> None:
"""
Template code for testing the output of the block.
"""
assert output is not None, 'The output is undefined'
Local file
Copy
from mage_ai.io.file import FileIO
if 'data_loader' not in globals():
from mage_ai.data_preparation.decorators import data_loader
if 'test' not in globals():
from mage_ai.data_preparation.decorators import test
@data_loader
def load_data_from_file(*args, **kwargs):
"""
Template for loading data from filesystem.
Load data from 1 file or multiple file directories.
For multiple directories, use the following:
FileIO().load(file_directories=['dir_1', 'dir_2'])
Docs: /design/data-loading#fileio
"""
filepath = 'path/to/your/file.csv'
return FileIO().load(filepath)
@test
def test_output(output, *args) -> None:
"""
Template code for testing the output of the block.
"""
assert output is not None, 'The output is undefined'
API
Copy
import io
import pandas as pd
import requests
if 'data_loader' not in globals():
from mage_ai.data_preparation.decorators import data_loader
if 'test' not in globals():
from mage_ai.data_preparation.decorators import test
@data_loader
def load_data_from_api(*args, **kwargs):
"""
Template for loading data from API
"""
url = ''
response = requests.get(url)
return pd.read_csv(io.StringIO(response.text), sep=',')
@test
def test_output(output, *args) -> None:
"""
Template code for testing the output of the block.
"""
assert output is not None, 'The output is undefined'
Azure Blob Storage
Copy
from mage_ai.settings.repo import get_repo_path
from mage_ai.io.config import ConfigFileLoader
from mage_ai.io.azure_blob_storage import AzureBlobStorage
from os import path
if 'data_loader' not in globals():
from mage_ai.data_preparation.decorators import data_loader
if 'test' not in globals():
from mage_ai.data_preparation.decorators import test
@data_loader
def load_from_azure_blob_storage(*args, **kwargs):
"""
Template for loading data from a Azure Blob Storage.
Specify your configuration settings in 'io_config.yaml'.
Docs: /design/data-loading#azureblobstorage
"""
config_path = path.join(get_repo_path(), 'io_config.yaml')
config_profile = 'default'
container_name = 'your_container_name'
blob_path = 'your_blob_path'
return AzureBlobStorage.with_config(ConfigFileLoader(config_path, config_profile)).load(
container_name,
blob_path,
)
@test
def test_output(output, *args) -> None:
"""
Template code for testing the output of the block.
"""
assert output is not None, 'The output is undefined'
Google BigQuery
Copy
from mage_ai.settings.repo import get_repo_path
from mage_ai.io.bigquery import BigQuery
from mage_ai.io.config import ConfigFileLoader
from os import path
if 'data_loader' not in globals():
from mage_ai.data_preparation.decorators import data_loader
if 'test' not in globals():
from mage_ai.data_preparation.decorators import test
@data_loader
def load_data_from_big_query(*args, **kwargs):
"""
Template for loading data from a BigQuery warehouse.
Specify your configuration settings in 'io_config.yaml'.
Docs: /design/data-loading#bigquery
"""
query = 'your_gbq_query'
config_path = path.join(get_repo_path(), 'io_config.yaml')
config_profile = 'default'
return BigQuery.with_config(ConfigFileLoader(config_path, config_profile)).load(query)
@test
def test_output(output, *args) -> None:
"""
Template code for testing the output of the block.
"""
assert output is not None, 'The output is undefined'
Google Cloud Storage
Copy
from mage_ai.settings.repo import get_repo_path
from mage_ai.io.config import ConfigFileLoader
from mage_ai.io.google_cloud_storage import GoogleCloudStorage
from os import path
if 'data_loader' not in globals():
from mage_ai.data_preparation.decorators import data_loader
if 'test' not in globals():
from mage_ai.data_preparation.decorators import test
@data_loader
def load_from_google_cloud_storage(*args, **kwargs):
"""
Template for loading data from a Google Cloud Storage bucket.
Specify your configuration settings in 'io_config.yaml'.
Docs: /design/data-loading#googlecloudstorage
"""
config_path = path.join(get_repo_path(), 'io_config.yaml')
config_profile = 'default'
bucket_name = 'your_bucket_name'
object_key = 'your_object_key'
return GoogleCloudStorage.with_config(ConfigFileLoader(config_path, config_profile)).load(
bucket_name,
object_key,
)
@test
def test_output(output, *args) -> None:
"""
Template code for testing the output of the block.
"""
assert output is not None, 'The output is undefined'
MySQL
Copy
from mage_ai.settings.repo import get_repo_path
from mage_ai.io.config import ConfigFileLoader
from mage_ai.io.mysql import MySQL
from os import path
if 'data_loader' not in globals():
from mage_ai.data_preparation.decorators import data_loader
if 'test' not in globals():
from mage_ai.data_preparation.decorators import test
@data_loader
def load_data_from_mysql(*args, **kwargs):
"""
Template for loading data from a MySQL database.
Specify your configuration settings in 'io_config.yaml'.
Docs: /design/data-loading#mysql
"""
query = 'Your MySQL query' # Specify your SQL query here
config_path = path.join(get_repo_path(), 'io_config.yaml')
config_profile = 'default'
with MySQL.with_config(ConfigFileLoader(config_path, config_profile)) as loader:
return loader.load(query)
@test
def test_output(output, *args) -> None:
"""
Template code for testing the output of the block.
"""
assert output is not None, 'The output is undefined'
PostgreSQL
Copy
from mage_ai.settings.repo import get_repo_path
from mage_ai.io.config import ConfigFileLoader
from mage_ai.io.postgres import Postgres
from os import path
if 'data_loader' not in globals():
from mage_ai.data_preparation.decorators import data_loader
if 'test' not in globals():
from mage_ai.data_preparation.decorators import test
@data_loader
def load_data_from_postgres(*args, **kwargs):
"""
Template for loading data from a PostgreSQL database.
Specify your configuration settings in 'io_config.yaml'.
Docs: /design/data-loading#postgresql
"""
query = 'your PostgreSQL query' # Specify your SQL query here
config_path = path.join(get_repo_path(), 'io_config.yaml')
config_profile = 'default'
with Postgres.with_config(ConfigFileLoader(config_path, config_profile)) as loader:
return loader.load(query)
@test
def test_output(output, *args) -> None:
"""
Template code for testing the output of the block.
"""
assert output is not None, 'The output is undefined'
Amazon Redshift
Copy
from mage_ai.settings.repo import get_repo_path
from mage_ai.io.config import ConfigFileLoader
from mage_ai.io.redshift import Redshift
from os import path
if 'data_loader' not in globals():
from mage_ai.data_preparation.decorators import data_loader
if 'test' not in globals():
from mage_ai.data_preparation.decorators import test
@data_loader
def load_data_from_redshift(*args, **kwargs):
"""
Template for loading data from a Redshift cluster.
Specify your configuration settings in 'io_config.yaml'.
Docs: /design/data-loading#redshift
"""
query = 'your_redshift_selection_query'
config_path = path.join(get_repo_path(), 'io_config.yaml')
config_profile = 'default'
with Redshift.with_config(ConfigFileLoader(config_path, config_profile)) as loader:
return loader.load(query)
@test
def test_output(output, *args) -> None:
"""
Template code for testing the output of the block.
"""
assert output is not None, 'The output is undefined'
Amazon S3
Copy
from mage_ai.settings.repo import get_repo_path
from mage_ai.io.config import ConfigFileLoader
from mage_ai.io.s3 import S3
from os import path
if 'data_loader' not in globals():
from mage_ai.data_preparation.decorators import data_loader
if 'test' not in globals():
from mage_ai.data_preparation.decorators import test
@data_loader
def load_from_s3_bucket(*args, **kwargs):
"""
Template for loading data from a S3 bucket.
Specify your configuration settings in 'io_config.yaml'.
Docs: /design/data-loading#s3
"""
config_path = path.join(get_repo_path(), 'io_config.yaml')
config_profile = 'default'
bucket_name = 'your_bucket_name'
object_key = 'your_object_key'
return S3.with_config(ConfigFileLoader(config_path, config_profile)).load(
bucket_name,
object_key,
)
@test
def test_output(output, *args) -> None:
"""
Template code for testing the output of the block.
"""
assert output is not None, 'The output is undefined'
Snowflake
Copy
from mage_ai.settings.repo import get_repo_path
from mage_ai.io.config import ConfigFileLoader
from mage_ai.io.snowflake import Snowflake
from os import path
if 'data_loader' not in globals():
from mage_ai.data_preparation.decorators import data_loader
if 'test' not in globals():
from mage_ai.data_preparation.decorators import test
@data_loader
def load_data_from_snowflake(*args, **kwargs):
"""
Template for loading data from a Snowflake warehouse.
Specify your configuration settings in 'io_config.yaml'.
Docs: /design/data-loading#example-loading-data-from-snowflake-warehouse
"""
query = 'your_snowflake_query'
config_path = path.join(get_repo_path(), 'io_config.yaml')
config_profile = 'default'
with Snowflake.with_config(ConfigFileLoader(config_path, config_profile)) as loader:
return loader.load(query)
@test
def test_output(output, *args) -> None:
"""
Template code for testing the output of the block.
"""
assert output is not None, 'The output is undefined'
Delta Lake
Amazon S3
Copy
from deltalake import DeltaTable
if 'data_loader' not in globals():
from mage_ai.data_preparation.decorators import data_loader
if 'test' not in globals():
from mage_ai.data_preparation.decorators import test
@data_loader
def load_table(*args, **kwargs):
"""
Load a Delta Table
Docs: https://delta-io.github.io/delta-rs/python/usage.html#loading-a-delta-table
"""
storage_options = {
'AWS_ACCESS_KEY_ID': '',
'AWS_SECRET_ACCESS_KEY': '',
'AWS_REGION': '',
'AWS_S3_ALLOW_UNSAFE_RENAME': 'true',
}
uri = 's3://[bucket]/[key]'
dt = DeltaTable(uri, storage_options=storage_options)
return dt.to_pandas()
@test
def test_output(output, *args) -> None:
"""
Template code for testing the output of the block.
"""
assert output is not None, 'The output is undefined'
Azure Blob Storage
Copy
from deltalake import DeltaTable
if 'data_loader' not in globals():
from mage_ai.data_preparation.decorators import data_loader
if 'test' not in globals():
from mage_ai.data_preparation.decorators import test
@data_loader
def load_table(*args, **kwargs):
"""
Load a Delta Table
Docs: https://delta-io.github.io/delta-rs/python/usage.html#loading-a-delta-table
"""
storage_options = {
'AZURE_STORAGE_ACCOUNT_NAME': '',
'AZURE_STORAGE_ACCOUNT_KEY': '',
'AZURE_STORAGE_ACCESS_KEY': '',
'AZURE_STORAGE_MASTER_KEY': '',
'AZURE_STORAGE_CLIENT_ID': '',
'AZURE_STORAGE_CLIENT_SECRET': '',
'AZURE_STORAGE_TENANT_ID': '',
'AZURE_STORAGE_SAS_KEY': '',
'AZURE_STORAGE_TOKEN': '',
'AZURE_STORAGE_USE_EMULATOR': '',
}
uri = 'az://[container]/[key]'
dt = DeltaTable(uri, storage_options=storage_options)
return dt.to_pandas()
@test
def test_output(output, *args) -> None:
"""
Template code for testing the output of the block.
"""
assert output is not None, 'The output is undefined'
Google Cloud Storage
Copy
from deltalake import DeltaTable
if 'data_loader' not in globals():
from mage_ai.data_preparation.decorators import data_loader
if 'test' not in globals():
from mage_ai.data_preparation.decorators import test
@data_loader
def load_table(*args, **kwargs):
"""
Load a Delta Table
Docs: https://delta-io.github.io/delta-rs/python/usage.html#loading-a-delta-table
"""
storage_options = {
'GOOGLE_SERVICE_ACCOUNT': '',
'GOOGLE_SERVICE_ACCOUNT_PATH': '',
'GOOGLE_SERVICE_ACCOUNT_KEY': '',
'GOOGLE_BUCKET': '',
}
uri = 'gs://[bucket]/[key]'
dt = DeltaTable(uri, storage_options=storage_options)
return dt.to_pandas()
@test
def test_output(output, *args) -> None:
"""
Template code for testing the output of the block.
"""
assert output is not None, 'The output is undefined'
Druid
Copy
from mage_ai.settings.repo import get_repo_path
from mage_ai.io.config import ConfigFileLoader
from mage_ai.io.druid import Druid
from os import path
if 'data_loader' not in globals():
from mage_ai.data_preparation.decorators import data_loader
if 'test' not in globals():
from mage_ai.data_preparation.decorators import test
@data_loader
def load_data_from_druid(*args, **kwargs):
"""
Template for loading data from a Druid warehouse.
Specify your configuration settings in 'io_config.yaml'.
Docs: https://docs.mage.ai/design/data-loading#druid
"""
query = 'your Druid query' # Specify your SQL query here
config_path = path.join(get_repo_path(), 'io_config.yaml')
config_profile = 'default'
with Druid.with_config(ConfigFileLoader(config_path, config_profile)) as loader:
return loader.load(query)
@test
def test_output(output, *args) -> None:
"""
Template code for testing the output of the block.
"""
assert output is not None, 'The output is undefined'
Pinot
Copy
from mage_ai.data_preparation.repo_manager import get_repo_path
from mage_ai.io.config import ConfigFileLoader
from mage_ai.io.pinot import Pinot
from os import path
if 'data_loader' not in globals():
from mage_ai.data_preparation.decorators import data_loader
if 'test' not in globals():
from mage_ai.data_preparation.decorators import test
@data_loader
def load_data_from_pinot(*args, **kwargs):
"""
Template for loading data from a Pinot warehouse.
Specify your configuration settings in 'io_config.yaml'.
Docs: https://docs.mage.ai/design/data-loading#pinot
"""
query = 'your Pinot query' # Specify your SQL query here
config_path = path.join(get_repo_path(), 'io_config.yaml')
config_profile = 'default'
with Pinot.with_config(ConfigFileLoader(config_path, config_profile)) as loader:
return loader.load(query)
@test
def test_output(output, *args) -> None:
"""
Template code for testing the output of the block.
"""
assert output is not None, 'The output is undefined'
Orchestration
Trigger pipeline
Copy
from mage_ai.orchestration.triggers.api import trigger_pipeline
if 'data_loader' not in globals():
from mage_ai.data_preparation.decorators import data_loader
@data_loader
def trigger(*args, **kwargs):
"""
Trigger another pipeline to run.
Documentation: /orchestration/triggers/trigger-pipeline
"""
trigger_pipeline(
'pipeline_uuid', # Required: enter the UUID of the pipeline to trigger
variables={}, # Optional: runtime variables for the pipeline
check_status=False, # Optional: poll and check the status of the triggered pipeline
error_on_failure=False, # Optional: if triggered pipeline fails, raise an exception
poll_interval=60, # Optional: check the status of triggered pipeline every N seconds
poll_timeout=None, # Optional: raise an exception after N seconds
verbose=True, # Optional: print status of triggered pipeline run
)
Was this page helpful?
Assistant
Responses are generated using AI and may contain mistakes.