Mage Sources are built on Singer Taps. You can read more about the Singer spec here. In this guide, we’ll copy over our sample source to get started and build from there.

If you haven’t already, start by cloning the Mage repo and navigating to the folder:

git clone https://github.com/mage-ai/mage-ai.git
cd mage-ai

For our source, we’ll need:

  • A schema that represents the source data
  • A template configuration file
  • An __init__.py file that contains the logic for fetching data from the source.
    • A test_connection method that tests the connection to our source.
    • A discover method that returns a Catalog of streams.
    • A load_data method that yields data from the source as a dictionary.

We’ll start from a template to make things easy.

Copy the template source

Navigate to /mage-ai/mage_integrations/mage_integrations/sources and create a copy of the titanic directory:

cd ./mage_integrations/mage_integrations/sources
cp -R titanic MY_SOURCE

Our titanic source simply reads data from a CSV file, but we’ll modify it to do much more. You’ll notice the following files and folders in the MY_SOURCE directory:

mage_integrations/sources/MY_SOURCE
├── README.md
├── __init__.py
├── schemas
│   └── passengers.json
└── templates
    └── config.json

So we’ll need to update or overwrite each of these.

Update schemas

This folder contains all the known schemas from your source. For sources that have dynamic schemas (e.g. database tables from MySQL), this folder may be empty since the schema is dependent on the individual’s source data.

The JSON format of these schema files follows the Singer spec.

When naming schemas, use the plural name of the object you’re referencing. This plural name will be displayed to the individual who is setting up a synchronization using this source. Take the sample “passengers” schema for example:

{
  "properties": {
    "Survived": {
      "type": ["null", "integer"]
    },
    "Name": {
      "type": ["null", "string"]
    }
  },
  "type": ["null", "object"]
}

Add templates

This folder contains a sample configuration JSON file that’s displayed to the user when they are setting up a synchronization using this source.

The config.json file contains keys and values that are used to configure the behavior of the source as well as credentials to authenticate requests to the source. You must use the exact filename config.json, regardless of your source’s name.

The following simple example is present in the Titanic source.

{
  "api_key": "",
  "secret_key": ""
}

Update __init__.py

A majority of source logic lives in the __init__.py file. Most of the work in adding your own source will involve creating/overwriting methods from __init__.py. Our sample contains the following:

from mage_integrations.sources.base import Source, main
from typing import Dict, Generator, List
import requests


URL = 'https://raw.githubusercontent.com/mage-ai/datasets/master/titanic_survival.csv'

class Titanic(Source):

    def load_data(
        self,
        **kwargs,
    ) -> Generator[List[Dict], None, None]:
        text = requests.get(URL).text

        rows = []
        lines = text.rstrip().split('\n')
        columns = lines[0].split(',')

        for line in lines[1:]:
            values = line.split(',')
            rows.append({col: values[idx] for idx, col in enumerate(columns)})

        yield rows

    def test_connection(self):
        request = requests.get(URL)

        if request.status_code != 200:
            raise Exception('Could not fetch titanic data')


if __name__ == '__main__':
    main(Titanic)

Rename source class

First, rename the source class to match your new source

from mage_integrations.sources.base import Source

class MY_SOURCE(Source):
    pass

Edit methods

The load_data() method

Override this method to contain the logic for fetching data specific to your source. The Titanic source’s load_data method reads data from a CSV file and yields a list of dictionaries:

def load_data(
    self,
    **kwargs,
) -> Generator[List[Dict], None, None]:
    url = 'https://raw.githubusercontent.com/mage-ai/datasets/master/titanic_survival.csv'
    text = requests.get(url).text
    rows = []
    lines = text.rstrip().split('\n')
    columns = lines[0].split(',')
    for line in lines[1:]:
        values = line.split(',')
        rows.append({col: values[idx] for idx, col in enumerate(columns)})
    yield rows

Your load_data method should also yield a list of dictionaries. There is a keyword argument named query in the load_data method that is a dictionary. When Mage runs a source, the following keys and values are automatically available on each run:

KeyDescriptionSample value
_execution_dateThe date and time (in ISO format) of when the pipeline started running.2022-10-21T17:24:49.443559
_execution_partitionAn automatically formatted partition of the pipeline run using the execution date.20221021T172557 (e.g. format %Y%m%dT%H%M%S)
_start_dateYou can define this variable as a runtime variable in your pipeline or it’ll be automatically filled in using the date and time your pipeline runs minus 1 hour, day, week, etc (based on your schedule’s interval).2022-10-01T00:00:00.000000
_end_dateYou can define this variable as a runtime variable in your pipeline or it’ll be automatically filled in using the date and time your pipeline runs.2022-10-02T00:00:00.000000

The discover() method

The discover method should return a Catalog of streams that define the data in your source. For example discover methods, see our Google Sheets source or our DynamoDB source.

The test_connection() method

We need a test_connection method to enable testing the tap in the Mage UI. This is a simple method that instantiates the tap and closes it. Just replace YourSource with the source class, e.g. GitHub and provide the configuration arguments to the tap. Here's a good example from the sFTP tap.

def test_connection(self) -> None:
        client = SFTPConnection(host=self.config['host'],
                                username=self.config['username'],
                                password=self.config['password'],
                                private_key_file=self.config.get(
                                    'private_key_file'),
                                port=self.config['port'])
        client.close()

Update main() function

Change the source name in the main() function to match your new source:

if __name__ == '__main__':
    main(MY_SOURCE)

Add your source to the UI

The list of sources is available here, add yours to make it accessible via the UI.

Document your source

Document how to configure and use your source in the README.md file at the source’s root directory. Documentation helps Magers and future contributor understand exactly how your source works! Be sure to be thorough and descriptive. Here’s an example of documentation done well.

Test your source

You’ll first need to configure your development environment. Once complete, follow this doc to test your new source.

Was this page helpful?