Data exporter
Data exporter templates
Adventure starts here
- Overview
- Quickstart
- Design
- Abstractions
Concepts
- Pipelines
- Blocks
- Overview
- Types
- Dynamic blocks
- Resources
- Data loader
- Transformer
- Data exporter
- Sensors
- Callbacks
- Conditionals
- Data integrations
- Triggers
- Pipeline runs
- Backfills
Project
- Setup
- Version control
- Docker
- Variables
- Custom libraries
- Database
- Credentials
- Code execution
- Timezones
- Settings
- Version upgrades
Contributing
- Overview
- Dev environment
- Backend
- Frontend
- Documentation
Data exporter
Data exporter templates
Generic
if 'data_exporter' not in globals():
from mage_ai.data_preparation.decorators import data_exporter
@data_exporter
def export_data(*args, **kwargs):
"""
Exports data to some source
Args:
args: The input variables from upstream blocks
Output (optional):
Optionally return any object and it'll be logged and
displayed when inspecting the block run.
"""
# Specify your data exporting logic here
Local file
from mage_ai.io.file import FileIO
from pandas import DataFrame
if 'data_exporter' not in globals():
from mage_ai.data_preparation.decorators import data_exporter
@data_exporter
def export_data_to_file(df: DataFrame, **kwargs) -> None:
"""
Template for exporting data to filesystem.
Docs: /design/data-loading#fileio
"""
filepath = 'path/to/write/dataframe/to.csv'
FileIO().export(df, filepath)
Azure Blob Storage
from mage_ai.settings.repo import get_repo_path
from mage_ai.io.azure_blob_storage import AzureBlobStorage
from mage_ai.io.config import ConfigFileLoader
from pandas import DataFrame
from os import path
if 'data_exporter' not in globals():
from mage_ai.data_preparation.decorators import data_exporter
@data_exporter
def export_data_to_azure_blob_storage(df: DataFrame, **kwargs) -> None:
"""
Template for exporting data to a Azure Blob Storage.
Specify your configuration settings in 'io_config.yaml'.
Docs: /design/data-loading
"""
config_path = path.join(get_repo_path(), 'io_config.yaml')
config_profile = 'default'
container_name = 'your_container_name'
blob_path = 'your_blob_path'
AzureBlobStorage.with_config(ConfigFileLoader(config_path, config_profile)).export(
df,
container_name,
blob_path,
)
Google BigQuery
from mage_ai.settings.repo import get_repo_path
from mage_ai.io.bigquery import BigQuery
from mage_ai.io.config import ConfigFileLoader
from pandas import DataFrame
from os import path
if 'data_exporter' not in globals():
from mage_ai.data_preparation.decorators import data_exporter
@data_exporter
def export_data_to_big_query(df: DataFrame, **kwargs) -> None:
"""
Template for exporting data to a BigQuery warehouse.
Specify your configuration settings in 'io_config.yaml'.
Docs: /design/data-loading#bigquery
"""
table_id = 'your-project.your_dataset.your_table_name'
config_path = path.join(get_repo_path(), 'io_config.yaml')
config_profile = 'default'
BigQuery.with_config(ConfigFileLoader(config_path, config_profile)).export(
df,
table_id,
if_exists='replace', # Specify resolution policy if table name already exists
)
Google Cloud Storage
from mage_ai.settings.repo import get_repo_path
from mage_ai.io.config import ConfigFileLoader
from mage_ai.io.google_cloud_storage import GoogleCloudStorage
from pandas import DataFrame
from os import path
if 'data_exporter' not in globals():
from mage_ai.data_preparation.decorators import data_exporter
@data_exporter
def export_data_to_google_cloud_storage(df: DataFrame, **kwargs) -> None:
"""
Template for exporting data to a Google Cloud Storage bucket.
Specify your configuration settings in 'io_config.yaml'.
Docs: /design/data-loading#googlecloudstorage
"""
config_path = path.join(get_repo_path(), 'io_config.yaml')
config_profile = 'default'
bucket_name = 'your_bucket_name'
object_key = 'your_object_key'
GoogleCloudStorage.with_config(ConfigFileLoader(config_path, config_profile)).export(
df,
bucket_name,
object_key,
)
MySQL
from mage_ai.settings.repo import get_repo_path
from mage_ai.io.config import ConfigFileLoader
from mage_ai.io.mysql import MySQL
from pandas import DataFrame
from os import path
if 'data_exporter' not in globals():
from mage_ai.data_preparation.decorators import data_exporter
@data_exporter
def export_data_to_mysql(df: DataFrame, **kwargs) -> None:
"""
Template for exporting data to a MySQL database.
Specify your configuration settings in 'io_config.yaml'.
Docs: /design/data-loading#mysql
"""
table_name = 'your_table_name' # Specify the name of the table to export data to
config_path = path.join(get_repo_path(), 'io_config.yaml')
config_profile = 'default'
with MySQL.with_config(ConfigFileLoader(config_path, config_profile)) as loader:
loader.export(
df,
None,
table_name,
index=False, # Specifies whether to include index in exported table
if_exists='replace', # Specify resolution policy if table name already exists
)
PostgreSQL
from mage_ai.settings.repo import get_repo_path
from mage_ai.io.config import ConfigFileLoader
from mage_ai.io.postgres import Postgres
from pandas import DataFrame
from os import path
if 'data_exporter' not in globals():
from mage_ai.data_preparation.decorators import data_exporter
@data_exporter
def export_data_to_postgres(df: DataFrame, **kwargs) -> None:
"""
Template for exporting data to a PostgreSQL database.
Specify your configuration settings in 'io_config.yaml'.
Docs: /design/data-loading#postgresql
"""
schema_name = 'your_schema_name' # Specify the name of the schema to export data to
table_name = 'your_table_name' # Specify the name of the table to export data to
config_path = path.join(get_repo_path(), 'io_config.yaml')
config_profile = 'default'
with Postgres.with_config(ConfigFileLoader(config_path, config_profile)) as loader:
loader.export(
df,
schema_name,
table_name,
index=False, # Specifies whether to include index in exported table
if_exists='replace', # Specify resolution policy if table name already exists
)
Amazon Redshift
from mage_ai.settings.repo import get_repo_path
from mage_ai.io.config import ConfigFileLoader
from mage_ai.io.redshift import Redshift
from pandas import DataFrame
from os import path
if 'data_exporter' not in globals():
from mage_ai.data_preparation.decorators import data_exporter
@data_exporter
def export_data_to_redshift(df: DataFrame, **kwargs) -> None:
"""
Template for exporting data to a Redshift cluster.
Specify your configuration settings in 'io_config.yaml'.
Docs: /design/data-loading#redshift
"""
table_name = 'your_table_name'
config_path = path.join(get_repo_path(), 'io_config.yaml')
config_profile = 'default'
with Redshift.with_config(ConfigFileLoader(config_path, config_profile)) as loader:
loader.export(
df,
table_name,
if_exists='replace', # Specify resolution policy if table already exists
)
Amazon S3
from mage_ai.settings.repo import get_repo_path
from mage_ai.io.config import ConfigFileLoader
from mage_ai.io.s3 import S3
from pandas import DataFrame
from os import path
if 'data_exporter' not in globals():
from mage_ai.data_preparation.decorators import data_exporter
@data_exporter
def export_data_to_s3(df: DataFrame, **kwargs) -> None:
"""
Template for exporting data to a S3 bucket.
Specify your configuration settings in 'io_config.yaml'.
Docs: /design/data-loading#s3
"""
config_path = path.join(get_repo_path(), 'io_config.yaml')
config_profile = 'default'
bucket_name = 'your_bucket_name'
object_key = 'your_object_key'
S3.with_config(ConfigFileLoader(config_path, config_profile)).export(
df,
bucket_name,
object_key,
)
Snowflake
from mage_ai.settings.repo import get_repo_path
from mage_ai.io.config import ConfigFileLoader
from mage_ai.io.snowflake import Snowflake
from pandas import DataFrame
from os import path
if 'data_exporter' not in globals():
from mage_ai.data_preparation.decorators import data_exporter
@data_exporter
def export_data_to_snowflake(df: DataFrame, **kwargs) -> None:
"""
Template for exporting data to a Snowflake warehouse.
Specify your configuration settings in 'io_config.yaml'.
Docs: /design/data-loading#snowflake
"""
table_name = 'your_table_name'
database = 'your_database_name'
schema = 'your_schema_name'
config_path = path.join(get_repo_path(), 'io_config.yaml')
config_profile = 'default'
with Snowflake.with_config(ConfigFileLoader(config_path, config_profile)) as loader:
loader.export(
df,
table_name,
database,
schema,
if_exists='replace', # Specify resolution policy if table already exists
)
Delta Lake
Amazon S3
from deltalake.writer import write_deltalake
if 'data_exporter' not in globals():
from mage_ai.data_preparation.decorators import data_exporter
@data_exporter
def export_data(df, *args, **kwargs):
"""
Export data to a Delta Table
Docs: https://delta-io.github.io/delta-rs/python/usage.html#writing-delta-tables
"""
storage_options = {
'AWS_ACCESS_KEY_ID': '',
'AWS_SECRET_ACCESS_KEY': '',
'AWS_REGION': '',
'AWS_S3_ALLOW_UNSAFE_RENAME': 'true',
}
uri = 's3://[bucket]/[key]'
write_deltalake(
uri,
data=df,
mode='append', # append or overwrite
overwrite_schema=False, # set True to alter the schema when overwriting
partition_by=[],
storage_options=storage_options,
)
Azure Blob Storage
from deltalake.writer import write_deltalake
if 'data_exporter' not in globals():
from mage_ai.data_preparation.decorators import data_exporter
@data_exporter
def export_data(df, *args, **kwargs):
"""
Export data to a Delta Table
Docs: https://delta-io.github.io/delta-rs/python/usage.html#writing-delta-tables
"""
storage_options = {
'AZURE_STORAGE_ACCOUNT_NAME': '',
'AZURE_STORAGE_ACCOUNT_KEY': '',
'AZURE_STORAGE_ACCESS_KEY': '',
'AZURE_STORAGE_MASTER_KEY': '',
'AZURE_STORAGE_CLIENT_ID': '',
'AZURE_STORAGE_CLIENT_SECRET': '',
'AZURE_STORAGE_TENANT_ID': '',
'AZURE_STORAGE_SAS_KEY': '',
'AZURE_STORAGE_TOKEN': '',
'AZURE_STORAGE_USE_EMULATOR': '',
}
uri = 'az://[container]/[key]'
write_deltalake(
uri,
data=df,
mode='append', # append or overwrite
overwrite_schema=False, # set True to alter the schema when overwriting
partition_by=[],
storage_options=storage_options,
)
Google Cloud Storage
from deltalake.writer import write_deltalake
if 'data_exporter' not in globals():
from mage_ai.data_preparation.decorators import data_exporter
@data_exporter
def export_data(df, *args, **kwargs):
"""
Export data to a Delta Table
Docs: https://delta-io.github.io/delta-rs/python/usage.html#writing-delta-tables
"""
storage_options = {
'GOOGLE_SERVICE_ACCOUNT': '',
'GOOGLE_SERVICE_ACCOUNT_PATH': '',
'GOOGLE_SERVICE_ACCOUNT_KEY': '',
'GOOGLE_BUCKET': '',
}
uri = 'gs://[bucket]/[key]'
write_deltalake(
uri,
data=df,
mode='append', # append or overwrite
overwrite_schema=False, # set True to alter the schema when overwriting
partition_by=[],
storage_options=storage_options,
)
Orchestration
Trigger pipeline
from mage_ai.orchestration.triggers.api import trigger_pipeline
if 'data_exporter' not in globals():
from mage_ai.data_preparation.decorators import data_exporter
@data_exporter
def trigger(*args, **kwargs):
"""
Trigger another pipeline to run.
Documentation: /orchestration/triggers/trigger-pipeline
"""
trigger_pipeline(
'pipeline_uuid', # Required: enter the UUID of the pipeline to trigger
variables={}, # Optional: runtime variables for the pipeline
check_status=False, # Optional: poll and check the status of the triggered pipeline
error_on_failure=False, # Optional: if triggered pipeline fails, raise an exception
poll_interval=60, # Optional: check the status of triggered pipeline every N seconds
poll_timeout=None, # Optional: raise an exception after N seconds
verbose=True, # Optional: print status of triggered pipeline run
)