Kafka
Basic config
connector_type: kafka
bootstrap_server: "localhost:9092"
topic: topic_name
topics: ['topic_name1', 'topic_name2'] # Optional. Used for specifying multiple topics.
consumer_group: unique_consumer_group
include_metadata: false
api_version: "0.10.2"
batch_size: 100
timeout_ms: 500
auto_offset_reset: latest # Optional
max_partition_fetch_bytes: 1048576 # Optional
offset: # Optional
value: null # Optional
type: "int"
partitions: # Optional
- {"topic": "test", "partition": 1}
Offset Configuration
Available in versions >= 0.9.61
The offset
config allows you to reset offset when starting the streaming pipeline. If using the offset
config, the partitions
config is required.
offset
config has 4 optional values:
- beginning
- end
- int
- timestamp
Both beginning
and end
set the consumer to consume data from the beginning and end of the queue, respectively.
Also, they do not require offset_value
int
config set the consumer to consume data from the given offset value inside the queue.
This value correspond to the numeric position inside each partition.
timestamp
config set the consumer to consume data from the given offset timestamp value inside the queue.
This value correspond to the timestamp of the message (Unit should be milliseconds since beginning of the epoch)
SSL authentication
security_protocol: "SSL"
ssl_config:
cafile: "CARoot.pem"
certfile: "certificate.pem"
keyfile: "key.pem"
password: password
check_hostname: true
SASL authentication
SASL_PLAINTEXT config:
security_protocol: "SASL_PLAINTEXT"
sasl_config:
mechanism: "PLAIN"
username: username
password: password
SASL_SSL config:
security_protocol: "SASL_SSL"
sasl_config:
mechanism: "SCRAM-SHA-512"
username: username
password: password
ssl_config:
cafile: "pemfilename.pem"
Data format
By default, if include_metadata
is false, the kafka data loader returns data from value field, e.g.
{"location": "klamath", "scientist": "anderson", "count": 23}
Kafka supports structuring and partitioning your data.
If include_metadata
is set to true, the kafka data loader returns these elements as messages with data = {key: value}
and metadata
with (topic, partition, offset and time), e.g.
message = {
"data": {"location": "klamath", "scientist": "anderson", "count": 23},
"metadata": {
"key": "bees",
"partition": 0,
"offset": 6,
"time": 1693396630163, # timestamp with ms precision (Wed Aug 30 2023 11:57:10 GMT+0)
"topic": "census",
}}
Deserialize message with protobuf schema
serde_config:
serialization_method: PROTOBUF
schema_classpath: "path.to.schema.SchemaClass"
- Specify the
serialization_method
toPROTOBUF
. - Set the
schema_classpath
to the path to the Python schema class. Test whether you have access the the schema with the code in a scratchpad.from path.to.schema import SchemaClass
Pass raw message to transformer
serde_config:
serialization_method: RAW_VALUE
Deserialize message with Avro schema in Confluent schema registry
serde_config:
serialization_method: AVRO
schema_registry_url: https://schema_registry_url
schema_registry_username: username
schema_registry_password: password
API version
In case you are using newer versions of Kafka brokers, you should consider using corresponding Kafka api_version.