Streaming pipeline

Set up RabbitMQ

If you don’t have RabbitMQ already setup, here is a quick guide on how to run and use RabbitMQ locally

Using RabbitMQ locally

  1. In your terminal, execute this command:

    docker run --rm -it -p 15672:15672 -p 5672:5672 rabbitmq:3-management.

    (This should start the RabbitMQ server on port 5672 and a management UI on port 15672)

Create a Queue (via RabbitMQ management)

  1. Log into your RabbitMQ management UI ( if you just created one, default user/password is guest/guest)

  2. Go to “Queues” tab

  3. Click on “Add a new queue”, define your settings and click on “Add Queue”

  4. Your queue should be ready to be used!

Create a Queue (via python Pika)

  1. Install pika in your favorite depedency manager: pipenv install pika

  2. Execute this snippet:

    import pika
    
    connection = pika.BlockingConnection(
        pika.ConnectionParameters(host=localhost,port=5672))
    
    # in case of failure connection, uncomment this code
    # username = guest
    # password = guest
    # connection_host = localhost
    # connection_port = 5672
    # vt_host = r'%2f'
    # generated_url = f"amqp://{username}:{password}@" \
    #                 f"{connection_host}:{connection_port}/{vt_host}"
    # create_connection = pika.BlockingConnection(
    #                 pika.URLParameters(
    #                     generated_url
    #                 )
    #             )
    
    channel = connection.channel()
    
    channel.queue_declare(queue='magic_queue', durable=True) # durable is optional
    
    
  3. Your queue should be ready to be used!

Start Mage

Create a new pipeline

  1. Open Mage in your browser.

  2. Click + New pipeline, then select Streaming.

  3. Add a data loader block, select RabbitMQ, and paste the following:

    connector_type: 'rabbitmq'
    connection_host: localhost
    connection_port: 5672
    queue_name: default
    configure_consume: False
    username: 'guest'
    password: 'guest'
    amqp_url_virtual_host: '%2f'
    
  4. Add a transformer block and paste the following:

    from typing import Dict, List
    import logging
    import json
    
    if 'transformer' not in globals():
        from mage_ai.data_preparation.decorators import transformer
    
    
    @transformer
    def transform(messages: List[Dict], *args, **kwargs):
        kwargs['channel'].basic_ack(messages[0].delivery_tag)
        print(messages.body.decode())
        return json.dumps({'message':messages.body.decode()})
    
  5. Add a data exporter block, select OpenSearch and paste the following:

    connector_type: opensearch
    host: https://search-something-abcdefg123456.us-west-1.es.amazonaws.com/
    index_name: python-test-index
    
    1. Change the host to match your OpenSearch domain’s endpoint.
    2. Change the index_name to match the index you want to export data into.

Test pipeline

Open the streaming pipeline you just created, and in the right side panel near the bottom, click the button Execute pipeline to test the pipeline.

You should see an output like this:

[RabbitMQSource] Trying to connect on {your url}
[RabbitMQSource] Connected on broker
[RabbitMQSource] Start consuming messages.

Publish messages using RabbitMQ Management UI

  1. Log into your RabbitMQ management UI ( if you just created one, default user/password is guest/guest)

  2. Go to “Queues” tab, and click on you desired Queue

  3. Open “Publish Message” dropdown and write “Hello World!” on the payload block

  4. Click on the black button called “Publish Message”. This message should be seen on both mage-ai and your opensearch index!

Publish messages using Python Pika

  1. Execute this snippet:

    import json
    
    import pika
    
    connection = pika.BlockingConnection(
        pika.ConnectionParameters(host='insert_host',port=5672))
    
    # in case of failure connection, uncomment this code
    # username = guest
    # password = guest
    # connection_host = localhost
    # connection_port = 5672
    # vt_host = r'%2f'
    # generated_url = f"amqp://{username}:{password}@" \
    #                 f"{connection_host}:{connection_port}/{vt_host}"
    # create_connection = pika.BlockingConnection(
    #                 pika.URLParameters(
    #                     generated_url
    #                 )
    #             )
    
    channel = connection.channel()
    
    message = "Hello World!"
    
    channel.basic_publish(
      exchange='',
      routing_key='magic_queue',
      body=message,
      properties=pika.BasicProperties(
          delivery_mode=pika.spec.PERSISTENT_DELIVERY_MODE
      )
    )
    
    print(" [x] Sent ")
    
  2. This message should be seen on both mage-ai and your OpenSearch index!