How to build a data integration pipeline
Here are the high level steps to build a data integration pipeline:
Demo video
Requirements
You must run Mage using Docker because there are a lot of
dependencies required for data integrations that can be challenging to install using pip
or conda
.
Add new data integration pipeline
- Open Mage in your browser and click the
[+ New pipeline]
button. - Click the dropdown menu option Data integration.
Configure source
-
Click the dropdown menu under Select source and choose the option you want to load data from (e.g. Amplitude).
-
Depending on the chosen source, you’ll need to enter credentials and options into the section labeled Configuration. For example, if you chose Amplitude, you’ll need to enter credentials like this:
Best practices: you can interpolate values in the configuration using the following syntax:
"{{ env_var('SECRET_KEY') }}"
: this will extract the value from theSECRET_KEY
key in your environment variables."{{ variables('SECRET_KEY') }}"
: this will extract the value from theSECRET_KEY
key in your runtime variables.
You can also add a prefix to output tables by configuring the following variables in the
source
configuration yaml: -
After you enter in all the credentials, click the button
[Fetch list of streams]
under the section labeled Select stream. -
Shortly after clicking the above button, click the new dropdown menu under the section labeled Select stream. Then, choose the stream (aka table) you want to load data from.
Configure stream
After selecting a stream (aka table), you’ll need to configure the schema.
Configuring the schema informs your pipeline on which fields to synchronize, how to determine if a record is unique, and what to do if their are conflicts (aka duplicate records).
Here are the steps you can optionally go through:
- Selected field(s):
- Check the box next to the field name to include the field in your synchronization.
- Uncheck the ones you don’t want to sync.
- Field type(s)
- Each field will have a default field type.
- Add additional field types or remove them if they don’t fit your needs.
- Unique field(s)
- On the right of the field names, there is a box you can check that will determine which field(s) need to have unique values.
- If the box is un-checkable, that means you cannot use that field as a unique field.
- Bookmark field(s)
- Under the column labeled Bookmark, check the box to use the field as a way to keep track of progress during synchronization.
- Upon every synchronization, these columns are used to pick up from where the previous synchronization left off. In addition, if a synchronization fails midway, these bookmark columns are used to track the record that was most recently successful.
- Replication method
FULL_TABLE
: synchronize the entire set of records from the source.INCREMENTAL
: synchronize the records starting after the most recent bookmarked record from the previous synchronization run.
- Unique conflict method: choose how to handle duplicate records
IGNORE
: skip the new record if it’s a duplicate of an existing record.UPDATE
: update the existing record with the new record’s properties.
Once you’ve configured the schema for the stream, you can optionally choose to run the stream in parallel. Parallel streams will be attempted to be synchronized in parallel if there are enough worker resources available. This may speed up the sync process if you have a large number of streams.
Editing bookmark property values
- You can edit the bookmark values for your next sync by editing them
in the
Bookmark property values
table under the Manually edit bookmark property values setting in the Settings section. - The Manually edit bookmark property values setting will not appear until at least one column is selected as a bookmark field.
- In order to enable the toggle that displays the
Bookmark property values
table, you need to select a destination first. - Click the toggle and enter the new value for your bookmark property, and then
click
Save
. You MUST click theSave
button in order for the bookmark value to be updated.
Configure destination
-
Click the dropdown menu under Select destination and choose the option you want to export data to (e.g. Snowflake).
-
Depending on the chosen source, you’ll need to enter credentials and options into the section labeled Configuration. For example, if you chose Snowflake, you’ll need to enter credentials like this:
Best practices: you can interpolate values in the configuration using the following syntax:
"{{ env_var('PASSWORD') }}"
: this will extract the value from thePASSWORD
key in your environment variables."{{ variables('PASSWORD') }}"
: this will extract the value from thePASSWORD
key in your runtime variables.
Run pipeline and start sync
Once you’re done configuring your pipeline, go back to the pipeline’s trigger page by clicking the name of your pipeline in your header.
The breadcrumbs in your header could look like this:
Pipelines / pipeline name / Edit
.
Once you’re on the pipeline triggers page, create a
new scheduled trigger
and choose the @once
interval. For more schedules, read the
other options here.
Monitoring pipeline
After you create a scheduled trigger, click the [Start trigger]
button at
the top of the page. You’ll see a new pipeline run appear shortly on the screen.
You can click the logs for that pipeline
run to view the progress of your synchronization. Any string config values over
8 characters long in the logs will be redacted for security purposes. You’ll see
these values hidden as ********
in the logs.
Support
If you get stuck, run into problems, or just want someone to walk you through these steps, please join our Slack