Set up RabbitMQ
If you don’t have RabbitMQ already setup, here is a quick guide on how to run and
use RabbitMQ locally
Using RabbitMQ locally
-
In your terminal, execute this command:
docker run --rm -it -p 15672:15672 -p 5672:5672 rabbitmq:3-management
.
(This should start the RabbitMQ server on port 5672
and a management UI on port 15672
)
Create a Queue (via RabbitMQ management)
-
Log into your RabbitMQ management UI ( if you just created one, default user/password is guest/guest)
-
Go to “Queues” tab
-
Click on “Add a new queue”, define your settings and click on “Add Queue”
-
Your queue should be ready to be used!
Create a Queue (via python Pika)
-
Install pika in your favorite depedency manager:
pipenv install pika
-
Execute this snippet:
import pika
connection = pika.BlockingConnection(
pika.ConnectionParameters(host=localhost,port=5672))
# in case of failure connection, uncomment this code
# username = guest
# password = guest
# connection_host = localhost
# connection_port = 5672
# vt_host = r'%2f'
# generated_url = f"amqp://{username}:{password}@" \
# f"{connection_host}:{connection_port}/{vt_host}"
# create_connection = pika.BlockingConnection(
# pika.URLParameters(
# generated_url
# )
# )
channel = connection.channel()
channel.queue_declare(queue='magic_queue', durable=True) # durable is optional
-
Your queue should be ready to be used!
Start Mage
Create a new pipeline
-
Open Mage in your browser.
-
Click + New pipeline
, then select Streaming
.
-
Add a data loader block, select RabbitMQ
, and paste the following:
connector_type: 'rabbitmq'
connection_host: localhost
connection_port: 5672
queue_name: default
configure_consume: False
username: 'guest'
password: 'guest'
amqp_url_virtual_host: '%2f'
-
Add a transformer block and paste the following:
from typing import Dict, List
import logging
import json
if 'transformer' not in globals():
from mage_ai.data_preparation.decorators import transformer
@transformer
def transform(messages: List[Dict], *args, **kwargs):
kwargs['channel'].basic_ack(messages[0].delivery_tag)
print(messages.body.decode())
return json.dumps({'message':messages.body.decode()})
-
Add a data exporter block, select OpenSearch
and paste the following:
connector_type: opensearch
host: https://search-something-abcdefg123456.us-west-1.es.amazonaws.com/
index_name: python-test-index
- Change the
host
to match your OpenSearch domain’s endpoint.
- Change the
index_name
to match the index you want to export data into.
Test pipeline
Open the streaming pipeline you just created, and in the right side panel near
the bottom, click the button Execute pipeline to test the pipeline.
You should see an output like this:
[RabbitMQSource] Trying to connect on {your url}
[RabbitMQSource] Connected on broker
[RabbitMQSource] Start consuming messages.
Publish messages using RabbitMQ Management UI
-
Log into your RabbitMQ management UI ( if you just created one, default user/password is guest/guest)
-
Go to “Queues” tab, and click on you desired Queue
-
Open “Publish Message” dropdown and write “Hello World!” on the payload block
-
Click on the black button called “Publish Message”. This message should be seen on both mage-ai and your opensearch index!
Publish messages using Python Pika
-
Execute this snippet:
import json
import pika
connection = pika.BlockingConnection(
pika.ConnectionParameters(host='insert_host',port=5672))
# in case of failure connection, uncomment this code
# username = guest
# password = guest
# connection_host = localhost
# connection_port = 5672
# vt_host = r'%2f'
# generated_url = f"amqp://{username}:{password}@" \
# f"{connection_host}:{connection_port}/{vt_host}"
# create_connection = pika.BlockingConnection(
# pika.URLParameters(
# generated_url
# )
# )
channel = connection.channel()
message = "Hello World!"
channel.basic_publish(
exchange='',
routing_key='magic_queue',
body=message,
properties=pika.BasicProperties(
delivery_mode=pika.spec.PERSISTENT_DELIVERY_MODE
)
)
print(" [x] Sent ")
-
This message should be seen on both mage-ai and your OpenSearch index!
Responses are generated using AI and may contain mistakes.