Sources
Kafka
Basic config
connector_type: kafka
bootstrap_server: "localhost:9092"
topic: topic_name
topics: ['topic_name1', 'topic_name2'] # Optional. Used for specifying multiple topics.
consumer_group: unique_consumer_group
include_metadata: false
api_version: "0.10.2"
batch_size: 100
timeout_ms: 500
SSL authentication
security_protocol: "SSL"
ssl_config:
cafile: "CARoot.pem"
certfile: "certificate.pem"
keyfile: "key.pem"
password: password
check_hostname: true
SASL authentication
security_protocol: "SASL_SSL"
sasl_config:
mechanism: "PLAIN"
username: username
password: password
Data format
By default, if include_metadata
is false, the kafka data loader returns data from value field, e.g.
{"location": "klamath", "scientist": "anderson", "count": 23}
Kafka supports structuring and partitioning your data.
If include_metadata
is set to true, the kafka data loader returns these elements as messages with data = {key: value}
and metadata
with (topic, partition, offset and time), e.g.
message = {
"data": {"location": "klamath", "scientist": "anderson", "count": 23},
"metadata": {
"key": "bees",
"partition": 0,
"offset": 6,
"time": 1693396630163, # timestamp with ms precision (Wed Aug 30 2023 11:57:10 GMT+0)
"topic": "census",
}}
Deserialize message with protobuf schema
serde_config:
serialization_method: PROTOBUF
schema_classpath: "path.to.schema.SchemaClass"
- Specify the
serialization_method
toPROTOBUF
. - Set the
schema_classpath
to the path to the Python schema class. Test whether you have access the the schema with the code in a scratchpad.from path.to.schema import SchemaClass
Pass raw message to transformer
serde_config:
serialization_method: RAW_VALUE
Deserialize message with Avro schema in Confluent schema registry
serde_config:
serialization_method: AVRO
schema_registry_url: https://schema_registry_url
schema_registry_username: username
schema_registry_password: password