Great Expectations

Setup

  1. Before adding expectations to your pipeline, please make sure you have at least 1 data loader, transformer, or data exporter block. They must be Python blocks (SQL block support coming soon).

    If you don’t have any blocks, add a data loader block and paste the following code:

    import io
    import pandas as pd
    import requests
    
    
    @data_loader
    def load_data_from_api(*args, **kwargs):
        url = 'https://raw.githubusercontent.com/mage-ai/datasets/master/restaurant_user_transactions.csv'
        response = requests.get(url)
        return pd.read_csv(io.StringIO(response.text), sep=',')
    

    For more expectations, read Great Expectation’s documentation.

  2. Add Great Expectations power up to your pipeline.

  3. In the list of available power ups, click the option for Great Expectations. Power ups


Adding expectations to your pipeline

  1. Once you’re on the Great Expectations power up detail page, you can add extension blocks to the current pipeline by clicking the button labeled [+ Extension block].

  2. In dropdown menu, click the template option labeled Empty template.

  3. A popup dialog may appear asking you to enter a name for the new extension block. If it appears, fill in test number of rows as the name and click the button labeled [Save and add block].

  4. Paste the following code in the extension block named test number of rows:

    @extension('great_expectations')
    def validate(validator, *args, **kwargs):
        validator.expect_table_row_count_to_be_between(
            min_value=1000,
            max_value=10000,
        )
    

    You can add expectations using code or from a JSON object. See section Defining expectations below for more details.

  5. In the extension block near the bottom, click the input field that says “Select blocks to run expectations on”.

  6. Once you click that input field, a list of blocks from your pipeline will appear.

  7. Check the checkbox on the right side of the dropdown to associate that block to this extension block.

  8. Click the button labeled Save selected blocks.

  9. After you save, a button that is labeled after the name of the block you just selected will appear. For example, if your block is named load_api_data_demo, then a button labeled load_api_data_demo will appear. Great Expectations extension block

  10. Click that button to run your extension block for the block load_api_data_demo.

  11. The output should look something like this:

    Calculating Metrics:   0%|          | 0/1 [00:00<?, ?it/s]
    Calculating Metrics:   0%|          | 0/1 [00:00<?, ?it/s]
    Expectations from extension test_number_of_rows for block load_api_data_demo succeeded.
    {
      "results": [
        {
          "expectation_config": {
            "expectation_type": "expect_table_row_count_to_be_between",
            "kwargs": {
              "min_value": 1000,
              "max_value": 10000,
              "batch_id": "9946450cf9609e633658e9c4ee38efa5"
            },
            "meta": {}
          },
          "success": true,
          "meta": {},
          "exception_info": {
            "raised_exception": false,
            "exception_traceback": null,
            "exception_message": null
          },
          "result": {
            "observed_value": 10000
          }
        }
      ],
      "success": true,
      "evaluation_parameters": {},
      "meta": {
        "great_expectations_version": "0.15.50",
        "expectation_suite_name": "expectation_suite_for_block_load_api_data_demo",
        "run_id": {
          "run_name": null,
          "run_time": "2023-03-14T01:41:09.117251+00:00"
        },
        "batch_spec": {
          "data_asset_name": "data_asset_load_api_data_demo",
          "batch_data": "PandasDataFrame"
        },
        "batch_markers": {
          "ge_load_time": "20230314T014109.103956Z",
          "pandas_data_fingerprint": "76d98f6ff10d31abaae6e9a7c47cc7c1"
        },
        "active_batch_definition": {
          "datasource_name": "datasource_name_load_api_data_demo",
          "data_connector_name": "data_connector_name_load_api_data_demo",
          "data_asset_name": "data_asset_load_api_data_demo",
          "batch_identifiers": {
            "default_identifier_name": "default_identifier"
          }
        },
        "validation_time": "20230314T014109.117222Z",
        "checkpoint_name": null
      },
      "statistics": {
        "evaluated_expectations": 1,
        "successful_expectations": 1,
        "unsuccessful_expectations": 0,
        "success_percent": 100.0
      }
    }
    

Other ways to run expectations

Whenever you run a block while editing your pipeline, any associated Great Expectations extension blocks will also be ran.

If any expectations fail, the block output will display an error message with the failure results.


Defining expectations

Code

@extension('great_expectations')
def validate(validator, *args, **kwargs):
    validator.expect_table_row_count_to_be_between(
        min_value=1000,
        max_value=10000,
    )

JSON object

expectations_json = [
  {
    "expectation_type": "expect_table_row_count_to_be_between",
    "kwargs": {
      "min_value": 1000,
      "max_value": 10000
    }
  },
  {
    "expectation_type": "expect_column_values_to_not_be_null",
    "kwargs": {
      "column": "user ID",
      "result_format": "BASIC"
    }
  }
]


@extension('great_expectations', expectations=expectations_json)
def validate(validator, *args, **kwargs):
    pass

Code and JSON object

expectations_json = [
  {
    "expectation_type": "expect_column_values_to_not_be_null",
    "kwargs": {
      "column": "user ID",
      "result_format": "BASIC"
    }
  }
]


@extension('great_expectations', expectations=expectations_json)
def validate(validator, *args, **kwargs):
    validator.expect_table_row_count_to_be_between(
        min_value=1000,
        max_value=10000,
    )

Running expectations end-to-end

When your pipeline runs, the expectations you defined in your extension blocks will be ran for every associated data loader, transformer, or data exporter block in your pipeline.

Success

If all expectations for a block pass, the success message in the pipeline run logs will look something like this:

Expectations from extension test_number_of_rows for block load_api_data_demo succeeded.

Failure

If any expectation fails, the block it was associated with will also fail.

You can check the pipeline logs for the block failure. There will be an entry containing the error message with the failure results. It could look something like this:

Traceback (most recent call last):
  File "/home/src/mage_ai/data_preparation/models/block/__init__.py", line 648, in execute_sync
    output = self.execute_block(
  File "/home/src/mage_ai/data_preparation/models/block/__init__.py", line 885, in execute_block
    outputs = self._execute_block(
  File "/home/src/mage_ai/data_preparation/models/block/__init__.py", line 949, in _execute_block
    outputs = self.execute_block_function(
  File "/home/src/mage_ai/data_preparation/models/block/__init__.py", line 975, in execute_block_function
    output = block_function(*input_vars, **global_vars)
  File "/home/src/mage_ai/data_preparation/models/block/extension/block.py", line 46, in func
    raise Exception(
Exception: Expectations from extension test_number_of_rows for block load_api_data_demo failed:

{
  "exception_info": {
    "raised_exception": false,
    "exception_traceback": null,
    "exception_message": null
  },
  "meta": {},
  "success": false,
  "expectation_config": {
    "expectation_type": "expect_table_row_count_to_be_between",
    "kwargs": {
      "min_value": 1000,
      "max_value": 10000,
      "batch_id": "9946450cf9609e633658e9c4ee38efa5"
    },
    "meta": {}
  },
  "result": {
    "observed_value": 10
  }
}