Load data from data integration sources and export to data integration destinations as blocks inside batch pipelines.
Available in version 0.9.35
and greater.
You can use a data integration source (e.g. Stripe) and/or a data integration destination (e.g. Trino) as a block in batch pipelines.
Combine the low-code aspect and capabilities of data integrations with the flexibility and dynamic capabilities of a batch pipeline.
The image below doesn’t reflect the above example exactly.
The image shows a different set of blocks than the example above.
[Coming soon]
Support log based sync.[Coming soon]
Replicate source and destination blocks.[Coming soon]
Support source and destination blocks as dynamic blocks.Go to the project preferences and
enable the feature labeled Data integration in batch pipeline
.
This feature requires the Add new block v2
feature to be enabled.
Create a new batch pipeline.
Add a new Data loader block by selecting:
Configure the source:
Add a new Data exporter block by selecting:
Configure the destination:
For more information on how to configure and setup a data integration source and destination, refer back to the original data integration guide.
For more information on how to configure settings for a stream, refer back to the original data integration guide.
For more information on how to configure schema properties for a stream, refer back to the original data integration guide.
When adding a data integration source or destination block to a batch pipeline, you can choose the language to be YAML or Python (default option is YAML).
If you choose Python as the language, you programmatically define the following settings of the data integration:
Configuration | Example | Decorator | Description |
---|---|---|---|
Source | stripe | @data_integration_source | The source to load data from. |
Destination | salesforce | @data_integration_destination | The destination to export data to. |
Config | database='...' | @data_integration_config | A dictionary containing the credentials and other settings specific to the source or destination. |
Selected streams | ['account'] | @data_integration_selected_streams | A list of stream names to be selected from the catalog. |
Catalog | {...} | @data_integration_catalog | A dictionary containing a list of streams and the stream’s settings, schema properties, and metadata. |
@data_integration_source
This decorated function is required.
@data_integration_destination
This decorated function is required.
@data_integration_config
This decorated function is required.
The following keyword arguments are only available in the decorated function’s body for sources.
Keyword | source |
Sample code | kwargs['source'] |
Return value type | str |
Sample return value | 'postgresql' |
Description | The name of the source |
Available in | Sources only |
Keyword | destination |
Sample code | kwargs['destination'] |
Return value type | str |
Sample return value | 'postgresql' |
Description | The name of the destination |
Available in | Destinations only |
@data_integration_selected_streams
This decorated function is optional.
The following arguments are only available in the decorator @data_integration_selected_streams
for sources.
Argument | discover_streams |
Argument values | False , True |
Description | If True , there will be a list of stream names available in the function’s body via the keyword argument selected_streams ; e.g. kwargs['selected_streams'] . This list of stream names are retrieved by the source’s underlying code in the Mage Integrations library. |
Available in | Sources only |
The following keyword arguments are only available in the decorated function’s body.
Keyword | source |
Sample code | kwargs['source'] |
Return value type | str |
Sample return value | 'postgresql' |
Description | The name of the source |
Available in | Sources only |
Keyword | destination |
Sample code | kwargs['destination'] |
Return value type | str |
Sample return value | 'postgresql' |
Description | The name of the destination |
Available in | Destinations only |
Keyword | config |
Sample code | kwargs['config'] |
Return value type | dict |
Sample return value | { 'database': '...' } |
Description | The dictionary containing the credentials and other configurations. |
Available in | Sources and destinations |
Keyword | selected_streams |
Sample code | kwargs['selected_streams'] |
Return value type | List[str] |
Sample return value | ['account', 'users'] |
Description | A list of strings containing all the available stream names for the source. This is only available if discover_streams is True . |
Available in | Sources only |
Keyword | discover_streams_func |
Sample code | kwargs['discover_streams_func'] |
Return value type | Callable |
Sample return value | If function is invoked, the return value is the same as selected_streams above. |
Description | A list of stream names are retrieved by the source’s underlying code in the Mage Integrations library. |
Available in | Sources only |
@data_integration_catalog
The returned dictionary will override the catalog setup through the user interface.
The following arguments are only available in the decorator @data_integration_catalog
for sources.
Argument | discover |
Argument values | False , True |
Description | If True , the source will fetch the catalog and all stream schema properties then pass the returned value to the decorated function’s keyword arguments. |
Available in | Sources only |
Argument | select_all |
Argument values | True , False |
Description | If False , the streams in the catalog won’t be selected by default and you’ll have to manually select the stream via the stream’s metadata. |
Available in | Sources only |
The following keyword arguments are only available in the decorated function’s body.
Keyword | source |
Sample code | kwargs['source'] |
Return value type | str |
Sample return value | 'postgresql' |
Description | The name of the source |
Available in | Sources only |
Keyword | destination |
Sample code | kwargs['destination'] |
Return value type | str |
Sample return value | 'postgresql' |
Description | The name of the destination |
Available in | Destinations only |
Keyword | config |
Sample code | kwargs['config'] |
Return value type | dict |
Sample return value | { 'database': '...' } |
Description | The dictionary containing the credentials and other configurations. |
Available in | Sources and destinations |
Keyword | selected_streams |
Sample code | kwargs['selected_streams'] |
Return value type | List[str] |
Sample return value | ['account', 'users'] |
Description | A list of strings containing the available stream names returned from the decorated function under @data_integration_selected_streams . |
Available in | Sources and destinations |
Keyword | catalog |
Sample code | kwargs['catalog'] |
Return value type | dict |
Sample return value | { 'streams': [] } |
Description | A dictionary with a key named streams that is a list of dictionaries containing the stream schema properties and metadata. This is only available if discover is True . |
Available in | Sources only |
Keyword | discover_func |
Sample code | kwargs['discover_func'] |
Return value type | Callable |
Sample return value | If function is invoked, the return value is the same as catalog above. |
Description | If function is invoked, the source will fetch the catalog and all stream schema properties. |
Available in | Sources only |
The data integration source and destination block can depend on upstream blocks.
Those upstream blocks can pass its output data into the data integration source and destination, which then can be used as:
Follow these steps to use the output of an upstream block as an input:
Open the data integration block’s configuration and click on the tab labeled Upstream block settings.
Toggle the name of the block you want as an input.
Check the box labeled Use the block’s output data as an input.
Inputs are available as positional arguments for all decorated functions in Python blocks.
Only use this block’s output data as an input
If checked, then the upstream block’s output data is only used as inputs and won’t be ingested and exported to the destination. This is only applicable to destination blocks.
Use catalog as an input
If checked, then the upstream block’s catalog will be included as part of the input argument(s) for the data integration block.
If you have an upstream block, named pg_credentials
, that has the following code:
Here are examples on how to use the upstream block’s output to configure credentials at runtime:
YAML blocks
If you have a data integration source or destination block that depends on the above upstream block, you can interpolate the output of the upstream block in the source or destination block’s credentials like this:
Python blocks
Depending on how many upstream blocks your source or destination block uses as an input, it’ll determine how many positional arguments are available in any of your decorated functions.
If you have an upstream block, named titanic_survival
, that has the following code:
And its output data has the following Pandas data types:
Column | Data type | Sample data |
---|---|---|
PassengerId | int64 | 2 |
Survived | int64 | 1 |
Pclass | int64 | 1 |
Name | object | Mrs. John Bradley (Florence Briggs Thayer) |
Sex | object | female |
Age | float64 | 38 |
SibSp | int64 | 1 |
Parch | int64 | 0 |
Ticket | object | PC 17599 |
Fare | float64 | 71.2833 |
Cabin | object | C85 |
Embarked | object | C |
Here are examples on how to use the upstream block’s catalog as an input to the data integration’s decorated functions:
Python blocks
Depending on how many upstream blocks your source or destination block uses as an input, it’ll determine how many positional arguments are available in any of your decorated functions.
The positional argument correlated to the upstream block that you’re using as an input will be either a single object or a tuple. It’s a tuple if you checked the box labeled Use catalog as an input in the data integration block’s configuration on the tab labeled Upstream block settings.
In this example, the last item in the tuple from the 1st positional argument (
e.g. titanic_survival_data_and_catalog
) contains a Python dictionary that represents
the catalog from the upstream block named titanic_survival
.
This catalog contains a key named streams
that is a list of dictionaries containing
stream settings, metadata, and schema properties of the output data from the upstream
block titanic_survival
.
Here is an example of that catalog (a lot of the metadata and schema properties are omitted from this example so that it’s not too lengthy):
Outputs from upstream blocks, as data to be exported, can only be used by destination blocks.
The outputs from upstreams blocks will not only be available as an input, but it’ll also be used as a data stream when ingesting and exporting to the destination.
You can control which upstream block’s data is ingested and exported by configuring the destination block’s streams.
Destination blocks don’t output any data to downstream blocks.
A data integration source block will output the data fetched from the source and pass that data to its downstream blocks.
A downstream data integration destination block will automatically know how to ingest the data and process it for exporting.
All other types of blocks will receive the data integration source block’s output data either as a dictionary or as a list containing 2 or more dictionaries.
The output data will be a dictionary if the source block is only fetching 1 stream.
The output data will be a list of dictionaries if the source block is fetching 2 or more streams. The number of items in the list corresponds to the number of streams that the source block is configured to sync.
Each item in the list is a dictionary with the following keys:
Key | Value data type | Sample value | Description |
---|---|---|---|
stream | dict | { 'stream': 'accounts' } | The dictionary for key stream contains information about the stream’s name (via the stream key), settings (e.g. replication method), metadata, and schema properties. |
columns | List[str] | ['id', 'power', 'created_at'] | A list of column names from the data that was fetched for the stream. |
rows | List[List[Any]] | [[1, 3.5, '3000-01-01']] | A list of lists where each item in the outer list is a row that was fetched from the stream. Each item in the inner list is a value from a column. The column that the value corresponds to is determined by the index of that value in its list and the column name at that same index in the columns list. |
JSON format
For each stream in a data integration source block, you can enable that stream to have its data fetched in parallel while other streams for that source is being fetched.
For each stream in a data integration destination block, you can enable that stream to have its data exported in parallel while other streams for that destination is being exported.
This option can be turned on by editing a specific stream’s settings.
This option can also be turned on or off for multiple streams at once by bulk editing all the streams at once in the Overview section:
Synchronize your data incrementally after the 1st sync.
When you choose 1 or more bookmark values in a stream, the value from that property will be saved after each successful sync. The next sync will use that value to compare to the next batch of records that need to be synced.
If you want to change the last saved bookmark value for the next sync, you can do this in 2 ways:
Trigger pipeline to run once
New trigger or edit an existing trigger