A dynamic block will create multiple downstream blocks at runtime.

The number of blocks it creates is equal to the number of items in the output data of the dynamic block multiplied by the number of its downstream blocks.

For example, the following data loader block will output a list of 2 lists. The 1st item in the list contains a list of 3 dictionaries containing user information.

from typing import Dict, List


@data_loader
def load_data(*args, **kwargs) -> List[List[Dict]]:
    users = []
    metadata = []

    for i in range(3):
        i += 1
        users.append(dict(id=i, name=f'user_{i}'))
        metadata.append(dict(block_uuid=f'for_user_{i}'))

    return [
        users,
        metadata, # optional
    ]

If the above data loader has 2 downstream blocks (e.g. 2 blocks that depend on this data loader), then this dynamic block will create 6 total downstream blocks at runtime (3 users multiplied by 2 downstream blocks).


This feature saves me when I need to securely transfer multiple media files from one location to another (SCP), especially when the files are spread across different directories and across the network. Typically, one would have to loop through each file, but secure copy (SCP) operations can fail for various reasons, and dealing with exceptions can be error-prone.

An alternative approach would be to use an API trigger (like the one provided by Mage AI) to receive POST requests for each file that needs to be securely copied. However, this method can be highly resource-intensive, particularly when dealing with thousands of pipeline runs.

Dynamic block, which allows me to manage source and destination specifications in a DataFrame and then dynamically execute SCP operations without having to handle exceptions manually (thanks to the “retry incomplete block” feature), is a game-changer. Not only does it save me from the hassle of exception handling, but it also keeps computational resources in check by avoiding unnecessary pipeline triggers.

Thank you for your world-class project. Keep up the excellent work.

Data output shape of a dynamic block

A dynamic block must return a list of 2 lists of dictionaries (e.g. List[List[Dict]]).

For example, a data loader block returns the following output:

[
    [
        {
            "id": 1,
            "name": "user_1"
        },
        {
            "id": 2,
            "name": "user_2"
        },
        {
            "id": 3,
            "name": "user_3"
        }
    ],
    [
        {
            "block_uuid": "for_user_1"
        },
        {
            "block_uuid": "for_user_2"
        },
        {
            "block_uuid": "for_user_3"
        }
    ]
]

Input data for downstream blocks

The 1st item in the list is a list of dictionaries. These dictionaries contain data that will be passed down to all its downstream blocks.

[
    {
        "id": 1,
        "name": "user_1"
    },
    {
        "id": 2,
        "name": "user_2"
    },
    {
        "id": 3,
        "name": "user_3"
    }
]

The number of items in this list of dictionaries will correspond to how many downstream blocks get created at runtime.

For example, if this dynamic block has a downstream transformer block with the following code:

from typing import Dict, List


@transformer
def transform(data: Dict, *args, **kwargs) -> List[Dict]:
    data['id'] = int(data['id']) * 100
    return [data]

The return output data of this downstream transformer block will be:

[
    {
        "id": 100,
        "name": "user_1"
    }
]

Note

The transformer block is returning a list as its output data. The list of items is used as positional arguments in any downstream block of this transformer block.

Metadata for downstream blocks (optional)

The 2nd item in the list is a list of dictionaries. These dictionaries contain metadata for each downstream block.

[
    {
        "block_uuid": "for_user_1"
    },
    {
        "block_uuid": "for_user_2"
    },
    {
        "block_uuid": "for_user_3"
    }
]

This metadata is used to uniquely identify and create each dynamically created downstream block. The keys in each metadata dictionary are:

KeyDescriptionRequired
block_uuidThis value is used in combination with the downstream block’s original UUID to construct a unique UUID across all dynamic blocks. This value must be unique within the same list of metadata dictionaries.Yes

If the above transformer block UUID is anonymize_user_data and the metadata for the 1st dynamically created block is { "block_uuid": "for_user_1" }, then the dynamically created block’s UUID is anonymize_user_data:for_user_1.

The convention is [original_block_uuid]:[metadata_block_uuid].

Dynamically created blocks

Every downstream block from a dynamic block is referred to as a dynamically created block.

The number of these blocks created are determined by the return output data of a dynamic block.

For example, if a dynamic block returns the following data:

[
    [
        {
            "id": 1,
            "name": "user_1"
        },
        {
            "id": 2,
            "name": "user_2"
        },
        {
            "id": 3,
            "name": "user_3"
        }
    ],
    [
        {
            "block_uuid": "for_user_1"
        },
        {
            "block_uuid": "for_user_2"
        },
        {
            "block_uuid": "for_user_3"
        }
    ]
]

Then there will be 3 dynamically created blocks for every downstream block of that dynamic block.

If the dynamic block had 1 downstream block, then 3 blocks will be dynamically created.

For example, if the downstream block was a transformer with UUID anonymize_user_data and the following code:

from typing import Dict, List


@transformer
def transform(data: Dict, *args, **kwargs) -> List[Dict]:
    data['id'] = int(data['id']) * 100
    return [data]

There will be 3 dynamically created blocks with the following UUID and return output data:

Dynamic block UUIDReturn output data
anonymize_user_data:for_user_1[{ "id": 100, "name": "user_1" }]
anonymize_user_data:for_user_2[{ "id": 200, "name": "user_2" }]
anonymize_user_data:for_user_3[{ "id": 300, "name": "user_3" }]

Downstream blocks of dynamically created blocks

If a dynamically created block has downstream blocks, those downstream blocks will be created multiple times. The number of times those are created correspond to how many blocks were created dynamically from the dynamic block.

In the current example, 3 blocks were created dynamically (3 users, 1 downstream block from the dynamic block). If the above transformer block has 2 downstream blocks, then 6 more blocks will be created.

For example, if the transformer block anonymize_user_data has 2 downstream blocks with UUID clean_column_names and compute_engagement_score, the following blocks will also be dynamically created:

  1. anonymize_user_data:for_user_1
→ clean_column_names:for_user_1
→ compute_engagement_score:for_user_1
  1. anonymize_user_data:for_user_2
→ clean_column_names:for_user_2
→ compute_engagement_score:for_user_2
  1. anonymize_user_data:for_user_3
→ clean_column_names:for_user_3
→ compute_engagement_score:for_user_3

The data that is passed into each of these downstream blocks (e.g. clean_column_names, compute_engagement_score) will come from its upstream block (e.g. anonymize_user_data:for_user_1, anonymize_user_data:for_user_2, anonymize_user_data:for_user_3).

For example, the block clean_column_names:for_user_1 and compute_engagement_score:for_user_1 will have input data with the following shape and value:

{
    "id": 100,
    "name": "user_1"
}

Reduce output

By default, dynamically created blocks will create more dynamically created blocks from their own downstream blocks. A dynamically created block will pass its return output data to its downstream blocks and so on.

However, a dynamically created block can be configured to reduce all the outputs across each dynamically created block and combine them into a single list of items.

If we configure the above transformer block anonymize_user_data to reduce its output, then there will only be 2 downstream blocks instead of 6.

The 2 downstream blocks will be clean_column_names and compute_engagement_score. The input data to each of these 2 downstream blocks will be:

[
    {
        "id": 100,
        "name": "user_1"
    },
    {
        "id": 200,
        "name": "user_2"
    },
    {
        "id": 300,
        "name": "user_3"
    }
]

Dynamic SQL blocks

SQL blocks can be dynamic or be a child of a dynamic block.

Here is an example pipeline with code:

    [data_loader]
          ↓
 [dynamic_sql_loader]
          ↓
    [transformer]

data_loader.py: dynamic block

@data_loader
def load_data(*args, **kwargs):
    return [[dict(uuid=1), dict(uuid=2)]]

dynamic_sql_loader: dynamic block, dynamic child

SELECT * FROM {{ df_1 }}
UNION ALL
SELECT 2 AS uuid

transformer: dynamic child

@transformer
def transform(data, *args, **kwargs):
    return data

When this pipeline is triggered, the following block runs will be created:

                                [data_loader]
                                 ↓         ↓
              [dynamic_sql_loader:0]     [dynamic_sql_loader:1]
                ↓                ↓         ↓                ↓
 [transformer:0:0]  [transformer:0:1]   [transformer:1:0]  [transformer:1:1]

Tutorial guide

Follow this guide to create a simple pipeline with a dynamic block, dynamically created blocks, and a block that reduces its output.