RabbitMQ streaming pipeline
Build pipelines that ingest data from event streaming sources like RabbitMQ.
Set up RabbitMQ
If you don’t have RabbitMQ already setup, here is a quick guide on how to run and use RabbitMQ locally
Using RabbitMQ locally
-
In your terminal, execute this command:
docker run --rm -it -p 15672:15672 -p 5672:5672 rabbitmq:3-management
.(This should start the RabbitMQ server on port
5672
and a management UI on port15672
)
Create a Queue (via RabbitMQ management)
-
Log into your RabbitMQ management UI ( if you just created one, default user/password is guest/guest)
-
Go to “Queues” tab
-
Click on “Add a new queue”, define your settings and click on “Add Queue”
-
Your queue should be ready to be used!
Create a Queue (via python Pika)
-
Install pika in your favorite depedency manager:
pipenv install pika
-
Execute this snippet:
-
Your queue should be ready to be used!
Start Mage
Create a new pipeline
-
Open Mage in your browser.
-
Click
+ New pipeline
, then selectStreaming
. -
Add a data loader block, select
RabbitMQ
, and paste the following: -
Add a transformer block and paste the following:
-
Add a data exporter block, select
OpenSearch
and paste the following:- Change the
host
to match your OpenSearch domain’s endpoint. - Change the
index_name
to match the index you want to export data into.
- Change the
Test pipeline
Open the streaming pipeline you just created, and in the right side panel near the bottom, click the button Execute pipeline to test the pipeline.
You should see an output like this:
Publish messages using RabbitMQ Management UI
-
Log into your RabbitMQ management UI ( if you just created one, default user/password is guest/guest)
-
Go to “Queues” tab, and click on you desired Queue
-
Open “Publish Message” dropdown and write “Hello World!” on the payload block
-
Click on the black button called “Publish Message”. This message should be seen on both mage-ai and your opensearch index!
Publish messages using Python Pika
-
Execute this snippet:
-
This message should be seen on both mage-ai and your OpenSearch index!
Was this page helpful?