Skip to main content

Basic config

connector_type: kafka
bootstrap_server: "localhost:9092"
topic: topic_name
topics: ['topic_name1', 'topic_name2']  # Optional. Used for specifying multiple topics.
consumer_group: unique_consumer_group
include_metadata: false
api_version: "0.10.2"
batch_size: 100
timeout_ms: 500
auto_offset_reset: latest # Optional
max_partition_fetch_bytes: 5242880 # Optional, default: 5MB (Pro), 1MB (OSS)
max_partition_fetch_bytes: 1048576 # Optional
offset:  # Optional
  value: null # Optional
  type: "int"
  partitions: # Optional
  - {"topic": "test", "partition": 1}

Configuration Parameters

Required Parameters

  • bootstrap_server: Kafka broker address (e.g., "localhost:9092" or "broker1:9092,broker2:9092")
  • consumer_group: Unique consumer group identifier for this pipeline
  • topic or topics: Single topic name (string) or list of topic names to consume from

Performance Parameters

  • batch_size (default: 100): Maximum number of messages to process in a single batch. The consumer will accumulate messages across multiple polls to reach this target when possible. Recommendations:
    • Small messages (< 1 KB): 500-2000
    • Medium messages (1-10 KB): 200-1000
    • Large messages (> 10 KB): 50-500
  • max_partition_fetch_bytes (default: 5242880 = 5MB for Pro, 1048576 = 1MB for OSS): Maximum amount of data to fetch per partition in a single poll. This is a per-partition limit that can constrain your batch size. Important: This value must be:
    • Greater than your largest message size
    • Aligned with broker settings (message.max.bytes, replica.fetch.max.bytes)
    Recommendations by message size:
    • Small messages (< 1 KB): 1-5 MB
    • Medium messages (1-10 KB): 5-10 MB
    • Large messages (10-100 KB): 10-50 MB
    • Very large messages (> 100 KB): 50-100 MB
    Memory consideration: Total in-flight data = max_partition_fetch_bytes × number of partitions. Ensure your consumer has sufficient heap memory.
  • timeout_ms (default: 500): Maximum time to wait for messages when polling (in milliseconds). Lower values reduce latency but may increase CPU usage.

Other Parameters

  • auto_offset_reset (default: latest): What to do when there is no initial offset or the offset is out of range. Options: earliest, latest, none
  • include_metadata: If true, includes Kafka metadata (key, partition, offset, timestamp, topic) with each message
  • api_version (default: "0.10.2"): Kafka API version. Use a version matching your broker version for best compatibility

Offset Configuration

Available in versions >= 0.9.61
The offset config allows you to reset offset when starting the streaming pipeline. If using the offset config, the partitions config is required. offset config has 4 optional values:
  1. beginning - Start from the earliest available message
  2. end - Start from the latest message (skip all existing messages)
  3. int - Start from a specific numeric offset
  4. timestamp - Start from messages at or after a specific timestamp
Examples: Start from beginning:
offset:
  type: "beginning"
  partitions:
    - {"topic": "my_topic", "partition": 0}
    - {"topic": "my_topic", "partition": 1}
Start from specific offset:
offset:
  type: "int"
  value: 1000
  partitions:
    - {"topic": "my_topic", "partition": 0}
Start from specific timestamp (milliseconds since epoch):
offset:
  type: "timestamp"
  value: 1693396630163  # Wed Aug 30 2023 11:57:10 UTC
  partitions:
    - {"topic": "my_topic", "partition": 0}
Note: Both beginning and end do not require value field.
  1. beginning
  2. end
  3. int
  4. timestamp
Both beginning and end set the consumer to consume data from the beginning and end of the queue, respectively. Also, they do not require offset_value int config set the consumer to consume data from the given offset value inside the queue. This value correspond to the numeric position inside each partition. timestamp config set the consumer to consume data from the given offset timestamp value inside the queue. This value correspond to the timestamp of the message (Unit should be milliseconds since beginning of the epoch)

SSL authentication

security_protocol: "SSL"
ssl_config:
  cafile: "CARoot.pem"
  certfile: "certificate.pem"
  keyfile: "key.pem"
  password: password
  check_hostname: true

SASL authentication

SASL_PLAINTEXT config:
security_protocol: "SASL_PLAINTEXT"
sasl_config:
  mechanism: "PLAIN"
  username: username
  password: password
SASL_SSL config:
security_protocol: "SASL_SSL"
sasl_config:
  mechanism: "SCRAM-SHA-512"
  username: username
  password: password
ssl_config:
  cafile: "pemfilename.pem"
SASL_SSL with OAUTHBEARER (OAuth 2.0):
security_protocol: "SASL_SSL"
sasl_config:
  mechanism: "OAUTHBEARER"
  oauth_token_url: "https://keycloak.example.com/auth/realms/myrealm/protocol/openid-connect/token"
  oauth_client_id: "kafka-client"
  oauth_client_secret: "your-client-secret"
  oauth_scope: "kafka"  # Optional
ssl_config:
  cafile: "pemfilename.pem"
  check_hostname: true
OAUTHBEARER Authentication:
  • The oauth_token_url, oauth_client_id, and oauth_client_secret are required when using OAUTHBEARER mechanism.
  • The oauth_scope parameter is optional.
  • OAuth tokens are automatically cached and refreshed before expiry (60 seconds before expiration by default).
  • It’s recommended to use SASL_SSL (not SASL_PLAINTEXT) with OAUTHBEARER for secure token transmission.
  • Compatible with OAuth 2.0 providers like Keycloak, Okta, and other identity servers that support client credentials flow.

Data format

By default, if include_metadata is false, the kafka data loader returns data from value field, e.g.
{"location": "klamath", "scientist": "anderson", "count": 23}
Kafka supports structuring and partitioning your data. If include_metadata is set to true, the kafka data loader returns these elements as messages with data = {key: value} and metadata with (topic, partition, offset and time), e.g.
message = {
    "data": {"location": "klamath", "scientist": "anderson", "count": 23},
    "metadata": {
      "key": "bees",
      "partition": 0,
      "offset": 6,
      "time": 1693396630163,  # timestamp with ms precision (Wed Aug 30 2023 11:57:10 GMT+0)
      "topic": "census",
    }}

Deserialize message with protobuf schema

serde_config:
  serialization_method: PROTOBUF
  schema_classpath: "path.to.schema.SchemaClass"
  • Specify the serialization_method to PROTOBUF.
  • Set the schema_classpath to the path to the Python schema class. Test whether you have access the the schema with the code in a scratchpad.
    from path.to.schema import SchemaClass
    

Pass raw message to transformer

serde_config:
  serialization_method: RAW_VALUE

Deserialize message with Avro schema in Confluent schema registry

serde_config:
  serialization_method: AVRO
  schema_registry_url: https://schema_registry_url
  schema_registry_username: username
  schema_registry_password: password

API version

In case you are using newer versions of Kafka brokers, you should consider using corresponding Kafka api_version.

Performance Tuning

Understanding Batch Size vs. Fetch Bytes

The batch_size and max_partition_fetch_bytes work together:
  • batch_size: Target number of messages to process together
  • max_partition_fetch_bytes: Maximum data (bytes) fetched per partition per poll
Example: If you set batch_size: 500 but max_partition_fetch_bytes: 1048576 (1MB) and your messages are 5KB each:
  • Each partition can fetch ~200 messages (1MB ÷ 5KB)
  • With 1 partition, you’ll get ~200 messages per batch
  • With multiple partitions, you may get closer to 500, but each partition is still limited to 1MB
Solution: Increase max_partition_fetch_bytes to allow more messages per partition:
batch_size: 500
max_partition_fetch_bytes: 5242880  # 5MB allows ~1000 messages per partition

High-Throughput Configuration

For high-volume scenarios with consumer lag:
connector_type: kafka
bootstrap_server: "broker1:9092,broker2:9092"
topic: high_volume_topic
consumer_group: high_throughput_consumer
batch_size: 1000
max_partition_fetch_bytes: 10485760  # 10MB
timeout_ms: 1000
auto_offset_reset: earliest

Low-Latency Configuration

For real-time processing with minimal delay:
connector_type: kafka
bootstrap_server: "localhost:9092"
topic: realtime_topic
consumer_group: low_latency_consumer
batch_size: 10
max_partition_fetch_bytes: 1048576  # 1MB
timeout_ms: 100
auto_offset_reset: latest

Memory Planning

Calculate memory requirements:
Estimated Memory = max_partition_fetch_bytes × number_of_partitions × 1.5
The 1.5x multiplier accounts for overhead. Ensure your consumer process has at least this much memory available.

Troubleshooting

Issue: Not Receiving Expected Batch Size

Symptom: Set batch_size: 500 but only getting ~200-250 messages per batch. Cause: max_partition_fetch_bytes is too low for your message size. Solution:
  1. Calculate: max_partition_fetch_bytes ÷ average_message_size = max_messages_per_partition
  2. Increase max_partition_fetch_bytes:
    max_partition_fetch_bytes: 5242880  # 5MB (or higher)
    
  3. Verify broker settings allow this value:
    • message.max.bytes >= max_partition_fetch_bytes
    • replica.fetch.max.bytes >= max_partition_fetch_bytes

Issue: Consumer Lag Growing

Symptom: Consumer falling behind producer, lag increasing over time. Solutions:
  1. Increase batch_size and max_partition_fetch_bytes
  2. Increase timeout_ms to allow more time for message accumulation
  3. Scale horizontally: Set higher value for executor_count to run multiple consumer instances in the same consumer group so Kafka can distribute partitions among them.
  4. Optimize downstream processing speed

Issue: Out of Memory Errors

Symptom: Consumer crashes with memory errors. Cause: max_partition_fetch_bytes × number_of_partitions exceeds available memory. Solution:
  1. Reduce max_partition_fetch_bytes
  2. Reduce number of partitions consumed (use topic filtering)
  3. Increase consumer heap memory
  4. Reduce batch_size to process smaller batches

Issue: Configuration Changes Not Taking Effect

Symptom: Changed batch_size or max_partition_fetch_bytes but behavior unchanged. Cause: Kafka consumer is initialized at pipeline start. Configuration is read during initialization. Solution: Restart the streaming pipeline after changing configuration parameters.

Issue: Messages Not Being Consumed

Symptom: No messages received despite messages in topic. Checklist:
  1. Verify consumer_group is unique and not being used by another consumer
  2. Check auto_offset_reset setting (use earliest to read from beginning)
  3. Verify topic name spelling matches exactly
  4. Check network connectivity to bootstrap_server
  5. Verify consumer has read permissions for the topic
  6. Check if using offset config - it may be seeking to a specific position

Complete Example Configurations

Example 1: Basic Setup with Small Messages

connector_type: kafka
bootstrap_server: "localhost:9092"
topic: user_events
consumer_group: user_events_processor
batch_size: 500
max_partition_fetch_bytes: 2097152  # 2MB
timeout_ms: 500
auto_offset_reset: latest
include_metadata: false

Example 2: High-Volume Processing with Large Messages

connector_type: kafka
bootstrap_server: "kafka-broker1:9092,kafka-broker2:9092"
topic: large_payload_topic
consumer_group: large_payload_consumer
batch_size: 100
max_partition_fetch_bytes: 52428800  # 50MB for large messages
timeout_ms: 2000
auto_offset_reset: earliest
include_metadata: true

Example 3: Multiple Topics with Authentication

connector_type: kafka
bootstrap_server: "secure-kafka:9093"
topics: ['topic1', 'topic2', 'topic3']
consumer_group: multi_topic_consumer
batch_size: 200
max_partition_fetch_bytes: 10485760  # 10MB
timeout_ms: 1000
security_protocol: SASL_SSL
sasl_config:
  mechanism: "SCRAM-SHA-512"
  username: kafka_user
  password: kafka_password
ssl_config:
  cafile: "/path/to/ca-cert.pem"

Example 4: Backfill from Specific Timestamp

connector_type: kafka
bootstrap_server: "localhost:9092"
topic: historical_data
consumer_group: backfill_consumer
batch_size: 1000
max_partition_fetch_bytes: 10485760  # 10MB
timeout_ms: 1000
offset:
  type: "timestamp"
  value: 1609459200000  # 2021-01-01 00:00:00 UTC
  partitions:
    - {"topic": "historical_data", "partition": 0}
    - {"topic": "historical_data", "partition": 1}
    - {"topic": "historical_data", "partition": 2}

Quick Reference

Configuration Quick Reference Table

ParameterDefaultRecommended RangeNotes
batch_size10050-2000Higher for small messages, lower for large
max_partition_fetch_bytes5MB (Pro), 1MB (OSS)1MB-100MBMust exceed max message size
timeout_ms500100-5000Lower = lower latency, higher CPU
auto_offset_resetlatestearliest/latestearliest for backfill, latest for real-time

Message Size to Configuration Mapping

Message SizeRecommended batch_sizeRecommended max_partition_fetch_bytes
< 1 KB500-20001-5 MB
1-10 KB200-10005-10 MB
10-100 KB50-50010-50 MB
> 100 KB10-10050-100 MB

Best Practices

  1. Consumer Groups: Use unique consumer group names per pipeline/environment (e.g., pipeline_prod, pipeline_staging)
  2. Batch Sizing: Start with defaults and tune based on message size and throughput needs
  3. Monitoring: Monitor consumer lag, batch sizes, and memory usage regularly
  4. Broker Alignment: Ensure max_partition_fetch_bytes aligns with broker configuration (message.max.bytes, replica.fetch.max.bytes)
  5. Error Handling: Implement proper error handling in your transformation logic to handle malformed messages
  6. Offset Management:
    • Use auto_offset_reset: earliest for initial processing or backfills
    • Use auto_offset_reset: latest for real-time streaming
    • Use explicit offset config for precise control
  7. Multiple Topics: When consuming from multiple topics, ensure all topics exist and are accessible
  8. Restart After Config Changes: Always restart the pipeline after changing batch_size or max_partition_fetch_bytes
  9. Memory Planning: Calculate memory needs: max_partition_fetch_bytes × partitions × 1.5
  10. Testing: Test configuration changes in a staging environment before production