Batch read settings

On a block-by-block basis, customize how it reads data from its upstream blocks.

Read specific chunks

Downstream blocks can control the volume of data it loads into memory from its upstream block’s output.

The downstream block will only load the chunk from indigo_mountain where power equals 5.

1

Load data

import random

import polars as pl

from mage_ai.data.tabular.mocks import create_dataframe


@data_loader
def load_data(*args, **kwargs):
    dfs = []
    for i in range(10):
        df = create_dataframe(n_rows=100_000, use_pandas=False)
        df = df.with_columns(pl.lit(i).alias('power'))
        if i == 5:
            df = df.with_columns(pl.lit(i).cast(pl.Float64).alias('col_0'))
        dfs.append(df)
    return pl.concat(dfs)
2

Transform data

@transformer
def transform(data, *args, **kwargs):
    return data

Input data types

Downstream blocks can control the strategy it implements when loading an upstream block’s output data into memory.

Batch

Read data in batches.

@transformer
def transform(data, *args, **kwargs):
    print('Batch size:', len(data[0]))
    print('Chunks:', len(data))
Batch size: 143
Chunks: 21105

Generator

Batch generator framework to operate and process 1,000+ gigabytes (GB) of data without running out of memory.

@transformer
def transform(data, *args, **kwargs):
    for batch in data:
        df = batch.deserialize()
        print(df.shape)
(1337, 11)
(1337, 11)
(1337, 11)
(1337, 11)
(1337, 11)
(1337, 11)
(1337, 11)
(1337, 11)
(1337, 11)
(1337, 11)
...
1

Load data

import random

import polars as pl

from mage_ai.data.tabular.mocks import create_dataframe


@data_loader
def load_data(*args, **kwargs):
    dfs = []
    for i in range(10):
        df = create_dataframe(n_rows=100_000, use_pandas=False)
        df = df.with_columns(pl.lit(i).alias('power'))
        if i == 5:
            df = df.with_columns(pl.lit(i).cast(pl.Float64).alias('col_0'))
        dfs.append(df)
    return pl.concat(dfs)
2

Transform data

@transformer
def transform(data, *args, **kwargs):
    for batch in data:
        df = batch.deserialize()
        print(df.shape)

Reader

Invoke methods on the reader object directly.

from mage_ai.data.tabular.reader import (
    read_metadata,
    sample_batch_datasets,
    scan_batch_datasets_generator,
)

@transformer
def transform(data, *args, **kwargs):
    print(data.chunks)
    print(data.number_of_outputs)
    print(data.resource_usages)
    print(data.variable_path)
    print(data.variable_type)
    print(data.data_source)

    output = data.read_sync()
    for batch in output:
        df = batch.deserialize()
        print(df.shape)

Batch write settings

Customize how data is written when outputting data from a block.

import random

import polars as pl

from mage_ai.data.tabular.mocks import create_dataframe


@data_loader
def load_data(*args, **kwargs):
    dfs = []
    for i in range(10):
        df = create_dataframe(n_rows=100_000, use_pandas=False)
        df = df.with_columns(pl.lit(i).alias('power'))
        if i == 5:
            df = df.with_columns(pl.lit(i).cast(pl.Float64).alias('col_0'))
        dfs.append(df)
    return pl.concat(dfs)

Strategies

Explicitly set the chunking strategy when creating data partitions or batches of data.

Example: create chunks by the value in column power

Items per chunk

Example: each chunk can only contain a maximum number of 7,777 items.

Number of chunks

Example: each chunk must have at least 1,337 items and the total number of chunks cannot exceed 143.

Byte size per check

Example: each chunk cannot exceed 100MB in size on disk.

Modes

Control how data is written.

  1. Append
  2. Replace

Was this page helpful?