Basic config

connector_type: kafka
bootstrap_server: "localhost:9092"
topic: topic_name
topics: ['topic_name1', 'topic_name2']  # Optional. Used for specifying multiple topics.
consumer_group: unique_consumer_group
include_metadata: false
api_version: "0.10.2"
batch_size: 100
timeout_ms: 500

SSL authentication

security_protocol: "SSL"
ssl_config:
  cafile: "CARoot.pem"
  certfile: "certificate.pem"
  keyfile: "key.pem"
  password: password
  check_hostname: true

SASL authentication

security_protocol: "SASL_SSL"
sasl_config:
  mechanism: "PLAIN"
  username: username
  password: password

Data format

By default, if include_metadata is false, the kafka data loader returns data from value field, e.g.

{"location": "klamath", "scientist": "anderson", "count": 23}

Kafka supports structuring and partitioning your data. If include_metadata is set to true, the kafka data loader returns these elements as messages with data = {key: value} and metadata with (topic, partition, offset and time), e.g.

message = {
    "data": {"location": "klamath", "scientist": "anderson", "count": 23},
    "metadata": {
      "key": "bees",
      "partition": 0,
      "offset": 6,
      "time": 1693396630163,  # timestamp with ms precision (Wed Aug 30 2023 11:57:10 GMT+0)
      "topic": "census",
    }}

Deserialize message with protobuf schema

serde_config:
  serialization_method: PROTOBUF
  schema_classpath: "path.to.schema.SchemaClass"
  • Specify the serialization_method to PROTOBUF.
  • Set the schema_classpath to the path to the Python schema class. Test whether you have access the the schema with the code in a scratchpad.
    from path.to.schema import SchemaClass
    

Pass raw message to transformer

serde_config:
  serialization_method: RAW_VALUE

Deserialize message with Avro schema in Confluent schema registry

serde_config:
  serialization_method: AVRO
  schema_registry_url: https://schema_registry_url
  schema_registry_username: username
  schema_registry_password: password