Callback blocks
A callback block is associated to another block. When the parent block succeeds or fails, the callback block functions are executed.
0.8.61
or greater.Callback blocks are part of your pipeline but don’t run as individual steps in your pipeline like data loader blocks, transformer blocks, etc. However, callback blocks are associated to other blocks within the pipeline (e.g. data loaders, transformers, data exporters, etc).
After those other blocks are executed, any associated callback blocks will also be executed depending on the status of the parent block.
Example
-
Your pipeline has the following blocks:
load_data_from_api
(data loader)clean_column_names
(transformer)save_data
(data exporter)
-
Then, you add the following callback block to your pipeline:
@callback('success') def hello_world(parent_block_data, **kwargs): # some code... @callback('failure') def alert_me(parent_block_data, **kwargs): # some code...
-
You associate the above callback block to the following 2 blocks:
load_data_from_api
(data loader)clean_column_names
(transformer)
-
When you run the pipeline, it’ll execute the
load_data_from_api
(data loader) block first. It successfully completes. Then, the callback block will run and execute the functionhello_world
. -
Next, the pipeline will execute the
clean_column_names
(transformer) block. It fails due to some error. The callback block will run and execute the functionalert_me
.
How to add callbacks to your pipeline
- Create a new pipeline or open an existing pipeline.
- Edit the pipeline.
- On the right side of the page, click the Add-ons icon in the navigation. If you don’t see it, try expanding the right area of the page.
- Click the button Callbacks.
- Click the button + Callback block.
- In the callback block’s code, add a function and decorate it with
@callback
. You can have as many functions in a callback block as you want. - Use
@callback
or@callback('success')
to decorate a function that should only run when the parent block successfully completes. - Use
@callback('failure')
to decorate a function that should only run when the parent block fails.
Supported callbacks
success
Execute callback function when block runs successfully.
@callback('success')
def only_run_this_function_on_success(parent_block_data, **kwargs):
pass
failure
Execute callback function when block fails.
@callback('failure')
def only_run_this_function_on_failure(parent_block_data, **kwargs):
pass
Positional arguments
The 1st positional argument is the data output of the callback block’s parent block.
Positional arguments are only supported in Mage versions >= 0.8.81
.
Keyword arguments
Here are the keyword arguments that are available in each callback function:
Name | Description | Sample value |
---|---|---|
block_uuid | Callback block UUID. | 'fireball_callback' |
parent_block_uuid | The block UUID of the parent block. | 'parent_block' |
ds | Date string when the parent block started executing. | '2023-12-25' |
event | A dictionary containing metadata from an event triggered pipeline. | {} |
execution_date | Python datetime object for when the parent block started executing. | datetime.datetime(2023, 4, 26, 20, 28, 17, 335254, tzinfo=datetime.timezone.utc) |
execution_partition | Partition used for the parent block when it was executed. | '207/20230426T202817' |
hr | Hour string when the parent block started executing. | '20' |
pipeline_run | Python pipeline run object associated to the current run. | PipelineRun(id=2357, pipeline_uuid=fire_etl, execution_date=2023-04-26 20:28:17.335254+00:00) |
pipeline_uuid | UUID of the current pipeline. | 'fire_etl' |
__input | For version >= 0.8.81 . See below for more details. | { 'some_block_uuid': {} } |
__input
keyword argument
>= 0.8.81
.If your callback function is defined to only accept keyword arguments and no positional arguments,
you can access the callback block’s parent block’s data output by accessing the key
labeled __input
in the callback function’s keyword arguments.
The value of the __input
key is a dictionary. The keys in that dictionary are named after the
callback block’s parent block’s UUID.
For example, if the callback block’s parent block’s UUID is load_api_data
, then this is how
you would access load_api_data
’s data output:
@callback('success')
def callback_for_load_api_data(**kwargs):
data_from_parent_block = kwargs['__input']['load_api_data']
Data integration pipelines only
The following keyword arguments are only available in callback functions with parent blocks in a data integration pipelines:
Name | Description | Sample value |
---|---|---|
destination_table | The table name that the source data is being exported to. | 'dim_users_v1' |
index | The order in which the parent block executed compared to other blocks of the same type. | 0 |
is_last_block_run | Boolean value for whether or not the parent block was the last block ran for the block type in the pipeline. | True or False |
stream | The name of the source stream (or table) that the parent block is syncing for. | 'stripe_transactions' |
Legacy
< 0.8.61
.Adding a callback to your block
- Create a block as normal.
- In the top right of the block, click the triple dot icon which should open up a menu with more options. Click “Add callback”.
- You should now see another code editor under the main editor for your block.
- You can add an
on_success
callback and/or aon_failure
callback. These callbacks will run after your block run completes or fails. When you run your block in the pipeline edit page, it will also run the callback depending on the status of the block. - The callback function will be passed your pipeline’s runtime variables as keyword
arguments, so you can use the same
kwargs.get('<variable>')
syntax in your callback.
Was this page helpful?