Skip to main content

Basic config

connector_type: kafka
bootstrap_server: "localhost:9092"
topic: topic_name
api_version: "0.10.2"
batch_size: 100
timeout_ms: 500

SSL authentication

security_protocol: "SSL"
ssl_config:
  cafile: "CARoot.pem"
  certfile: "certificate.pem"
  keyfile: "key.pem"
  password: password
  check_hostname: true

SASL authentication

SASL_SSL with PLAIN or SCRAM:
security_protocol: "SASL_SSL"
sasl_config:
  mechanism: "PLAIN"  # or "SCRAM-SHA-256", "SCRAM-SHA-512"
  username: username
  password: password
ssl_config:
  cafile: "pemfilename.pem"  # Optional but recommended
SASL_SSL with OAUTHBEARER (OAuth 2.0):
security_protocol: "SASL_SSL"
sasl_config:
  mechanism: "OAUTHBEARER"
  oauth_token_url: "https://keycloak.example.com/auth/realms/myrealm/protocol/openid-connect/token"
  oauth_client_id: "kafka-client"
  oauth_client_secret: "your-client-secret"
  oauth_scope: "kafka"  # Optional
ssl_config:
  cafile: "pemfilename.pem"
  check_hostname: true
OAUTHBEARER Authentication:
  • The oauth_token_url, oauth_client_id, and oauth_client_secret are required when using OAUTHBEARER mechanism.
  • The oauth_scope parameter is optional.
  • OAuth tokens are automatically cached and refreshed before expiry (60 seconds before expiration by default).
  • It’s recommended to use SASL_SSL (not SASL_PLAINTEXT) with OAUTHBEARER for secure token transmission.
  • Compatible with OAuth 2.0 providers like Keycloak, Okta, and other identity servers that support client credentials flow.

Data format

The Kafka data exporter scalar values
message = <Any>
or dictionaries with a specific format as messages:
message = {
    'data': <Any>,
    'metadata': <Optional[Dict]>
}
If the data contains no dictionary it will be written to the database as value with the current time value and no key. Otherwise, every dictionary message contains at least some data in a data field. The following messages are valid:
message = {'data': {'bees': 23, 'ants': 30}}
Kafka supports structuring and partitioning your data. The Kafka data exporter can create these elements if the messages received contain a metadata dictionary:
message['metadata'] = {
    'dest_topic': str
    'key': str
    'time': int
}
The following message shows the configuration of all possible elements data and metadata:
message = {
    'data': {
        'bees': 23,
        'ants': 30,
    },
    'metadata': {
        'dest_topic': 'census',
        'key': 'key',
        'time': 1693396630163,  # timestamp with ms precision
    }
}
If not all elements are configured, default values are assumed.
  • The default topic has to be configured in the yaml file.
connector_type: kafka
bootstrap_server: "localhost:9092"
topic: topic_name
  • The default time timestamp is the current time of execution.
  • In case you are using newer versions of Kafka brokers, you should consider using corresponding Kafka api_version.