> ## Documentation Index
> Fetch the complete documentation index at: https://docs.mage.ai/llms.txt
> Use this file to discover all available pages before exploring further.

# RabbitMQ streaming pipeline

> Build pipelines that ingest data from event streaming sources like RabbitMQ.

<img alt="Streaming pipeline" src="https://i.gifer.com/4Ze9.gif" />

## Set up RabbitMQ

If you don't have RabbitMQ already setup, here is a quick guide on how to run and
use RabbitMQ locally

### Using RabbitMQ locally

1. In your terminal, execute this command:

   `docker run --rm -it -p 15672:15672 -p 5672:5672 rabbitmq:3-management`.

   (This should start the RabbitMQ server on port `5672` and a management UI on port `15672`)

### Create a Queue (via RabbitMQ management)

1. Log into your RabbitMQ management UI ( if you just created one, default user/password is guest/guest)

2. Go to "Queues" tab

3. Click on "Add a new queue", define your settings and click on "Add Queue"

4. Your queue should be ready to be used!

### Create a Queue (via python Pika)

1. Install pika in your favorite dependency manager:
   `pipenv install pika`

2. Execute this snippet:

   ```Python theme={"system"}
   import pika

   connection = pika.BlockingConnection(
       pika.ConnectionParameters(host=localhost,port=5672))

   # in case of failure connection, uncomment this code
   # username = guest
   # password = guest
   # connection_host = localhost
   # connection_port = 5672
   # vt_host = r'%2f'
   # generated_url = f"amqp://{username}:{password}@" \
   #                 f"{connection_host}:{connection_port}/{vt_host}"
   # create_connection = pika.BlockingConnection(
   #                 pika.URLParameters(
   #                     generated_url
   #                 )
   #             )

   channel = connection.channel()

   channel.queue_declare(queue='magic_queue', durable=True) # durable is optional

   ```

3. Your queue should be ready to be used!

## Start Mage

### Create a new pipeline

1. Open Mage in your browser.

2. Click <b>`+ New pipeline`</b>, then select `Streaming`.

3. Add a data loader block, select `RabbitMQ`, and paste the following:

   ```yaml theme={"system"}
   connector_type: 'rabbitmq'
   connection_host: localhost
   connection_port: 5672
   queue_name: default
   configure_consume: False
   username: 'guest'
   password: 'guest'
   amqp_url_virtual_host: '%2f'
   ```

4. Add a transformer block and paste the following:

   ```python theme={"system"}
   from typing import Dict, List
   import logging
   import json

   if 'transformer' not in globals():
       from mage_ai.data_preparation.decorators import transformer


   @transformer
   def transform(messages: List[Dict], *args, **kwargs):
       kwargs['channel'].basic_ack(messages[0].delivery_tag)
       print(messages.body.decode())
       return json.dumps({'message':messages.body.decode()})
   ```

5. Add a data exporter block, select `OpenSearch` and paste the following:

   ```yaml theme={"system"}
   connector_type: opensearch
   host: https://search-something-abcdefg123456.us-west-1.es.amazonaws.com/
   index_name: python-test-index
   ```

   1. Change the `host` to match your OpenSearch domain’s endpoint.
   2. Change the `index_name` to match the index you want to export data into.

***

## Test pipeline

Open the streaming pipeline you just created, and in the right side panel near
the bottom, click the button <b>Execute pipeline</b> to test the pipeline.

You should see an output like this:

```
[RabbitMQSource] Trying to connect on {your url}
[RabbitMQSource] Connected on broker
[RabbitMQSource] Start consuming messages.
```

### Publish messages using RabbitMQ Management UI

1. Log into your RabbitMQ management UI ( if you just created one, default user/password is guest/guest)

2. Go to "Queues" tab, and click on you desired Queue

3. Open "Publish Message" dropdown and write "Hello World!" on the payload block

4. Click on the black button called "Publish Message". This message should be seen on both mage-ai and your opensearch index!

### Publish messages using Python Pika

1. Execute this snippet:

   ```python theme={"system"}
   import json

   import pika

   connection = pika.BlockingConnection(
       pika.ConnectionParameters(host='insert_host',port=5672))

   # in case of failure connection, uncomment this code
   # username = guest
   # password = guest
   # connection_host = localhost
   # connection_port = 5672
   # vt_host = r'%2f'
   # generated_url = f"amqp://{username}:{password}@" \
   #                 f"{connection_host}:{connection_port}/{vt_host}"
   # create_connection = pika.BlockingConnection(
   #                 pika.URLParameters(
   #                     generated_url
   #                 )
   #             )

   channel = connection.channel()

   message = "Hello World!"

   channel.basic_publish(
     exchange='',
     routing_key='magic_queue',
     body=message,
     properties=pika.BasicProperties(
         delivery_mode=pika.spec.PERSISTENT_DELIVERY_MODE
     )
   )

   print(" [x] Sent ")
   ```

2. This message should be seen on both mage-ai and your OpenSearch index!
