> ## Documentation Index
> Fetch the complete documentation index at: https://docs.mage.ai/llms.txt
> Use this file to discover all available pages before exploring further.

# Kafka

## Basic config

```yaml theme={"system"}
connector_type: kafka
bootstrap_server: "localhost:9092"
topic: topic_name
topics: ['topic_name1', 'topic_name2']  # Optional. Used for specifying multiple topics.
consumer_group: unique_consumer_group
include_metadata: false
api_version: "0.10.2"
batch_size: 100
timeout_ms: 500
auto_offset_reset: latest # Optional
max_partition_fetch_bytes: 5242880 # Optional, default: 5MB (Pro), 1MB (OSS)
offset:  # Optional
  value: null # Optional
  type: "int"
  partitions: # Optional
  - {"topic": "test", "partition": 1}
```

### Configuration Parameters

#### Required Parameters

* **`bootstrap_server`**: Kafka broker address (e.g., `"localhost:9092"` or `"broker1:9092,broker2:9092"`)
* **`consumer_group`**: Unique consumer group identifier for this pipeline
* **`topic`** or **`topics`**: Single topic name (string) or list of topic names to consume from

#### Performance Parameters

* **`batch_size`** (default: `100`): Maximum number of messages to process in a single batch. The consumer will accumulate messages across multiple polls to reach this target when possible.

  **Recommendations:**

  * Small messages (\< 1 KB): 500-2000
  * Medium messages (1-10 KB): 200-1000
  * Large messages (> 10 KB): 50-500

* **`max_partition_fetch_bytes`** (default: `5242880` = 5MB for Pro, `1048576` = 1MB for OSS): Maximum amount of data to fetch per partition in a single poll. This is a **per-partition limit** that can constrain your batch size.

  **Important:** This value must be:

  * Greater than your largest message size
  * Aligned with broker settings (`message.max.bytes`, `replica.fetch.max.bytes`)

  **Recommendations by message size:**

  * Small messages (\< 1 KB): 1-5 MB
  * Medium messages (1-10 KB): 5-10 MB
  * Large messages (10-100 KB): 10-50 MB
  * Very large messages (> 100 KB): 50-100 MB

  **Memory consideration:** Total in-flight data = `max_partition_fetch_bytes × number of partitions`. Ensure your consumer has sufficient heap memory.

* **`timeout_ms`** (default: `500`): Maximum time to wait for messages when polling (in milliseconds). Lower values reduce latency but may increase CPU usage.

#### Other Parameters

* **`auto_offset_reset`** (default: `latest`): What to do when there is no initial offset or the offset is out of range. Options: `earliest`, `latest`, `none`
* **`include_metadata`**: If `true`, includes Kafka metadata (key, partition, offset, timestamp, topic) with each message
* **`api_version`** (default: `"0.10.2"`): Kafka API version. Use a version matching your broker version for best compatibility

### Offset Configuration

<Note>
  Available in versions `>= 0.9.61`
</Note>

The `offset` config allows you to reset offset when starting the streaming pipeline. If using the `offset` config, the `partitions` config is required.

`offset` config has 4 optional values:

1. `beginning` - Start from the earliest available message
2. `end` - Start from the latest message (skip all existing messages)
3. `int` - Start from a specific numeric offset
4. `timestamp` - Start from messages at or after a specific timestamp

**Examples:**

Start from beginning:

```yaml theme={"system"}
offset:
  type: "beginning"
  partitions:
    - {"topic": "my_topic", "partition": 0}
    - {"topic": "my_topic", "partition": 1}
```

Start from specific offset:

```yaml theme={"system"}
offset:
  type: "int"
  value: 1000
  partitions:
    - {"topic": "my_topic", "partition": 0}
```

Start from specific timestamp (milliseconds since epoch):

```yaml theme={"system"}
offset:
  type: "timestamp"
  value: 1693396630163  # Wed Aug 30 2023 11:57:10 UTC
  partitions:
    - {"topic": "my_topic", "partition": 0}
```

**Note:** Both `beginning` and `end` do not require `value` field.

## SSL authentication

```yaml theme={"system"}
security_protocol: "SSL"
ssl_config:
  cafile: "CARoot.pem"
  certfile: "certificate.pem"
  keyfile: "key.pem"
  password: password
  check_hostname: true
```

## SASL authentication

SASL\_PLAINTEXT config:

```yaml theme={"system"}
security_protocol: "SASL_PLAINTEXT"
sasl_config:
  mechanism: "PLAIN"
  username: username
  password: password
```

SASL\_SSL config:

```yaml theme={"system"}
security_protocol: "SASL_SSL"
sasl_config:
  mechanism: "SCRAM-SHA-512"
  username: username
  password: password
ssl_config:
  cafile: "pemfilename.pem"
```

SASL\_SSL with OAUTHBEARER (OAuth 2.0):

```yaml theme={"system"}
security_protocol: "SASL_SSL"
sasl_config:
  mechanism: "OAUTHBEARER"
  oauth_token_url: "https://keycloak.example.com/auth/realms/myrealm/protocol/openid-connect/token"
  oauth_client_id: "kafka-client"
  oauth_client_secret: "your-client-secret"
  oauth_scope: "kafka"  # Optional
ssl_config:
  cafile: "pemfilename.pem"
  check_hostname: true
```

<Note>
  **OAUTHBEARER Authentication:**

  * The `oauth_token_url`, `oauth_client_id`, and `oauth_client_secret` are **required** when using `OAUTHBEARER` mechanism.
  * The `oauth_scope` parameter is optional.
  * OAuth tokens are automatically cached and refreshed before expiry (60 seconds before expiration by default).
  * It's recommended to use `SASL_SSL` (not `SASL_PLAINTEXT`) with OAUTHBEARER for secure token transmission.
  * Compatible with OAuth 2.0 providers like Keycloak, Okta, and other identity servers that support client credentials flow.
</Note>

## Data format

By default, if `include_metadata` is false, the kafka data loader returns data from value field, e.g.

```python theme={"system"}
{"location": "klamath", "scientist": "anderson", "count": 23}
```

Kafka supports [structuring and partitioning](https://kafka-python.readthedocs.io/en/master/apidoc/KafkaProducer.html#kafka.KafkaProducer.send) your data.
If `include_metadata` is set to true, the kafka data loader returns these elements as messages with `data = {key: value}` and `metadata` with (topic, partition, offset and time), e.g.

```python theme={"system"}
message = {
    "data": {"location": "klamath", "scientist": "anderson", "count": 23},
    "metadata": {
      "key": "bees",
      "partition": 0,
      "offset": 6,
      "time": 1693396630163,  # timestamp with ms precision (Wed Aug 30 2023 11:57:10 GMT+0)
      "topic": "census",
    }}
```

## Deserialize message with protobuf schema

```yaml theme={"system"}
serde_config:
  serialization_method: PROTOBUF
  schema_classpath: "path.to.schema.SchemaClass"
```

* Specify the `serialization_method` to `PROTOBUF`.
* Set the `schema_classpath` to the path to the Python schema class. Test whether you have access the the schema with the code
  in a scratchpad.
  ```python theme={"system"}
  from path.to.schema import SchemaClass
  ```

## Pass raw message to transformer

```yaml theme={"system"}
serde_config:
  serialization_method: RAW_VALUE
```

## Deserialize message with Avro schema in Confluent schema registry

```yaml theme={"system"}
serde_config:
  serialization_method: AVRO
  schema_registry_url: https://schema_registry_url
  schema_registry_username: username
  schema_registry_password: password
```

## API version

In case you are using newer versions of Kafka brokers, you should consider using corresponding Kafka api\_version.

## Performance Tuning

### Understanding Batch Size vs. Fetch Bytes

The `batch_size` and `max_partition_fetch_bytes` work together:

* **`batch_size`**: Target number of messages to process together
* **`max_partition_fetch_bytes`**: Maximum data (bytes) fetched per partition per poll

**Example:** If you set `batch_size: 500` but `max_partition_fetch_bytes: 1048576` (1MB) and your messages are 5KB each:

* Each partition can fetch \~200 messages (1MB ÷ 5KB)
* With 1 partition, you'll get \~200 messages per batch
* With multiple partitions, you may get closer to 500, but each partition is still limited to 1MB

**Solution:** Increase `max_partition_fetch_bytes` to allow more messages per partition:

```yaml theme={"system"}
batch_size: 500
max_partition_fetch_bytes: 5242880  # 5MB allows ~1000 messages per partition
```

### High-Throughput Configuration

For high-volume scenarios with consumer lag:

```yaml theme={"system"}
connector_type: kafka
bootstrap_server: "broker1:9092,broker2:9092"
topic: high_volume_topic
consumer_group: high_throughput_consumer
batch_size: 1000
max_partition_fetch_bytes: 10485760  # 10MB
timeout_ms: 1000
auto_offset_reset: earliest
```

### Low-Latency Configuration

For real-time processing with minimal delay:

```yaml theme={"system"}
connector_type: kafka
bootstrap_server: "localhost:9092"
topic: realtime_topic
consumer_group: low_latency_consumer
batch_size: 10
max_partition_fetch_bytes: 1048576  # 1MB
timeout_ms: 100
auto_offset_reset: latest
```

### Memory Planning

Calculate memory requirements:

```
Estimated Memory = max_partition_fetch_bytes × number_of_partitions × 1.5
```

The 1.5x multiplier accounts for overhead. Ensure your consumer process has at least this much memory available.

## Troubleshooting

### Issue: Not Receiving Expected Batch Size

**Symptom:** Set `batch_size: 500` but only getting \~200-250 messages per batch.

**Cause:** `max_partition_fetch_bytes` is too low for your message size.

**Solution:**

1. Calculate: `max_partition_fetch_bytes ÷ average_message_size = max_messages_per_partition`
2. Increase `max_partition_fetch_bytes`:
   ```yaml theme={"system"}
   max_partition_fetch_bytes: 5242880  # 5MB (or higher)
   ```
3. Verify broker settings allow this value:
   * `message.max.bytes >= max_partition_fetch_bytes`
   * `replica.fetch.max.bytes >= max_partition_fetch_bytes`

### Issue: Consumer Lag Growing

**Symptom:** Consumer falling behind producer, lag increasing over time.

**Solutions:**

1. Increase `batch_size` and `max_partition_fetch_bytes`
2. Increase `timeout_ms` to allow more time for message accumulation
3. Scale horizontally: Set higher value for [executor\_count](/guides/streaming/introduction#executor-count) to run multiple consumer instances in the same consumer group so Kafka can distribute partitions among them.
4. Optimize downstream processing speed

### Issue: Out of Memory Errors

**Symptom:** Consumer crashes with memory errors.

**Cause:** `max_partition_fetch_bytes × number_of_partitions` exceeds available memory.

**Solution:**

1. Reduce `max_partition_fetch_bytes`
2. Reduce number of partitions consumed (use topic filtering)
3. Increase consumer heap memory
4. Reduce `batch_size` to process smaller batches

### Issue: Configuration Changes Not Taking Effect

**Symptom:** Changed `batch_size` or `max_partition_fetch_bytes` but behavior unchanged.

**Cause:** Kafka consumer is initialized at pipeline start. Configuration is read during initialization.

**Solution:** Restart the streaming pipeline after changing configuration parameters.

### Issue: Messages Not Being Consumed

**Symptom:** No messages received despite messages in topic.

**Checklist:**

1. Verify `consumer_group` is unique and not being used by another consumer
2. Check `auto_offset_reset` setting (use `earliest` to read from beginning)
3. Verify topic name spelling matches exactly
4. Check network connectivity to `bootstrap_server`
5. Verify consumer has read permissions for the topic
6. Check if using `offset` config - it may be seeking to a specific position

## Complete Example Configurations

### Example 1: Basic Setup with Small Messages

```yaml theme={"system"}
connector_type: kafka
bootstrap_server: "localhost:9092"
topic: user_events
consumer_group: user_events_processor
batch_size: 500
max_partition_fetch_bytes: 2097152  # 2MB
timeout_ms: 500
auto_offset_reset: latest
include_metadata: false
```

### Example 2: High-Volume Processing with Large Messages

```yaml theme={"system"}
connector_type: kafka
bootstrap_server: "kafka-broker1:9092,kafka-broker2:9092"
topic: large_payload_topic
consumer_group: large_payload_consumer
batch_size: 100
max_partition_fetch_bytes: 52428800  # 50MB for large messages
timeout_ms: 2000
auto_offset_reset: earliest
include_metadata: true
```

### Example 3: Multiple Topics with Authentication

```yaml theme={"system"}
connector_type: kafka
bootstrap_server: "secure-kafka:9093"
topics: ['topic1', 'topic2', 'topic3']
consumer_group: multi_topic_consumer
batch_size: 200
max_partition_fetch_bytes: 10485760  # 10MB
timeout_ms: 1000
security_protocol: SASL_SSL
sasl_config:
  mechanism: "SCRAM-SHA-512"
  username: kafka_user
  password: kafka_password
ssl_config:
  cafile: "/path/to/ca-cert.pem"
```

### Example 4: Backfill from Specific Timestamp

```yaml theme={"system"}
connector_type: kafka
bootstrap_server: "localhost:9092"
topic: historical_data
consumer_group: backfill_consumer
batch_size: 1000
max_partition_fetch_bytes: 10485760  # 10MB
timeout_ms: 1000
offset:
  type: "timestamp"
  value: 1609459200000  # 2021-01-01 00:00:00 UTC
  partitions:
    - {"topic": "historical_data", "partition": 0}
    - {"topic": "historical_data", "partition": 1}
    - {"topic": "historical_data", "partition": 2}
```

## Quick Reference

### Configuration Quick Reference Table

| Parameter                   | Default              | Recommended Range | Notes                                       |
| --------------------------- | -------------------- | ----------------- | ------------------------------------------- |
| `batch_size`                | 100                  | 50-2000           | Higher for small messages, lower for large  |
| `max_partition_fetch_bytes` | 5MB (Pro), 1MB (OSS) | 1MB-100MB         | Must exceed max message size                |
| `timeout_ms`                | 500                  | 100-5000          | Lower = lower latency, higher CPU           |
| `auto_offset_reset`         | latest               | earliest/latest   | earliest for backfill, latest for real-time |

### Message Size to Configuration Mapping

| Message Size | Recommended `batch_size` | Recommended `max_partition_fetch_bytes` |
| ------------ | ------------------------ | --------------------------------------- |
| \< 1 KB      | 500-2000                 | 1-5 MB                                  |
| 1-10 KB      | 200-1000                 | 5-10 MB                                 |
| 10-100 KB    | 50-500                   | 10-50 MB                                |
| > 100 KB     | 10-100                   | 50-100 MB                               |

## Best Practices

1. **Consumer Groups:** Use unique consumer group names per pipeline/environment (e.g., `pipeline_prod`, `pipeline_staging`)
2. **Batch Sizing:** Start with defaults and tune based on message size and throughput needs
3. **Monitoring:** Monitor consumer lag, batch sizes, and memory usage regularly
4. **Broker Alignment:** Ensure `max_partition_fetch_bytes` aligns with broker configuration (`message.max.bytes`, `replica.fetch.max.bytes`)
5. **Error Handling:** Implement proper error handling in your transformation logic to handle malformed messages
6. **Offset Management:**
   * Use `auto_offset_reset: earliest` for initial processing or backfills
   * Use `auto_offset_reset: latest` for real-time streaming
   * Use explicit `offset` config for precise control
7. **Multiple Topics:** When consuming from multiple topics, ensure all topics exist and are accessible
8. **Restart After Config Changes:** Always restart the pipeline after changing `batch_size` or `max_partition_fetch_bytes`
9. **Memory Planning:** Calculate memory needs: `max_partition_fetch_bytes × partitions × 1.5`
10. **Testing:** Test configuration changes in a staging environment before production
