Data loader
Data loader templates
Adventure starts here
- Overview
- Quickstart
- Design
- Abstractions
Concepts
- Pipelines
- Blocks
- Overview
- Types
- Dynamic blocks
- Resources
- Data integrations
- Triggers
- Pipeline runs
- Backfills
Project
- Setup
- Version control
- Docker
- Variables
- Custom libraries
- Database
- Credentials
- Code execution
- Timezones
- Settings
- Version upgrades
Contributing
- Overview
- Dev environment
- Backend
- Frontend
- Documentation
Data loader
Data loader templates
Generic
if 'data_loader' not in globals():
from mage_ai.data_preparation.decorators import data_loader
if 'test' not in globals():
from mage_ai.data_preparation.decorators import test
@data_loader
def load_data(*args, **kwargs):
"""
Template code for loading data from any source.
Returns:
Anything (e.g. data frame, dictionary, array, int, str, etc.)
"""
# Specify your data loading logic here
return {}
@test
def test_output(output, *args) -> None:
"""
Template code for testing the output of the block.
"""
assert output is not None, 'The output is undefined'
Local file
from mage_ai.io.file import FileIO
if 'data_loader' not in globals():
from mage_ai.data_preparation.decorators import data_loader
if 'test' not in globals():
from mage_ai.data_preparation.decorators import test
@data_loader
def load_data_from_file(*args, **kwargs):
"""
Template for loading data from filesystem.
Load data from 1 file or multiple file directories.
For multiple directories, use the following:
FileIO().load(file_directories=['dir_1', 'dir_2'])
Docs: /design/data-loading#fileio
"""
filepath = 'path/to/your/file.csv'
return FileIO().load(filepath)
@test
def test_output(output, *args) -> None:
"""
Template code for testing the output of the block.
"""
assert output is not None, 'The output is undefined'
API
import io
import pandas as pd
import requests
if 'data_loader' not in globals():
from mage_ai.data_preparation.decorators import data_loader
if 'test' not in globals():
from mage_ai.data_preparation.decorators import test
@data_loader
def load_data_from_api(*args, **kwargs):
"""
Template for loading data from API
"""
url = ''
response = requests.get(url)
return pd.read_csv(io.StringIO(response.text), sep=',')
@test
def test_output(output, *args) -> None:
"""
Template code for testing the output of the block.
"""
assert output is not None, 'The output is undefined'
Azure Blob Storage
from mage_ai.settings.repo import get_repo_path
from mage_ai.io.config import ConfigFileLoader
from mage_ai.io.azure_blob_storage import AzureBlobStorage
from os import path
if 'data_loader' not in globals():
from mage_ai.data_preparation.decorators import data_loader
if 'test' not in globals():
from mage_ai.data_preparation.decorators import test
@data_loader
def load_from_azure_blob_storage(*args, **kwargs):
"""
Template for loading data from a Azure Blob Storage.
Specify your configuration settings in 'io_config.yaml'.
Docs: /design/data-loading#azureblobstorage
"""
config_path = path.join(get_repo_path(), 'io_config.yaml')
config_profile = 'default'
container_name = 'your_container_name'
blob_path = 'your_blob_path'
return AzureBlobStorage.with_config(ConfigFileLoader(config_path, config_profile)).load(
container_name,
blob_path,
)
@test
def test_output(output, *args) -> None:
"""
Template code for testing the output of the block.
"""
assert output is not None, 'The output is undefined'
Google BigQuery
from mage_ai.settings.repo import get_repo_path
from mage_ai.io.bigquery import BigQuery
from mage_ai.io.config import ConfigFileLoader
from os import path
if 'data_loader' not in globals():
from mage_ai.data_preparation.decorators import data_loader
if 'test' not in globals():
from mage_ai.data_preparation.decorators import test
@data_loader
def load_data_from_big_query(*args, **kwargs):
"""
Template for loading data from a BigQuery warehouse.
Specify your configuration settings in 'io_config.yaml'.
Docs: /design/data-loading#bigquery
"""
query = 'your_gbq_query'
config_path = path.join(get_repo_path(), 'io_config.yaml')
config_profile = 'default'
return BigQuery.with_config(ConfigFileLoader(config_path, config_profile)).load(query)
@test
def test_output(output, *args) -> None:
"""
Template code for testing the output of the block.
"""
assert output is not None, 'The output is undefined'
Google Cloud Storage
from mage_ai.settings.repo import get_repo_path
from mage_ai.io.config import ConfigFileLoader
from mage_ai.io.google_cloud_storage import GoogleCloudStorage
from os import path
if 'data_loader' not in globals():
from mage_ai.data_preparation.decorators import data_loader
if 'test' not in globals():
from mage_ai.data_preparation.decorators import test
@data_loader
def load_from_google_cloud_storage(*args, **kwargs):
"""
Template for loading data from a Google Cloud Storage bucket.
Specify your configuration settings in 'io_config.yaml'.
Docs: /design/data-loading#googlecloudstorage
"""
config_path = path.join(get_repo_path(), 'io_config.yaml')
config_profile = 'default'
bucket_name = 'your_bucket_name'
object_key = 'your_object_key'
return GoogleCloudStorage.with_config(ConfigFileLoader(config_path, config_profile)).load(
bucket_name,
object_key,
)
@test
def test_output(output, *args) -> None:
"""
Template code for testing the output of the block.
"""
assert output is not None, 'The output is undefined'
MySQL
from mage_ai.settings.repo import get_repo_path
from mage_ai.io.config import ConfigFileLoader
from mage_ai.io.mysql import MySQL
from os import path
if 'data_loader' not in globals():
from mage_ai.data_preparation.decorators import data_loader
if 'test' not in globals():
from mage_ai.data_preparation.decorators import test
@data_loader
def load_data_from_mysql(*args, **kwargs):
"""
Template for loading data from a MySQL database.
Specify your configuration settings in 'io_config.yaml'.
Docs: /design/data-loading#mysql
"""
query = 'Your MySQL query' # Specify your SQL query here
config_path = path.join(get_repo_path(), 'io_config.yaml')
config_profile = 'default'
with MySQL.with_config(ConfigFileLoader(config_path, config_profile)) as loader:
return loader.load(query)
@test
def test_output(output, *args) -> None:
"""
Template code for testing the output of the block.
"""
assert output is not None, 'The output is undefined'
PostgreSQL
from mage_ai.settings.repo import get_repo_path
from mage_ai.io.config import ConfigFileLoader
from mage_ai.io.postgres import Postgres
from os import path
if 'data_loader' not in globals():
from mage_ai.data_preparation.decorators import data_loader
if 'test' not in globals():
from mage_ai.data_preparation.decorators import test
@data_loader
def load_data_from_postgres(*args, **kwargs):
"""
Template for loading data from a PostgreSQL database.
Specify your configuration settings in 'io_config.yaml'.
Docs: /design/data-loading#postgresql
"""
query = 'your PostgreSQL query' # Specify your SQL query here
config_path = path.join(get_repo_path(), 'io_config.yaml')
config_profile = 'default'
with Postgres.with_config(ConfigFileLoader(config_path, config_profile)) as loader:
return loader.load(query)
@test
def test_output(output, *args) -> None:
"""
Template code for testing the output of the block.
"""
assert output is not None, 'The output is undefined'
Amazon Redshift
from mage_ai.settings.repo import get_repo_path
from mage_ai.io.config import ConfigFileLoader
from mage_ai.io.redshift import Redshift
from os import path
if 'data_loader' not in globals():
from mage_ai.data_preparation.decorators import data_loader
if 'test' not in globals():
from mage_ai.data_preparation.decorators import test
@data_loader
def load_data_from_redshift(*args, **kwargs):
"""
Template for loading data from a Redshift cluster.
Specify your configuration settings in 'io_config.yaml'.
Docs: /design/data-loading#redshift
"""
query = 'your_redshift_selection_query'
config_path = path.join(get_repo_path(), 'io_config.yaml')
config_profile = 'default'
with Redshift.with_config(ConfigFileLoader(config_path, config_profile)) as loader:
return loader.load(query)
@test
def test_output(output, *args) -> None:
"""
Template code for testing the output of the block.
"""
assert output is not None, 'The output is undefined'
Amazon S3
from mage_ai.settings.repo import get_repo_path
from mage_ai.io.config import ConfigFileLoader
from mage_ai.io.s3 import S3
from os import path
if 'data_loader' not in globals():
from mage_ai.data_preparation.decorators import data_loader
if 'test' not in globals():
from mage_ai.data_preparation.decorators import test
@data_loader
def load_from_s3_bucket(*args, **kwargs):
"""
Template for loading data from a S3 bucket.
Specify your configuration settings in 'io_config.yaml'.
Docs: /design/data-loading#s3
"""
config_path = path.join(get_repo_path(), 'io_config.yaml')
config_profile = 'default'
bucket_name = 'your_bucket_name'
object_key = 'your_object_key'
return S3.with_config(ConfigFileLoader(config_path, config_profile)).load(
bucket_name,
object_key,
)
@test
def test_output(output, *args) -> None:
"""
Template code for testing the output of the block.
"""
assert output is not None, 'The output is undefined'
Snowflake
from mage_ai.settings.repo import get_repo_path
from mage_ai.io.config import ConfigFileLoader
from mage_ai.io.snowflake import Snowflake
from os import path
if 'data_loader' not in globals():
from mage_ai.data_preparation.decorators import data_loader
if 'test' not in globals():
from mage_ai.data_preparation.decorators import test
@data_loader
def load_data_from_snowflake(*args, **kwargs):
"""
Template for loading data from a Snowflake warehouse.
Specify your configuration settings in 'io_config.yaml'.
Docs: /design/data-loading#example-loading-data-from-snowflake-warehouse
"""
query = 'your_snowflake_query'
config_path = path.join(get_repo_path(), 'io_config.yaml')
config_profile = 'default'
with Snowflake.with_config(ConfigFileLoader(config_path, config_profile)) as loader:
return loader.load(query)
@test
def test_output(output, *args) -> None:
"""
Template code for testing the output of the block.
"""
assert output is not None, 'The output is undefined'
Delta Lake
Amazon S3
from deltalake import DeltaTable
if 'data_loader' not in globals():
from mage_ai.data_preparation.decorators import data_loader
if 'test' not in globals():
from mage_ai.data_preparation.decorators import test
@data_loader
def load_table(*args, **kwargs):
"""
Load a Delta Table
Docs: https://delta-io.github.io/delta-rs/python/usage.html#loading-a-delta-table
"""
storage_options = {
'AWS_ACCESS_KEY_ID': '',
'AWS_SECRET_ACCESS_KEY': '',
'AWS_REGION': '',
'AWS_S3_ALLOW_UNSAFE_RENAME': 'true',
}
uri = 's3://[bucket]/[key]'
dt = DeltaTable(uri, storage_options=storage_options)
return dt.to_pandas()
@test
def test_output(output, *args) -> None:
"""
Template code for testing the output of the block.
"""
assert output is not None, 'The output is undefined'
Azure Blob Storage
from deltalake import DeltaTable
if 'data_loader' not in globals():
from mage_ai.data_preparation.decorators import data_loader
if 'test' not in globals():
from mage_ai.data_preparation.decorators import test
@data_loader
def load_table(*args, **kwargs):
"""
Load a Delta Table
Docs: https://delta-io.github.io/delta-rs/python/usage.html#loading-a-delta-table
"""
storage_options = {
'AZURE_STORAGE_ACCOUNT_NAME': '',
'AZURE_STORAGE_ACCOUNT_KEY': '',
'AZURE_STORAGE_ACCESS_KEY': '',
'AZURE_STORAGE_MASTER_KEY': '',
'AZURE_STORAGE_CLIENT_ID': '',
'AZURE_STORAGE_CLIENT_SECRET': '',
'AZURE_STORAGE_TENANT_ID': '',
'AZURE_STORAGE_SAS_KEY': '',
'AZURE_STORAGE_TOKEN': '',
'AZURE_STORAGE_USE_EMULATOR': '',
}
uri = 'az://[container]/[key]'
dt = DeltaTable(uri, storage_options=storage_options)
return dt.to_pandas()
@test
def test_output(output, *args) -> None:
"""
Template code for testing the output of the block.
"""
assert output is not None, 'The output is undefined'
Google Cloud Storage
from deltalake import DeltaTable
if 'data_loader' not in globals():
from mage_ai.data_preparation.decorators import data_loader
if 'test' not in globals():
from mage_ai.data_preparation.decorators import test
@data_loader
def load_table(*args, **kwargs):
"""
Load a Delta Table
Docs: https://delta-io.github.io/delta-rs/python/usage.html#loading-a-delta-table
"""
storage_options = {
'GOOGLE_SERVICE_ACCOUNT': '',
'GOOGLE_SERVICE_ACCOUNT_PATH': '',
'GOOGLE_SERVICE_ACCOUNT_KEY': '',
'GOOGLE_BUCKET': '',
}
uri = 'gs://[bucket]/[key]'
dt = DeltaTable(uri, storage_options=storage_options)
return dt.to_pandas()
@test
def test_output(output, *args) -> None:
"""
Template code for testing the output of the block.
"""
assert output is not None, 'The output is undefined'
Druid
from mage_ai.settings.repo import get_repo_path
from mage_ai.io.config import ConfigFileLoader
from mage_ai.io.druid import Druid
from os import path
if 'data_loader' not in globals():
from mage_ai.data_preparation.decorators import data_loader
if 'test' not in globals():
from mage_ai.data_preparation.decorators import test
@data_loader
def load_data_from_druid(*args, **kwargs):
"""
Template for loading data from a Druid warehouse.
Specify your configuration settings in 'io_config.yaml'.
Docs: https://docs.mage.ai/design/data-loading#druid
"""
query = 'your Druid query' # Specify your SQL query here
config_path = path.join(get_repo_path(), 'io_config.yaml')
config_profile = 'default'
with Druid.with_config(ConfigFileLoader(config_path, config_profile)) as loader:
return loader.load(query)
@test
def test_output(output, *args) -> None:
"""
Template code for testing the output of the block.
"""
assert output is not None, 'The output is undefined'
Pinot
from mage_ai.data_preparation.repo_manager import get_repo_path
from mage_ai.io.config import ConfigFileLoader
from mage_ai.io.pinot import Pinot
from os import path
if 'data_loader' not in globals():
from mage_ai.data_preparation.decorators import data_loader
if 'test' not in globals():
from mage_ai.data_preparation.decorators import test
@data_loader
def load_data_from_pinot(*args, **kwargs):
"""
Template for loading data from a Pinot warehouse.
Specify your configuration settings in 'io_config.yaml'.
Docs: https://docs.mage.ai/design/data-loading#pinot
"""
query = 'your Pinot query' # Specify your SQL query here
config_path = path.join(get_repo_path(), 'io_config.yaml')
config_profile = 'default'
with Pinot.with_config(ConfigFileLoader(config_path, config_profile)) as loader:
return loader.load(query)
@test
def test_output(output, *args) -> None:
"""
Template code for testing the output of the block.
"""
assert output is not None, 'The output is undefined'
Orchestration
Trigger pipeline
from mage_ai.orchestration.triggers.api import trigger_pipeline
if 'data_loader' not in globals():
from mage_ai.data_preparation.decorators import data_loader
@data_loader
def trigger(*args, **kwargs):
"""
Trigger another pipeline to run.
Documentation: /orchestration/triggers/trigger-pipeline
"""
trigger_pipeline(
'pipeline_uuid', # Required: enter the UUID of the pipeline to trigger
variables={}, # Optional: runtime variables for the pipeline
check_status=False, # Optional: poll and check the status of the triggered pipeline
error_on_failure=False, # Optional: if triggered pipeline fails, raise an exception
poll_interval=60, # Optional: check the status of triggered pipeline every N seconds
poll_timeout=None, # Optional: raise an exception after N seconds
verbose=True, # Optional: print status of triggered pipeline run
)