Streaming
Contributing
This doc talks about how to add a new source or destination to the streaming pipeline.
Prerequisites
Follow this doc to set up the development environment for Mage.
Add a new source
Example PR for adding a source: https://github.com/mage-ai/mage-ai/pull/1953
- Add the new source type to mage_ai/streaming/constants.py.
- Add the source file to the folder https://github.com/mage-ai/mage-ai/tree/master/mage_ai/streaming/sources
- Define the source class which inherits from the
BaseSource
. - Define the source config class.
- Implement the methods:
init_client
: Initialize the client used to connect to the source.batch_read
: Batch read messages from the source and usehandler
method to process the messages.
- Define the source class which inherits from the
- Add the new source to the SourceFactory.
- Add a template file with example config to the folder: https://github.com/mage-ai/mage-ai/tree/master/mage_ai/data_preparation/templates/data_loaders/streaming
- Add new source type to the frontend code.
- https://github.com/mage-ai/mage-ai/blob/master/mage_ai/frontend/components/PipelineDetail/AddNewBlocks/utils.tsx#L31
- https://github.com/mage-ai/mage-ai/blob/master/mage_ai/frontend/interfaces/DataSourceType.ts#L3
- https://github.com/mage-ai/mage-ai/blob/master/mage_ai/frontend/interfaces/DataSourceType.ts#L22
- Add unit tests: https://github.com/mage-ai/mage-ai/tree/master/mage_ai/tests/streaming/sources
- Add doc: https://github.com/mage-ai/mage-ai/tree/master/docs/guides/streaming/sources
Add a new destination (sink)
Example PR for adding a sink: https://github.com/mage-ai/mage-ai/pull/2121
- Add the new sink type to mage_ai/streaming/constants.py.
- Add the sink file to the folder https://github.com/mage-ai/mage-ai/tree/master/mage_ai/streaming/sinks
- Define the sink class which inherits from the
BaseSink
. - Define the sink config class.
- Implement the methods:
init_client
: Initialize the client used to connect to the destination.batch_write
: Batch write messages to destination.
- Define the sink class which inherits from the
- Add the new sink to the SinkFactory.
- Add a template file with example config to the folder: https://github.com/mage-ai/mage-ai/tree/master/mage_ai/data_preparation/templates/data_exporters/streaming
- Add new source type to the frontend code.
- https://github.com/mage-ai/mage-ai/blob/master/mage_ai/frontend/components/PipelineDetail/AddNewBlocks/utils.tsx#L37
- https://github.com/mage-ai/mage-ai/blob/master/mage_ai/frontend/interfaces/DataSourceType.ts#L3
- https://github.com/mage-ai/mage-ai/blob/master/mage_ai/frontend/interfaces/DataSourceType.ts#L22
- Add unit tests: https://github.com/mage-ai/mage-ai/tree/master/mage_ai/tests/streaming/sinks
- Add doc: https://github.com/mage-ai/mage-ai/tree/master/docs/guides/streaming/destinations
Was this page helpful?