> ## Documentation Index
> Fetch the complete documentation index at: https://docs.mage.ai/llms.txt
> Use this file to discover all available pages before exploring further.

# Kafka

## Basic config

```yaml theme={"system"}
connector_type: kafka
bootstrap_server: "localhost:9092"
topic: topic_name
api_version: "0.10.2"
batch_size: 100
timeout_ms: 500
```

## SSL authentication

```yaml theme={"system"}
security_protocol: "SSL"
ssl_config:
  cafile: "CARoot.pem"
  certfile: "certificate.pem"
  keyfile: "key.pem"
  password: password
  check_hostname: true
```

## SASL authentication

SASL\_SSL with PLAIN or SCRAM:

```yaml theme={"system"}
security_protocol: "SASL_SSL"
sasl_config:
  mechanism: "PLAIN"  # or "SCRAM-SHA-256", "SCRAM-SHA-512"
  username: username
  password: password
ssl_config:
  cafile: "pemfilename.pem"  # Optional but recommended
```

SASL\_SSL with OAUTHBEARER (OAuth 2.0):

```yaml theme={"system"}
security_protocol: "SASL_SSL"
sasl_config:
  mechanism: "OAUTHBEARER"
  oauth_token_url: "https://keycloak.example.com/auth/realms/myrealm/protocol/openid-connect/token"
  oauth_client_id: "kafka-client"
  oauth_client_secret: "your-client-secret"
  oauth_scope: "kafka"  # Optional
ssl_config:
  cafile: "pemfilename.pem"
  check_hostname: true
```

<Note>
  **OAUTHBEARER Authentication:**

  * The `oauth_token_url`, `oauth_client_id`, and `oauth_client_secret` are **required** when using `OAUTHBEARER` mechanism.
  * The `oauth_scope` parameter is optional.
  * OAuth tokens are automatically cached and refreshed before expiry (60 seconds before expiration by default).
  * It's recommended to use `SASL_SSL` (not `SASL_PLAINTEXT`) with OAUTHBEARER for secure token transmission.
  * Compatible with OAuth 2.0 providers like Keycloak, Okta, and other identity servers that support client credentials flow.
</Note>

## Data format

The Kafka data exporter scalar values

```
message = <Any>
```

or dictionaries with a specific format as messages:

```
message = {
    'data': <Any>,
    'metadata': <Optional[Dict]>
}
```

If the data contains no dictionary it will be written to the database as value with the current time value and no key.
Otherwise, every dictionary message contains at least some data in a `data` field.
The following messages are valid:

```python theme={"system"}
message = {'data': {'bees': 23, 'ants': 30}}
```

Kafka supports [structuring and partitioning](https://kafka-python.readthedocs.io/en/master/apidoc/KafkaProducer.html#kafka.KafkaProducer.send) your data.
The Kafka data exporter can create these elements if the messages received contain a metadata dictionary:

```python theme={"system"}
message['metadata'] = {
    'dest_topic': str
    'key': str
    'time': int
}
```

The following message shows the configuration of all possible elements data and metadata:

```python theme={"system"}
message = {
    'data': {
        'bees': 23,
        'ants': 30,
    },
    'metadata': {
        'dest_topic': 'census',
        'key': 'key',
        'time': 1693396630163,  # timestamp with ms precision
    }
}
```

If not all elements are configured, default values are assumed.

* The default topic has to be configured in the yaml file.

```yaml theme={"system"}
connector_type: kafka
bootstrap_server: "localhost:9092"
topic: topic_name
```

* The default time timestamp is the current time of execution.

* In case you are using newer versions of Kafka brokers, you should consider using corresponding Kafka api\_version.
