In this guide, we’ll build a sample source for Titanic. When you’re building your own source, you can swap out the Titanic name for the real name of your new source.

  1. Create a directory for the new source
  2. Define custom source class
  3. Add main function
  4. Add your source to the UI (optional)
  5. Test your source

1. Create a directory for the new source

In the mage_integrations/sources/ directory, add a new directory named after your source. Use snake case and lowercase for your directory name: mage_integrations/sources/titanic/.

In this new directory, create the following subdirectories and files:

  • mage_integrations/sources/titanic/schemas/passengers.json
  • mage_integrations/sources/titanic/templates/config.json
  • mage_integrations/sources/titanic/__init__.py
  • mage_integrations/sources/titanic/README.md

The directory structure should look like this:

mage_integrations/
|   sources/
|   |   titanic/
|   |   |   schemas/
|   |   |   |   passengers.json
|   |   |   templates/
|   |   |   |   config.json
|   |   |   __init__.py
|   |   |   README.md

Schemas folder

This folder contains all the known schemas from your source.

For sources that have dynamic schemas (e.g. database tables from MySQL), this folder may be empty since the schema is dependent on the individual’s source data.

The JSON format of these schema files follows the Singer spec.

Naming convention

Use the plural name of the object you’re referencing. This plural name will be displayed to the individual who is setting up a synchronization using this source.

Examples

mage_integrations/sources/titanic/schemas/passengers.json

{
  "properties": {
    "Survived": {
      "type": [
        "null",
        "integer"
      ]
    },
    "Name": {
      "type": [
        "null",
        "string"
      ]
    }
  },
  "type": [
    "null",
    "object"
  ]
}

Templates folder

This folder contains a sample configuration JSON file that’s displayed to the user when they are setting up a synchronization using this source.

The config.json file contains keys and values that are used to configure the behavior of the source as well as credentials to authenticate requests to the source.

Naming convention

You must use the exact filename config.json.

Examples

mage_integrations/sources/titanic/templates/config.json

{
  "api_key": "",
  "secret_key": ""
}

__init__.py

This is where majority of the source logic will exist.

Examples

mage_integrations/sources/titanic/__init__.py

README.md

Document how to configure and use your source in the README.md file.


2. Define custom source class

In the mage_integrations/sources/titanic/__init__.py, create a new class named after your source and subclass the base source class.

from mage_integrations.sources.base import Source


class Titanic(Source):
    pass

Override the load_data method

The base Source class has an instance method called load_data. Here is the interface:

def load_data(
    self,
    bookmarks: Dict = None,
    query: Dict = {},
    start_date: datetime = None,
    **kwargs,
) -> Generator[List[Dict], None, None]:
    yield []

Override this method to contain the logic for fetching data that is specific to your source.

For example, here is the code for the Titanic source’s load_data method:

def load_data(
    self,
    **kwargs,
) -> Generator[List[Dict], None, None]:
    url = 'https://raw.githubusercontent.com/mage-ai/datasets/master/titanic_survival.csv'
    text = requests.get(url).text
    rows = []
    lines = text.rstrip().split('\n')
    columns = lines[0].split(',')
    for line in lines[1:]:
        values = line.split(',')
        rows.append({col: values[idx] for idx, col in enumerate(columns)})
    yield rows

Available values in the query keyword argument

There is a keyword argument named query in the load_data method that is a dictionary.

When Mage runs a source, the following keys and values are automatically available on each run:

KeyDescriptionSample value
_execution_dateThe date and time (in ISO format) of when the pipeline started running.2022-10-21T17:24:49.443559
_execution_partitionAn automatically formatted partition of the pipeline run using the execution date.20221021T172557 (e.g. format %Y%m%dT%H%M%S)
_start_dateYou can define this variable as a runtime variable in your pipeline or it’ll be automatically filled in using the date and time your pipeline runs minus 1 hour, day, week, etc (based on your schedule’s interval).2022-10-01T00:00:00.000000
_end_dateYou can define this variable as a runtime variable in your pipeline or it’ll be automatically filled in using the date and time your pipeline runs.2022-10-02T00:00:00.000000

3. Add main function

In the file mage_integrations/sources/titanic/__init__.py where your custom source class is defined, import this at the top of the file:

from mage_integrations.sources.base import Source, main

Then, add the following code at the bottom of the file (outside of the class definition):

if __name__ == '__main__':
    main(Titanic)

Your final file should look like this:

from mage_integrations.sources.base import Source, main
from typing import Dict, Generator, List
import csv
import requests


class Titanic(Source):
    def load_data(
        self,
        **kwargs,
    ) -> Generator[List[Dict], None, None]:
        url = 'https://raw.githubusercontent.com/mage-ai/datasets/master/titanic_survival.csv'
        text = requests.get(url).text
        rows = []
        lines = text.rstrip().split('\n')
        columns = lines[0].split(',')
        for line in lines[1:]:
            values = line.split(',')
            rows.append({col: values[idx] for idx, col in enumerate(columns)})
        yield rows


if __name__ == '__main__':
    main(Titanic)

4. Add your source to the UI (optional)

Add the new source to the SOURCES list constant in this file: https://github.com/mage-ai/mage-ai/blob/master/mage_ai/data_integrations/sources/constants.py

5. Test your desination

Follow this doc to test your new source.

To test the source in the UI, you can install your updated mage_integrations module by running the following commands in Mage terminal:

pip uninstall -y mage_integrations
pip install "git+https://github.com/your_repo.git@your_branch#egg=mage-integrations&subdirectory=mage_integrations"