RabbitMQ streaming pipeline

Set up RabbitMQ
If you don’t have RabbitMQ already setup, here is a quick guide on how to run and use RabbitMQ locally
Using RabbitMQ locally
-
In your terminal, execute this command:
docker run --rm -it -p 15672:15672 -p 5672:5672 rabbitmq:3-management
.(This should start the RabbitMQ server on port
5672
and a management UI on port15672
)
Create a Queue (via RabbitMQ management)
-
Log into your RabbitMQ management UI ( if you just created one, default user/password is guest/guest)
-
Go to “Queues” tab
-
Click on “Add a new queue”, define your settings and click on “Add Queue”
-
Your queue should be ready to be used!
Create a Queue (via python Pika)
-
Install pika in your favorite depedency manager:
pipenv install pika
-
Execute this snippet:
import pika connection = pika.BlockingConnection( pika.ConnectionParameters(host=localhost,port=5672)) # in case of failure connection, uncomment this code # username = guest # password = guest # connection_host = localhost # connection_port = 5672 # vt_host = r'%2f' # generated_url = f"amqp://{username}:{password}@" \ # f"{connection_host}:{connection_port}/{vt_host}" # create_connection = pika.BlockingConnection( # pika.URLParameters( # generated_url # ) # ) channel = connection.channel() channel.queue_declare(queue='magic_queue', durable=True) # durable is optional
-
Your queue should be ready to be used!
Start Mage
Create a new pipeline
-
Open Mage in your browser.
-
Click
+ New pipeline
, then selectStreaming
. -
Add a data loader block, select
RabbitMQ
, and paste the following:connector_type: 'rabbitmq' connection_host: localhost connection_port: 5672 queue_name: default configure_consume: False username: 'guest' password: 'guest' amqp_url_virtual_host: '%2f'
-
Add a transformer block and paste the following:
from typing import Dict, List import logging import json if 'transformer' not in globals(): from mage_ai.data_preparation.decorators import transformer @transformer def transform(messages: List[Dict], *args, **kwargs): kwargs['channel'].basic_ack(messages[0].delivery_tag) print(messages.body.decode()) return json.dumps({'message':messages.body.decode()})
-
Add a data exporter block, select
OpenSearch
and paste the following:connector_type: opensearch host: https://search-something-abcdefg123456.us-west-1.es.amazonaws.com/ index_name: python-test-index
- Change the
host
to match your OpenSearch domain’s endpoint. - Change the
index_name
to match the index you want to export data into.
- Change the
Test pipeline
Open the streaming pipeline you just created, and in the right side panel near the bottom, click the button Execute pipeline to test the pipeline.
You should see an output like this:
[RabbitMQSource] Trying to connect on {your url}
[RabbitMQSource] Connected on broker
[RabbitMQSource] Start consuming messages.
Publish messages using RabbitMQ Management UI
-
Log into your RabbitMQ management UI ( if you just created one, default user/password is guest/guest)
-
Go to “Queues” tab, and click on you desired Queue
-
Open “Publish Message” dropdown and write “Hello World!” on the payload block
-
Click on the black button called “Publish Message”. This message should be seen on both mage-ai and your opensearch index!
Publish messages using Python Pika
-
Execute this snippet:
import json import pika connection = pika.BlockingConnection( pika.ConnectionParameters(host='insert_host',port=5672)) # in case of failure connection, uncomment this code # username = guest # password = guest # connection_host = localhost # connection_port = 5672 # vt_host = r'%2f' # generated_url = f"amqp://{username}:{password}@" \ # f"{connection_host}:{connection_port}/{vt_host}" # create_connection = pika.BlockingConnection( # pika.URLParameters( # generated_url # ) # ) channel = connection.channel() message = "Hello World!" channel.basic_publish( exchange='', routing_key='magic_queue', body=message, properties=pika.BasicProperties( delivery_mode=pika.spec.PERSISTENT_DELIVERY_MODE ) ) print(" [x] Sent ")
-
This message should be seen on both mage-ai and your OpenSearch index!