Create a new source
This guide details adding a new source to Mage. If your source already exists as a Singer tap, check out our guide for adapting an existing source instead.
Mage Sources are built on Singer Taps. You can read more about the Singer spec here. In this guide, we’ll copy over our sample source to get started and build from there.
If you haven’t already, start by cloning the Mage repo and navigating to the folder:
git clone https://github.com/mage-ai/mage-ai.git
cd mage-ai
For our source, we’ll need:
- A schema that represents the source data
- A template configuration file
- An
__init__.py
file that contains the logic for fetching data from the source.- A
test_connection
method that tests the connection to our source. - A
discover
method that returns a Catalog of streams. - A
load_data
method that yields data from the source as a dictionary.
- A
We’ll start from a template to make things easy.
Copy the template source
Navigate to /mage-ai/mage_integrations/mage_integrations/sources
and create a copy of the titanic
directory:
cd ./mage_integrations/mage_integrations/sources
cp -R titanic MY_SOURCE
Our titanic source simply reads data from a CSV file, but we’ll modify it to do much more. You’ll notice the following files and folders in the MY_SOURCE
directory:
mage_integrations/sources/MY_SOURCE
├── README.md
├── __init__.py
├── schemas
│ └── passengers.json
└── templates
└── config.json
So we’ll need to update or overwrite each of these.
Update schemas
This folder contains all the known schemas from your source. For sources that have dynamic schemas (e.g. database tables from MySQL), this folder may be empty since the schema is dependent on the individual’s source data.
The JSON format of these schema files follows the Singer spec.
When naming schemas, use the plural name of the object you’re referencing. This plural name will be displayed to the individual who is setting up a synchronization using this source. Take the sample “passengers” schema for example:
{
"properties": {
"Survived": {
"type": ["null", "integer"]
},
"Name": {
"type": ["null", "string"]
}
},
"type": ["null", "object"]
}
Add templates
This folder contains a sample configuration JSON file that’s displayed to the user when they are setting up a synchronization using this source.
The config.json
file contains keys and values that are used to configure the behavior of the source as well as credentials to authenticate requests to the source. You must use the exact filename config.json
, regardless of your source’s name.
The following simple example is present in the Titanic source.
{
"api_key": "",
"secret_key": ""
}
Update __init__.py
A majority of source logic lives in the __init__.py
file. Most of the work in adding your own source will involve creating/overwriting methods from __init__.py
. Our sample contains the following:
from mage_integrations.sources.base import Source, main
from typing import Dict, Generator, List
import requests
URL = 'https://raw.githubusercontent.com/mage-ai/datasets/master/titanic_survival.csv'
class Titanic(Source):
def load_data(
self,
**kwargs,
) -> Generator[List[Dict], None, None]:
text = requests.get(URL).text
rows = []
lines = text.rstrip().split('\n')
columns = lines[0].split(',')
for line in lines[1:]:
values = line.split(',')
rows.append({col: values[idx] for idx, col in enumerate(columns)})
yield rows
def test_connection(self):
request = requests.get(URL)
if request.status_code != 200:
raise Exception('Could not fetch titanic data')
if __name__ == '__main__':
main(Titanic)
Rename source class
First, rename the source class to match your new source
from mage_integrations.sources.base import Source
class MY_SOURCE(Source):
pass
Edit methods
The load_data()
method
Override this method to contain the logic for fetching data specific to your source. The Titanic
source’s load_data
method reads data from a CSV file and yields a list of dictionaries:
def load_data(
self,
**kwargs,
) -> Generator[List[Dict], None, None]:
url = 'https://raw.githubusercontent.com/mage-ai/datasets/master/titanic_survival.csv'
text = requests.get(url).text
rows = []
lines = text.rstrip().split('\n')
columns = lines[0].split(',')
for line in lines[1:]:
values = line.split(',')
rows.append({col: values[idx] for idx, col in enumerate(columns)})
yield rows
Your load_data
method should also yield a list of dictionaries. There is a keyword argument named query
in the load_data
method that is a dictionary. When Mage runs a source, the following keys and values are automatically available on each run:
Key | Description | Sample value |
---|---|---|
_execution_date | The date and time (in ISO format) of when the pipeline started running. | 2022-10-21T17:24:49.443559 |
_execution_partition | An automatically formatted partition of the pipeline run using the execution date. | 20221021T172557 (e.g. format %Y%m%dT%H%M%S ) |
_start_date | You can define this variable as a runtime variable in your pipeline or it’ll be automatically filled in using the date and time your pipeline runs minus 1 hour, day, week, etc (based on your schedule’s interval). | 2022-10-01T00:00:00.000000 |
_end_date | You can define this variable as a runtime variable in your pipeline or it’ll be automatically filled in using the date and time your pipeline runs. | 2022-10-02T00:00:00.000000 |
The discover()
method
The discover method should return a Catalog of streams that define the data in your source. For example discover methods, see our Google Sheets source or our DynamoDB source.
The test_connection()
method
We need a test_connection
method to enable testing the tap in the Mage UI. This is a simple method that instantiates the tap and closes it. Just replace YourSource
with the source class, e.g. GitHub
and provide the configuration arguments to the tap. Here's a good example from the sFTP tap.
def test_connection(self) -> None:
client = SFTPConnection(host=self.config['host'],
username=self.config['username'],
password=self.config['password'],
private_key_file=self.config.get(
'private_key_file'),
port=self.config['port'])
client.close()
Update main()
function
Change the source name in the main()
function to match your new source:
if __name__ == '__main__':
main(MY_SOURCE)
Add your source to the UI
The list of sources is available here, add yours to make it accessible via the UI.
Document your source
Document how to configure and use your source in the README.md
file at the source’s root directory. Documentation helps Magers and future contributor understand exactly how your source works! Be sure to be thorough and descriptive. Here’s an example of documentation done well.
Test your source
You’ll first need to configure your development environment. Once complete, follow this doc to test your new source.
Was this page helpful?