> ## Documentation Index
> Fetch the complete documentation index at: https://docs.mage.ai/llms.txt
> Use this file to discover all available pages before exploring further.

# Kafka streaming pipeline

> Build pipelines that ingest data from event streaming sources like Kafka.

<img alt="Streaming pipeline" src="https://c.tenor.com/sezVCSw-n7sAAAAC/waterfall-stream.gif" />

<br />

## Set up Kafka

If you don’t have Kafka already setup, here is a quick guide on how to run and
use Kafka locally:

### Using Kafka locally

1. In your terminal, clone this repository:
   `git clone https://github.com/wurstmeister/kafka-docker.git`.
2. Change directory into that repository: `cd kafka-docker`.
3. Edit the `docker-compose.yml` file to match this:
   ```yaml theme={"system"}
   version: "2"
   services:
     zookeeper:
       image: wurstmeister/zookeeper
       ports:
         - "2181:2181"
     kafka:
       build: .
       container_name: docker_kafka
       ports:
         - "9092:9092"
       expose:
         - "9093"
       environment:
         KAFKA_ADVERTISED_LISTENERS: INSIDE://kafka:9093,OUTSIDE://localhost:9092
         KAFKA_LISTENER_SECURITY_PROTOCOL_MAP: INSIDE:PLAINTEXT,OUTSIDE:PLAINTEXT
         KAFKA_LISTENERS: INSIDE://0.0.0.0:9093,OUTSIDE://0.0.0.0:9092
         KAFKA_INTER_BROKER_LISTENER_NAME: INSIDE
         KAFKA_ZOOKEEPER_CONNECT: zookeeper:2181
       volumes:
         - /var/run/docker.sock:/var/run/docker.sock
   ```
4. Start Docker:
   ```bash theme={"system"}
   docker-compose up
   ```
   If you encounter the error `command not found: docker-compose`,
   try running the following command instead:
   ```bash theme={"system"}
   docker compose up
   ```
5. Start a terminal session in the running container:
   ```bash theme={"system"}
   docker exec -i -t -u root $(docker ps | grep docker_kafka | cut -d' ' -f1) /bin/bash
   ```
6. Create a topic:
   ```bash theme={"system"}
   $KAFKA_HOME/bin/kafka-topics.sh --create --partitions 4 --bootstrap-server kafka:9092 --topic test
   ```
7. List all available topics in Kafka instance:
   ```bash theme={"system"}
   $KAFKA_HOME/bin/kafka-topics.sh --bootstrap-server kafka:9092 --list
   ```
8. Start a producer on topic named `test`:
   ```bash theme={"system"}
   $KAFKA_HOME/bin/kafka-console-producer.sh --broker-list kafka:9092 --topic=test
   ```
9. Send messages to the topic named `test` by typing the following JSON strings in the
   terminal (Note that Kafka messages in Mage are assumed to be in JSON format):
   ```bash theme={"system"}
   >{ "hello": 1 }
   >{ "this is a test": 1 }
   >{ "test": 1 }
   >{ "test": 2 }
   >{ "test": 3 }
   ```
10. Open another terminal and start a consumer on the topic named `test`:

```bash theme={"system"}
$KAFKA_HOME/bin/kafka-console-consumer.sh --from-beginning --bootstrap-server kafka:9092 --topic=test
```

11. The output should look something like this:

```bash theme={"system"}
{ "hello": 1 }
{ "this is a test": 1 }
{ "test": 1 }
{ "test": 2 }
{ "test": 3 }
```

<sub>
  Original
  [source](https://towardsdatascience.com/introduction-to-kafka-stream-processing-in-python-e30d34bf3a12)
  of instructions.
</sub>

***

## Build streaming pipeline

### Start Mage

#### Using Kafka locally in a Docker container

Start Mage using Docker. Run the following command to run Docker in network
mode:

```bash theme={"system"}
docker run -it -p 6789:6789 -v $(pwd):/home/src \
  --env AWS_ACCESS_KEY_ID=your_access_key_id \
  --env AWS_SECRET_ACCESS_KEY=your_secret_access_key \
  --env AWS_REGION=your_region \
  --network kafka-docker_default \
  mageai/mageai /app/run_app.sh  mage start default_repo
```

<Note>
  Change the environment variables argument depending on your cloud provider.
</Note>

If the network named `kafka-docker_default` doesn’t exist, create a new network:

```bash theme={"system"}
docker network create -d bridge kafka-docker_default
```

Check that it exists:

```bash theme={"system"}
docker network ls
```

***

If you can’t connect to Kafka locally in a Docker container using Mage in a
Docker container, do the following:

1. Clone Mage: `git clone https://github.com/mage-ai/mage-ai.git`.
2. Change directory into Mage: `cd mage-ai`.
3. Edit the `docker-compose.yml` file to match this:
   ```yaml theme={"system"}
   version: '3'
   services:
     server:
       ... (original config)
       networks:
         - kafka
     app:
       ... (original config)
   networks:
     kafka:
       name: kafka-docker_default
       external: true
   ```
4. Run the following script in your terminal: `./scripts/dev.sh`.

This will run Mage in development mode; which runs it in a Docker container
using `docker compose` instead of `docker run`.

#### Using Kafka without a Docker container

Start Mage using Docker. If you haven’t done this before, refer to the
[setup guide](/getting-started/setup).

### Create a new pipeline

1. Open Mage in your browser.

2. Click <b>`+ New pipeline`</b>, then select `Streaming`.

3. Add a data loader block, select `Kafka`, and paste the following:
   ```yaml theme={"system"}
   connector_type: kafka
   bootstrap_server: "localhost:9092"
   topic: test
   consumer_group: unique_consumer_group
   include_metadata: false
   api_version: 0.10.2
   batch_size: 100
   ```
   1. By default, the `bootstrap_server` is set to `localhost:9092`. If you’re
      running Mage in a docker container, the `bootstrap_server` should be
      `kafka:9093`.
   2. Messages are consumed from source in micro batch mode for better efficiency.
      The default batch size is 100. You can adjust the batch size in the source config.

4. Add a transformer block and paste the following:

   ```python theme={"system"}
   from typing import Dict, List

   if 'transformer' not in globals():
       from mage_ai.data_preparation.decorators import transformer


   @transformer
   def transform(messages: List[Dict], *args, **kwargs):
       for msg in messages:
           print(msg)

       return messages
   ```

5. Add a data exporter block, select `OpenSearch` and paste the following:
   ```yaml theme={"system"}
   connector_type: opensearch
   host: https://search-something-abcdefg123456.us-west-1.es.amazonaws.com/
   index_name: python-test-index
   ```
   1. Change the `host` to match your OpenSearch domain’s endpoint.
   2. Change the `index_name` to match the index you want to export data into.

***

## Test pipeline

Open the streaming pipeline you just created, and in the right side panel near
the bottom, click the button <b>`Execute pipeline`</b> to test the pipeline.

You should see an output like this:

```
[streaming_pipeline_test] [KafkaSource] Start initializing consumer.
[streaming_pipeline_test] [KafkaSource] Finish initializing consumer.
[streaming_pipeline_test] [KafkaSource] Test connection successfully.
[streaming_pipeline_test] [KafkaSource] Start consuming messages in batches.
```

### Publish messages using Python

1. Open a terminal on your local workstation.
2. Install `kafka-python`:
   ```bash theme={"system"}
   pip install kafka-python
   ```
3. Open a Python shell and write the following code to publish messages:

   ```python theme={"system"}
   from kafka import KafkaProducer
   from random import random
   import json


   topic = 'test'
   producer = KafkaProducer(
       bootstrap_servers='kafka:9093',
   )


   def publish_messages(limit):
       for i in range(limit):
           data = {
               'title': 'test_title',
               'director': 'Bennett Miller',
               'year': '2011',
               'rating': random(),
           }
           producer.send(topic, json.dumps(data).encode('utf-8'))

   publish_messages(5)
   ```

Once you run the code snippet above, go back to your streaming pipeline in Mage
and the output should look like this:

```
[streaming_pipeline_test] Start initializing kafka consumer.
[streaming_pipeline_test] Finish initializing kafka consumer.
[streaming_pipeline_test] Start consuming messages from kafka.
[streaming_pipeline_test] [Kafka] Receive message 2:16: v=b'{"title": "test_title", "director": "Bennett Miller", "year": "2011", "rating": 0.7010424523477785}', time=1665618592.226788
[streaming_pipeline_test] [Kafka] Receive message 0:16: v=b'{"title": "test_title", "director": "Bennett Miller", "year": "2011", "rating": 0.7886308380991354}', time=1665618592.2268753
[streaming_pipeline_test] [Kafka] Receive message 0:17: v=b'{"title": "test_title", "director": "Bennett Miller", "year": "2011", "rating": 0.0673276352704153}', time=1665618592.2268832
[streaming_pipeline_test] [Kafka] Receive message 3:10: v=b'{"title": "test_title", "director": "Bennett Miller", "year": "2011", "rating": 0.37935417366095525}', time=1665618592.2268872
[streaming_pipeline_test] [Kafka] Receive message 3:11: v=b'{"title": "test_title", "director": "Bennett Miller", "year": "2011", "rating": 0.21110511524126563}', time=1665618592.2268918
[streaming_pipeline_test] {'title': 'test_title', 'director': 'Bennett Miller', 'year': '2011', 'rating': 0.7010424523477785}
[streaming_pipeline_test] {'title': 'test_title', 'director': 'Bennett Miller', 'year': '2011', 'rating': 0.7886308380991354}
[streaming_pipeline_test] {'title': 'test_title', 'director': 'Bennett Miller', 'year': '2011', 'rating': 0.0673276352704153}
[streaming_pipeline_test] {'title': 'test_title', 'director': 'Bennett Miller', 'year': '2011', 'rating': 0.37935417366095525}
[streaming_pipeline_test] {'title': 'test_title', 'director': 'Bennett Miller', 'year': '2011', 'rating': 0.21110511524126563}
[streaming_pipeline_test] [Opensearch] Batch ingest data [{'title': 'test_title', 'director': 'Bennett Miller', 'year': '2011', 'rating': 0.7010424523477785}, {'title': 'test_title', 'director': 'Bennett Miller', 'year': '2011', 'rating': 0.7886308380991354}, {'title': 'test_title', 'director': 'Bennett Miller', 'year': '2011', 'rating': 0.0673276352704153}, {'title': 'test_title', 'director': 'Bennett Miller', 'year': '2011', 'rating': 0.37935417366095525}, {'title': 'test_title', 'director': 'Bennett Miller', 'year': '2011', 'rating': 0.21110511524126563}], time=1665618592.2294626
```

### Consume messages using Python

If you want to programmatically consume messages from a Kafka topic, here is a
code snippet:

```python theme={"system"}
from kafka import KafkaConsumer
import time


topic = 'test'
consumer = KafkaConsumer(
    topic,
    group_id='test',
    bootstrap_servers='kafka:9093',
)

for message in consumer:
    print(f"{message.partition}:{message.offset}: v={message.value}, time={time.time()}")
```

***

## Run in production

1. [Create a trigger](/design/data-pipeline-management#create-trigger).
2. Once trigger is created, click the <b>`Start trigger`</b> button at the top
   of the page to make the streaming pipeline active.
