Example

from pandas import DataFrame


@transformer
def transform_df(df: DataFrame, *args, **kwargs) -> DataFrame:
    return df.iloc[:10]

How data is received from upstream blocks

All blocks (except Scratchpads and Sensors) pass their data from the return statement in their decorated function to all their downstream blocks.

In order to access the data from an upstream block within a current block, use the positional arguments of the decorated function.

For example, in the above function named transform_df, the output data from its upstream block is accessed by the positional argument named df.

Getting data from multiple upstream blocks

If a block has more than 1 upstream block, then each upstream block’s output is accessible via positional arguments.

For example, if you have 3 upstream blocks, then there will be 3 positional arguments available within the transform_df function.

See below for an example:

from pandas import DataFrame
import pandas as pd


@transformer
def transform_df(df1, df2, df3, **kwargs) -> DataFrame:
    return pd.concat([df1, df2, df3])

Handling column renames in data integration pipeline

Since data passed to transform blocks is a pandas DataFrame, many operations are available on each row of your data.

It’s possible to do a few dangerous things with this, for example if you have a target destination that needs it’s columns renamed to match existing schemas.

In this case, there can be edge case mismatches between the data types in the source columns, and the expected data types in the destination. This is particularly important for SQL columns where the INSERT commands generated by the pipelines will use CAST expressions to try and make sure the writes succeed. However, if a column is renamed in the transformer, we will use our best guess by looking at values to decide what to CAST the column values to.

For example, a TIMESTAMPTZ value of 2023-01-09T20:01:00, once it’s renamed, can look like a string, but using an INSERT with CAST('2023-01-09T20:01:00' AS TEXT) to a column with data type TIMESTAMPTZ will fail.

In order to fix this, you can add column “aliases” in the pipeline’s data integration catalog.

  1. Have a transformer block in your integration pipeline that does some column renaming:

from pandas import DataFrame
import pandas as pd

@transformer
def transform(data, *args, **kwargs):
    # keys = source column names, values = destination column names
    COLUMN_MAP = {
        'impressions': 'field_5627', 
        'clicks': 'field_5628', 
        'new_free_signups': 'field_5629',
        'new_paid_signups': 'field_5626', 
        'event_timestamp': 'field_5630'
    }
    # the pandas rename in place to match destination schema
    data.rename(columns=COLUMN_MAP, inplace=True)
    data.insert(0, 'trashed', False)
    return data

  1. Add ‘aliases’ property to each renamed field in your pipeline’s data_integration_catalog.json file.

{
    "catalog": {
        "streams": [
            {
                "tap_stream_id": "marketing_data",
                "replication_method": "INCREMENTAL",
                "key_properties": [
                    "id"
                ],
                "schema": {
                    "properties": {
                        "id": {
                            "type": [
                                "integer"
                            ]
                        },
                        "impressions": {
                            "type": [
                                "null",
                                "number"
                            ],
                            "aliases": [
                                "field_5627"
                            ]
                        },
                        "clicks": {
                            "type": [
                                "null",
                                "number"
                            ],
                            "aliases": [
                                "field_5628"
                            ]
                        },
                        "new_free_signups": {
                            "type": [
                                "null",
                                "number"
                            ],
                            "aliases": [
                                "field_5629"
                            ]
                        },
                        "new_paid_signups": {
                            "type": [
                                "null",
                                "number"
                            ],
                            "aliases": [
                                "field_5626"
                            ]
                        },
                        "event_timestamp": {
                            "format": "date-time",
                            "type": [
                                "null",
                                "string"
                            ],
                            "aliases": [
                                "field_5630"
                            ]
                        }
                    },
                    "type": "object"
                },
                "stream": "marketing_data",
                "metadata": [...],
                ...
            }
        ]
    }
}

Now, pipeline runs will use the correct data types for INSERT commands, thanks to transformer column aliases defined in the schema.

Was this page helpful?