ETL pipeline tutorial
In this tutorial, we’ll create a data pipeline that does the following:
- Load data from an online endpoint
- Visualize the data using charts
- Transform the data and create 2 new columns
- Write the transformed data to PostgreSQL
Setup
If you haven’t created a Mage project before, follow the setup guide before starting this tutorial.
1.
Create new pipeline
- Go to the pipelines list page (
/pipelines
). This is the default page when navigating to Mage in your web browser. - In the top left corner of the page, click the button labeled + New, then select the option labeled Standard (batch) to create a new pipeline.
- In the left vertical navigation, click the last link labeled Pipeline settings.
- Change the pipeline’s name to
ETL demo
. - Click the button labeled Save pipeline settings.

2.
Load data from an API
-
In the left vertical navigation, click the 1st link labeled Edit pipeline.
-
Click the button labeled + Data loader, then hover over Python, and click the option labeled API.
-
A dialog menu will appear. Change the block name to
load data
. -
Click the button labeled Save and add block.
-
Paste the following code in the data loader block:
import io import pandas as pd import requests @data_loader def load_data_from_api(*args, **kwargs): url = 'https://raw.githubusercontent.com/mage-ai/datasets/master/restaurant_user_transactions.csv' response = requests.get(url) return pd.read_csv(io.StringIO(response.text), sep=',') @test def test_row_count(df, *args) -> None: assert len(df.index) >= 1000, 'The data does not have enough rows.'
-
Run the block by clicking the play icon button in the top right corner of the data loader block or press 1 of the following keyboard shortcuts:
⌘ + Enter
Control + Enter
Shift + Enter
-
After you run the block (⌘ + Enter), you’ll see a sample of the data that was loaded.
3.
Visualize data
3a.
Distribution of ratings
We’ll add a chart to visualize how frequent people give 1 star, 2 star, 3 star, 4 star, or 5 star ratings.
- In the top right corner of the data loader block, click the charts icon.
- In the dropdown menu, select the option labeled Histogram. This will add a new chart block in the right side of the page (aka the sidekick).
- Click the pencil icon in the top right corner of the chart block to edit the chart.
- In the dropdown menu labeled Number column for chart,
select the option for column
rating
. - Click the play button icon in the top right corner of the chart block to run the chart.
- The chart should look like this:
3b.
Number of meals per user
Let’s add another chart to see how many meals each user has.
- In the top right corner of the data loader block, click the charts icon.
- In the dropdown menu, select the option labeled Bar chart. This will add a new chart block in the right side of the page (aka the sidekick).
- Click the pencil icon in the top right corner of the chart block to edit the chart.
- In the dropdown menu labeled Group by columns,
select the option for column
user ID
. - Under the Metrics section:
- In the dropdown menu labeled aggregation,
select the option for
count_distinct
. - In the dropdown menu labeled column,
select the option for column
meal transaction ID
.
- In the dropdown menu labeled aggregation,
select the option for
- Click the play button icon in the top right corner of the chart block to run the chart.
- The chart should look like this:
4.
Transform data
Let’s transform the data in 2 ways:
- Add a column that counts the number of meals for each user.
- Clean the column names to properly store in a PostgreSQL database.
Follow these steps:
-
Click the button labeled + Transformer, then hover over Python, and click the option labeled Generic (no template).
-
A dialog menu will appear. Change the block name to
transform data
. -
Click the button labeled Save and add block.
-
Paste the following code in the transformer block:
def number_of_rows_per_key(df, key, column_name): data = df.groupby(key)[key].agg(['count']) data.columns = [column_name] return data def clean_column(column_name): return column_name.lower().replace(' ', '_') @transformer def transform(df, *args, **kwargs): # Add number of meals for each user df_new_column = number_of_rows_per_key(df, 'user ID', 'number of meals') df = df.join(df_new_column, on='user ID') # Clean column names df.columns = [clean_column(col) for col in df.columns] return df.iloc[:100] @test def test_number_of_columns(df, *args) -> None: assert len(df.columns) >= 11, 'There needs to be at least 11 columns.'
-
Run the block by clicking the play icon button in the top right corner of the data loader block or press 1 of the following keyboard shortcuts:
⌘ + Enter
Control + Enter
Shift + Enter
-
After you run the block (⌘ + Enter), you’ll see a sample of the data that was transformed.
5.
Export data to PostgreSQL
5a.
Add credentials
-
On the left side of the screen in the file browser, click on the file named
io_config.yaml
. -
Then, paste the following credentials:
version: 0.1.1 default: POSTGRES_PORT: 5432 POSTGRES_DBNAME: xyleviup POSTGRES_USER: xyleviup POSTGRES_PASSWORD: edSrMWH7Mww-lTKpp-jPHX9sYSNLy7LG POSTGRES_HOST: queenie.db.elephantsql.com POSTGRES_SCHEMA: elt_demo
-
Save the file by pressing:
- Clicking the button labeled Save file content
⌘ + Enter
Control + Enter
-
Close the file by pressing the
X
button on the right of the file name at the top of the screen or click the button labeled View pipeline to return to theETL demo
pipeline.
5b.
Export data using SQL
-
Click the button labeled + Data exporter and then click the option labeled SQL.
-
A dialog menu will appear. Change the block name to
export data
. -
Click the button labeled Save and add block.
-
At the top of the block, in the 1st dropdown menu labeled Connection, select the option labeled PostgreSQL.
-
At the top of the block, in the 2nd dropdown menu labeled Profile, select the option labeled default.
-
At the top of the block, in the last dropdown menu labeled Write policy, select the option labeled Replace.
-
Paste the following SQL statement in the data exporter block:
SELECT * FROM {{ df_1 }}
-
Run the block by clicking the play icon button in the top right corner of the data loader block or press 1 of the following keyboard shortcuts:
⌘ + Enter
Control + Enter
Shift + Enter
-
You should see output statements like this:
Postgres initialized └─ Opening connection to PostgreSQL database... DONE Exporting data from upstream block transform_data to etl_demo.dev_etl_demo_transform_data_v1. ├─ └─ Exporting data to 'etl_demo.dev_etl_demo_export_data_v1'... ├─ E └─ Loading data... DONE
-
After you run the block (⌘ + Enter), you’ll see a sample of the data that was exported.
🎉 Congratulations!
You’ve successfully built an end-to-end ETL pipeline that loaded data, transformed it, and exported it to a database.
Now you’re ready to raid the dungeons and find magical treasures with your new powers!
If you have more questions or ideas, get real-time help in our live support Slack channel.