Stateful store now is supported in the streaming pipeline. You can access it through the state_store object in the kwargs parameter. Redis is the only supported state store.

Configuration

State store configuration is required to set up in metadata.yml in order to enable stateful store in a streaming pipeline.

state_store_config:
  type: 'REDIS'
  redis:
    redis_url: 'redis_url'

This configuration ensures that the pipeline can access and utilize Redis as its state store.

Usage

Code example to access state store in your streaming pipeline:

    state_store = kwargs['state_store']
    state_store.set_state('string_key', 'Transformer Hello, Redis!')
    state_store.set_state('int_key', 123)
    state_store.set_state('list_key', [1, 2, 3,4, 5])
    state_store.set_state('dict_key', {'key1': 'value1', 'key2': 'value2'})
    state_store = kwargs['state_store']
    state_store.get_state('string_key')
    state_store.get_state('list_key')
    state_store.get_state('int_key')
    state_store.get_state('dict_key')

Window Aggregation

The Window Aggregation Block is now supported in streaming pipeline, allowing for the aggregation of input data over a specified window of time and send it to down stream blocks.

Usage

When you create a transformer and select the Window Aggregation type, it creates a window aggregation block with the following configuration:

transformer_type: window_aggregation
method: time
type: tumbling
interval_window: 60 # seconds
advance_window: 10

You can update interval_window with the desired interval. In the meantime, you can add the following blocks after the window aggregation block. These blocks will be triggered periodically based on the configuration. For example, you can add a transformer block to report those data into monitoring system. It can be used as alerts to your pipeline.

How It Works

The window block is registered during the streaming pipeline execution with the specified configuration. An independent thread is created to manage each window block and trigger it periodically based on the configuration. When triggered, the downstream blocks are also executed accordingly.


🚀 With the stateful store and window aggregation support, your streaming pipeline can now efficiently manage and process stateful data.