Load data from data integration sources and export to data integration destinations as blocks inside batch pipelines.
0.9.35
and greater.[Coming soon]
Support log based sync.[Coming soon]
Replicate source and destination blocks.[Coming soon]
Support source and destination blocks as dynamic blocks.Data integration in batch pipeline
.
Add new block v2
feature to be enabled.Configuration | Example | Decorator | Description |
---|---|---|---|
Source | stripe | @data_integration_source | The source to load data from. |
Destination | salesforce | @data_integration_destination | The destination to export data to. |
Config | database='...' | @data_integration_config | A dictionary containing the credentials and other settings specific to the source or destination. |
Selected streams | ['account'] | @data_integration_selected_streams | A list of stream names to be selected from the catalog. |
Catalog | {...} | @data_integration_catalog | A dictionary containing a list of streams and the stream’s settings, schema properties, and metadata. |
@data_integration_source
@data_integration_destination
@data_integration_config
Keyword | source |
Sample code | kwargs['source'] |
Return value type | str |
Sample return value | 'postgresql' |
Description | The name of the source |
Available in | Sources only |
Keyword | destination |
Sample code | kwargs['destination'] |
Return value type | str |
Sample return value | 'postgresql' |
Description | The name of the destination |
Available in | Destinations only |
@data_integration_selected_streams
@data_integration_selected_streams
for sources.
Argument | discover_streams |
Argument values | False , True |
Description | If True , there will be a list of stream names available in the function’s body via the keyword argument selected_streams ; e.g. kwargs['selected_streams'] . This list of stream names are retrieved by the source’s underlying code in the Mage Integrations library. |
Available in | Sources only |
Keyword | source |
Sample code | kwargs['source'] |
Return value type | str |
Sample return value | 'postgresql' |
Description | The name of the source |
Available in | Sources only |
Keyword | destination |
Sample code | kwargs['destination'] |
Return value type | str |
Sample return value | 'postgresql' |
Description | The name of the destination |
Available in | Destinations only |
Keyword | config |
Sample code | kwargs['config'] |
Return value type | dict |
Sample return value | { 'database': '...' } |
Description | The dictionary containing the credentials and other configurations. |
Available in | Sources and destinations |
Keyword | selected_streams |
Sample code | kwargs['selected_streams'] |
Return value type | List[str] |
Sample return value | ['account', 'users'] |
Description | A list of strings containing all the available stream names for the source. This is only available if discover_streams is True . |
Available in | Sources only |
Keyword | discover_streams_func |
Sample code | kwargs['discover_streams_func'] |
Return value type | Callable |
Sample return value | If function is invoked, the return value is the same as selected_streams above. |
Description | A list of stream names are retrieved by the source’s underlying code in the Mage Integrations library. |
Available in | Sources only |
@data_integration_catalog
@data_integration_catalog
for sources.
Argument | discover |
Argument values | False , True |
Description | If True , the source will fetch the catalog and all stream schema properties then pass the returned value to the decorated function’s keyword arguments. |
Available in | Sources only |
Argument | select_all |
Argument values | True , False |
Description | If False , the streams in the catalog won’t be selected by default and you’ll have to manually select the stream via the stream’s metadata. |
Available in | Sources only |
Keyword | source |
Sample code | kwargs['source'] |
Return value type | str |
Sample return value | 'postgresql' |
Description | The name of the source |
Available in | Sources only |
Keyword | destination |
Sample code | kwargs['destination'] |
Return value type | str |
Sample return value | 'postgresql' |
Description | The name of the destination |
Available in | Destinations only |
Keyword | config |
Sample code | kwargs['config'] |
Return value type | dict |
Sample return value | { 'database': '...' } |
Description | The dictionary containing the credentials and other configurations. |
Available in | Sources and destinations |
Keyword | selected_streams |
Sample code | kwargs['selected_streams'] |
Return value type | List[str] |
Sample return value | ['account', 'users'] |
Description | A list of strings containing the available stream names returned from the decorated function under @data_integration_selected_streams . |
Available in | Sources and destinations |
Keyword | catalog |
Sample code | kwargs['catalog'] |
Return value type | dict |
Sample return value | { 'streams': [] } |
Description | A dictionary with a key named streams that is a list of dictionaries containing the stream schema properties and metadata. This is only available if discover is True . |
Available in | Sources only |
Keyword | discover_func |
Sample code | kwargs['discover_func'] |
Return value type | Callable |
Sample return value | If function is invoked, the return value is the same as catalog above. |
Description | If function is invoked, the source will fetch the catalog and all stream schema properties. |
Available in | Sources only |
Only use this block’s output data as an input
Use catalog as an input
pg_credentials
, that has the following code:
YAML blocks
Python blocks
titanic_survival
, that has the following code:
Column | Data type | Sample data |
---|---|---|
PassengerId | int64 | 2 |
Survived | int64 | 1 |
Pclass | int64 | 1 |
Name | object | Mrs. John Bradley (Florence Briggs Thayer) |
Sex | object | female |
Age | float64 | 38 |
SibSp | int64 | 1 |
Parch | int64 | 0 |
Ticket | object | PC 17599 |
Fare | float64 | 71.2833 |
Cabin | object | C85 |
Embarked | object | C |
Python blocks
titanic_survival_data_and_catalog
) contains a Python dictionary that represents
the catalog from the upstream block named titanic_survival
.This catalog contains a key named streams
that is a list of dictionaries containing
stream settings, metadata, and schema properties of the output data from the upstream
block titanic_survival
.Here is an example of that catalog (a lot of the metadata and schema properties are omitted
from this example so that it’s not too lengthy):Key | Value data type | Sample value | Description |
---|---|---|---|
stream | dict | { 'stream': 'accounts' } | The dictionary for key stream contains information about the stream’s name (via the stream key), settings (e.g. replication method), metadata, and schema properties. |
columns | List[str] | ['id', 'power', 'created_at'] | A list of column names from the data that was fetched for the stream. |
rows | List[List[Any]] | [[1, 3.5, '3000-01-01']] | A list of lists where each item in the outer list is a row that was fetched from the stream. Each item in the inner list is a value from a column. The column that the value corresponds to is determined by the index of that value in its list and the column name at that same index in the columns list. |
JSON format
Trigger pipeline to run once
New trigger or edit an existing trigger