Basic config
Offset Configuration
Available in versions
>= 0.9.61offset config allows you to reset offset when starting the streaming pipeline. If using the offset config, the partitions config is required.
offset config has 4 optional values:
- beginning
- end
- int
- timestamp
beginning and end set the consumer to consume data from the beginning and end of the queue, respectively.
Also, they do not require offset_value
int config set the consumer to consume data from the given offset value inside the queue.
This value correspond to the numeric position inside each partition.
timestamp config set the consumer to consume data from the given offset timestamp value inside the queue.
This value correspond to the timestamp of the message (Unit should be milliseconds since beginning of the epoch)
SSL authentication
SASL authentication
SASL_PLAINTEXT config:OAUTHBEARER Authentication:
- The
oauth_token_url,oauth_client_id, andoauth_client_secretare required when usingOAUTHBEARERmechanism. - The
oauth_scopeparameter is optional. - OAuth tokens are automatically cached and refreshed before expiry (60 seconds before expiration by default).
- It’s recommended to use
SASL_SSL(notSASL_PLAINTEXT) with OAUTHBEARER for secure token transmission. - Compatible with OAuth 2.0 providers like Keycloak, Okta, and other identity servers that support client credentials flow.
Data format
By default, ifinclude_metadata is false, the kafka data loader returns data from value field, e.g.
include_metadata is set to true, the kafka data loader returns these elements as messages with data = {key: value} and metadata with (topic, partition, offset and time), e.g.
Deserialize message with protobuf schema
- Specify the
serialization_methodtoPROTOBUF. - Set the
schema_classpathto the path to the Python schema class. Test whether you have access the the schema with the code in a scratchpad.