Basic config
Configuration Parameters
Required Parameters
bootstrap_server: Kafka broker address (e.g.,"localhost:9092"or"broker1:9092,broker2:9092")consumer_group: Unique consumer group identifier for this pipelinetopicortopics: Single topic name (string) or list of topic names to consume from
Performance Parameters
-
batch_size(default:100): Maximum number of messages to process in a single batch. The consumer will accumulate messages across multiple polls to reach this target when possible. Recommendations:- Small messages (< 1 KB): 500-2000
- Medium messages (1-10 KB): 200-1000
- Large messages (> 10 KB): 50-500
-
max_partition_fetch_bytes(default:5242880= 5MB for Pro,1048576= 1MB for OSS): Maximum amount of data to fetch per partition in a single poll. This is a per-partition limit that can constrain your batch size. Important: This value must be:- Greater than your largest message size
- Aligned with broker settings (
message.max.bytes,replica.fetch.max.bytes)
- Small messages (< 1 KB): 1-5 MB
- Medium messages (1-10 KB): 5-10 MB
- Large messages (10-100 KB): 10-50 MB
- Very large messages (> 100 KB): 50-100 MB
max_partition_fetch_bytes × number of partitions. Ensure your consumer has sufficient heap memory. -
timeout_ms(default:500): Maximum time to wait for messages when polling (in milliseconds). Lower values reduce latency but may increase CPU usage.
Other Parameters
auto_offset_reset(default:latest): What to do when there is no initial offset or the offset is out of range. Options:earliest,latest,noneinclude_metadata: Iftrue, includes Kafka metadata (key, partition, offset, timestamp, topic) with each messageapi_version(default:"0.10.2"): Kafka API version. Use a version matching your broker version for best compatibility
Offset Configuration
Available in versions
>= 0.9.61offset config allows you to reset offset when starting the streaming pipeline. If using the offset config, the partitions config is required.
offset config has 4 optional values:
beginning- Start from the earliest available messageend- Start from the latest message (skip all existing messages)int- Start from a specific numeric offsettimestamp- Start from messages at or after a specific timestamp
beginning and end do not require value field.
SSL authentication
SASL authentication
SASL_PLAINTEXT config:OAUTHBEARER Authentication:
- The
oauth_token_url,oauth_client_id, andoauth_client_secretare required when usingOAUTHBEARERmechanism. - The
oauth_scopeparameter is optional. - OAuth tokens are automatically cached and refreshed before expiry (60 seconds before expiration by default).
- It’s recommended to use
SASL_SSL(notSASL_PLAINTEXT) with OAUTHBEARER for secure token transmission. - Compatible with OAuth 2.0 providers like Keycloak, Okta, and other identity servers that support client credentials flow.
Data format
By default, ifinclude_metadata is false, the kafka data loader returns data from value field, e.g.
include_metadata is set to true, the kafka data loader returns these elements as messages with data = {key: value} and metadata with (topic, partition, offset and time), e.g.
Deserialize message with protobuf schema
- Specify the
serialization_methodtoPROTOBUF. - Set the
schema_classpathto the path to the Python schema class. Test whether you have access the the schema with the code in a scratchpad.
Pass raw message to transformer
Deserialize message with Avro schema in Confluent schema registry
API version
In case you are using newer versions of Kafka brokers, you should consider using corresponding Kafka api_version.Performance Tuning
Understanding Batch Size vs. Fetch Bytes
Thebatch_size and max_partition_fetch_bytes work together:
batch_size: Target number of messages to process togethermax_partition_fetch_bytes: Maximum data (bytes) fetched per partition per poll
batch_size: 500 but max_partition_fetch_bytes: 1048576 (1MB) and your messages are 5KB each:
- Each partition can fetch ~200 messages (1MB ÷ 5KB)
- With 1 partition, you’ll get ~200 messages per batch
- With multiple partitions, you may get closer to 500, but each partition is still limited to 1MB
max_partition_fetch_bytes to allow more messages per partition:
High-Throughput Configuration
For high-volume scenarios with consumer lag:Low-Latency Configuration
For real-time processing with minimal delay:Memory Planning
Calculate memory requirements:Troubleshooting
Issue: Not Receiving Expected Batch Size
Symptom: Setbatch_size: 500 but only getting ~200-250 messages per batch.
Cause: max_partition_fetch_bytes is too low for your message size.
Solution:
- Calculate:
max_partition_fetch_bytes ÷ average_message_size = max_messages_per_partition - Increase
max_partition_fetch_bytes: - Verify broker settings allow this value:
message.max.bytes >= max_partition_fetch_bytesreplica.fetch.max.bytes >= max_partition_fetch_bytes
Issue: Consumer Lag Growing
Symptom: Consumer falling behind producer, lag increasing over time. Solutions:- Increase
batch_sizeandmax_partition_fetch_bytes - Increase
timeout_msto allow more time for message accumulation - Scale horizontally: Set higher value for executor_count to run multiple consumer instances in the same consumer group so Kafka can distribute partitions among them.
- Optimize downstream processing speed
Issue: Out of Memory Errors
Symptom: Consumer crashes with memory errors. Cause:max_partition_fetch_bytes × number_of_partitions exceeds available memory.
Solution:
- Reduce
max_partition_fetch_bytes - Reduce number of partitions consumed (use topic filtering)
- Increase consumer heap memory
- Reduce
batch_sizeto process smaller batches
Issue: Configuration Changes Not Taking Effect
Symptom: Changedbatch_size or max_partition_fetch_bytes but behavior unchanged.
Cause: Kafka consumer is initialized at pipeline start. Configuration is read during initialization.
Solution: Restart the streaming pipeline after changing configuration parameters.
Issue: Messages Not Being Consumed
Symptom: No messages received despite messages in topic. Checklist:- Verify
consumer_groupis unique and not being used by another consumer - Check
auto_offset_resetsetting (useearliestto read from beginning) - Verify topic name spelling matches exactly
- Check network connectivity to
bootstrap_server - Verify consumer has read permissions for the topic
- Check if using
offsetconfig - it may be seeking to a specific position
Complete Example Configurations
Example 1: Basic Setup with Small Messages
Example 2: High-Volume Processing with Large Messages
Example 3: Multiple Topics with Authentication
Example 4: Backfill from Specific Timestamp
Quick Reference
Configuration Quick Reference Table
| Parameter | Default | Recommended Range | Notes |
|---|---|---|---|
batch_size | 100 | 50-2000 | Higher for small messages, lower for large |
max_partition_fetch_bytes | 5MB (Pro), 1MB (OSS) | 1MB-100MB | Must exceed max message size |
timeout_ms | 500 | 100-5000 | Lower = lower latency, higher CPU |
auto_offset_reset | latest | earliest/latest | earliest for backfill, latest for real-time |
Message Size to Configuration Mapping
| Message Size | Recommended batch_size | Recommended max_partition_fetch_bytes |
|---|---|---|
| < 1 KB | 500-2000 | 1-5 MB |
| 1-10 KB | 200-1000 | 5-10 MB |
| 10-100 KB | 50-500 | 10-50 MB |
| > 100 KB | 10-100 | 50-100 MB |
Best Practices
- Consumer Groups: Use unique consumer group names per pipeline/environment (e.g.,
pipeline_prod,pipeline_staging) - Batch Sizing: Start with defaults and tune based on message size and throughput needs
- Monitoring: Monitor consumer lag, batch sizes, and memory usage regularly
- Broker Alignment: Ensure
max_partition_fetch_bytesaligns with broker configuration (message.max.bytes,replica.fetch.max.bytes) - Error Handling: Implement proper error handling in your transformation logic to handle malformed messages
- Offset Management:
- Use
auto_offset_reset: earliestfor initial processing or backfills - Use
auto_offset_reset: latestfor real-time streaming - Use explicit
offsetconfig for precise control
- Use
- Multiple Topics: When consuming from multiple topics, ensure all topics exist and are accessible
- Restart After Config Changes: Always restart the pipeline after changing
batch_sizeormax_partition_fetch_bytes - Memory Planning: Calculate memory needs:
max_partition_fetch_bytes × partitions × 1.5 - Testing: Test configuration changes in a staging environment before production