Batch read settings
On a block-by-block basis, customize how it reads data from its upstream blocks.
Only in Mage Pro.
Try our fully managed solution to access this advanced feature.
Read specific chunks
Downstream blocks can control the volume of data it loads into memory from its upstream block’s output.
The downstream block will only load the chunk from indigo_mountain where power equals 5.
Load data
import random
import polars as pl
from mage_ai.data.tabular.mocks import create_dataframe
@data_loader
def load_data(*args, **kwargs):
dfs = []
for i in range(10):
df = create_dataframe(n_rows=100_000, use_pandas=False)
df = df.with_columns(pl.lit(i).alias('power'))
if i == 5:
df = df.with_columns(pl.lit(i).cast(pl.Float64).alias('col_0'))
dfs.append(df)
return pl.concat(dfs)
Transform data
@transformer
def transform(data, *args, **kwargs):
return data
Downstream blocks can control the strategy it implements when loading an upstream block’s
output data into memory.
Batch
Read data in batches.
@transformer
def transform(data, *args, **kwargs):
print('Batch size:', len(data[0]))
print('Chunks:', len(data))
Batch size: 143
Chunks: 21105
Generator
Batch generator framework to operate and process 1,000+ gigabytes (GB) of data without running out of memory.
@transformer
def transform(data, *args, **kwargs):
for batch in data:
df = batch.deserialize()
print(df.shape)
(1337, 11)
(1337, 11)
(1337, 11)
(1337, 11)
(1337, 11)
(1337, 11)
(1337, 11)
(1337, 11)
(1337, 11)
(1337, 11)
...
Load data
import random
import polars as pl
from mage_ai.data.tabular.mocks import create_dataframe
@data_loader
def load_data(*args, **kwargs):
dfs = []
for i in range(10):
df = create_dataframe(n_rows=100_000, use_pandas=False)
df = df.with_columns(pl.lit(i).alias('power'))
if i == 5:
df = df.with_columns(pl.lit(i).cast(pl.Float64).alias('col_0'))
dfs.append(df)
return pl.concat(dfs)
Transform data
@transformer
def transform(data, *args, **kwargs):
for batch in data:
df = batch.deserialize()
print(df.shape)
Reader
Invoke methods on the reader object directly.
from mage_ai.data.tabular.reader import (
read_metadata,
sample_batch_datasets,
scan_batch_datasets_generator,
)
@transformer
def transform(data, *args, **kwargs):
print(data.chunks)
print(data.number_of_outputs)
print(data.resource_usages)
print(data.variable_path)
print(data.variable_type)
print(data.data_source)
output = data.read_sync()
for batch in output:
df = batch.deserialize()
print(df.shape)
Batch write settings
Customize how data is written when outputting data from a block.
Only in Mage Pro.
Try our fully managed solution to access this advanced feature.
import random
import polars as pl
from mage_ai.data.tabular.mocks import create_dataframe
@data_loader
def load_data(*args, **kwargs):
dfs = []
for i in range(10):
df = create_dataframe(n_rows=100_000, use_pandas=False)
df = df.with_columns(pl.lit(i).alias('power'))
if i == 5:
df = df.with_columns(pl.lit(i).cast(pl.Float64).alias('col_0'))
dfs.append(df)
return pl.concat(dfs)
Strategies
Explicitly set the chunking strategy when creating data partitions or batches of data.
Example: create chunks by the value in column power
Items per chunk
Example: each chunk can only contain a maximum number of 7,777 items.
Number of chunks
Example: each chunk must have at least 1,337 items and the total number of chunks cannot exceed 143.
Byte size per check
Example: each chunk cannot exceed 100MB in size on disk.
Modes
Control how data is written.
- Append
- Replace