Start Mage

  1. Open Mage in your browser and then navigate to the pipelines page.
  2. Create a new pipeline by clicking the “+ New” button and then select “Streaming.”
  3. Enter a name for the pipeline and click “Create.”

Authentication To authenticate with Azure Event Hub, complete the following actions:

  1. Configure your io_config.yml file with the account variables below. If you need further information about how to store secrets in Mage check out the documentation.
  AZURE_CLIENT_ID: "{{ env_var('AZURE_CLIENT_ID') }}"
  AZURE_CLIENT_SECRET: "{{ env_var('AZURE_CLIENT_SECRET') }}"
  AZURE_STORAGE_ACCOUNT_NAME: "{{ env_var('AZURE_STORAGE_ACCOUNT_NAME') }}"
  AZURE_TENANT_ID: "{{ env_var('AZURE_TENANT_ID') }}"
  1. Add a data loader block to your pipeline and then select “Azure Event Hub” as the source.
  2. Complete the data loader block with the information below.
connector_type: azure_event_hub
connection_str: 'Endpoint=sb://[namespace].servicebus.windows.net/;...'
eventhub_name: event_hub_name
consumer_group: '$Default'

Connection string: Use the connection_str parameter in the config as shown above. You can obtain this from your Azure portal.

  1. Add the template transformer block or create a custom block to transform data from Azure Event Hub
    • The template will populate with the code below. Make adjustments to your specific requirements.
from typing import Dict, List

if 'transformer' not in globals():
    from mage_ai.data_preparation.decorators import transformer

@transformer
def transform(messages: List[Dict], *args, **kwargs):
    """
    Template code for a transformer block.

    Args:
        messages: List of messages from the Event Hub.

    Returns:
        Transformed messages
    """
# Specify your transformation logic here
    print(f'Transformed messages: {messages}')
    return messages
  1. Add a data exporter block and select “Dummy”
  2. Use the default setting as seen below.
connector_type: dummy
print_msg: true

Additional Configuration Options

These additional configuration options are specifically for the data loader block that uses the Azure Event Hub connector. They should be added to the YAML configuration of your data loader block.

  1. prefetch: This determines the maximum number of events to retrieve in a single batch when reading from Event Hub.
  2. max_wait_time: This sets the maximum time to wait for a batch to be filled before returning.
  3. starting_position: This optional parameter specifies where to start reading events from in the Event Hub.
    • Common starting positions include:
      • -1 or @latest: Start from the end of the stream
      • -2 or @earliest: Start from the end of the stream
prefetch: <batch size> # optional, usually between 100 - 1000
max_wait_time: <Wait time> # optional
starting_position: <starting position> #optional, example '@latest'

By following this guide, you can successfully set up a streaming pipeline in Mage using Azure Event Hub. This allows you to authenticate, load, transform, and export data efficiently. With these steps, you can leverage Mage and Azure Event Hub to create powerful and scalable streaming data pipelines.