Dynamic blocks
Use the output of a block to dynamically create more blocks.
A dynamic block will create multiple downstream blocks at runtime.
The number of blocks it creates is equal to the number of items in the output data of the dynamic block multiplied by the number of its downstream blocks.
For example, the following data loader block will output a list of 2 lists. The 1st item in the list contains a list of 3 dictionaries containing user information.
from typing import Dict, List
@data_loader
def load_data(*args, **kwargs) -> List[List[Dict]]:
users = []
metadata = []
for i in range(3):
i += 1
users.append(dict(id=i, name=f'user_{i}'))
metadata.append(dict(block_uuid=f'for_user_{i}'))
return [
users,
metadata, # optional
]
If the above data loader has 2 downstream blocks (e.g. 2 blocks that depend on this data loader), then this dynamic block will create 6 total downstream blocks at runtime (3 users multiplied by 2 downstream blocks).
Data output shape of a dynamic block
A dynamic block must return a list of 2 lists of dictionaries
(e.g. List[List[Dict]]
).
For example, a data loader block returns the following output:
[
[
{
"id": 1,
"name": "user_1"
},
{
"id": 2,
"name": "user_2"
},
{
"id": 3,
"name": "user_3"
}
],
[
{
"block_uuid": "for_user_1"
},
{
"block_uuid": "for_user_2"
},
{
"block_uuid": "for_user_3"
}
]
]
Input data for downstream blocks
The 1st item in the list is a list of dictionaries. These dictionaries contain data that will be passed down to all its downstream blocks.
[
{
"id": 1,
"name": "user_1"
},
{
"id": 2,
"name": "user_2"
},
{
"id": 3,
"name": "user_3"
}
]
The number of items in this list of dictionaries will correspond to how many downstream blocks get created at runtime.
For example, if this dynamic block has a downstream transformer block with the following code:
from typing import Dict, List
@transformer
def transform(data: Dict, *args, **kwargs) -> List[Dict]:
data['id'] = int(data['id']) * 100
return [data]
The return output data of this downstream transformer block will be:
[
{
"id": 100,
"name": "user_1"
}
]
Note
The transformer block is returning a list as its output data. The list of items is used as positional arguments in any downstream block of this transformer block.
Metadata for downstream blocks (optional)
The 2nd item in the list is a list of dictionaries. These dictionaries contain metadata for each downstream block.
[
{
"block_uuid": "for_user_1"
},
{
"block_uuid": "for_user_2"
},
{
"block_uuid": "for_user_3"
}
]
This metadata is used to uniquely identify and create each dynamically created downstream block. The keys in each metadata dictionary are:
Key | Description | Required |
---|---|---|
block_uuid | This value is used in combination with the downstream block’s original UUID to construct a unique UUID across all dynamic blocks. This value must be unique within the same list of metadata dictionaries. | Yes |
If the above transformer block UUID is anonymize_user_data
and the metadata for the 1st
dynamically created block is { "block_uuid": "for_user_1" }
, then the dynamically created block’s
UUID is anonymize_user_data:for_user_1
.
The convention is [original_block_uuid]:[metadata_block_uuid]
.
Dynamically created blocks
Every downstream block from a dynamic block is referred to as a dynamically created block.
The number of these blocks created are determined by the return output data of a dynamic block.
For example, if a dynamic block returns the following data:
[
[
{
"id": 1,
"name": "user_1"
},
{
"id": 2,
"name": "user_2"
},
{
"id": 3,
"name": "user_3"
}
],
[
{
"block_uuid": "for_user_1"
},
{
"block_uuid": "for_user_2"
},
{
"block_uuid": "for_user_3"
}
]
]
Then there will be 3 dynamically created blocks for every downstream block of that dynamic block.
If the dynamic block had 1 downstream block, then 3 blocks will be dynamically created.
For example, if the downstream block was a transformer with UUID anonymize_user_data
and the following code:
from typing import Dict, List
@transformer
def transform(data: Dict, *args, **kwargs) -> List[Dict]:
data['id'] = int(data['id']) * 100
return [data]
There will be 3 dynamically created blocks with the following UUID and return output data:
Dynamic block UUID | Return output data |
---|---|
anonymize_user_data:for_user_1 | [{ "id": 100, "name": "user_1" }] |
anonymize_user_data:for_user_2 | [{ "id": 200, "name": "user_2" }] |
anonymize_user_data:for_user_3 | [{ "id": 300, "name": "user_3" }] |
Downstream blocks of dynamically created blocks
If a dynamically created block has downstream blocks, those downstream blocks will be created multiple times. The number of times those are created correspond to how many blocks were created dynamically from the dynamic block.
In the current example, 3 blocks were created dynamically (3 users, 1 downstream block from the dynamic block). If the above transformer block has 2 downstream blocks, then 6 more blocks will be created.
For example, if the transformer block anonymize_user_data
has 2 downstream blocks with UUID
clean_column_names
and compute_engagement_score
, the following blocks will also be
dynamically created:
anonymize_user_data:for_user_1
→ clean_column_names:for_user_1
→ compute_engagement_score:for_user_1
anonymize_user_data:for_user_2
→ clean_column_names:for_user_2
→ compute_engagement_score:for_user_2
anonymize_user_data:for_user_3
→ clean_column_names:for_user_3
→ compute_engagement_score:for_user_3
The data that is passed into each of these downstream blocks
(e.g. clean_column_names
, compute_engagement_score
) will come from its upstream block
(e.g. anonymize_user_data:for_user_1
, anonymize_user_data:for_user_2
, anonymize_user_data:for_user_3
).
For example, the block clean_column_names:for_user_1
and compute_engagement_score:for_user_1
will have input data with the following shape and value:
{
"id": 100,
"name": "user_1"
}
Reduce output
By default, dynamically created blocks will create more dynamically created blocks from their own downstream blocks. A dynamically created block will pass its return output data to its downstream blocks and so on.
However, a dynamically created block can be configured to reduce all the outputs across each dynamically created block and combine them into a single list of items.
If we configure the above transformer block anonymize_user_data
to reduce its output, then
there will only be 2 downstream blocks instead of 6.
The 2 downstream blocks will be clean_column_names
and compute_engagement_score
.
The input data to each of these 2 downstream blocks will be:
[
{
"id": 100,
"name": "user_1"
},
{
"id": 200,
"name": "user_2"
},
{
"id": 300,
"name": "user_3"
}
]
Dynamic SQL blocks
SQL blocks can be dynamic or be a child of a dynamic block.
Here is an example pipeline with code:
[data_loader]
↓
[dynamic_sql_loader]
↓
[transformer]
data_loader.py
: dynamic block
@data_loader
def load_data(*args, **kwargs):
return [[dict(uuid=1), dict(uuid=2)]]
dynamic_sql_loader
: dynamic block, dynamic child
SELECT * FROM {{ df_1 }}
UNION ALL
SELECT 2 AS uuid
transformer
: dynamic child
@transformer
def transform(data, *args, **kwargs):
return data
When this pipeline is triggered, the following block runs will be created:
[data_loader]
↓ ↓
[dynamic_sql_loader:0] [dynamic_sql_loader:1]
↓ ↓ ↓ ↓
[transformer:0:0] [transformer:0:1] [transformer:1:0] [transformer:1:1]
Tutorial guide
Follow this guide to create a simple pipeline with a dynamic block, dynamically created blocks, and a block that reduces its output.
Was this page helpful?