Create a new source
In this guide, we’ll build a sample source for Titanic. When you’re building your own source, you can swap out the Titanic name for the real name of your new source.
- Create a directory for the new source
- Define custom source class
- Add main function
- Add your source to the UI (optional)
- Test your source
1. Create a directory for the new source
In the mage_integrations/sources/
directory, add a new directory named after your source.
Use snake case and lowercase for your directory name: mage_integrations/sources/titanic/
.
In this new directory, create the following subdirectories and files:
mage_integrations/sources/titanic/schemas/passengers.json
mage_integrations/sources/titanic/templates/config.json
mage_integrations/sources/titanic/__init__.py
mage_integrations/sources/titanic/README.md
The directory structure should look like this:
mage_integrations/
| sources/
| | titanic/
| | | schemas/
| | | | passengers.json
| | | templates/
| | | | config.json
| | | __init__.py
| | | README.md
Schemas folder
This folder contains all the known schemas from your source.
For sources that have dynamic schemas (e.g. database tables from MySQL), this folder may be empty since the schema is dependent on the individual’s source data.
The JSON format of these schema files follows the Singer spec.
Naming convention
Use the plural name of the object you’re referencing. This plural name will be displayed to the individual who is setting up a synchronization using this source.
Examples
mage_integrations/sources/titanic/schemas/passengers.json
{
"properties": {
"Survived": {
"type": [
"null",
"integer"
]
},
"Name": {
"type": [
"null",
"string"
]
}
},
"type": [
"null",
"object"
]
}
Templates folder
This folder contains a sample configuration JSON file that’s displayed to the user when they are setting up a synchronization using this source.
The config.json
file contains keys and values that are used to configure the
behavior of the source as well as credentials to authenticate requests to the source.
Naming convention
You must use the exact filename config.json
.
Examples
mage_integrations/sources/titanic/templates/config.json
{
"api_key": "",
"secret_key": ""
}
__init__.py
This is where majority of the source logic will exist.
Examples
mage_integrations/sources/titanic/__init__.py
README.md
Document how to configure and use your source in the README.md
file.
2. Define custom source class
In the mage_integrations/sources/titanic/__init__.py
,
create a new class named after your source and subclass the
base source class.
from mage_integrations.sources.base import Source
class Titanic(Source):
pass
Override the load_data
method
The base Source
class has an instance method called load_data
. Here is the interface:
def load_data(
self,
bookmarks: Dict = None,
query: Dict = {},
start_date: datetime = None,
**kwargs,
) -> Generator[List[Dict], None, None]:
yield []
Override this method to contain the logic for fetching data that is specific to your source.
For example, here is the code for the Titanic
source’s load_data
method:
def load_data(
self,
**kwargs,
) -> Generator[List[Dict], None, None]:
url = 'https://raw.githubusercontent.com/mage-ai/datasets/master/titanic_survival.csv'
text = requests.get(url).text
rows = []
lines = text.rstrip().split('\n')
columns = lines[0].split(',')
for line in lines[1:]:
values = line.split(',')
rows.append({col: values[idx] for idx, col in enumerate(columns)})
yield rows
Available values in the query
keyword argument
There is a keyword argument named query
in the load_data
method that is a dictionary.
When Mage runs a source, the following keys and values are automatically available on each run:
Key | Description | Sample value |
---|---|---|
_execution_date | The date and time (in ISO format) of when the pipeline started running. | 2022-10-21T17:24:49.443559 |
_execution_partition | An automatically formatted partition of the pipeline run using the execution date. | 20221021T172557 (e.g. format %Y%m%dT%H%M%S ) |
_start_date | You can define this variable as a runtime variable in your pipeline or it’ll be automatically filled in using the date and time your pipeline runs minus 1 hour, day, week, etc (based on your schedule’s interval). | 2022-10-01T00:00:00.000000 |
_end_date | You can define this variable as a runtime variable in your pipeline or it’ll be automatically filled in using the date and time your pipeline runs. | 2022-10-02T00:00:00.000000 |
3. Add main function
In the file mage_integrations/sources/titanic/__init__.py
where your custom source class is defined, import this at the top of the file:
from mage_integrations.sources.base import Source, main
Then, add the following code at the bottom of the file (outside of the class definition):
if __name__ == '__main__':
main(Titanic)
Your final file should look like this:
from mage_integrations.sources.base import Source, main
from typing import Dict, Generator, List
import csv
import requests
class Titanic(Source):
def load_data(
self,
**kwargs,
) -> Generator[List[Dict], None, None]:
url = 'https://raw.githubusercontent.com/mage-ai/datasets/master/titanic_survival.csv'
text = requests.get(url).text
rows = []
lines = text.rstrip().split('\n')
columns = lines[0].split(',')
for line in lines[1:]:
values = line.split(',')
rows.append({col: values[idx] for idx, col in enumerate(columns)})
yield rows
if __name__ == '__main__':
main(Titanic)
4. Add your source to the UI (optional)
Add the new source to the SOURCES
list constant in this file: https://github.com/mage-ai/mage-ai/blob/master/mage_ai/data_integrations/sources/constants.py
5. Test your desination
Follow this doc to test your new source.
To test the source in the UI, you can install your updated mage_integrations
module by running the following commands in Mage terminal:
pip uninstall -y mage_integrations
pip install "git+https://github.com/your_repo.git@your_branch#egg=mage-integrations&subdirectory=mage_integrations"