Skip to main content

Basic config

connector_type: kafka
bootstrap_server: "localhost:9092"
topic: topic_name
topics: ['topic_name1', 'topic_name2']  # Optional. Used for specifying multiple topics.
consumer_group: unique_consumer_group
include_metadata: false
api_version: "0.10.2"
batch_size: 100
timeout_ms: 500
auto_offset_reset: latest # Optional
max_partition_fetch_bytes: 5242880 # Optional, default: 5MB (Pro), 1MB (OSS)
offset:  # Optional
  value: null # Optional
  type: "int"
  partitions: # Optional
  - {"topic": "test", "partition": 1}

Configuration Parameters

Required Parameters

  • bootstrap_server: Kafka broker address (e.g., "localhost:9092" or "broker1:9092,broker2:9092")
  • consumer_group: Unique consumer group identifier for this pipeline
  • topic or topics: Single topic name (string) or list of topic names to consume from

Performance Parameters

  • batch_size (default: 100): Maximum number of messages to process in a single batch. The consumer will accumulate messages across multiple polls to reach this target when possible. Recommendations:
    • Small messages (< 1 KB): 500-2000
    • Medium messages (1-10 KB): 200-1000
    • Large messages (> 10 KB): 50-500
  • max_partition_fetch_bytes (default: 5242880 = 5MB for Pro, 1048576 = 1MB for OSS): Maximum amount of data to fetch per partition in a single poll. This is a per-partition limit that can constrain your batch size. Important: This value must be:
    • Greater than your largest message size
    • Aligned with broker settings (message.max.bytes, replica.fetch.max.bytes)
    Recommendations by message size:
    • Small messages (< 1 KB): 1-5 MB
    • Medium messages (1-10 KB): 5-10 MB
    • Large messages (10-100 KB): 10-50 MB
    • Very large messages (> 100 KB): 50-100 MB
    Memory consideration: Total in-flight data = max_partition_fetch_bytes × number of partitions. Ensure your consumer has sufficient heap memory.
  • timeout_ms (default: 500): Maximum time to wait for messages when polling (in milliseconds). Lower values reduce latency but may increase CPU usage.

Other Parameters

  • auto_offset_reset (default: latest): What to do when there is no initial offset or the offset is out of range. Options: earliest, latest, none
  • include_metadata: If true, includes Kafka metadata (key, partition, offset, timestamp, topic) with each message
  • api_version (default: "0.10.2"): Kafka API version. Use a version matching your broker version for best compatibility

Offset Configuration

Available in versions >= 0.9.61
The offset config allows you to reset offset when starting the streaming pipeline. If using the offset config, the partitions config is required. offset config has 4 optional values:
  1. beginning - Start from the earliest available message
  2. end - Start from the latest message (skip all existing messages)
  3. int - Start from a specific numeric offset
  4. timestamp - Start from messages at or after a specific timestamp
Examples: Start from beginning:
offset:
  type: "beginning"
  partitions:
    - {"topic": "my_topic", "partition": 0}
    - {"topic": "my_topic", "partition": 1}
Start from specific offset:
offset:
  type: "int"
  value: 1000
  partitions:
    - {"topic": "my_topic", "partition": 0}
Start from specific timestamp (milliseconds since epoch):
offset:
  type: "timestamp"
  value: 1693396630163  # Wed Aug 30 2023 11:57:10 UTC
  partitions:
    - {"topic": "my_topic", "partition": 0}
Note: Both beginning and end do not require value field.

SSL authentication

security_protocol: "SSL"
ssl_config:
  cafile: "CARoot.pem"
  certfile: "certificate.pem"
  keyfile: "key.pem"
  password: password
  check_hostname: true

SASL authentication

SASL_PLAINTEXT config:
security_protocol: "SASL_PLAINTEXT"
sasl_config:
  mechanism: "PLAIN"
  username: username
  password: password
SASL_SSL config:
security_protocol: "SASL_SSL"
sasl_config:
  mechanism: "SCRAM-SHA-512"
  username: username
  password: password
ssl_config:
  cafile: "pemfilename.pem"
SASL_SSL with OAUTHBEARER (OAuth 2.0):
security_protocol: "SASL_SSL"
sasl_config:
  mechanism: "OAUTHBEARER"
  oauth_token_url: "https://keycloak.example.com/auth/realms/myrealm/protocol/openid-connect/token"
  oauth_client_id: "kafka-client"
  oauth_client_secret: "your-client-secret"
  oauth_scope: "kafka"  # Optional
ssl_config:
  cafile: "pemfilename.pem"
  check_hostname: true
OAUTHBEARER Authentication:
  • The oauth_token_url, oauth_client_id, and oauth_client_secret are required when using OAUTHBEARER mechanism.
  • The oauth_scope parameter is optional.
  • OAuth tokens are automatically cached and refreshed before expiry (60 seconds before expiration by default).
  • It’s recommended to use SASL_SSL (not SASL_PLAINTEXT) with OAUTHBEARER for secure token transmission.
  • Compatible with OAuth 2.0 providers like Keycloak, Okta, and other identity servers that support client credentials flow.

Data format

By default, if include_metadata is false, the kafka data loader returns data from value field, e.g.
{"location": "klamath", "scientist": "anderson", "count": 23}
Kafka supports structuring and partitioning your data. If include_metadata is set to true, the kafka data loader returns these elements as messages with data = {key: value} and metadata with (topic, partition, offset and time), e.g.
message = {
    "data": {"location": "klamath", "scientist": "anderson", "count": 23},
    "metadata": {
      "key": "bees",
      "partition": 0,
      "offset": 6,
      "time": 1693396630163,  # timestamp with ms precision (Wed Aug 30 2023 11:57:10 GMT+0)
      "topic": "census",
    }}

Deserialize message with protobuf schema

serde_config:
  serialization_method: PROTOBUF
  schema_classpath: "path.to.schema.SchemaClass"
  • Specify the serialization_method to PROTOBUF.
  • Set the schema_classpath to the path to the Python schema class. Test whether you have access the the schema with the code in a scratchpad.
    from path.to.schema import SchemaClass
    

Pass raw message to transformer

serde_config:
  serialization_method: RAW_VALUE

Deserialize message with Avro schema in Confluent schema registry

serde_config:
  serialization_method: AVRO
  schema_registry_url: https://schema_registry_url
  schema_registry_username: username
  schema_registry_password: password

API version

In case you are using newer versions of Kafka brokers, you should consider using corresponding Kafka api_version.

Performance Tuning

Understanding Batch Size vs. Fetch Bytes

The batch_size and max_partition_fetch_bytes work together:
  • batch_size: Target number of messages to process together
  • max_partition_fetch_bytes: Maximum data (bytes) fetched per partition per poll
Example: If you set batch_size: 500 but max_partition_fetch_bytes: 1048576 (1MB) and your messages are 5KB each:
  • Each partition can fetch ~200 messages (1MB ÷ 5KB)
  • With 1 partition, you’ll get ~200 messages per batch
  • With multiple partitions, you may get closer to 500, but each partition is still limited to 1MB
Solution: Increase max_partition_fetch_bytes to allow more messages per partition:
batch_size: 500
max_partition_fetch_bytes: 5242880  # 5MB allows ~1000 messages per partition

High-Throughput Configuration

For high-volume scenarios with consumer lag:
connector_type: kafka
bootstrap_server: "broker1:9092,broker2:9092"
topic: high_volume_topic
consumer_group: high_throughput_consumer
batch_size: 1000
max_partition_fetch_bytes: 10485760  # 10MB
timeout_ms: 1000
auto_offset_reset: earliest

Low-Latency Configuration

For real-time processing with minimal delay:
connector_type: kafka
bootstrap_server: "localhost:9092"
topic: realtime_topic
consumer_group: low_latency_consumer
batch_size: 10
max_partition_fetch_bytes: 1048576  # 1MB
timeout_ms: 100
auto_offset_reset: latest

Memory Planning

Calculate memory requirements:
Estimated Memory = max_partition_fetch_bytes × number_of_partitions × 1.5
The 1.5x multiplier accounts for overhead. Ensure your consumer process has at least this much memory available.

Troubleshooting

Issue: Not Receiving Expected Batch Size

Symptom: Set batch_size: 500 but only getting ~200-250 messages per batch. Cause: max_partition_fetch_bytes is too low for your message size. Solution:
  1. Calculate: max_partition_fetch_bytes ÷ average_message_size = max_messages_per_partition
  2. Increase max_partition_fetch_bytes:
    max_partition_fetch_bytes: 5242880  # 5MB (or higher)
    
  3. Verify broker settings allow this value:
    • message.max.bytes >= max_partition_fetch_bytes
    • replica.fetch.max.bytes >= max_partition_fetch_bytes

Issue: Consumer Lag Growing

Symptom: Consumer falling behind producer, lag increasing over time. Solutions:
  1. Increase batch_size and max_partition_fetch_bytes
  2. Increase timeout_ms to allow more time for message accumulation
  3. Scale horizontally: Set higher value for executor_count to run multiple consumer instances in the same consumer group so Kafka can distribute partitions among them.
  4. Optimize downstream processing speed

Issue: Out of Memory Errors

Symptom: Consumer crashes with memory errors. Cause: max_partition_fetch_bytes × number_of_partitions exceeds available memory. Solution:
  1. Reduce max_partition_fetch_bytes
  2. Reduce number of partitions consumed (use topic filtering)
  3. Increase consumer heap memory
  4. Reduce batch_size to process smaller batches

Issue: Configuration Changes Not Taking Effect

Symptom: Changed batch_size or max_partition_fetch_bytes but behavior unchanged. Cause: Kafka consumer is initialized at pipeline start. Configuration is read during initialization. Solution: Restart the streaming pipeline after changing configuration parameters.

Issue: Messages Not Being Consumed

Symptom: No messages received despite messages in topic. Checklist:
  1. Verify consumer_group is unique and not being used by another consumer
  2. Check auto_offset_reset setting (use earliest to read from beginning)
  3. Verify topic name spelling matches exactly
  4. Check network connectivity to bootstrap_server
  5. Verify consumer has read permissions for the topic
  6. Check if using offset config - it may be seeking to a specific position

Complete Example Configurations

Example 1: Basic Setup with Small Messages

connector_type: kafka
bootstrap_server: "localhost:9092"
topic: user_events
consumer_group: user_events_processor
batch_size: 500
max_partition_fetch_bytes: 2097152  # 2MB
timeout_ms: 500
auto_offset_reset: latest
include_metadata: false

Example 2: High-Volume Processing with Large Messages

connector_type: kafka
bootstrap_server: "kafka-broker1:9092,kafka-broker2:9092"
topic: large_payload_topic
consumer_group: large_payload_consumer
batch_size: 100
max_partition_fetch_bytes: 52428800  # 50MB for large messages
timeout_ms: 2000
auto_offset_reset: earliest
include_metadata: true

Example 3: Multiple Topics with Authentication

connector_type: kafka
bootstrap_server: "secure-kafka:9093"
topics: ['topic1', 'topic2', 'topic3']
consumer_group: multi_topic_consumer
batch_size: 200
max_partition_fetch_bytes: 10485760  # 10MB
timeout_ms: 1000
security_protocol: SASL_SSL
sasl_config:
  mechanism: "SCRAM-SHA-512"
  username: kafka_user
  password: kafka_password
ssl_config:
  cafile: "/path/to/ca-cert.pem"

Example 4: Backfill from Specific Timestamp

connector_type: kafka
bootstrap_server: "localhost:9092"
topic: historical_data
consumer_group: backfill_consumer
batch_size: 1000
max_partition_fetch_bytes: 10485760  # 10MB
timeout_ms: 1000
offset:
  type: "timestamp"
  value: 1609459200000  # 2021-01-01 00:00:00 UTC
  partitions:
    - {"topic": "historical_data", "partition": 0}
    - {"topic": "historical_data", "partition": 1}
    - {"topic": "historical_data", "partition": 2}

Quick Reference

Configuration Quick Reference Table

ParameterDefaultRecommended RangeNotes
batch_size10050-2000Higher for small messages, lower for large
max_partition_fetch_bytes5MB (Pro), 1MB (OSS)1MB-100MBMust exceed max message size
timeout_ms500100-5000Lower = lower latency, higher CPU
auto_offset_resetlatestearliest/latestearliest for backfill, latest for real-time

Message Size to Configuration Mapping

Message SizeRecommended batch_sizeRecommended max_partition_fetch_bytes
< 1 KB500-20001-5 MB
1-10 KB200-10005-10 MB
10-100 KB50-50010-50 MB
> 100 KB10-10050-100 MB

Best Practices

  1. Consumer Groups: Use unique consumer group names per pipeline/environment (e.g., pipeline_prod, pipeline_staging)
  2. Batch Sizing: Start with defaults and tune based on message size and throughput needs
  3. Monitoring: Monitor consumer lag, batch sizes, and memory usage regularly
  4. Broker Alignment: Ensure max_partition_fetch_bytes aligns with broker configuration (message.max.bytes, replica.fetch.max.bytes)
  5. Error Handling: Implement proper error handling in your transformation logic to handle malformed messages
  6. Offset Management:
    • Use auto_offset_reset: earliest for initial processing or backfills
    • Use auto_offset_reset: latest for real-time streaming
    • Use explicit offset config for precise control
  7. Multiple Topics: When consuming from multiple topics, ensure all topics exist and are accessible
  8. Restart After Config Changes: Always restart the pipeline after changing batch_size or max_partition_fetch_bytes
  9. Memory Planning: Calculate memory needs: max_partition_fetch_bytes × partitions × 1.5
  10. Testing: Test configuration changes in a staging environment before production