Basic config

connector_type: kafka
bootstrap_server: "localhost:9092"
topic: topic_name
topics: ['topic_name1', 'topic_name2']  # Optional. Used for specifying multiple topics.
consumer_group: unique_consumer_group
include_metadata: false
api_version: "0.10.2"
batch_size: 100
timeout_ms: 500
auto_offset_reset: latest # Optional
max_partition_fetch_bytes: 1048576 # Optional
offset:  # Optional
  value: null # Optional
  type: "int"
  partitions: # Optional
  - {"topic": "test", "partition": 1}

Offset Configuration

Available in versions >= 0.9.61

The offset config allows you to reset offset when starting the streaming pipeline. If using the offset config, the partitions config is required.

offset config has 4 optional values:

  1. beginning
  2. end
  3. int
  4. timestamp

Both beginning and end set the consumer to consume data from the beginning and end of the queue, respectively. Also, they do not require offset_value

int config set the consumer to consume data from the given offset value inside the queue. This value correspond to the numeric position inside each partition.

timestamp config set the consumer to consume data from the given offset timestamp value inside the queue. This value correspond to the timestamp of the message (Unit should be milliseconds since beginning of the epoch)

SSL authentication

security_protocol: "SSL"
ssl_config:
  cafile: "CARoot.pem"
  certfile: "certificate.pem"
  keyfile: "key.pem"
  password: password
  check_hostname: true

SASL authentication

SASL_PLAINTEXT config:

security_protocol: "SASL_PLAINTEXT"
sasl_config:
  mechanism: "PLAIN"
  username: username
  password: password

SASL_SSL config:

security_protocol: "SASL_SSL"
sasl_config:
  mechanism: "SCRAM-SHA-512"
  username: username
  password: password
ssl_config:
  cafile: "pemfilename.pem"

Data format

By default, if include_metadata is false, the kafka data loader returns data from value field, e.g.

{"location": "klamath", "scientist": "anderson", "count": 23}

Kafka supports structuring and partitioning your data. If include_metadata is set to true, the kafka data loader returns these elements as messages with data = {key: value} and metadata with (topic, partition, offset and time), e.g.

message = {
    "data": {"location": "klamath", "scientist": "anderson", "count": 23},
    "metadata": {
      "key": "bees",
      "partition": 0,
      "offset": 6,
      "time": 1693396630163,  # timestamp with ms precision (Wed Aug 30 2023 11:57:10 GMT+0)
      "topic": "census",
    }}

Deserialize message with protobuf schema

serde_config:
  serialization_method: PROTOBUF
  schema_classpath: "path.to.schema.SchemaClass"
  • Specify the serialization_method to PROTOBUF.
  • Set the schema_classpath to the path to the Python schema class. Test whether you have access the the schema with the code in a scratchpad.
    from path.to.schema import SchemaClass
    

Pass raw message to transformer

serde_config:
  serialization_method: RAW_VALUE

Deserialize message with Avro schema in Confluent schema registry

serde_config:
  serialization_method: AVRO
  schema_registry_url: https://schema_registry_url
  schema_registry_username: username
  schema_registry_password: password

API version

In case you are using newer versions of Kafka brokers, you should consider using corresponding Kafka api_version.

Was this page helpful?