Callback blocks are part of your pipeline but don’t run as individual steps in your pipeline like data loader blocks, transformer blocks, etc. However, callback blocks are associated to other blocks within the pipeline (e.g. data loaders, transformers, data exporters, etc).
After those other blocks are executed, any associated callback blocks will also be executed depending on the status of the parent block.
Your pipeline has the following blocks:
Then, you add the following callback block to your pipeline:
@callback('success') def hello_world(parent_block_data, **kwargs): # some code... @callback('failure') def alert_me(parent_block_data, **kwargs): # some code...
You associate the above callback block to the following 2 blocks:
When you run the pipeline, it’ll execute the
load_data_from_api(data loader) block first. It successfully completes. Then, the callback block will run and execute the function
Next, the pipeline will execute the
clean_column_names(transformer) block. It fails due to some error. The callback block will run and execute the function
How to add callbacks to your pipeline
- Create a new pipeline or open an existing pipeline.
- Edit the pipeline.
- On the right side of the page, click the Callbacks icon in the navigation. If you don’t see it, try expanding the right area of the page.
- Click the button + Callback block.
- In the callback block’s code, add a function and decorate it with
@callback. You can have as many functions in a callback block as you want.
@callback('success')to decorate a function that should only run when the parent block successfully completes.
@callback('failure')to decorate a function that should only run when the parent block fails.
Execute callback function when block runs successfully.
@callback('success') def only_run_this_function_on_success(parent_block_data, **kwargs): pass
Execute callback function when block fails.
@callback('failure') def only_run_this_function_on_failure(parent_block_data, **kwargs): pass
The 1st positional argument is the data output of the callback block’s parent block.
Positional arguments are only supported in Mage versions
Here are the keyword arguments that are available in each callback function:
|Callback block UUID.|
|Date string when the parent block started executing.|
|A dictionary containing metadata from an event triggered pipeline.|
|Python datetime object for when the parent block started executing.|
|Partition used for the parent block when it was executed.|
|Hour string when the parent block started executing.|
|Python pipeline run object associated to the current run.|
|UUID of the current pipeline.|
|For version |
__input keyword argument
If your callback function is defined to only accept keyword arguments and no positional arguments,
you can access the callback block’s parent block’s data output by accessing the key
__input in the callback function’s keyword arguments.
The value of the
__input key is a dictionary. The keys in that dictionary are named after the
callback block’s parent block’s UUID.
For example, if the callback block’s parent block’s UUID is
load_api_data, then this is how
you would access
load_api_data’s data output:
@callback('success') def callback_for_load_api_data(**kwargs): data_from_parent_block = kwargs['__input']['load_api_data']
Data integration pipelines only
The following keyword arguments are only available in callback functions with parent blocks in a data integration pipelines:
|The table name that the source data is being exported to.|
|The order in which the parent block executed compared to other blocks of the same type.|
|Boolean value for whether or not the parent block was the last block ran for the block type in the pipeline.|
|The name of the source stream (or table) that the parent block is syncing for.|
Adding a callback to your block
- Create a block as normal.
- In the top right of the block, click the triple dot icon which should open up a menu with more options. Click “Add callback”.
- You should now see another code editor under the main editor for your block.
- You can add an
on_successcallback and/or a
on_failurecallback. These callbacks will run after your block run completes or fails. When you run your block in the pipeline edit page, it will also run the callback depending on the status of the block.
- The callback function will be passed your pipeline’s runtime variables as keyword
arguments, so you can use the same
kwargs.get('<variable>')syntax in your callback.