> ## Documentation Index
> Fetch the complete documentation index at: https://docs.mage.ai/llms.txt
> Use this file to discover all available pages before exploring further.

# Kafka

## Basic config

```yaml theme={"system"}
connector_type: kafka
bootstrap_server: "localhost:9092"
topic: topic_name
topics: ['topic_name1', 'topic_name2']  # Optional. Used for specifying multiple topics.
consumer_group: unique_consumer_group
include_metadata: false
api_version: "0.10.2"
batch_size: 100
timeout_ms: 500
auto_offset_reset: latest # Optional
max_partition_fetch_bytes: 5242880 # Optional, default: 5MB (Pro), 1MB (OSS)
max_partition_fetch_bytes: 1048576 # Optional
offset:  # Optional
  value: null # Optional
  type: "int"
  partitions: # Optional
  - {"topic": "test", "partition": 1}
```

### Configuration Parameters

#### Required Parameters

* **`bootstrap_server`**: Kafka broker address (e.g., `"localhost:9092"` or `"broker1:9092,broker2:9092"`)
* **`consumer_group`**: Unique consumer group identifier for this pipeline
* **`topic`** or **`topics`**: Single topic name (string) or list of topic names to consume from

#### Performance Parameters

* **`batch_size`** (default: `100`): Maximum number of messages to process in a single batch. The consumer will accumulate messages across multiple polls to reach this target when possible.

  **Recommendations:**

  * Small messages (\< 1 KB): 500-2000
  * Medium messages (1-10 KB): 200-1000
  * Large messages (> 10 KB): 50-500

* **`max_partition_fetch_bytes`** (default: `5242880` = 5MB for Pro, `1048576` = 1MB for OSS): Maximum amount of data to fetch per partition in a single poll. This is a **per-partition limit** that can constrain your batch size.

  **Important:** This value must be:

  * Greater than your largest message size
  * Aligned with broker settings (`message.max.bytes`, `replica.fetch.max.bytes`)

  **Recommendations by message size:**

  * Small messages (\< 1 KB): 1-5 MB
  * Medium messages (1-10 KB): 5-10 MB
  * Large messages (10-100 KB): 10-50 MB
  * Very large messages (> 100 KB): 50-100 MB

  **Memory consideration:** Total in-flight data = `max_partition_fetch_bytes × number of partitions`. Ensure your consumer has sufficient heap memory.

* **`timeout_ms`** (default: `500`): Maximum time to wait for messages when polling (in milliseconds). Lower values reduce latency but may increase CPU usage.

#### Other Parameters

* **`auto_offset_reset`** (default: `latest`): What to do when there is no initial offset or the offset is out of range. Options: `earliest`, `latest`, `none`
* **`include_metadata`**: If `true`, includes Kafka metadata (key, partition, offset, timestamp, topic) with each message
* **`api_version`** (default: `"0.10.2"`): Kafka API version. Use a version matching your broker version for best compatibility

### Offset Configuration

<Note>
  Available in versions `>= 0.9.61`
</Note>

The `offset` config allows you to reset offset when starting the streaming pipeline. If using the `offset` config, the `partitions` config is required.

`offset` config has 4 optional values:

1. `beginning` - Start from the earliest available message
2. `end` - Start from the latest message (skip all existing messages)
3. `int` - Start from a specific numeric offset
4. `timestamp` - Start from messages at or after a specific timestamp

**Examples:**

Start from beginning:

```yaml theme={"system"}
offset:
  type: "beginning"
  partitions:
    - {"topic": "my_topic", "partition": 0}
    - {"topic": "my_topic", "partition": 1}
```

Start from specific offset:

```yaml theme={"system"}
offset:
  type: "int"
  value: 1000
  partitions:
    - {"topic": "my_topic", "partition": 0}
```

Start from specific timestamp (milliseconds since epoch):

```yaml theme={"system"}
offset:
  type: "timestamp"
  value: 1693396630163  # Wed Aug 30 2023 11:57:10 UTC
  partitions:
    - {"topic": "my_topic", "partition": 0}
```

**Note:** Both `beginning` and `end` do not require `value` field.

1. beginning
2. end
3. int
4. timestamp

Both `beginning` and `end` set the consumer to consume data from the beginning and end of the queue, respectively.
Also, they do not require `offset_value`

`int` config set the consumer to consume data from the given offset value inside the queue.
This value correspond to the numeric position inside each partition.

`timestamp` config set the consumer to consume data from the given offset timestamp value inside the queue.
This value correspond to the timestamp of the message (Unit should be milliseconds since beginning of the epoch)

## SSL authentication

```yaml theme={"system"}
security_protocol: "SSL"
ssl_config:
  cafile: "CARoot.pem"
  certfile: "certificate.pem"
  keyfile: "key.pem"
  password: password
  check_hostname: true
```

## SASL authentication

SASL\_PLAINTEXT config:

```yaml theme={"system"}
security_protocol: "SASL_PLAINTEXT"
sasl_config:
  mechanism: "PLAIN"
  username: username
  password: password
```

SASL\_SSL config:

```yaml theme={"system"}
security_protocol: "SASL_SSL"
sasl_config:
  mechanism: "SCRAM-SHA-512"
  username: username
  password: password
ssl_config:
  cafile: "pemfilename.pem"
```

SASL\_SSL with OAUTHBEARER (OAuth 2.0):

```yaml theme={"system"}
security_protocol: "SASL_SSL"
sasl_config:
  mechanism: "OAUTHBEARER"
  oauth_token_url: "https://keycloak.example.com/auth/realms/myrealm/protocol/openid-connect/token"
  oauth_client_id: "kafka-client"
  oauth_client_secret: "your-client-secret"
  oauth_scope: "kafka"  # Optional
ssl_config:
  cafile: "pemfilename.pem"
  check_hostname: true
```

<Note>
  **OAUTHBEARER Authentication:**

  * The `oauth_token_url`, `oauth_client_id`, and `oauth_client_secret` are **required** when using `OAUTHBEARER` mechanism.
  * The `oauth_scope` parameter is optional.
  * OAuth tokens are automatically cached and refreshed before expiry (60 seconds before expiration by default).
  * It's recommended to use `SASL_SSL` (not `SASL_PLAINTEXT`) with OAUTHBEARER for secure token transmission.
  * Compatible with OAuth 2.0 providers like Keycloak, Okta, and other identity servers that support client credentials flow.
</Note>

## Data format

By default, if `include_metadata` is false, the kafka data loader returns data from value field, e.g.

```python theme={"system"}
{"location": "klamath", "scientist": "anderson", "count": 23}
```

Kafka supports [structuring and partitioning](https://kafka-python.readthedocs.io/en/master/apidoc/KafkaProducer.html#kafka.KafkaProducer.send) your data.
If `include_metadata` is set to true, the kafka data loader returns these elements as messages with `data = {key: value}` and `metadata` with (topic, partition, offset and time), e.g.

```python theme={"system"}
message = {
    "data": {"location": "klamath", "scientist": "anderson", "count": 23},
    "metadata": {
      "key": "bees",
      "partition": 0,
      "offset": 6,
      "time": 1693396630163,  # timestamp with ms precision (Wed Aug 30 2023 11:57:10 GMT+0)
      "topic": "census",
    }}
```

## Deserialize message with protobuf schema

```yaml theme={"system"}
serde_config:
  serialization_method: PROTOBUF
  schema_classpath: "path.to.schema.SchemaClass"
```

* Specify the `serialization_method` to `PROTOBUF`.
* Set the `schema_classpath` to the path to the Python schema class. Test whether you have access the the schema with the code
  in a scratchpad.
  ```python theme={"system"}
  from path.to.schema import SchemaClass
  ```

## Pass raw message to transformer

```yaml theme={"system"}
serde_config:
  serialization_method: RAW_VALUE
```

## Deserialize message with Avro schema in Confluent schema registry

```yaml theme={"system"}
serde_config:
  serialization_method: AVRO
  schema_registry_url: https://schema_registry_url
  schema_registry_username: username
  schema_registry_password: password
```

## API version

In case you are using newer versions of Kafka brokers, you should consider using corresponding Kafka api\_version.

## Performance Tuning

### Understanding Batch Size vs. Fetch Bytes

The `batch_size` and `max_partition_fetch_bytes` work together:

* **`batch_size`**: Target number of messages to process together
* **`max_partition_fetch_bytes`**: Maximum data (bytes) fetched per partition per poll

**Example:** If you set `batch_size: 500` but `max_partition_fetch_bytes: 1048576` (1MB) and your messages are 5KB each:

* Each partition can fetch \~200 messages (1MB ÷ 5KB)
* With 1 partition, you'll get \~200 messages per batch
* With multiple partitions, you may get closer to 500, but each partition is still limited to 1MB

**Solution:** Increase `max_partition_fetch_bytes` to allow more messages per partition:

```yaml theme={"system"}
batch_size: 500
max_partition_fetch_bytes: 5242880  # 5MB allows ~1000 messages per partition
```

### High-Throughput Configuration

For high-volume scenarios with consumer lag:

```yaml theme={"system"}
connector_type: kafka
bootstrap_server: "broker1:9092,broker2:9092"
topic: high_volume_topic
consumer_group: high_throughput_consumer
batch_size: 1000
max_partition_fetch_bytes: 10485760  # 10MB
timeout_ms: 1000
auto_offset_reset: earliest
```

### Low-Latency Configuration

For real-time processing with minimal delay:

```yaml theme={"system"}
connector_type: kafka
bootstrap_server: "localhost:9092"
topic: realtime_topic
consumer_group: low_latency_consumer
batch_size: 10
max_partition_fetch_bytes: 1048576  # 1MB
timeout_ms: 100
auto_offset_reset: latest
```

### Memory Planning

Calculate memory requirements:

```
Estimated Memory = max_partition_fetch_bytes × number_of_partitions × 1.5
```

The 1.5x multiplier accounts for overhead. Ensure your consumer process has at least this much memory available.

## Troubleshooting

### Issue: Not Receiving Expected Batch Size

**Symptom:** Set `batch_size: 500` but only getting \~200-250 messages per batch.

**Cause:** `max_partition_fetch_bytes` is too low for your message size.

**Solution:**

1. Calculate: `max_partition_fetch_bytes ÷ average_message_size = max_messages_per_partition`
2. Increase `max_partition_fetch_bytes`:
   ```yaml theme={"system"}
   max_partition_fetch_bytes: 5242880  # 5MB (or higher)
   ```
3. Verify broker settings allow this value:
   * `message.max.bytes >= max_partition_fetch_bytes`
   * `replica.fetch.max.bytes >= max_partition_fetch_bytes`

### Issue: Consumer Lag Growing

**Symptom:** Consumer falling behind producer, lag increasing over time.

**Solutions:**

1. Increase `batch_size` and `max_partition_fetch_bytes`
2. Increase `timeout_ms` to allow more time for message accumulation
3. Scale horizontally: Set higher value for [executor\_count](/guides/streaming/introduction#executor-count) to run multiple consumer instances in the same consumer group so Kafka can distribute partitions among them.
4. Optimize downstream processing speed

### Issue: Out of Memory Errors

**Symptom:** Consumer crashes with memory errors.

**Cause:** `max_partition_fetch_bytes × number_of_partitions` exceeds available memory.

**Solution:**

1. Reduce `max_partition_fetch_bytes`
2. Reduce number of partitions consumed (use topic filtering)
3. Increase consumer heap memory
4. Reduce `batch_size` to process smaller batches

### Issue: Configuration Changes Not Taking Effect

**Symptom:** Changed `batch_size` or `max_partition_fetch_bytes` but behavior unchanged.

**Cause:** Kafka consumer is initialized at pipeline start. Configuration is read during initialization.

**Solution:** Restart the streaming pipeline after changing configuration parameters.

### Issue: Messages Not Being Consumed

**Symptom:** No messages received despite messages in topic.

**Checklist:**

1. Verify `consumer_group` is unique and not being used by another consumer
2. Check `auto_offset_reset` setting (use `earliest` to read from beginning)
3. Verify topic name spelling matches exactly
4. Check network connectivity to `bootstrap_server`
5. Verify consumer has read permissions for the topic
6. Check if using `offset` config - it may be seeking to a specific position

## Complete Example Configurations

### Example 1: Basic Setup with Small Messages

```yaml theme={"system"}
connector_type: kafka
bootstrap_server: "localhost:9092"
topic: user_events
consumer_group: user_events_processor
batch_size: 500
max_partition_fetch_bytes: 2097152  # 2MB
timeout_ms: 500
auto_offset_reset: latest
include_metadata: false
```

### Example 2: High-Volume Processing with Large Messages

```yaml theme={"system"}
connector_type: kafka
bootstrap_server: "kafka-broker1:9092,kafka-broker2:9092"
topic: large_payload_topic
consumer_group: large_payload_consumer
batch_size: 100
max_partition_fetch_bytes: 52428800  # 50MB for large messages
timeout_ms: 2000
auto_offset_reset: earliest
include_metadata: true
```

### Example 3: Multiple Topics with Authentication

```yaml theme={"system"}
connector_type: kafka
bootstrap_server: "secure-kafka:9093"
topics: ['topic1', 'topic2', 'topic3']
consumer_group: multi_topic_consumer
batch_size: 200
max_partition_fetch_bytes: 10485760  # 10MB
timeout_ms: 1000
security_protocol: SASL_SSL
sasl_config:
  mechanism: "SCRAM-SHA-512"
  username: kafka_user
  password: kafka_password
ssl_config:
  cafile: "/path/to/ca-cert.pem"
```

### Example 4: Backfill from Specific Timestamp

```yaml theme={"system"}
connector_type: kafka
bootstrap_server: "localhost:9092"
topic: historical_data
consumer_group: backfill_consumer
batch_size: 1000
max_partition_fetch_bytes: 10485760  # 10MB
timeout_ms: 1000
offset:
  type: "timestamp"
  value: 1609459200000  # 2021-01-01 00:00:00 UTC
  partitions:
    - {"topic": "historical_data", "partition": 0}
    - {"topic": "historical_data", "partition": 1}
    - {"topic": "historical_data", "partition": 2}
```

## Quick Reference

### Configuration Quick Reference Table

| Parameter                   | Default              | Recommended Range | Notes                                       |
| --------------------------- | -------------------- | ----------------- | ------------------------------------------- |
| `batch_size`                | 100                  | 50-2000           | Higher for small messages, lower for large  |
| `max_partition_fetch_bytes` | 5MB (Pro), 1MB (OSS) | 1MB-100MB         | Must exceed max message size                |
| `timeout_ms`                | 500                  | 100-5000          | Lower = lower latency, higher CPU           |
| `auto_offset_reset`         | latest               | earliest/latest   | earliest for backfill, latest for real-time |

### Message Size to Configuration Mapping

| Message Size | Recommended `batch_size` | Recommended `max_partition_fetch_bytes` |
| ------------ | ------------------------ | --------------------------------------- |
| \< 1 KB      | 500-2000                 | 1-5 MB                                  |
| 1-10 KB      | 200-1000                 | 5-10 MB                                 |
| 10-100 KB    | 50-500                   | 10-50 MB                                |
| > 100 KB     | 10-100                   | 50-100 MB                               |

## Best Practices

1. **Consumer Groups:** Use unique consumer group names per pipeline/environment (e.g., `pipeline_prod`, `pipeline_staging`)
2. **Batch Sizing:** Start with defaults and tune based on message size and throughput needs
3. **Monitoring:** Monitor consumer lag, batch sizes, and memory usage regularly
4. **Broker Alignment:** Ensure `max_partition_fetch_bytes` aligns with broker configuration (`message.max.bytes`, `replica.fetch.max.bytes`)
5. **Error Handling:** Implement proper error handling in your transformation logic to handle malformed messages
6. **Offset Management:**
   * Use `auto_offset_reset: earliest` for initial processing or backfills
   * Use `auto_offset_reset: latest` for real-time streaming
   * Use explicit `offset` config for precise control
7. **Multiple Topics:** When consuming from multiple topics, ensure all topics exist and are accessible
8. **Restart After Config Changes:** Always restart the pipeline after changing `batch_size` or `max_partition_fetch_bytes`
9. **Memory Planning:** Calculate memory needs: `max_partition_fetch_bytes × partitions × 1.5`
10. **Testing:** Test configuration changes in a staging environment before production
