Basic config

connector_type: kafka
bootstrap_server: "localhost:9092"
topic: topic_name
api_version: "0.10.2"
batch_size: 100
timeout_ms: 500

SSL authentication

security_protocol: "SSL"
ssl_config:
  cafile: "CARoot.pem"
  certfile: "certificate.pem"
  keyfile: "key.pem"
  password: password
  check_hostname: true

SASL authentication

security_protocol: "SASL_SSL"
sasl_config:
  mechanism: "PLAIN"
  username: username
  password: password

Data format

The Kafka data exporter scalar values

message = <Any>

or dictionaries with a specific format as messages:

message = {
    'data': <Any>,
    'metadata': <Optional[Dict]>
}

If the data contains no dictionary it will be written to the database as value with the current time value and no key. Otherwise, every dictionary message contains at least some data in a data field. The following messages are valid:

message = {'data': {'bees': 23, 'ants': 30}}

Kafka supports structuring and partitioning your data. The Kafka data exporter can create these elements if the messages received contain a metadata dictionary:

message['metadata'] = {
    'dest_topic': str
    'key': str
    'time': int
}

The following message shows the configuration of all possible elements data and metadata:

message = {
    'data': {
        'bees': 23,
        'ants': 30,
    },
    'metadata': {
        'dest_topic': 'census',
        'key': 'key',
        'time': 1693396630163,  # timestamp with ms precision
    }
}

If not all elements are configured, default values are assumed.

  • The default topic has to be configured in the yaml file.
connector_type: kafka
bootstrap_server: "localhost:9092"
topic: topic_name
  • The default time timestamp is the current time of execution.

  • In case you are using newer versions of Kafka brokers, you should consider using corresponding Kafka api_version.

Was this page helpful?