Set up Kafka
If you don’t have Kafka already setup, here is a quick guide on how to run and
use Kafka locally:
Using Kafka locally
- In your terminal, clone this repository:
git clone https://github.com/wurstmeister/kafka-docker.git
.
- Change directory into that repository:
cd kafka-docker
.
- Edit the
docker-compose.yml
file to match this:
version: "2"
services:
zookeeper:
image: wurstmeister/zookeeper
ports:
- "2181:2181"
kafka:
build: .
container_name: docker_kafka
ports:
- "9092:9092"
expose:
- "9093"
environment:
KAFKA_ADVERTISED_LISTENERS: INSIDE://kafka:9093,OUTSIDE://localhost:9092
KAFKA_LISTENER_SECURITY_PROTOCOL_MAP: INSIDE:PLAINTEXT,OUTSIDE:PLAINTEXT
KAFKA_LISTENERS: INSIDE://0.0.0.0:9093,OUTSIDE://0.0.0.0:9092
KAFKA_INTER_BROKER_LISTENER_NAME: INSIDE
KAFKA_ZOOKEEPER_CONNECT: zookeeper:2181
volumes:
- /var/run/docker.sock:/var/run/docker.sock
- Start Docker:
If you encounter the error
command not found: docker-compose
,
try running the following command instead:
- Start a terminal session in the running container:
docker exec -i -t -u root $(docker ps | grep docker_kafka | cut -d' ' -f1) /bin/bash
- Create a topic:
$KAFKA_HOME/bin/kafka-topics.sh --create --partitions 4 --bootstrap-server kafka:9092 --topic test
- List all available topics in Kafka instance:
$KAFKA_HOME/bin/kafka-topics.sh --bootstrap-server kafka:9092 --list
- Start a producer on topic named
test
:
$KAFKA_HOME/bin/kafka-console-producer.sh --broker-list kafka:9092 --topic=test
- Send messages to the topic named
test
by typing the following JSON strings in the
terminal (Note that Kafka messages in Mage are assumed to be in JSON format):
>{ "hello": 1 }
>{ "this is a test": 1 }
>{ "test": 1 }
>{ "test": 2 }
>{ "test": 3 }
- Open another terminal and start a consumer on the topic named
test
:
$KAFKA_HOME/bin/kafka-console-consumer.sh --from-beginning --bootstrap-server kafka:9092 --topic=test
- The output should look something like this:
{ "hello": 1 }
{ "this is a test": 1 }
{ "test": 1 }
{ "test": 2 }
{ "test": 3 }
Original
source
of instructions.
Build streaming pipeline
Start Mage
Using Kafka locally in a Docker container
Start Mage using Docker. Run the following command to run Docker in network
mode:
docker run -it -p 6789:6789 -v $(pwd):/home/src \
--env AWS_ACCESS_KEY_ID=your_access_key_id \
--env AWS_SECRET_ACCESS_KEY=your_secret_access_key \
--env AWS_REGION=your_region \
--network kafka-docker_default \
mageai/mageai /app/run_app.sh mage start default_repo
Change the environment variables argument depending on your cloud provider.
If the network named kafka-docker_default
doesn’t exist, create a new network:
docker network create -d bridge kafka-docker_default
Check that it exists:
If you can’t connect to Kafka locally in a Docker container using Mage in a
Docker container, do the following:
- Clone Mage:
git clone https://github.com/mage-ai/mage-ai.git
.
- Change directory into Mage:
cd mage-ai
.
- Edit the
docker-compose.yml
file to match this:
version: '3'
services:
server:
... (original config)
networks:
- kafka
app:
... (original config)
networks:
kafka:
name: kafka-docker_default
external: true
- Run the following script in your terminal:
./scripts/dev.sh
.
This will run Mage in development mode; which runs it in a Docker container
using docker compose
instead of docker run
.
Using Kafka without a Docker container
Start Mage using Docker. If you haven’t done this before, refer to the
setup guide.
Create a new pipeline
-
Open Mage in your browser.
-
Click + New pipeline
, then select Streaming
.
-
Add a data loader block, select Kafka
, and paste the following:
connector_type: kafka
bootstrap_server: "localhost:9092"
topic: test
consumer_group: unique_consumer_group
include_metadata: false
api_version: 0.10.2
batch_size: 100
- By default, the
bootstrap_server
is set to localhost:9092
. If you’re
running Mage in a docker container, the bootstrap_server
should be
kafka:9093
.
- Messages are consumed from source in micro batch mode for better efficiency.
The default batch size is 100. You can adjust the batch size in the source config.
-
Add a transformer block and paste the following:
from typing import Dict, List
if 'transformer' not in globals():
from mage_ai.data_preparation.decorators import transformer
@transformer
def transform(messages: List[Dict], *args, **kwargs):
for msg in messages:
print(msg)
return messages
-
Add a data exporter block, select OpenSearch
and paste the following:
connector_type: opensearch
host: https://search-something-abcdefg123456.us-west-1.es.amazonaws.com/
index_name: python-test-index
- Change the
host
to match your OpenSearch domain’s endpoint.
- Change the
index_name
to match the index you want to export data into.
Test pipeline
Open the streaming pipeline you just created, and in the right side panel near
the bottom, click the button Execute pipeline
to test the pipeline.
You should see an output like this:
[streaming_pipeline_test] [KafkaSource] Start initializing consumer.
[streaming_pipeline_test] [KafkaSource] Finish initializing consumer.
[streaming_pipeline_test] [KafkaSource] Test connection successfully.
[streaming_pipeline_test] [KafkaSource] Start consuming messages in batches.
Publish messages using Python
-
Open a terminal on your local workstation.
-
Install kafka-python
:
-
Open a Python shell and write the following code to publish messages:
from kafka import KafkaProducer
from random import random
import json
topic = 'test'
producer = KafkaProducer(
bootstrap_servers='kafka:9093',
)
def publish_messages(limit):
for i in range(limit):
data = {
'title': 'test_title',
'director': 'Bennett Miller',
'year': '2011',
'rating': random(),
}
producer.send(topic, json.dumps(data).encode('utf-8'))
publish_messages(5)
Once you run the code snippet above, go back to your streaming pipeline in Mage
and the output should look like this:
[streaming_pipeline_test] Start initializing kafka consumer.
[streaming_pipeline_test] Finish initializing kafka consumer.
[streaming_pipeline_test] Start consuming messages from kafka.
[streaming_pipeline_test] [Kafka] Receive message 2:16: v=b'{"title": "test_title", "director": "Bennett Miller", "year": "2011", "rating": 0.7010424523477785}', time=1665618592.226788
[streaming_pipeline_test] [Kafka] Receive message 0:16: v=b'{"title": "test_title", "director": "Bennett Miller", "year": "2011", "rating": 0.7886308380991354}', time=1665618592.2268753
[streaming_pipeline_test] [Kafka] Receive message 0:17: v=b'{"title": "test_title", "director": "Bennett Miller", "year": "2011", "rating": 0.0673276352704153}', time=1665618592.2268832
[streaming_pipeline_test] [Kafka] Receive message 3:10: v=b'{"title": "test_title", "director": "Bennett Miller", "year": "2011", "rating": 0.37935417366095525}', time=1665618592.2268872
[streaming_pipeline_test] [Kafka] Receive message 3:11: v=b'{"title": "test_title", "director": "Bennett Miller", "year": "2011", "rating": 0.21110511524126563}', time=1665618592.2268918
[streaming_pipeline_test] {'title': 'test_title', 'director': 'Bennett Miller', 'year': '2011', 'rating': 0.7010424523477785}
[streaming_pipeline_test] {'title': 'test_title', 'director': 'Bennett Miller', 'year': '2011', 'rating': 0.7886308380991354}
[streaming_pipeline_test] {'title': 'test_title', 'director': 'Bennett Miller', 'year': '2011', 'rating': 0.0673276352704153}
[streaming_pipeline_test] {'title': 'test_title', 'director': 'Bennett Miller', 'year': '2011', 'rating': 0.37935417366095525}
[streaming_pipeline_test] {'title': 'test_title', 'director': 'Bennett Miller', 'year': '2011', 'rating': 0.21110511524126563}
[streaming_pipeline_test] [Opensearch] Batch ingest data [{'title': 'test_title', 'director': 'Bennett Miller', 'year': '2011', 'rating': 0.7010424523477785}, {'title': 'test_title', 'director': 'Bennett Miller', 'year': '2011', 'rating': 0.7886308380991354}, {'title': 'test_title', 'director': 'Bennett Miller', 'year': '2011', 'rating': 0.0673276352704153}, {'title': 'test_title', 'director': 'Bennett Miller', 'year': '2011', 'rating': 0.37935417366095525}, {'title': 'test_title', 'director': 'Bennett Miller', 'year': '2011', 'rating': 0.21110511524126563}], time=1665618592.2294626
Consume messages using Python
If you want to programmatically consume messages from a Kafka topic, here is a
code snippet:
from kafka import KafkaConsumer
import time
topic = 'test'
consumer = KafkaConsumer(
topic,
group_id='test',
bootstrap_servers='kafka:9093',
)
for message in consumer:
print(f"{message.partition}:{message.offset}: v={message.value}, time={time.time()}")
Run in production
- Create a trigger.
- Once trigger is created, click the
Start trigger
button at the top
of the page to make the streaming pipeline active.