Data loader utilities
Mage provides data loading clients that simplify loading and exporting data in your pipelines, allowing you to spend more time analyzing and transforming your data for ML tasks. Currently, Mage includes clients for the following data sources:
- AWS Redshift
- AWS S3
- Azure Blob Storage
- DuckDB
- Filesystem
- Google BigQuery
- Google Cloud Storage
- Google Sheets
- MySQL
- OracleDB
- PostgreSQL
- Snowflake
Mage’s data loading clients fall into two categories:
- File-Loading Clients - import/export data between files and your pipeline. Includes both local filesystem storage and external filesystem storage (like AWS S3)
- Database Clients - imports data from an external database into your pipeline and exports data frames back into that database. These database clients include the
execute
method which execute your queries in the connected database. The Google BigQuery client follows this structure.- A subcategory of database clients are connection-based clients, which wrap a connection to the database. This connection is used to execute transactions (sets of queries) on the database, which are either committed (saved to database) or rolled-back (deleted). Clients for PostgreSQL, Redshift, and Snowflake follow this structure.
Example: Loading data from a file
While traditional Pandas IO procedures can be utilized to load files into your pipeline, Mage provides the mage_ai.io.file.FileIO
client as a convenience wrapper.
The following code uses the load
function of the FileIO
client to load the Titanic survival dataset from a CSV file into a Pandas DataFrame for use in your pipeline. All data loaders can be initialized with the verbose = True
parameter, which will print the current action the data loading client is performing. This parameter defaults to False
.
Then the export
function is used to save this data frame back to file, this time in JSON format
Any formatting settings (such as specifying the data frame orient) can be passed as keyword arguments to load
and export
. These arguments are passed to Pandas IO procedures for the requested file format, enabling fine-grained control over how your data is loaded and exported.
As the data loader was constructed with the verbose parameter set to True
, the above operations would print the following output describing the actions of the data loader.
See the FileIO API to learn more about loading data from filesystem.
Example: Loading data from Snowflake warehouse
Loading data from a Snowflake data warehouse is made easy using the mage_ai.io.snowflake.Snowflake
data loading client. In order to authenticate access to a Snowflake warehouse, the client requires the associated Snowflake credentials:
- Account Username
- Account Password
- Account ID (including your region) (Ex: example.us-west-2)
These parameters can be manually specified as input to the data loading client (see Snowflake API). However we recommend using a Configuration Loader to handle loading these secrets. If you used mage init
to create your project repository, you can store these values in your io_config.yaml
file and use mage_ai.io.config.ConfigFileLoader
to construct the data loader client.
An example io_config.yaml
file in this instance would be:
with which a Snowflake data loading client can be constructed as:
To learn more about using configuration settings, see Configuration Settings.
Then the following code uses the functions:
execute()
- executes an arbitrary query on your data warehouse. In this case, the warehouse, database, and schema to use are selected.load()
- loads the results of aSELECT
query into a Pandas DataFrame.export()
- stores data frame as a table in your data warehouse. If the table exists, then the data is appended by default (and can be configured with other behavior, see Snowflake API). If the table doesn’t exist, then the table is created with the given schema name and table name (the table data types are inferred from the Python data type).
The loader
object manages a direct connection to your Snowflake data warehouse, so it is important to make sure that your connection is closed once your operations are completed. You can manually use loader.open()
and loader.close()
to open and close the connection to your data warehouse or automatically manage the connection with a context manager.
To learn more about loading data from Snowflake, see the Snowflake API for more details on member functions and usage.
Client APIs
This section covers the API for using the following data loaders.
Redshift
mage_ai.io.redshift.Redshift
Handles data transfer between a Redshift cluster and the Mage app. Mage uses temporary credentials to authenticate access to a Redshift cluster. There are two ways to specify these credentials:
-
Pre-generate temporary credentials and specify them in the configuration settings. Add the following keys to the configuration settings for
Redshift
to use the temporary credentials: -
Provide an IAM Profile to automatically generate temporary credentials for connection. The IAM profile is read from
~/.aws/
and is used with theGetClusterCredentials
endpoint to generate temporary credentials. Add the following keys to the configuration settings forRedshift
to generate temporary credentialsIf an IAM profile is not setup using
aws configure
, manually specify the AWS credentials in the configuration settings as well.
Constructor
__init__(**kwargs)
-
Args
verbose
: Enables verbose output. Defaults toFalse
.
See other keyword arguments in Redshift’s Python connector docs.
Attributes
- conn (
redshift_connector.Connection
) - the underlying Redshift Connection object.
Factory Methods
with_config - with_config(config: BaseConfigLoader, **kwargs) -> Redshift
Initializes Redshift client from configuration settings.
- Args:
config (BaseConfigLoader)
: Configuration loader object.**kwargs
: Additional parameters passed to the loader constructor.
- Returns: (
Redshift
) Redshift data loading client constructed using this method
with_temporary_credentials - with_temporary_credentials(database: str, host: str, user: str, password: str, port: int = 5439, **kwargs) -> Redshift
Creates a Redshift data loader with temporary database credentials.
- Args:
database (str)
: Name of the database to connect to.host (str)
: The hostname of the Redshift cluster which the database belongs to.user (str)
: Temporary credentials username for use in authentication.password (str)
: Temporary credentials password for use in authentication.port (int, optional)
: Port number of the Redshift cluster. Defaults to 5439.**kwargs
: Additional parameters passed to the loader constructor.
- Returns: (
Redshift
) Redshift data loading client constructed using this method
with_iam - with_iam(cluster_identifier: str, database: str, db_user: str, profile: str, **kwargs) -> Redshift
Creates a Redshift data loader using an IAM profile from ~/.aws
.
The IAM Profile settings can also be manually specified as keyword arguments to this constructor, but is not recommended. If credentials are manually specified, the region of the Redshift cluster must also be specified.
- Args:
cluster_identifier (str)
: Identifier of the cluster to connect to.database (str)
: The database to connect to within the specified cluster.db_user (str)
: Database usernameprofile (str, optional)
: The profile to use from stored credentials file. Defaults to ‘default’.**kwargs
: Additional parameters passed to the loader constructor
- Returns: (
Redshift
) Redshift data loading client constructed using this method
Methods
close - close()
Closes connection to the Redshift cluster specified in the loader configuration.
commit - commit()
Saves all changes made to the database since the last transaction.
execute - execute(query_string: str, **kwargs) -> None
Sends query to the connected Redshift cluster. Any changes made to the database will not be saved unless commit()
is called afterward.
- Args:
query_string (str)
: The query to execute on the Redshift cluster.**kwargs
: Additional parameters to pass to the query. Seeredshift-connector
docs for configuring query parameters.
export - export(df: DataFrame, table_name: str) -> None
Exports a Pandas data frame to a Redshift cluster under the specified table. The changes made to the database will not be saved unless commit()
is called afterward.
- Args:
df (DataFrame)
: Data frame to export to database in Redshift cluster.table_name (str)
: Name of the table to export the data to. Table must already exist.
load - load(query_string: str, limit: int, *args, **kwargs) -> DataFrame
Loads data from the results of a query into a Pandas data frame. This will fail if the query returns no data from the database.
This function will load at maximum 10,000,000 rows of data (this limit is configurable). To operate on more data, consider performing data transformations in warehouse using execute
.
-
Args:
query_string (str)
: Query to fetch a table or subset of a table.limit (int, Optional)
: The number of rows to limit the loaded data frame to. Defaults to 10,000,000.*args, **kwargs
: Additional parameters to send to query, including parameters for use with format strings. Seeredshift-connector
docs for configuring query parameters.
-
Returns: (
DataFrame
) Data frame containing the queried data.
open - open()
Opens a connection to the Redshift cluster specified in the loader configuration.
rollback - rollback()
Rolls back (deletes) all database changes made since the last transaction.
sample - sample(schema: str, table: str, size: int, **kwargs) -> DataFrame
Sample data from a table in the selected database in the Redshift cluster. Sample is not guaranteed to be random.
-
Args:
schema (str)
: The schema to select the table from.size (int)
: The number of rows to sample. Defaults to 10,000,000.table (str)
: The table to sample from in the connected database.
-
Returns: (
DataFrame
) Sampled data from the table.
S3
mage_ai.io.s3.S3
Handles data transfer between an S3 bucket and the Mage app. The S3
client supports importing and exporting with the following file formats:
- CSV
- JSON
- Parquet
- HDF5
- XML
- Excel
If an IAM profile is not set up using aws configure
, then AWS credentials for accesses the S3 bucket can be manually specified through either Mage’s configuration loader system or through keyword arguments in the constructor (see constructor).
If using configuration settings, specify the following keys:
Constructor
__init__(self, verbose: bool)
Initializes the S3 data loading client.
- Args
verbose (bool)
: Enables verbose output printing. Defaults to False.- If IAM profile is not set up using
aws configure
and Mage’s configuration loader is not used, then specify your AWS credentials through the following keyword arguments:aws_access_key_id (str)
: AWS Access Key ID credentialaws_secret_access_key (str)
: AWS Secret Access Key credentialaws_region (str)
: Region associated with AWS IAM profile
Factory Methods
with_config - with_config(config: BaseConfigLoader, **kwargs) -> S3
Creates S3 data loading client from configuration settings.
- Args:
config (BaseConfigLoader)
: Configuration loader object.**kwargs
: Additional parameters passed to the loader constructor
- Returns: (
S3
) The constructed dataloader using this method
Methods
export - export(df: DataFrame, bucket_name: str, object_key: str, format: FileFormat | str = None, **kwargs) -> None:
Exports data frame to an S3 bucket.
If the format is HDF5, the default key under which the data frame is stored is the stem of the filename. For example, if the file to write the data frame to is ‘storage/my_data_frame.hdf5’, the key would be ‘my_data_frame’. This can be overridden using the key
keyword argument.
-
Args:
-
data (Union[DataFrame, str])
: Data frame or file path to export. -
bucket_name (str)
: Name of the bucket to export data frame to. -
object_key (str)
: Object key in S3 bucket to export data frame to. -
format (FileFormat | str, Optional)
: Format of the file to export data frame to. Defaults toNone
, in which case the format is inferred. -
**kwargs
- Additional keyword arguments to pass to the file writer. See possible arguments by file formats at:Format Pandas Writer CSV DataFrame.to_csv JSON DataFrame.to_json Parquet DataFrame.to_parquet HDF5 DataFrame.to_hdf XML DataFrame.to_xml Excel DataFrame.to_excel
-
load - load(bucket_name: str, object_key: str, format: FileFormat | str = None, limit: int = QUERY_ROW_LIMIT **kwargs) -> DataFrame
Loads data from object in S3 bucket into a Pandas data frame. This function will load at maximum 10,000,000 rows of data from the specified file (this limit is configurable).
-
Args:
-
bucket_name (str)
: Name of the bucket to load data from. -
object_key (str)
: Key of the object in S3 bucket to load data from. -
format (FileFormat | str, Optional)
: Format of the file to load data frame from. Defaults to None, in which case the format is inferred. -
limit (int, optional)
: The number of rows to limit the loaded data frame to. Defaults to 10,000,000. -
**kwargs
- Additional keyword arguments to pass to the file writer. See possible arguments by file formats at:Format Pandas Reader CSV read_csv JSON read_json Parquet read_parquet HDF5 read_hdf XML read_xml Excel read_excel
-
-
Returns: (
DataFrame
) Data frame object loaded from data in the specified file.
FileIO
mage_ai.io.file.FileIO
Handles data transfer between the filesystem and the Mage app. The FileIO
client currently supports importing and exporting with the following file formats:
- CSV
- JSON
- Parquet
- HDF5
- XML
- Excel
Constructor
__init__(self, verbose: bool)
Initializes the FileIO data loading client.
- Args
verbose (bool)
: Enables verbose output printing. Defaults to False.
Methods
export - export(df: DataFrame, filepath: os.PathLike, format: FileFormat | str = None, **kwargs) -> None:
Exports the input data frame to the file specified.
If the format is HDF5, the default key under which the data frame is stored is the stem of the filename. For example, if the file to write the data frame to is ‘storage/my_data_frame.hdf5’, the key would be ‘my_data_frame’. This can be overridden using the key
keyword argument.
-
Args:
-
df (DataFrame)
: Data frame to export. -
filepath (os.PathLike)
: Filepath to export data frame to. -
format (FileFormat | str, Optional)
: Format of the file to export data frame to. Defaults to None, in which case the format is inferred. -
**kwargs
- Additional keyword arguments to pass to the file writer. See possible arguments by file formats at:Format Pandas Writer CSV DataFrame.to_csv JSON DataFrame.to_json Parquet DataFrame.to_parquet HDF5 DataFrame.to_hdf XML DataFrame.to_xml Excel DataFrame.to_excel
-
load - load(filepath: os.PathLike, format: FileFormat | str = None, limit: int = QUERY_ROW_LIMIT **kwargs) -> DataFrame
Loads the data frame from the filepath specified. This function will load at maximum 10,000,000 rows of data from the specified file (this limit is configurable).
-
Args:
-
filepath (os.PathLike)
: Filepath to load data frame from. -
format (FileFormat | str, Optional)
: Format of the file to load data frame from. Defaults to None, in which case the format is inferred. -
limit (int, optional)
: The number of rows to limit the loaded data frame to. Defaults to 10,000,000. -
**kwargs
- Additional keyword arguments to pass to the file writer. See possible arguments by file formats at:Format Pandas Reader CSV read_csv JSON read_json Parquet read_parquet HDF5 read_hdf XML read_xml Excel read_excel
-
-
Returns: (
DataFrame
) Data frame object loaded from data in the specified file.
GoogleCloudStorage
mage_ai.io.google_cloud_storage.GoogleCloudStorage
Handles data transfer between a Google Cloud Storage bucket and the Mage app. Supports loading files of any of the following types:
- CSV
- JSON
- Parquet
- HDF5
- XML
- Excel
If GOOGLE_APPLICATION_CREDENTIALS
environment is set, no further arguments are needed other than those specified below. Otherwise, use the factory method with_config
to construct the data loader using manually specified credentials.
To authenticate (and authorize) access to Google Cloud Storage, credentials must be provided. Below are the different ways to access those credentials:
- Define the
GOOGLE_APPLICATION_CREDENTIALS
environment variable holding the filepath to your service account key - Define the
GOOGLE_SERVICE_ACC_KEY_FILEPATH
key with your configuration loader or thepath_to_credentials
keyword argument with the client constructor holding the filepath to your service account key - Define the
GOOGLE_SERVICE_ACC_KEY
key with your configuration loader or thecredentials_mapping
keyword argument with the client constructor holding a mapping sharing the same contents as your service key- if using a configuration file, be careful to wrap your service key values in quotes so the YAML parser reads the settings correctly
- Manually pass the
google.oauth2.service_account.Credentials
object with the keyword argumentcredentials
Constructor
__init__(self, verbose: bool = False, **kwargs)
Initializes the GoogleCloudStorage data loading client.
-
Args
verbose (bool)
: Enables verbose output printing. Defaults toFalse
.credentials_mapping (Mapping[str, str])
- Mapping object corresponding to your service account key. See instructions above on when to use this keyword argumentpath_to_credentials (str)
- Filepath to service account key. See instructions above on when to use this keyword argument.
Factory Methods
with_config - with_config(config: BaseConfigLoader, **kwargs) -> GoogleCloudStorage
Creates GoogleCloudStorage data loading client from configuration settings.
- Args:
config (BaseConfigLoader)
: Configuration loader object.**kwargs
: Additional parameters passed to the loader constructor
- Returns: (
GoogleCloudStorage
) The constructed dataloader using this method
Methods
export - export(df: DataFrame, bucket_name: str, object_key: str, format: FileFormat | str = None, **kwargs) -> None:
Exports data frame to a GoogleCloudStorage bucket.
-
Args:
-
data (Union[DataFrame, str])
: Data frame or file path to export. -
bucket_name (str)
: Name of the bucket to export data frame to. -
object_key (str)
: Object key in GoogleCloudStorage bucket to export data frame to. -
format (FileFormat | str, Optional)
: Format of the file to export data frame to. Defaults toNone
, in which case the format is inferred. -
**kwargs
- Additional keyword arguments to pass to the file writer. See possible arguments by file formats at:Format Pandas Writer CSV DataFrame.to_csv JSON DataFrame.to_json Parquet DataFrame.to_parquet HDF5 DataFrame.to_hdf XML DataFrame.to_xml Excel DataFrame.to_excel
-
load - load(bucket_name: str, object_key: str, format: FileFormat | str = None, limit: int = QUERY_ROW_LIMIT, **kwargs) -> DataFrame
Loads data from object in GoogleCloudStorage bucket into a Pandas data frame. This function will load at maximum 10,000,000 rows of data from the specified file (this limit is configurable).
-
Args:
-
bucket_name (str)
: Name of the bucket to load data from. -
object_key (str)
: Key of the object in GoogleCloudStorage bucket to load data from. -
format (FileFormat | str, Optional)
: Format of the file to load data frame from. Defaults to None, in which case the format is inferred. -
limit (int, optional)
: The number of rows to limit the loaded data frame to. Defaults to 10,000,000. -
**kwargs
- Additional keyword arguments to pass to the file writer. See possible arguments by file formats at:Format Pandas Reader CSV read_csv JSON read_json Parquet read_parquet HDF5 read_hdf XML read_xml Excel read_excel
-
-
Returns: (
DataFrame
) Data frame object loaded from data in the specified file.
AzureBlobStorage
mage_ai.io.azure_blob_storage.AzureBlobStorage
Handles data transfer between an Azure Blob Storage conatiner and Mage. Supports loading files of the following types:
- CSV
- JSON
- Parquet
- HDF5
- XML
- Excel
To authenticate access to Azure Blob Storage, the following environment variables must be set:
AZURE_CLIENT_ID
AZURE_CLIENT_SECRET
AZURE_TENANT_ID
AZURE_STORAGE_ACCOUNT_NAME
By default, these correspond to variables of the same name in io_config.yaml
.
Constructor
Initializes data loader from an Azure Blob Storage container.
-
Args
verbose (bool)
: Enables verbose output printing. Defaults toFalse
.
Factory Methods
with_config - with_config(config: BaseConfigLoader, **kwargs) -> AzureBlobStorage
Creates AzureBlobStorage data loading client from configuration settings.
- Args:
config (BaseConfigLoader)
: Configuration loader object.**kwargs
: Additional parameters passed to the loader constructor
- Returns: (
AzureBlobStorage
) The constructed dataloader using this method
Methods
export - export(df: DataFrame, bucket_name: str, object_key: str, format: FileFormat | str = None, **kwargs) -> None:
Exports data frame to an Azure Blob Storage container.
-
Args:
-
df (DataFrame)
: Data frame to export. -
container_name (str)
: Name of the Azure container to export data to. -
blob_path (str)
: The desired output path of the data in your Azure Blob -
format (FileFormat | str, Optional)
: Format of the file to export data frame to. Defaults toNone
, in which case the format is inferred. -
**kwargs
- Additional keyword arguments to pass to the file writer. See possible arguments by file formats at:Format Pandas Writer CSV DataFrame.to_csv JSON DataFrame.to_json Parquet DataFrame.to_parquet HDF5 DataFrame.to_hdf XML DataFrame.to_xml Excel DataFrame.to_excel
-
load - load(container_name: str, blob_path: str, format: FileFormat | str = None, limit: int = QUERY_ROW_LIMIT, **kwargs) -> DataFrame
Loads data from object in Azure Blob Storage into a Pandas data frame. This function will load at maximum 10,000,000 rows of data from the specified file (this limit is configurable).
GoogleSheets
mage_ai.io.google_sheets.GoogleSheets
Handles data transfer between a Google Sheets spreadsheet and the Mage app.
Authentication is the same for our other Google Cloud data loaders. Read more about configuration here.
The Google Sheets class is defined here.
BigQuery
mage_ai.io.bigquery.BigQuery
Handles data transfer between a BigQuery data warehouse and the Mage app.
Authentication with a Google BigQuery warehouse requires specifying the service account key for the service account that has access to the BigQuery warehouse. There are four ways to provide this service key:
- Define the
GOOGLE_APPLICATION_CREDENTIALS
environment variable holding the filepath to your service account key - Define the
GOOGLE_SERVICE_ACC_KEY_FILEPATH
key with your configuration loader or thepath_to_credentials
keyword argument with the client constructor holding the filepath to your service account key - Define the
GOOGLE_SERVICE_ACC_KEY
key with your configuration loader or thecredentials_mapping
keyword argument with the client constructor holding a mapping sharing the same contents as your service key- if using a configuration file, be careful to wrap your service key values in quotes so the YAML parser reads the settings correctly
- Manually pass the
google.oauth2.service_account.Credentials
object with the keyword argumentcredentials
Constructor
__init__(self, **kwargs)
Initializes the BigQuery data loading client.
-
Args
verbose (bool)
: Enables verbose output printing. Defaults toFalse
.credentials_mapping (Mapping[str, str])
- Mapping object corresponding to your service account key. See instructions above on when to use this keyword argumentpath_to_credentials (str)
- Filepath to service account key. See instructions above on when to use this keyword argument.
All other keyword arguments can be found in the Google BigQuery Python Client docs
Factory Methods
with_config - with_config(config: BaseConfigLoader, **kwargs) -> BigQuery
Creates BigQuery data loading client from configuration settings.
- Args:
config (BaseConfigLoader)
: Configuration loader object.**kwargs
: Additional parameters passed to the loader constructor
- Returns: (
BigQuery
) BigQuery data loading client constructed using this method
with_credentials_file - with_credentials_file(cls, path_to_credentials: str, **kwargs) -> BigQuery
Constructs BigQuery data loader using the file containing the service account key.
- Args:
path_to_credentials (str)
: Path to the credentials file.**kwargs
: Additional parameters to pass to BigQuery client constructor.
- Returns: (
BigQuery
) BigQuery data loading client constructed using this method
with_credentials_object - with_credentials_object(cls, credentials: Mapping[str, str], **kwargs) -> BigQuery
Constructs BigQuery data loader using manually specified service account key credentials.
- Args:
credentials (Mapping[str, str])
: Mapping containing same key-value pairs as a service account key.**kwargs
: Additional parameters to pass to BigQuery client constructor.
- Returns: (
BigQuery
) BigQuery data loading client constructed using this method
Methods
execute - execute(query_string: str, **kwargs) -> None
Sends query to the connected BigQuery warehouse.
- Args:
query_string (str)
: Query to execute on the BigQuery warehouse.**kwargs
: Additional arguments to pass to query, such as query configurations. See Client.query() docs for additional arguments.
export - export(df: DataFrame, table_name: str, database: str, schema: str, if_exists: str, **kwargs) -> None
Exports a data frame to a Google BigQuery warehouse. If table doesn’t exist, the table is automatically created.
- Args:
df (DataFrame)
: Data frame to exporttable_id (str)
: ID of the table to export the data frame to. If of the format"your-project.your_dataset.your_table_name"
. If this table exists, the table schema must match the data frame schema. If this table doesn’t exist, the table schema is automatically inferred.if_exists (str)
: Specifies export policy if table exists. Either -'fail'
: throw an error. -'replace'
: drops existing table and creates new table of same name. -'append'
: appends data frame to existing table. In this case the schema must match the original table. Defaults to'replace'
. Ifwrite_disposition
is specified as a keyword argument, this parameter is ignored (as both define the same functionality).**configuration_params
: Configuration parameters for export job. See valid configuration parameters at LoadJobConfig docs.
load - load(query_string: str, limit: int, *args, **kwargs) -> DataFrame
Loads data from the results of a query into a Pandas data frame. This will fail if the query returns no data from the database.
When a select query is provided, this function will load at maximum 10,000,000 rows of data (this limit is configurable). To operate on more data, consider performing data transformations in warehouse.
- Args:
query_string (str)
: Query to fetch a table or subset of a table.limit (int, Optional)
: The number of rows to limit the loaded data frame to. Defaults to 10,000,000.**kwargs
: Additional parameters to pass to the query. See Google BigQuery Python client docs for additional arguments.
sample - sample(schema: str, table: str, size: int, **kwargs) -> DataFrame
Sample data from a table in the BigQuery warehouse. Sample is not guaranteed to be random.
-
Args:
schema (str)
: The schema to select the table from.size (int)
: The number of rows to sample. Defaults to 10,000,000.table (str)
: The table to sample from in the connected database.
-
Returns: (
DataFrame
) Sampled data from the table.
PostgreSQL
mage_ai.io.postgres.Postgres
Handles data transfer between a PostgreSQL database and the Mage app. The Postgres
client utilizes the following keys to connect the PostgreSQL database.
Constructor
__init__(self, **kwargs)
Initializes the Postgres data loading client.
- Args:
dbname (str)
: The name of the database to connect to.user (str)
: The user with which to connect to the database with.password (str)
: The login password for the user.host (str)
: Host address for database.port (str)
: Port on which the database is running.verbose (bool)
: Enables verbose output printing. Defaults toFalse
.**kwargs
: Additional settings for creating psycopg2 connection
Attributes
- conn (
psycopg2.connection.Connection
) - underlying psycopg2 Connection object
Factory Methods
with_config - with_config(config: BaseConfigLoader, **kwargs) -> Postgres
Creates Postgres data loading client from configuration settings.
- Args:
config (BaseConfigLoader)
: Configuration loader object.**kwargs
: Additional parameters passed to the loader constructor
- Returns: (
Postgres
) The constructed dataloader using this method
Methods
close - close()
Closes connection to PostgreSQL database.
commit - commit()
Saves all changes made to the database since the previous transaction.
execute - execute(query_string: str, **kwargs) -> None
Sends query to the connected PostgreSQL database. Any changes made to the database will not be saved unless commit()
is called afterward.
- Args:
query_string (str)
: The query to execute on the PostgreSQL database.**kwargs
: Additional parameters to pass to the query. Seepsycopg2
docs for configuring query parameters.
export - export(df: DataFrame, table_name: str, database: str, schema: str, if_exists: str, index: bool, **kwargs) -> None
Exports data frame to the PostgreSQL database from a Pandas data frame. If table doesn’t exist, the table is automatically created. If the schema doesn’t exist, the schema is also created.
Any changes made to the database will not be saved unless commit()
is called afterward.
-
Args:
-
df (DataFrame)
: Data frame to export to the PostgreSQL database. -
table_name (str)
: Name of the table to export data to (excluding database and schema). -
database (str)
: Name of the database in which the table is located. -
schema (str)
: Name of the schema in which the table is located. -
if_exists (ExportWritePolicy)
: Specifies export policy if table exists. Either'fail'
: throw an error.'replace'
: drops existing table and creates new table of same name.'append'
: appends data frame to existing table.
Defaults to
'replace'
. -
index (bool)
: IfTrue
, the data frame index is also exported alongside the table. Defaults toFalse
. -
**kwargs
: Additional arguments to pass to writer.
-
load - load(query_string: str, limit: int, *args, **kwargs) -> DataFrame
Loads data from the results of a query into a Pandas data frame. This will fail if the query returns no data from the database.
This function will load at maximum 10,000,000 rows of data (this limit is configurable). To operate on more data, consider performing data transformations in warehouse.
- Args:
query_string (str)
: Query to fetch a table or subset of a table.limit (int, Optional)
: The number of rows to limit the loaded data frame to. Defaults to 10,000,000.**kwargs
: Additional parameters to pass to the query. Seepsycopg2
docs for configuring query parameters.
- Returns: (
DataFrame
) Data frame containing the queried data.
open - open()
Opens a connection to PostgreSQL database.
rollback - rollback()
Rolls back (deletes) all database changes made since the last transaction.
sample - sample(schema: str, table: str, size: int, **kwargs) -> DataFrame
Sample data from a table in the PostgreSQL database. Sample is not guaranteed to be random.
-
Args:
schema (str)
: The schema to select the table from.size (int)
: The number of rows to sample. Defaults to 10,000,000.table (str)
: The table to sample from in the connected database.
-
Returns: (
DataFrame
) Sampled data from the table.
OracleDB
Constructor
__init__(self, **kwargs)
Initializes the OracleDB data loading client.
- Args:
user (str)
: The user for connecting to the database with.password (str)
: The login password for the user.host (str)
: Host address for database.port (int)
: Port on which the database is running Defaults to 3306.service (str)
: OracleDB servicemode (str)
: switch between OracleDB mode ofthin
orthick
verbose (bool)
: Enables verbose output printing. Defaults toTrue
.**kwargs
: Additional settings for creating psycopg2 connection
To switch OracleDB mode to thick
, it is required to use the customized oracle Dockerfile in integrations/oracle/Dockerfile
.
Factory Methods
with_config - with_config(config: BaseConfigLoader, **kwargs) -> Postgres
Creates OracleDB data loading client from configuration settings.
- Args:
config (BaseConfigLoader)
: Configuration loader object.**kwargs
: Additional parameters passed to the loader constructor
- Returns: (
OracleDB
) The constructed dataloader using this method
Methods
execute - execute(query_string: str, **kwargs) -> None
Sends query to the connected OracleDB database. Any changes made to the database will not be saved unless commit()
is called afterward.
- Args:
query_string (str)
: The query to execute on the OracleDB database.**kwargs
: Additional parameters to pass to the query.
export - export(df: DataFrame, schema_name: str, table_name: str, if_exists: str, index: bool, **kwargs) -> None
Exports data frame to the OracleDB database from a Pandas data frame. If table doesn’t exist, the table is automatically created. The schema_name
can be set as None
since it is not required for OracleDB loaders.
Any changes made to the database will not be saved unless commit()
is called afterward.
-
Args:
-
df (DataFrame)
: Data frame to export to the OracleDB database. -
schema_name (str)
: Not required for OracleDB loaders. Set toNone
. -
table_name (str)
: Name of the table to export data to (excluding database). -
if_exists (ExportWritePolicy)
: Specifies export policy if table exists. Either'fail'
: throw an error.'replace'
: drops existing table and creates new table of same name.'append'
: appends data frame to existing table.
Defaults to
'replace'
. -
index (bool)
: IfTrue
, the data frame index is also exported alongside the table. Defaults toFalse
. -
**kwargs
: Additional arguments to pass to writer.
-
load - load(query_string: str, limit: int, *args, **kwargs) -> DataFrame
Loads data from the results of a query into a Pandas data frame. This will fail if the query returns no data from the database.
This function will load at maximum 10,000,000 rows of data (this limit is configurable). To operate on more data, consider performing data transformations in warehouse.
- Args:
query_string (str)
: Query to fetch a table or subset of a table.limit (int, Optional)
: The number of rows to limit the loaded data frame to. Defaults to 10,000,000.verbose (bool)
: Enables verbose output printing. Defaults toTrue
.
- Returns: (
DataFrame
) Data frame containing the queried data.
open - open()
Opens a connection to OracleDB database.
MySQL
mage_ai.io.mysql.MySQL
Handles data transfer between a MySQL database and the Mage app. The MySQL
client utilizes the following keys to connect the MySQL database.
Constructor
__init__(self, **kwargs)
Initializes the MySQL data loading client.
- Args:
database (str)
: The name of the database to connect to.user (str)
: The user for connecting to the database with.password (str)
: The login password for the user.host (str)
: Host address for database.port (int)
: Port on which the database is running Defaults to 3306.verbose (bool)
: Enables verbose output printing. Defaults toTrue
.**kwargs
: Additional settings for creating psycopg2 connection
Factory Methods
with_config - with_config(config: BaseConfigLoader, **kwargs) -> MySQL
Creates MySQL data loading client from configuration settings.
- Args:
config (BaseConfigLoader)
: Configuration loader object.**kwargs
: Additional parameters passed to the loader constructor
- Returns: (
MySQL
) The constructed dataloader using this method
Methods
close - close()
Closes connection to MySQL database.
commit - commit()
Saves all changes made to the database since the previous transaction.
execute - execute(query_string: str, **kwargs) -> None
Sends query to the connected MySQL database. Any changes made to the database will not be saved unless commit()
is called afterward.
- Args:
query_string (str)
: The query to execute on the MySQL database.**kwargs
: Additional parameters to pass to the query.
export - export(df: DataFrame, schema_name: str, table_name: str, if_exists: str, index: bool, **kwargs) -> None
Exports data frame to the MySQL database from a Pandas data frame. If table doesn’t exist, the table is automatically created. The schema_name
can be set as None
since it is not required for MySQL loaders.
Any changes made to the database will not be saved unless commit()
is called afterward.
-
Args:
-
df (DataFrame)
: Data frame to export to the MySQL database. -
schema_name (str)
: Not required for MySQL loaders. Set toNone
. -
table_name (str)
: Name of the table to export data to (excluding database). -
if_exists (ExportWritePolicy)
: Specifies export policy if table exists. Either'fail'
: throw an error.'replace'
: drops existing table and creates new table of same name.'append'
: appends data frame to existing table.
Defaults to
'replace'
. -
index (bool)
: IfTrue
, the data frame index is also exported alongside the table. Defaults toFalse
. -
**kwargs
: Additional arguments to pass to writer.
-
load - load(query_string: str, limit: int, *args, **kwargs) -> DataFrame
Loads data from the results of a query into a Pandas data frame. This will fail if the query returns no data from the database.
This function will load at maximum 10,000,000 rows of data (this limit is configurable). To operate on more data, consider performing data transformations in warehouse.
- Args:
query_string (str)
: Query to fetch a table or subset of a table.limit (int, Optional)
: The number of rows to limit the loaded data frame to. Defaults to 10,000,000.
- Returns: (
DataFrame
) Data frame containing the queried data.
open - open()
Opens a connection to MySQL database.
sample - sample(database: str, table: str, size: int, **kwargs) -> DataFrame
Sample data from a table in the MySQL database. Sample is not guaranteed to be random.
-
Args:
database (str)
: The database to select the table from.table (str)
: The table to sample from in the connected database.size (int)
: The number of rows to sample. Defaults to 10,000,000.
-
Returns: (
DataFrame
) Sampled data from the table.
DuckDB
mage_ai.io.duckdb
Handles data transfer between DuckDB database and the Mage app. DuckDB client utilizes the following keys to connect:
Constructor
Initializes the DuckDB data loading client.
- Args:
database (str)
: The name of the database to connect to.schema
: Schema to use.verbose (bool)
: Enables verbose output printing. Defaults toTrue
.**kwargs
: Additional settings for creating psycopg2 connection
Factory Methods
with_config - with_config(config: BaseConfigLoader, **kwargs) -> DuckDB
Creates DuckDB data loading client from configuration settings.
- Args:
config (BaseConfigLoader)
: Configuration loader object.**kwargs
: Additional parameters passed to the loader constructor
- Returns: (
DuckDB
) The constructed dataloader using this method
Methods
close - close()
Closes connection to DuckDB database.
open - open()
Opens a connection to DuckDB database.
table_exists - table_exists()
Build query to check if a table existing in the DB.
- Args:
schema_name
: Name of the schema to use.table_name
: Name of the table.
- Returns: true or false.
upload_dataframe - upload_dataframe()
Build query to insert data into table.
- Args:
df
: Data in dataframe type.
get_type - get_type()
Convert data from input type into DuckDB type.
- Args:
column
: Metadata associated with the column.dtype
: Input data type.
- Returns: DuckDB data type in string format.
Snowflake
mage_ai.io.snowflake.Snowflake
Handles data transfer between a Snowflake data warehouse and the Mage app. The Snowflake
client utilizes the following keys to authenticate access and connect to Snowflake servers.
Constructor
__init__(self, **kwargs)
Initializes settings for connecting to Snowflake data warehouse. The following arguments must be provided to the connector, all other arguments are optional.
Required Arguments:
user (str)
: Username for the Snowflake user.password (str)
: Login Password for the user.account (str)
: Snowflake account identifier (including region, excludingsnowflake-computing.com
suffix).
Optional Arguments:
verbose (bool)
: Specify whether to print verbose output.database (str)
: Name of the default database to use. If unspecified no database is selected on login.schema (str)
: Name of the default schema to use. If unspecified no schema is selected on login.warehouse (str)
: Name of the default warehouse to use. If unspecified no warehouse is selected on login.
Attributes
- conn (
snowflake.connector.Connection
) - underlying Snowflake Connection object
Factory Methods
with_config - with_config(config: BaseConfigLoader, **kwargs) -> Snowflake
Creates Snowflake data loading client from configuration settings.
- Args:
config (BaseConfigLoader)
: Configuration loader object.verbose (bool)
: Enables verbose output printing. Defaults to False.**kwargs
: Additional parameters passed to the loader constructor
- Returns: (
Snowflake
) The constructed dataloader using this method
Methods
close - close()
Closes connection to Snowflake server.
commit - commit()
Saves all changes made to the warehouse since the previous transaction.
execute - execute(query_string: str, **kwargs) -> None
Sends query to the connected Snowflake warehouse. Any changes made to the database will not be saved unless commit()
is called afterward.
- Args:
query_string (str)
: The query to execute on the Snowflake warehouse.**kwargs
: Additional parameters to pass to the query. See Snowflake Connector Docs for additional parameters.
export - export(df: DataFrame, table_name: str, database: str, schema: str, if_exists: str, **kwargs) -> None
Exports a Pandas data frame to a Snowflake warehouse based on the table name. If table doesn’t exist, the table is automatically created.
Any changes made to the database will not be saved unless commit()
is called afterward.
-
Args:
-
df (DataFrame)
: Data frame to export to a Snowflake warehouse. -
table_name (str)
: Name of the table to export data to (excluding database and schema). -
database (str)
: Name of the database in which the table is located. -
schema (str)
: Name of the schema in which the table is located. -
if_exists (str, optional)
: Specifies export policy if table exists. Either'fail'
: throw an error.'replace'
: drops existing table and creates new table of same name.'append'
: appends data frame to existing table.
Defaults to
'append'
. -
**kwargs
: Additional arguments to pass to writer
-
load - load(query_string: str, limit: int, *args, **kwargs) -> DataFrame
Loads data from Snowflake into a Pandas data frame based on the query given. This will fail unless a SELECT
query is provided.
This function will load at maximum 10,000,000 rows of data (this limit is configurable). To operate on more data, consider performing data transformations in warehouse using execute
.
- Args:
query_string (str)
: Query to fetch a table or subset of a table.limit (int, Optional)
: The number of rows to limit the loaded data frame to. Defaults to 10,000,000.*args, **kwargs
: Additional parameters to pass to the query. See Snowflake Connector Docs for additional parameters.
- Returns: (
DataFrame
) Data frame containing the queried data
open - open()
Opens a connection to Snowflake servers.
rollback - rollback()
Rolls back (deletes) all database changes made since the last transaction.
sample - sample(schema: str, table: str, size: int, **kwargs) -> DataFrame
Sample data from a table in the Snowflake warehouse. Sample is not guaranteed to be random.
-
Args:
schema (str)
: The schema to select the table from.size (int)
: The number of rows to sample. Defaults to 10,000,000.table (str)
: The table to sample from in the connected database.
-
Returns: (
DataFrame
) Sampled data from the data frame.
Configuration Settings
Connections to third-party data storage require you to specify confidential information such as login information or access keys. While you can manually specify this information code while constructing data loading clients, it is recommended to not store the secrets directly in code.
Instead, Mage provides configuration loaders which allow data loading clients to use your secrets without explicitly writing them in code.
Currently Mage only supports AWS Secrets Manager as a configuration loader. If you need to use secrets from Azure Key Vault or GCP Secret Manager, you can reference the following docs:
Currently, the following sources (and their corresponding configuration loader) can be used to load configuration settings:
- Configuration File -
ConfigFileLoader
- Environment Variables -
EnvironmentVariableLoader
- AWS Secrets Manager -
AWSSecretLoader
s For example, the code below constructs a Redshift data loading client using secrets stored in AWS Secrets Manager
The following are the set of allowed key names that you must name your secrets with in order for Mage’s configuration loaders to recognize your secrets. In code you can refer to these keys by their string name or using the mage_ai.io.config.ConfigKey
enum. Not all keys need be specified at once - only use the keys related to the services you utilize.
Key Name | Service | Client Constructor Parameter | Description | Notes |
---|---|---|---|---|
AWS_ACCESS_KEY_ID | AWS General | - | AWS Access Key ID credential | Used by Redshift and S3 |
AWS_SECRET_ACCESS_KEY | AWS General | - | AWS Secret Access Key credential | Used by Redshift and S3 |
AWS_SESSION_TOKEN | AWS General | - | AWS Session Token (used to generate temporary DB credentials) | Used by Redshift |
AWS_REGION | AWS General | - | AWS Region | Used by Redshift and S3 |
REDSHIFT_DBNAME | AWS Redshift | database | Name of Redshift database to connect to | |
REDSHIFT_HOST | AWS Redshift | host | Redshift Cluster hostname | Use with temporary credentials |
REDSHIFT_PORT | AWS Redshift | port | Redshift Cluster port. Optional, defaults to 5439. | Use with temporary credentials |
REDSHIFT_TEMP_CRED_USER | AWS Redshift | user | Redshift temporary credentials username. | Use with temporary credentials |
REDSHIFT_TEMP_CRED_PASSWORD | AWS Redshift | password | Redshift temporary credentials password. | Use with temporary credentials |
REDSHIFT_DBUSER | AWS Redshift | db_user | Redshift database user to generate credentials for. | Use to generate temporary credentials |
REDSHIFT_CLUSTER_ID | AWS Redshift | cluster_identifier | Redshift cluster ID | Use to generate temporary credentials |
REDSHIFT_IAM_PROFILE | AWS Redshift | profile | Name of the IAM profile to generate temporary credentials with | Use to generate temporary credentials |
POSTGRES_DBNAME | PostgreSQL | dbname | Database name | |
POSTGRES_USER | PostgreSQL | user | Database login username | |
POSTGRES_PASSWORD | PostgreSQL | password | Database login password | |
POSTGRES_HOST | PostgreSQL | host | Database hostname | |
POSTGRES_PORT | PostgreSQL | port | PostgreSQL database port | |
SNOWFLAKE_USER | Snowflake | user | Snowflake username | |
SNOWFLAKE_PASS | Snowflake | password | Snowflake password | |
SNOWFLAKE_ACCOUNT | Snowflake | account | Snowflake account ID (including region) | |
SNOWFLAKE_DEFAULT_DB | Snowflake | database | Default database to use. Optional, no database chosen if unspecified. | |
SNOWFLAKE_DEFAULT_SCHEMA | Snowflake | schema | Default schema to use. Optional, no schema chosen if unspecified. | |
SNOWFLAKE_DEFAULT_WH | Snowflake | warehouse | Default warehouse to use. Optional, no warehouse chosen if unspecified. | |
GOOGLE_SERVICE_ACC_KEY | Google BigQuery | credentials_mapping | Service account key | |
GOOGLE_SERVICE_ACC_KEY_FILEPATH | Google BigQuery | path_to_credentials | Path to service account key |
Configuration Loader APIs
This section contains the exact APIs and more detailed information on the configuration loaders. Every configuration loader has two functions:
-
contains
- checks if the configuration source contains the requested key. Commonly, thein
operation is used to check for setting existence (but is not always identical ascontains
can accept multiple parameters while thein
keyword only accepts the key). -
get
- gets the configuration setting associated with the given key. If the key doesn’t exist, returns None. Commonly, the data model overload__getitem__
is used to fetch a configuration setting (but is not always identical asget
can accept multiple parameters while__getitem__
does not).
These functions are shared among all configuration loaders, but depending on the source some function signatures may differ.
Configuration File
Loads configuration settings from a configuration file.
Example:
Constructor: __init__(filepath: os.PathLike, profile: str)
Initializes IO Configuration loader. Input configuration file can have two formats:
- Standard: contains a subset of the configuration keys specified in
ConfigKey
. This is the default and recommended format. Below is an example configuration file using this format.The above configuration file has a single profile named'default'
. Each profile organizes a set of keys to use (for example, distinguishing production keys versus development keys). A configuration file can have multiple profiles. - Verbose: Instead of configuration keys, each profile stores an object of settings associated with
each data migration client. This format was used in previous versions of this tool, and exists
for backwards compatibility. Below is an example configuration file using this format.
Use handlebars and env_var
syntax to reference environment variables in either configuration file format.
Args:
filepath (os.PathLike, optional)
: Path to IO configuration file. Defaults to'[repo_path]/io_config.yaml'
profile (str, optional)
: Profile to load configuration settings from. Defaults to'default'
.
Methods
contains - contains(self, key: ConfigKey | str) -> Any
Checks if the configuration setting stored under key
is contained.
- Args:
key (str)
: Name of the configuration setting to check.
- Returns (
bool
) ReturnsTrue
if configuration setting exists, otherwise returnsFalse
.
get - get(self, key: ConfigKey | str) -> Any
Loads the configuration setting stored under key
.
- Args:
key (str)
: Key name of the configuration setting to load
- Returns: (
Any
) Configuration setting corresponding to the given key
Environment Variables
Loads configuration settings from environment variables in your current environment.
Example:
Constructor : __init__(self)
- no parameters for construction.
Methods:
contains - contains(env_var: ConfigKey | str) -> bool
Checks if the environment variable given by env_var
exists.
-
Args:
key (ConfigKey | str)
: Name of the configuration setting to check existence of.
-
Returns (
bool
) ReturnsTrue
if configuration setting exists, otherwise returnsFalse
.
get - get(env_var: ConfigKey | str) -> Any
Loads the config setting stored under the environment variable env_var
.
-
Args:
env_var (str)
: Name of the environment variable to load configuration setting from
-
Returns: (
Any
) The configuration setting stored underenv_var
AWS Secret Loader
Loads secrets from AWS Secrets Manager. To authenticate access to AWS Secrets Manager, either
- Configure your AWS profile using the AWS CLI
- Manually specify your AWS Credentials when constructing the configuration loader
python config = AWSSecretLoader( aws_access_key_id = 'your access key id', aws_secret_access_key = 'your secret key', region_name = 'your region' )
Example:
Constructor : __init__(self, **kwargs)
:
- Keyword Arguments:
aws_access_key_id (str, Optional)
: AWS access key ID credential.aws_secret_access_key (str, Optional)
: AWS secret access key credential.region_name (str, Optional)
: AWS region which Secrets Manager is created in.
Methods:
contains - contains( secret_id: ConfigKey | str, version_id: str, version_stage_label : str) -> bool
Check if there is a secret with ID secret_id
contained. Can also specify the version of the
secret to check. If
- both
version_id
andversion_stage_label
are specified, both must agree on the secret version. - neither of
version_id
orversion_stage_label
are specified, any version is checked. - one of
version_id
andversion_stage_label
are specified, the associated version is checked.
When using the in
operator, comparisons to specific versions are not allowed.
- Args:
secret_id (str)
: ID of the secret to loadversion_id (str, Optional)
: ID of the version of the secret to load. Defaults toNone
.version_stage_label (str, Optional)
: Staging label of the version of the secret to load. Defaults toNone
.
- Returns: (
bool
) Returns true if secret exists, otherwise returns false.
get - get(secret_id: ConfigKey | str, version_id: str, version_stage_label : str) -> bytes | str
Loads the secret stored under secret_id
. Can also specify the version of the
secret to fetch. If
- both
version_id
andversion_stage_label
are specified, both must agree on the secret version. - neither of
version_id
orversion_stage_label
are specified, the current version is loaded. - one of
version_id
andversion_stage_label
are specified, the associated version is loaded.
When using the __getitem__
overload, comparisons to specific versions are not allowed.
-
Args:
secret_id (str)
: ID of the secret to loadversion_id (str, Optional)
: ID of the version of the secret to load. Defaults toNone
.version_stage_label (str, Optional)
: Staging label of the version of the secret to load. Defaults toNone
.
-
Returns: (
bytes | str
) The secret stored undersecret_id
in AWS secret manager. If secret is a binary value, returns abytes
object; else returns astring
object
Was this page helpful?