execute
method which execute your queries in the connected database. The Google BigQuery client follows this structure.
mage_ai.io.file.FileIO
client as a convenience wrapper.
The following code uses the load
function of the FileIO
client to load the Titanic survival dataset from a CSV file into a Pandas DataFrame for use in your pipeline. All data loaders can be initialized with the verbose = True
parameter, which will print the current action the data loading client is performing. This parameter defaults to False
.
export
function is used to save this data frame back to file, this time in JSON format
load
and export
. These arguments are passed to Pandas IO procedures for the requested file format, enabling fine-grained control over how your data is loaded and exported.
As the data loader was constructed with the verbose parameter set to True
, the above operations would print the following output describing the actions of the data loader.
mage_ai.io.snowflake.Snowflake
data loading client. In order to authenticate access to a Snowflake warehouse, the client requires the associated Snowflake credentials:
mage init
to create your project repository, you can store these values in your io_config.yaml
file and use mage_ai.io.config.ConfigFileLoader
to construct the data loader client.
An example io_config.yaml
file in this instance would be:
execute()
- executes an arbitrary query on your data warehouse. In this case, the warehouse, database, and schema to use are selected.load()
- loads the results of a SELECT
query into a Pandas DataFrame.export()
- stores data frame as a table in your data warehouse. If the table exists, then the data is appended by default (and can be configured with other behavior, see Snowflake API). If the table doesn’t exist, then the table is created with the given schema name and table name (the table data types are inferred from the Python data type).loader
object manages a direct connection to your Snowflake data warehouse, so it is important to make sure that your connection is closed once your operations are completed. You can manually use loader.open()
and loader.close()
to open and close the connection to your data warehouse or automatically manage the connection with a context manager.
To learn more about loading data from Snowflake, see the Snowflake API for more details on member functions and usage.
mage_ai.io.redshift.Redshift
Handles data transfer between a Redshift cluster and the Mage app. Mage uses temporary credentials to authenticate access to a Redshift cluster. There are two ways to specify these credentials:
Redshift
to use the temporary credentials:
~/.aws/
and is used with the GetClusterCredentials
endpoint to generate temporary credentials. Add the following keys to the configuration settings for Redshift
to generate temporary credentials
aws configure
, manually specify the AWS credentials in the configuration settings as well.
__init__(**kwargs)
verbose
: Enables verbose output. Defaults to False
.redshift_connector.Connection
) - the underlying Redshift Connection object.with_config(config: BaseConfigLoader, **kwargs) -> Redshift
Initializes Redshift client from configuration settings.
config (BaseConfigLoader)
: Configuration loader object.**kwargs
: Additional parameters passed to the loader constructor.Redshift
) Redshift data loading client constructed using this methodwith_temporary_credentials(database: str, host: str, user: str, password: str, port: int = 5439, **kwargs) -> Redshift
Creates a Redshift data loader with temporary database credentials.
database (str)
: Name of the database to connect to.host (str)
: The hostname of the Redshift cluster which the database belongs to.user (str)
: Temporary credentials username for use in authentication.password (str)
: Temporary credentials password for use in authentication.port (int, optional)
: Port number of the Redshift cluster. Defaults to 5439.**kwargs
: Additional parameters passed to the loader constructor.Redshift
) Redshift data loading client constructed using this methodwith_iam(cluster_identifier: str, database: str, db_user: str, profile: str, **kwargs) -> Redshift
Creates a Redshift data loader using an IAM profile from ~/.aws
.
The IAM Profile settings can also be manually specified as keyword arguments to this constructor, but is not recommended. If credentials are manually specified, the region of the Redshift cluster must also be specified.
cluster_identifier (str)
: Identifier of the cluster to connect to.database (str)
: The database to connect to within the specified cluster.db_user (str)
: Database usernameprofile (str, optional)
: The profile to use from stored credentials file.
Defaults to ‘default’.**kwargs
: Additional parameters passed to the loader constructorRedshift
) Redshift data loading client constructed using this methodclose()
Closes connection to the Redshift cluster specified in the loader configuration.
commit - commit()
Saves all changes made to the database since the last transaction.
execute - execute(query_string: str, **kwargs) -> None
Sends query to the connected Redshift cluster. Any changes made to the database will not be saved unless commit()
is called afterward.
query_string (str)
: The query to execute on the Redshift cluster.**kwargs
: Additional parameters to pass to the query. See redshift-connector
docs for configuring query parameters.export(df: DataFrame, table_name: str) -> None
Exports a Pandas data frame to a Redshift cluster under the specified table. The changes made to the database will not be saved unless commit()
is called afterward.
df (DataFrame)
: Data frame to export to database in Redshift cluster.table_name (str)
: Name of the table to export the data to. Table must already exist.load(query_string: str, limit: int, *args, **kwargs) -> DataFrame
Loads data from the results of a query into a Pandas data frame. This will fail if the query returns no data from the database.
This function will load at maximum 10,000,000 rows of data (this limit is configurable). To operate on more data, consider performing data transformations in warehouse using execute
.
query_string (str)
: Query to fetch a table or subset of a table.limit (int, Optional)
: The number of rows to limit the loaded data frame to. Defaults to 10,000,000.*args, **kwargs
: Additional parameters to send to query, including parameters
for use with format strings. See redshift-connector
docs for configuring query parameters.DataFrame
) Data frame containing the queried data.
open()
Opens a connection to the Redshift cluster specified in the loader configuration.
rollback - rollback()
Rolls back (deletes) all database changes made since the last transaction.
sample - sample(schema: str, table: str, size: int, **kwargs) -> DataFrame
Sample data from a table in the selected database in the Redshift cluster. Sample is not guaranteed to be random.
schema (str)
: The schema to select the table from.size (int)
: The number of rows to sample. Defaults to 10,000,000.table (str)
: The table to sample from in the connected database.DataFrame
) Sampled data from the table.
mage_ai.io.s3.S3
Handles data transfer between an S3 bucket and the Mage app. The S3
client supports importing and exporting with the following file formats:
aws configure
, then AWS credentials for accesses the S3 bucket can be manually specified through either Mage’s configuration loader system or through keyword arguments in the constructor (see constructor).
If using configuration settings, specify the following keys:
__init__(self, verbose: bool)
Initializes the S3 data loading client.
verbose (bool)
: Enables verbose output printing. Defaults to False.aws configure
and Mage’s configuration loader is not used, then specify your AWS credentials through the following keyword arguments:
aws_access_key_id (str)
: AWS Access Key ID credentialaws_secret_access_key (str)
: AWS Secret Access Key credentialaws_region (str)
: Region associated with AWS IAM profilewith_config(config: BaseConfigLoader, **kwargs) -> S3
Creates S3 data loading client from configuration settings.
config (BaseConfigLoader)
: Configuration loader object.**kwargs
: Additional parameters passed to the loader constructorS3
) The constructed dataloader using this methodexport(df: DataFrame, bucket_name: str, object_key: str, format: FileFormat | str = None, **kwargs) -> None:
Exports data frame to an S3 bucket.
If the format is HDF5, the default key under which the data frame is stored is the stem of the filename. For example, if the file to write the data frame to is ‘storage/my_data_frame.hdf5’, the key would be ‘my_data_frame’. This can be overridden using the key
keyword argument.
data (Union[DataFrame, str])
: Data frame or file path to export.
bucket_name (str)
: Name of the bucket to export data frame to.
object_key (str)
: Object key in S3 bucket to export data frame to.
format (FileFormat | str, Optional)
: Format of the file to export data frame to. Defaults to None
, in which case the format is inferred.
**kwargs
- Additional keyword arguments to pass to the file writer. See possible arguments by file formats at:
Format | Pandas Writer |
---|---|
CSV | DataFrame.to_csv |
JSON | DataFrame.to_json |
Parquet | DataFrame.to_parquet |
HDF5 | DataFrame.to_hdf |
XML | DataFrame.to_xml |
Excel | DataFrame.to_excel |
load(bucket_name: str, object_key: str, format: FileFormat | str = None, limit: int = QUERY_ROW_LIMIT **kwargs) -> DataFrame
Loads data from object in S3 bucket into a Pandas data frame. This function will load at maximum 10,000,000 rows of data from the specified file (this limit is configurable).
bucket_name (str)
: Name of the bucket to load data from.
object_key (str)
: Key of the object in S3 bucket to load data from.
format (FileFormat | str, Optional)
: Format of the file to load data frame from. Defaults to None, in which case the format is inferred.
limit (int, optional)
: The number of rows to limit the loaded data frame to. Defaults to 10,000,000.
**kwargs
- Additional keyword arguments to pass to the file writer. See possible arguments by file formats at:
Format | Pandas Reader |
---|---|
CSV | read_csv |
JSON | read_json |
Parquet | read_parquet |
HDF5 | read_hdf |
XML | read_xml |
Excel | read_excel |
DataFrame
) Data frame object loaded from data in the specified file.
mage_ai.io.file.FileIO
Handles data transfer between the filesystem and the Mage app. The FileIO
client currently supports importing and exporting with the following file formats:
__init__(self, verbose: bool)
Initializes the FileIO data loading client.
verbose (bool)
: Enables verbose output printing. Defaults to False.export(df: DataFrame, filepath: os.PathLike, format: FileFormat | str = None, **kwargs) -> None:
Exports the input data frame to the file specified.
If the format is HDF5, the default key under which the data frame is stored is the stem of the filename. For example, if the file to write the data frame to is ‘storage/my_data_frame.hdf5’, the key would be ‘my_data_frame’. This can be overridden using the key
keyword argument.
df (DataFrame)
: Data frame to export.
filepath (os.PathLike)
: Filepath to export data frame to.
format (FileFormat | str, Optional)
: Format of the file to export data frame to. Defaults to None, in which case the format is inferred.
**kwargs
- Additional keyword arguments to pass to the file writer. See possible arguments by file formats at:
Format | Pandas Writer |
---|---|
CSV | DataFrame.to_csv |
JSON | DataFrame.to_json |
Parquet | DataFrame.to_parquet |
HDF5 | DataFrame.to_hdf |
XML | DataFrame.to_xml |
Excel | DataFrame.to_excel |
load(filepath: os.PathLike, format: FileFormat | str = None, limit: int = QUERY_ROW_LIMIT **kwargs) -> DataFrame
Loads the data frame from the filepath specified. This function will load at maximum 10,000,000 rows of data from the specified file (this limit is configurable).
filepath (os.PathLike)
: Filepath to load data frame from.
format (FileFormat | str, Optional)
: Format of the file to load data frame from. Defaults to None, in which case the format is inferred.
limit (int, optional)
: The number of rows to limit the loaded data frame to. Defaults to 10,000,000.
**kwargs
- Additional keyword arguments to pass to the file writer. See possible arguments by file formats at:
Format | Pandas Reader |
---|---|
CSV | read_csv |
JSON | read_json |
Parquet | read_parquet |
HDF5 | read_hdf |
XML | read_xml |
Excel | read_excel |
DataFrame
) Data frame object loaded from data in the specified file.
mage_ai.io.google_cloud_storage.GoogleCloudStorage
Handles data transfer between a Google Cloud Storage bucket and the Mage app. Supports loading files of any of the following types:
GOOGLE_APPLICATION_CREDENTIALS
environment is set, no further arguments are needed other than those specified below. Otherwise, use the factory method with_config
to construct the data loader using manually specified credentials.
To authenticate (and authorize) access to Google Cloud Storage, credentials must be provided.
Below are the different ways to access those credentials:
GOOGLE_APPLICATION_CREDENTIALS
environment variable holding the filepath to your service account keyGOOGLE_SERVICE_ACC_KEY_FILEPATH
key with your configuration loader or the path_to_credentials
keyword argument with the client constructor holding the filepath to your service account keyGOOGLE_SERVICE_ACC_KEY
key with your configuration loader or the credentials_mapping
keyword argument with the client constructor holding a mapping sharing the same contents as your service key
google.oauth2.service_account.Credentials
object with the keyword argument credentials
__init__(self, verbose: bool = False, **kwargs)
Initializes the GoogleCloudStorage data loading client.
verbose (bool)
: Enables verbose output printing. Defaults to False
.credentials_mapping (Mapping[str, str])
- Mapping object corresponding to your service account key. See instructions above on when to use this keyword argumentpath_to_credentials (str)
- Filepath to service account key. See instructions above on when to use this keyword argument.with_config(config: BaseConfigLoader, **kwargs) -> GoogleCloudStorage
Creates GoogleCloudStorage data loading client from configuration settings.
config (BaseConfigLoader)
: Configuration loader object.**kwargs
: Additional parameters passed to the loader constructorGoogleCloudStorage
) The constructed dataloader using this methodexport(df: DataFrame, bucket_name: str, object_key: str, format: FileFormat | str = None, **kwargs) -> None:
Exports data frame to a GoogleCloudStorage bucket.
data (Union[DataFrame, str])
: Data frame or file path to export.
bucket_name (str)
: Name of the bucket to export data frame to.
object_key (str)
: Object key in GoogleCloudStorage bucket to export data frame to.
format (FileFormat | str, Optional)
: Format of the file to export data frame to. Defaults to None
, in which case the format is inferred.
**kwargs
- Additional keyword arguments to pass to the file writer. See possible arguments by file formats at:
Format | Pandas Writer |
---|---|
CSV | DataFrame.to_csv |
JSON | DataFrame.to_json |
Parquet | DataFrame.to_parquet |
HDF5 | DataFrame.to_hdf |
XML | DataFrame.to_xml |
Excel | DataFrame.to_excel |
load(bucket_name: str, object_key: str, format: FileFormat | str = None, limit: int = QUERY_ROW_LIMIT, **kwargs) -> DataFrame
Loads data from object in GoogleCloudStorage bucket into a Pandas data frame. This function will load at maximum 10,000,000 rows of data from the specified file (this limit is configurable).
bucket_name (str)
: Name of the bucket to load data from.
object_key (str)
: Key of the object in GoogleCloudStorage bucket to load data from.
format (FileFormat | str, Optional)
: Format of the file to load data frame from. Defaults to None, in which case the format is inferred.
limit (int, optional)
: The number of rows to limit the loaded data frame to. Defaults to 10,000,000.
**kwargs
- Additional keyword arguments to pass to the file writer. See possible arguments by file formats at:
Format | Pandas Reader |
---|---|
CSV | read_csv |
JSON | read_json |
Parquet | read_parquet |
HDF5 | read_hdf |
XML | read_xml |
Excel | read_excel |
DataFrame
) Data frame object loaded from data in the specified file.
mage_ai.io.azure_blob_storage.AzureBlobStorage
Handles data transfer between an Azure Blob Storage conatiner and Mage. Supports loading files of the following types:
AZURE_CLIENT_ID
AZURE_CLIENT_SECRET
AZURE_TENANT_ID
AZURE_STORAGE_ACCOUNT_NAME
io_config.yaml
.
verbose (bool)
: Enables verbose output printing. Defaults to False
.with_config(config: BaseConfigLoader, **kwargs) -> AzureBlobStorage
Creates AzureBlobStorage data loading client from configuration settings.
config (BaseConfigLoader)
: Configuration loader object.**kwargs
: Additional parameters passed to the loader constructorAzureBlobStorage
) The constructed dataloader using this methodexport(df: DataFrame, bucket_name: str, object_key: str, format: FileFormat | str = None, **kwargs) -> None:
Exports data frame to an Azure Blob Storage container.
df (DataFrame)
: Data frame to export.
container_name (str)
: Name of the Azure container to export data to.
blob_path (str)
: The desired output path of the data in your Azure Blob
format (FileFormat | str, Optional)
: Format of the file to export data frame to. Defaults to None
, in which case the format is inferred.
**kwargs
- Additional keyword arguments to pass to the file writer. See possible arguments by file formats at:
Format | Pandas Writer |
---|---|
CSV | DataFrame.to_csv |
JSON | DataFrame.to_json |
Parquet | DataFrame.to_parquet |
HDF5 | DataFrame.to_hdf |
XML | DataFrame.to_xml |
Excel | DataFrame.to_excel |
load(container_name: str, blob_path: str, format: FileFormat | str = None, limit: int = QUERY_ROW_LIMIT, **kwargs) -> DataFrame
Loads data from object in Azure Blob Storage into a Pandas data frame. This function will load at maximum 10,000,000 rows of data from the specified file (this limit is configurable).
mage_ai.io.google_sheets.GoogleSheets
Handles data transfer between a Google Sheets spreadsheet and the Mage app.
Authentication is the same for our other Google Cloud data loaders. Read more about configuration here.
The Google Sheets class is defined here.
mage_ai.io.bigquery.BigQuery
Handles data transfer between a BigQuery data warehouse and the Mage app.
Authentication with a Google BigQuery warehouse requires specifying the service account key for the service account that has access to the BigQuery warehouse. There are four ways to provide this service key:
GOOGLE_APPLICATION_CREDENTIALS
environment variable holding the filepath to your service account keyGOOGLE_SERVICE_ACC_KEY_FILEPATH
key with your configuration loader or the path_to_credentials
keyword argument with the client constructor holding the filepath to your service account keyGOOGLE_SERVICE_ACC_KEY
key with your configuration loader or the credentials_mapping
keyword argument with the client constructor holding a mapping sharing the same contents as your service key
google.oauth2.service_account.Credentials
object with the keyword argument credentials
__init__(self, **kwargs)
Initializes the BigQuery data loading client.
verbose (bool)
: Enables verbose output printing. Defaults to False
.credentials_mapping (Mapping[str, str])
- Mapping object corresponding to your service account key. See instructions above on when to use this keyword argumentpath_to_credentials (str)
- Filepath to service account key. See instructions above on when to use this keyword argument.with_config(config: BaseConfigLoader, **kwargs) -> BigQuery
Creates BigQuery data loading client from configuration settings.
config (BaseConfigLoader)
: Configuration loader object.**kwargs
: Additional parameters passed to the loader constructorBigQuery
) BigQuery data loading client constructed using this methodwith_credentials_file(cls, path_to_credentials: str, **kwargs) -> BigQuery
Constructs BigQuery data loader using the file containing the service account key.
path_to_credentials (str)
: Path to the credentials file.**kwargs
: Additional parameters to pass to BigQuery client constructor.BigQuery
) BigQuery data loading client constructed using this methodwith_credentials_object(cls, credentials: Mapping[str, str], **kwargs) -> BigQuery
Constructs BigQuery data loader using manually specified service account key credentials.
credentials (Mapping[str, str])
: Mapping containing same key-value pairs as a service account key.**kwargs
: Additional parameters to pass to BigQuery client constructor.BigQuery
) BigQuery data loading client constructed using this methodexecute(query_string: str, **kwargs) -> None
Sends query to the connected BigQuery warehouse.
query_string (str)
: Query to execute on the BigQuery warehouse.**kwargs
: Additional arguments to pass to query, such as query configurations. See Client.query() docs for additional arguments.export(df: DataFrame, table_name: str, database: str, schema: str, if_exists: str, **kwargs) -> None
Exports a data frame to a Google BigQuery warehouse. If table doesn’t exist, the table is automatically created.
df (DataFrame)
: Data frame to exporttable_id (str)
: ID of the table to export the data frame to. If of the format
"your-project.your_dataset.your_table_name"
. If this table exists, the table schema must match the data frame schema. If this table doesn’t exist, the table schema is automatically inferred.if_exists (str)
: Specifies export policy if table exists. Either - 'fail'
: throw an error. - 'replace'
: drops existing table and creates new table of same name. - 'append'
: appends data frame to existing table. In this case the schema must match the original table.
Defaults to 'replace'
. If write_disposition
is specified as a keyword argument, this parameter
is ignored (as both define the same functionality).**configuration_params
: Configuration parameters for export job. See valid configuration parameters at LoadJobConfig docs.load(query_string: str, limit: int, *args, **kwargs) -> DataFrame
Loads data from the results of a query into a Pandas data frame. This will fail if the query returns no data from the database.
When a select query is provided, this function will load at maximum 10,000,000 rows of data (this limit is configurable). To operate on more data, consider performing data transformations in warehouse.
query_string (str)
: Query to fetch a table or subset of a table.limit (int, Optional)
: The number of rows to limit the loaded data frame to. Defaults to 10,000,000.**kwargs
: Additional parameters to pass to the query. See Google BigQuery Python client docs for additional arguments.sample(schema: str, table: str, size: int, **kwargs) -> DataFrame
Sample data from a table in the BigQuery warehouse. Sample is not guaranteed to be random.
schema (str)
: The schema to select the table from.size (int)
: The number of rows to sample. Defaults to 10,000,000.table (str)
: The table to sample from in the connected database.DataFrame
) Sampled data from the table.
mage_ai.io.postgres.Postgres
Handles data transfer between a PostgreSQL database and the Mage app. The Postgres
client utilizes the following keys to connect the PostgreSQL database.
__init__(self, **kwargs)
Initializes the Postgres data loading client.
dbname (str)
: The name of the database to connect to.user (str)
: The user with which to connect to the database with.password (str)
: The login password for the user.host (str)
: Host address for database.port (str)
: Port on which the database is running.verbose (bool)
: Enables verbose output printing. Defaults to False
.**kwargs
: Additional settings for creating psycopg2 connectionpsycopg2.connection.Connection
) - underlying psycopg2 Connection objectwith_config(config: BaseConfigLoader, **kwargs) -> Postgres
Creates Postgres data loading client from configuration settings.
config (BaseConfigLoader)
: Configuration loader object.**kwargs
: Additional parameters passed to the loader constructorPostgres
) The constructed dataloader using this methodclose()
Closes connection to PostgreSQL database.
commit - commit()
Saves all changes made to the database since the previous transaction.
execute - execute(query_string: str, **kwargs) -> None
Sends query to the connected PostgreSQL database. Any changes made to the database will not be saved unless commit()
is called afterward.
query_string (str)
: The query to execute on the PostgreSQL database.**kwargs
: Additional parameters to pass to the query. See psycopg2
docs for configuring query parameters.export(df: DataFrame, table_name: str, database: str, schema: str, if_exists: str, index: bool, **kwargs) -> None
Exports data frame to the PostgreSQL database from a Pandas data frame. If table doesn’t exist, the table is automatically created. If the schema doesn’t exist, the schema is also created.
Any changes made to the database will not be saved unless commit()
is called afterward.
df (DataFrame)
: Data frame to export to the PostgreSQL database.
table_name (str)
: Name of the table to export data to (excluding database and schema).
database (str)
: Name of the database in which the table is located.
schema (str)
: Name of the schema in which the table is located.
if_exists (ExportWritePolicy)
: Specifies export policy if table exists. Either
'fail'
: throw an error.'replace'
: drops existing table and creates new table of same name.'append'
: appends data frame to existing table.'replace'
.
index (bool)
: If True
, the data frame index is also exported alongside the table. Defaults to False
.
**kwargs
: Additional arguments to pass to writer.
load(query_string: str, limit: int, *args, **kwargs) -> DataFrame
Loads data from the results of a query into a Pandas data frame. This will fail if the query returns no data from the database.
This function will load at maximum 10,000,000 rows of data (this limit is configurable). To operate on more data, consider performing data transformations in warehouse.
query_string (str)
: Query to fetch a table or subset of a table.limit (int, Optional)
: The number of rows to limit the loaded data frame to. Defaults to 10,000,000.**kwargs
: Additional parameters to pass to the query. See psycopg2
docs for configuring query parameters.DataFrame
) Data frame containing the queried data.open()
Opens a connection to PostgreSQL database.
rollback - rollback()
Rolls back (deletes) all database changes made since the last transaction.
sample - sample(schema: str, table: str, size: int, **kwargs) -> DataFrame
Sample data from a table in the PostgreSQL database. Sample is not guaranteed to be random.
schema (str)
: The schema to select the table from.size (int)
: The number of rows to sample. Defaults to 10,000,000.table (str)
: The table to sample from in the connected database.DataFrame
) Sampled data from the table.
__init__(self, **kwargs)
Initializes the OracleDB data loading client.
user (str)
: The user for connecting to the database with.password (str)
: The login password for the user.host (str)
: Host address for database.port (int)
: Port on which the database is running Defaults to 3306.service (str)
: OracleDB servicemode (str)
: switch between OracleDB mode of thin
or thick
verbose (bool)
: Enables verbose output printing. Defaults to True
.**kwargs
: Additional settings for creating psycopg2 connectionthick
, it is required to use the customized oracle Dockerfile in integrations/oracle/Dockerfile
.
with_config(config: BaseConfigLoader, **kwargs) -> Postgres
Creates OracleDB data loading client from configuration settings.
config (BaseConfigLoader)
: Configuration loader object.**kwargs
: Additional parameters passed to the loader constructorOracleDB
) The constructed dataloader using this methodexecute(query_string: str, **kwargs) -> None
Sends query to the connected OracleDB database. Any changes made to the database will not be saved unless commit()
is called afterward.
query_string (str)
: The query to execute on the OracleDB database.**kwargs
: Additional parameters to pass to the query.export(df: DataFrame, schema_name: str, table_name: str, if_exists: str, index: bool, **kwargs) -> None
Exports data frame to the OracleDB database from a Pandas data frame. If table doesn’t exist, the table is automatically created. The schema_name
can be set as None
since it is not required for OracleDB loaders.
Any changes made to the database will not be saved unless commit()
is called afterward.
df (DataFrame)
: Data frame to export to the OracleDB database.
schema_name (str)
: Not required for OracleDB loaders. Set to None
.
table_name (str)
: Name of the table to export data to (excluding database).
if_exists (ExportWritePolicy)
: Specifies export policy if table exists. Either
'fail'
: throw an error.'replace'
: drops existing table and creates new table of same name.'append'
: appends data frame to existing table.'replace'
.
index (bool)
: If True
, the data frame index is also exported alongside the table. Defaults to False
.
**kwargs
: Additional arguments to pass to writer.
load(query_string: str, limit: int, *args, **kwargs) -> DataFrame
Loads data from the results of a query into a Pandas data frame. This will fail if the query returns no data from the database.
This function will load at maximum 10,000,000 rows of data (this limit is configurable). To operate on more data, consider performing data transformations in warehouse.
query_string (str)
: Query to fetch a table or subset of a table.limit (int, Optional)
: The number of rows to limit the loaded data frame to. Defaults to 10,000,000.verbose (bool)
: Enables verbose output printing. Defaults to True
.DataFrame
) Data frame containing the queried data.open()
Opens a connection to OracleDB database.
mage_ai.io.mysql.MySQL
Handles data transfer between a MySQL database and the Mage app. The MySQL
client utilizes the following keys to connect the MySQL database.
__init__(self, **kwargs)
Initializes the MySQL data loading client.
database (str)
: The name of the database to connect to.user (str)
: The user for connecting to the database with.password (str)
: The login password for the user.host (str)
: Host address for database.port (int)
: Port on which the database is running Defaults to 3306.allow_local_infile (bool)
: Enables verbose capability to load local file. Defaults to False
.verbose (bool)
: Enables verbose output printing. Defaults to True
.**kwargs
: Additional settings for creating psycopg2 connectionwith_config(config: BaseConfigLoader, **kwargs) -> MySQL
Creates MySQL data loading client from configuration settings.
config (BaseConfigLoader)
: Configuration loader object.**kwargs
: Additional parameters passed to the loader constructorMySQL
) The constructed dataloader using this methodclose()
Closes connection to MySQL database.
commit - commit()
Saves all changes made to the database since the previous transaction.
execute - execute(query_string: str, **kwargs) -> None
Sends query to the connected MySQL database. Any changes made to the database will not be saved unless commit()
is called afterward.
query_string (str)
: The query to execute on the MySQL database.**kwargs
: Additional parameters to pass to the query.export(df: DataFrame, schema_name: str, table_name: str, if_exists: str, index: bool, **kwargs) -> None
Exports data frame to the MySQL database from a Pandas data frame. If table doesn’t exist, the table is automatically created. The schema_name
can be set as None
since it is not required for MySQL loaders.
Any changes made to the database will not be saved unless commit()
is called afterward.
df (DataFrame)
: Data frame to export to the MySQL database.
schema_name (str)
: Not required for MySQL loaders. Set to None
.
table_name (str)
: Name of the table to export data to (excluding database).
if_exists (ExportWritePolicy)
: Specifies export policy if table exists. Either
'fail'
: throw an error.'replace'
: drops existing table and creates new table of same name.'append'
: appends data frame to existing table.'replace'
.
index (bool)
: If True
, the data frame index is also exported alongside the table. Defaults to False
.
**kwargs
: Additional arguments to pass to writer.
load(query_string: str, limit: int, *args, **kwargs) -> DataFrame
Loads data from the results of a query into a Pandas data frame. This will fail if the query returns no data from the database.
This function will load at maximum 10,000,000 rows of data (this limit is configurable). To operate on more data, consider performing data transformations in warehouse.
query_string (str)
: Query to fetch a table or subset of a table.limit (int, Optional)
: The number of rows to limit the loaded data frame to. Defaults to 10,000,000.DataFrame
) Data frame containing the queried data.open()
Opens a connection to MySQL database.
sample - sample(database: str, table: str, size: int, **kwargs) -> DataFrame
Sample data from a table in the MySQL database. Sample is not guaranteed to be random.
database (str)
: The database to select the table from.table (str)
: The table to sample from in the connected database.size (int)
: The number of rows to sample. Defaults to 10,000,000.DataFrame
) Sampled data from the table.
mage_ai.io.duckdb
Handles data transfer between DuckDB database and the Mage app.
DuckDB client utilizes the following keys to connect:
database (str)
: The name of the database to connect to.schema
: Schema to use.verbose (bool)
: Enables verbose output printing. Defaults to True
.**kwargs
: Additional settings for creating psycopg2 connectionwith_config(config: BaseConfigLoader, **kwargs) -> DuckDB
Creates DuckDB data loading client from configuration settings.
config (BaseConfigLoader)
: Configuration loader object.**kwargs
: Additional parameters passed to the loader constructorDuckDB
) The constructed dataloader using this methodclose()
Closes connection to DuckDB database.
open - open()
Opens a connection to DuckDB database.
table_exists - table_exists()
Build query to check if a table existing in the DB.
schema_name
: Name of the schema to use.table_name
: Name of the table.upload_dataframe()
Build query to insert data into table.
df
: Data in dataframe type.get_type()
Convert data from input type into DuckDB type.
column
: Metadata associated with the column.dtype
: Input data type.mage_ai.io.snowflake.Snowflake
Handles data transfer between a Snowflake data warehouse and the Mage app. The Snowflake
client utilizes the following keys to authenticate access and connect to Snowflake servers.
__init__(self, **kwargs)
Initializes settings for connecting to Snowflake data warehouse.
The following arguments must be provided to the connector, all other
arguments are optional.
Required Arguments:
user (str)
: Username for the Snowflake user.password (str)
: Login Password for the user.account (str)
: Snowflake account identifier (including region, excluding snowflake-computing.com
suffix).verbose (bool)
: Specify whether to print verbose output.database (str)
: Name of the default database to use. If unspecified no database is selected on login.schema (str)
: Name of the default schema to use. If unspecified no schema is selected on login.warehouse (str)
: Name of the default warehouse to use. If unspecified no warehouse is selected on login.snowflake.connector.Connection
) - underlying Snowflake Connection objectwith_config(config: BaseConfigLoader, **kwargs) -> Snowflake
Creates Snowflake data loading client from configuration settings.
config (BaseConfigLoader)
: Configuration loader object.verbose (bool)
: Enables verbose output printing. Defaults to False.**kwargs
: Additional parameters passed to the loader constructorSnowflake
) The constructed dataloader using this methodclose()
Closes connection to Snowflake server.
commit - commit()
Saves all changes made to the warehouse since the previous transaction.
execute - execute(query_string: str, **kwargs) -> None
Sends query to the connected Snowflake warehouse. Any changes made to the database will not be saved unless commit()
is called afterward.
query_string (str)
: The query to execute on the Snowflake warehouse.**kwargs
: Additional parameters to pass to the query. See Snowflake Connector Docs for additional parameters.export(df: DataFrame, table_name: str, database: str, schema: str, if_exists: str, **kwargs) -> None
Exports a Pandas data frame to a Snowflake warehouse based on the table name. If table doesn’t exist, the table is automatically created.
Any changes made to the database will not be saved unless commit()
is called afterward.
df (DataFrame)
: Data frame to export to a Snowflake warehouse.
table_name (str)
: Name of the table to export data to (excluding database and schema).
database (str)
: Name of the database in which the table is located.
schema (str)
: Name of the schema in which the table is located.
if_exists (str, optional)
: Specifies export policy if table exists. Either
'fail'
: throw an error.'replace'
: drops existing table and creates new table of same name.'append'
: appends data frame to existing table.'append'
.
**kwargs
: Additional arguments to pass to writer
load(query_string: str, limit: int, *args, **kwargs) -> DataFrame
Loads data from Snowflake into a Pandas data frame based on the query given. This will fail unless a SELECT
query is provided.
This function will load at maximum 10,000,000 rows of data (this limit is configurable). To operate on more data, consider performing data transformations in warehouse using execute
.
query_string (str)
: Query to fetch a table or subset of a table.limit (int, Optional)
: The number of rows to limit the loaded data frame to. Defaults to 10,000,000.*args, **kwargs
: Additional parameters to pass to the query. See Snowflake Connector Docs for additional parameters.DataFrame
) Data frame containing the queried dataopen()
Opens a connection to Snowflake servers.
rollback - rollback()
Rolls back (deletes) all database changes made since the last transaction.
sample - sample(schema: str, table: str, size: int, **kwargs) -> DataFrame
Sample data from a table in the Snowflake warehouse. Sample is not guaranteed to be random.
schema (str)
: The schema to select the table from.size (int)
: The number of rows to sample. Defaults to 10,000,000.table (str)
: The table to sample from in the connected database.DataFrame
) Sampled data from the data frame.
ConfigFileLoader
EnvironmentVariableLoader
AWSSecretLoader
s
For example, the code below constructs a Redshift data loading client using secrets stored in AWS Secrets Managermage_ai.io.config.ConfigKey
enum. Not all keys need be specified at once - only use the keys related to the services you utilize.
Key Name | Service | Client Constructor Parameter | Description | Notes |
---|---|---|---|---|
AWS_ACCESS_KEY_ID | AWS General | - | AWS Access Key ID credential | Used by Redshift and S3 |
AWS_SECRET_ACCESS_KEY | AWS General | - | AWS Secret Access Key credential | Used by Redshift and S3 |
AWS_SESSION_TOKEN | AWS General | - | AWS Session Token (used to generate temporary DB credentials) | Used by Redshift |
AWS_REGION | AWS General | - | AWS Region | Used by Redshift and S3 |
REDSHIFT_DBNAME | AWS Redshift | database | Name of Redshift database to connect to | |
REDSHIFT_HOST | AWS Redshift | host | Redshift Cluster hostname | Use with temporary credentials |
REDSHIFT_PORT | AWS Redshift | port | Redshift Cluster port. Optional, defaults to 5439. | Use with temporary credentials |
REDSHIFT_TEMP_CRED_USER | AWS Redshift | user | Redshift temporary credentials username. | Use with temporary credentials |
REDSHIFT_TEMP_CRED_PASSWORD | AWS Redshift | password | Redshift temporary credentials password. | Use with temporary credentials |
REDSHIFT_DBUSER | AWS Redshift | db_user | Redshift database user to generate credentials for. | Use to generate temporary credentials |
REDSHIFT_CLUSTER_ID | AWS Redshift | cluster_identifier | Redshift cluster ID | Use to generate temporary credentials |
REDSHIFT_IAM_PROFILE | AWS Redshift | profile | Name of the IAM profile to generate temporary credentials with | Use to generate temporary credentials |
POSTGRES_DBNAME | PostgreSQL | dbname | Database name | |
POSTGRES_USER | PostgreSQL | user | Database login username | |
POSTGRES_PASSWORD | PostgreSQL | password | Database login password | |
POSTGRES_HOST | PostgreSQL | host | Database hostname | |
POSTGRES_PORT | PostgreSQL | port | PostgreSQL database port | |
SNOWFLAKE_USER | Snowflake | user | Snowflake username | |
SNOWFLAKE_PASS | Snowflake | password | Snowflake password | |
SNOWFLAKE_ACCOUNT | Snowflake | account | Snowflake account ID (including region) | |
SNOWFLAKE_DEFAULT_DB | Snowflake | database | Default database to use. Optional, no database chosen if unspecified. | |
SNOWFLAKE_DEFAULT_SCHEMA | Snowflake | schema | Default schema to use. Optional, no schema chosen if unspecified. | |
SNOWFLAKE_DEFAULT_WH | Snowflake | warehouse | Default warehouse to use. Optional, no warehouse chosen if unspecified. | |
GOOGLE_SERVICE_ACC_KEY | Google BigQuery | credentials_mapping | Service account key | |
GOOGLE_SERVICE_ACC_KEY_FILEPATH | Google BigQuery | path_to_credentials | Path to service account key |
contains
- checks if the configuration source contains the requested key. Commonly, the in
operation is used to check for setting existence (but is not always identical as contains
can accept multiple parameters while the in
keyword only accepts the key).
get
- gets the configuration setting associated with the given key. If the key doesn’t exist, returns None. Commonly, the data model overload __getitem__
is used to fetch a configuration setting (but is not always identical as get
can accept multiple parameters while __getitem__
does not).
__init__(filepath: os.PathLike, profile: str)
Initializes IO Configuration loader. Input configuration file can have two formats:
ConfigKey
. This
is the default and recommended format. Below is an example configuration file using this format.
'default'
. Each profile organizes a set of keys to use (for example, distinguishing production keys versus development keys). A configuration file can have multiple profiles.env_var
syntax to reference environment variables in either configuration file format.
filepath (os.PathLike, optional)
: Path to IO configuration file. Defaults to '[repo_path]/io_config.yaml'
profile (str, optional)
: Profile to load configuration settings from. Defaults to 'default'
.contains(self, key: ConfigKey | str) -> Any
Checks if the configuration setting stored under key
is contained.
key (str)
: Name of the configuration setting to check.bool
) Returns True
if configuration setting exists, otherwise returns False
.get(self, key: ConfigKey | str) -> Any
Loads the configuration setting stored under key
.
key (str)
: Key name of the configuration setting to loadAny
) Configuration setting corresponding to the given key__init__(self)
- no parameters for construction.
Methods:
contains - contains(env_var: ConfigKey | str) -> bool
Checks if the environment variable given by env_var
exists.
key (ConfigKey | str)
: Name of the configuration setting to check existence of.bool
) Returns True
if configuration setting exists, otherwise returns False
.
get(env_var: ConfigKey | str) -> Any
Loads the config setting stored under the environment variable env_var
.
env_var (str)
: Name of the environment variable to load configuration setting fromAny
) The configuration setting stored under env_var
python config = AWSSecretLoader( aws_access_key_id = 'your access key id', aws_secret_access_key = 'your secret key', region_name = 'your region' )
__init__(self, **kwargs)
:
aws_access_key_id (str, Optional)
: AWS access key ID credential.aws_secret_access_key (str, Optional)
: AWS secret access key credential.region_name (str, Optional)
: AWS region which Secrets Manager is created in.contains( secret_id: ConfigKey | str, version_id: str, version_stage_label : str) -> bool
Check if there is a secret with ID secret_id
contained. Can also specify the version of the
secret to check. If
version_id
and version_stage_label
are specified, both must agree on the secret version.version_id
or version_stage_label
are specified, any version is checked.version_id
and version_stage_label
are specified, the associated version is checked.in
operator, comparisons to specific versions are not allowed.
secret_id (str)
: ID of the secret to loadversion_id (str, Optional)
: ID of the version of the secret to load. Defaults to None
.version_stage_label (str, Optional)
: Staging label of the version of the secret to load. Defaults to None
.bool
) Returns true if secret exists, otherwise returns false.get(secret_id: ConfigKey | str, version_id: str, version_stage_label : str) -> bytes | str
Loads the secret stored under secret_id
. Can also specify the version of the
secret to fetch. If
version_id
and version_stage_label
are specified, both must agree on the secret version.version_id
or version_stage_label
are specified, the current version is loaded.version_id
and version_stage_label
are specified, the associated version is loaded.__getitem__
overload, comparisons to specific versions are not allowed.
secret_id (str)
: ID of the secret to load version_id (str, Optional)
: ID of the version of the secret to load. Defaults to None
.version_stage_label (str, Optional)
: Staging label of the version of the secret to load. Defaults to None
.bytes | str
) The secret stored under secret_id
in AWS secret manager. If secret is a binary value, returns a bytes
object; else returns a string
object