Skip to main content
This guide provides comprehensive best practices for building, maintaining, and scaling data pipelines in Mage Pro. Following these practices will help you create reliable, performant, and maintainable data engineering solutions.

Table of Contents


Block Design & Architecture

Single Responsibility Principle

Each block should have a single, well-defined purpose. This makes blocks easier to understand, test, and maintain. ✅ Good:
# Block: load_customer_data
def load_customer_data():
    """Load customer data from source database."""
    return pd.read_sql("SELECT * FROM customers", connection)

# Block: transform_customer_data
def transform_customer_data(df):
    """Clean and standardize customer data."""
    df = df.dropna(subset=['email'])
    df['email'] = df['email'].str.lower()
    return df
❌ Avoid:
# Block: load_and_transform_customers
def load_and_transform_customers():
    """Loads, transforms, and exports customer data."""
    # Loading, transforming, and exporting in one block
    df = pd.read_sql("SELECT * FROM customers", connection)
    df = df.dropna(subset=['email'])
    df.to_parquet('output.parquet')

Modularity & Reusability

Design blocks to be reusable across pipelines and projects. Use clear, descriptive names and avoid hardcoded values. ✅ Good:
# Block: load_data_from_source
def load_data_from_source(source_table: str, connection_string: str):
    """Generic data loader that can be reused for different tables."""
    return pd.read_sql(f"SELECT * FROM {source_table}", connection_string)
❌ Avoid:
# Block: load_customers_jan_2024
def load_customers_jan_2024():
    """Hardcoded for specific use case."""
    return pd.read_sql("SELECT * FROM customers WHERE month='2024-01'", conn)

Block Naming Conventions

Use clear, descriptive names that indicate the block’s purpose:
  • Data loaders: load_<source>_<entity> (e.g., load_salesforce_contacts)
  • Transformers: transform_<action>_<entity> (e.g., transform_clean_customer_data)
  • Data exporters: export_<entity>_to_<destination> (e.g., export_customers_to_warehouse)

Organizing Blocks in Subdirectories

Organize blocks in subdirectories within their respective block type folders to improve project structure and maintainability. ✅ Benefits:
  • Better organization for large projects
  • Easier to find related blocks
  • Clearer project structure
  • Supports team collaboration
How to Organize:
  1. Create subdirectories by category:
    • By source type: api/, database/, file/, cloud/
    • By domain: sales/, marketing/, finance/
    • By function: validation/, cleaning/, aggregation/
  2. Using the UI:
    • When creating a block, name it with the subdirectory: subfolder_name/block_name
    • Example: api/load_stripe creates data_loaders/api/load_stripe.py
    • The subdirectory is automatically created if it doesn’t exist
  3. Example structure:
    data_loaders/
    ├── api/
    │   ├── load_stripe.py
    │   ├── load_salesforce.py
    │   └── load_hubspot.py
    ├── database/
    │   ├── load_postgres.py
    │   └── load_mysql.py
    └── file/
        ├── load_csv.py
        └── load_parquet.py
    
    transformers/
    ├── cleaning/
    │   ├── remove_duplicates.py
    │   └── standardize_format.py
    └── aggregation/
        ├── calculate_metrics.py
        └── group_by_category.py
    
  4. Best practices:
    • Use consistent subdirectory naming across block types
    • Keep subdirectory depth to 1-2 levels maximum
    • Group related blocks together logically
    • Document your organization structure for your team
For more details, see the Project Structure Guide.

Block Size Guidelines

Keep blocks focused and reasonably sized:
  • Ideal: 50-200 lines of code per block
  • Maximum: 500 lines (consider splitting if larger)
  • Minimum: Enough code to accomplish one clear task

Use Block Templates

Create reusable block templates for common patterns:
  • Standard ETL patterns
  • Data validation patterns
  • Error handling templates
  • Logging templates
This ensures consistency across your team and reduces duplicate code.

Pipeline Organization

Choosing the Right Pipeline Type

Mage supports three main pipeline types, each optimized for different use cases. Understanding when to use each type is crucial for building efficient and maintainable data pipelines.

Data Integration Pipelines

Use data integration pipelines when:
  • You need to sync data between systems (external to internal, or internal to external)
  • You’re working with pre-built connectors for SaaS tools, databases, or data warehouses
  • You want low-code/no-code data synchronization
  • You need incremental sync capabilities with automatic state tracking
  • You’re doing reverse ETL (syncing data from your warehouse to external systems)
Characteristics:
  • Uses Singer spec-compliant connectors
  • Handles schema migrations automatically
  • Supports incremental and full sync modes
  • Minimal code required - mostly configuration
  • Can be combined with batch pipelines for custom transformations
Example use cases:
  • Syncing Salesforce contacts to your data warehouse
  • Loading Stripe transaction data into BigQuery
  • Exporting customer segments from Snowflake to HubSpot
  • Syncing product data from PostgreSQL to MongoDB
Learn more: Data Integration Overview

Batch Pipelines

Use batch pipelines when:
  • You need scheduled data processing (hourly, daily, weekly)
  • You require custom transformations using Python, SQL, or R
  • You’re building ETL/ELT workflows with complex business logic
  • You need to process large volumes of historical data
  • You want to combine multiple data sources and apply custom logic
  • You’re building analytics pipelines for reporting and dashboards
Characteristics:
  • Runs on a schedule, completes, and exits
  • Full flexibility with custom code blocks
  • Can combine data integration blocks with custom transformers
  • Supports backfilling historical data
  • Ideal for data warehousing and analytics workloads
Example use cases:
  • Daily revenue reporting with complex aggregations
  • Monthly financial reconciliation
  • Customer segmentation analysis
  • Data quality checks and validation
  • Feature engineering for machine learning
Learn more: Building Batch Pipelines

Streaming Pipelines

Use streaming pipelines when:
  • You need real-time data processing (sub-second to second latency)
  • You’re processing continuous data streams (Kafka, Kinesis, etc.)
  • You require immediate insights or alerts
  • You’re building real-time analytics or monitoring systems
  • You need event-driven architectures
  • Low latency is critical (milliseconds to seconds)
Characteristics:
  • Long-running processes that continuously monitor sources
  • Processes data as it arrives (not in batches)
  • Event-driven architecture
  • Lower latency than batch processing
  • Supports stateful processing with window aggregations
Example use cases:
  • Real-time fraud detection
  • Live dashboard updates
  • IoT sensor data processing
  • Real-time recommendation systems
  • Event-driven data transformations
  • Real-time data quality monitoring
Learn more: Streaming Pipeline Introduction

Decision Matrix

RequirementData IntegrationBatchStreaming
LatencyMinutes to hoursMinutes to hoursMilliseconds to seconds
Code ComplexityLow (mostly config)High (custom code)Medium (transformers)
Use CaseData syncScheduled processingReal-time processing
Data VolumeSmall to largeLargeContinuous streams
Custom LogicLimitedFull flexibilityTransformers only
State ManagementAutomaticManualAutomatic (optional)
Hybrid Approach: You can combine pipeline types:
  • Use data integration pipelines to sync data into your warehouse
  • Use batch pipelines to transform and aggregate the synced data
  • Use streaming pipelines for real-time processing of critical events

Pipeline Structure

Organize pipelines logically:
  1. Data Integration Pipelines: Extract and load data from sources
  2. Transformation Pipelines: Transform and clean data
  3. Analytics Pipelines: Generate reports and metrics
  4. Orchestration Pipelines: Coordinate multiple pipelines

Pipeline Naming

Use consistent naming conventions:
  • Format: <domain>_<purpose>_<frequency> (e.g., sales_daily_revenue_report)
  • Examples:
    • marketing_campaign_attribution_hourly
    • finance_monthly_reconciliation
    • product_user_behavior_realtime

Pipeline Tagging

Use tags to organize and group related pipelines for easier discovery and management. ✅ Best Practices for Tagging:
  • Use consistent tag names: Establish a tagging convention across your team
    • Domain tags: sales, marketing, finance, product
    • Type tags: etl, analytics, reporting, integration
    • Environment tags: production, staging, development
    • Priority tags: critical, high, low
  • Apply multiple tags: Pipelines can have multiple tags for flexible grouping
    # Example: A pipeline might have tags: ['sales', 'etl', 'production', 'critical']
    
  • Group by tags: Use the pipeline list view to group and filter by tags
    • Right-click a pipeline → “Add/Remove tags”
    • Select or type tags (separated by Enter)
    • Filter and group pipelines by tag on the Pipeline page
  • Tag naming conventions:
    • Use lowercase with underscores: data_quality, customer_analytics
    • Keep tags short and descriptive
    • Avoid special characters and spaces
Benefits of Tagging:
  • Quickly find related pipelines
  • Filter pipelines by domain, type, or environment
  • Organize pipelines for different teams or projects
  • Group pipelines for monitoring and alerting
For more details, see the Pipeline Tagging Guide.

Limit Pipeline Complexity

✅ Recommended:
  • Block count: 10-30 blocks per pipeline
  • Depth: Maximum 5-7 levels of dependencies
  • Branches: Limit parallel branches to 5-10
⚠️ If your pipeline exceeds these limits:
  • Split into multiple pipelines
  • Use orchestration pipelines to coordinate
  • Extract common logic into reusable blocks

Dependency Management

  • Minimize dependencies: Only create dependencies where data actually flows
  • Avoid circular dependencies: Design acyclic graphs
  • Use clear data contracts: Document expected input/output schemas

Performance Optimization

Efficient Data Processing

Batch Processing:
# ✅ Process data in batches
def transform_large_dataset(df):
    batch_size = 10000
    results = []
    for i in range(0, len(df), batch_size):
        batch = df.iloc[i:i+batch_size]
        transformed = process_batch(batch)
        results.append(transformed)
    return pd.concat(results)
Use Appropriate Data Types:
# ✅ Optimize data types
df['id'] = df['id'].astype('int32')  # Instead of int64
df['category'] = df['category'].astype('category')  # Memory efficient

Leverage Spark for Large Datasets

For datasets larger than memory or requiring distributed processing:
# ✅ Use Spark for large-scale transformations
from mage_ai.data_preparation.models.block import Block

@block
def transform_with_spark(df):
    spark_df = spark.createDataFrame(df)
    result = spark_df.groupBy('category').agg({'value': 'sum'})
    return result.toPandas()

Optimize API Calls

Batch API Requests:
# ✅ Batch API calls
def load_data_from_api(ids):
    # Process in batches instead of one-by-one
    batch_size = 100
    results = []
    for i in range(0, len(ids), batch_size):
        batch = ids[i:i+batch_size]
        response = api.get_batch(batch)
        results.extend(response)
    return results
Use Connection Pooling:
# ✅ Reuse connections
from sqlalchemy import create_engine

# Create connection pool at module level
engine = create_engine('postgresql://...', pool_size=10)

def load_data():
    return pd.read_sql("SELECT * FROM table", engine)

Memory Management

Stream Large Files:
# ✅ Stream processing for large files
def process_large_file(file_path):
    chunk_size = 10000
    for chunk in pd.read_csv(file_path, chunksize=chunk_size):
        process_chunk(chunk)
Clean Up Resources:
# ✅ Explicitly close connections and free memory
def load_data():
    conn = create_connection()
    try:
        df = pd.read_sql("SELECT * FROM table", conn)
        return df
    finally:
        conn.close()
        del df  # Help garbage collector

Pipeline Performance

Choose the Right Executor:
  • local_python executor: Use for faster execution when you have sufficient local resources. Blocks run in the same process or separate processes on the same machine, which reduces overhead and improves performance for smaller to medium-sized pipelines.
  • k8s executor: Use for scalability when you need more resources or want to distribute block execution across multiple pods. Ideal for large-scale pipelines, resource-intensive transformations, or when you need to scale horizontally.
Optimize Execution Mode:
  • run_pipeline_in_one_process: true: Speeds up pipeline execution by running all blocks in a single process. This reduces process creation overhead and improves performance for pipelines with many small blocks or when blocks need to share memory efficiently.
  • run_pipeline_in_one_process: false: Runs blocks in separate processes (for local_python executor) or separate pods (for k8s executor). Use this when:
    • Blocks need isolation (e.g., different resource requirements)
    • You want to leverage parallel execution across multiple processes/pods
    • Memory isolation is important to prevent one block from affecting others
Optimize Block Execution:
  • Use dynamic blocks for parallel processing
  • Leverage conditional execution for optional steps
  • Cache intermediate results when appropriate
Control Concurrency: Configure concurrency limits to optimize resource usage and prevent system overload. You can set limits at both the global (project) and pipeline levels:
  • Global concurrency: Limit the maximum number of concurrent block runs across all pipelines
  • Pipeline-level concurrency: Control concurrent block runs and pipeline runs per trigger
  • Per-block concurrency: Set specific concurrency limits for individual blocks
For more information, see the Concurrency Control Guide.

Testing & Validation

Write Tests in Blocks

Mage’s built-in testing framework makes it easy to write tests alongside your code:
# ✅ Include tests in your blocks
def transform_data(df):
    # Transform logic
    df['normalized_value'] = df['value'] / df['max_value']
    return df

def test_output(output):
    """Test that transformation produces valid results."""
    assert output is not None, "Output should not be None"
    assert 'normalized_value' in output.columns, "Missing normalized_value column"
    assert (output['normalized_value'] <= 1.0).all(), "Values should be normalized"
    assert output['normalized_value'].notna().all(), "No null values allowed"

Data Quality Checks

Schema Validation:
def validate_schema(df, expected_schema):
    """Validate DataFrame schema matches expected structure."""
    assert set(df.columns) == set(expected_schema.keys()), "Column mismatch"
    for col, dtype in expected_schema.items():
        assert df[col].dtype == dtype, f"Type mismatch for {col}"
Data Quality Rules:
def validate_data_quality(df):
    """Comprehensive data quality checks."""
    # Check for nulls in critical columns
    assert df['customer_id'].notna().all(), "No null customer IDs allowed"
    
    # Check value ranges
    assert (df['age'] >= 0).all() and (df['age'] <= 150).all(), "Invalid age range"
    
    # Check for duplicates
    assert df['email'].is_unique, "Duplicate emails found"
    
    # Check referential integrity
    assert df['status'].isin(['active', 'inactive', 'pending']).all(), "Invalid status values"

Test-Driven Development

Write tests before or alongside implementation:
  1. Define expected behavior in test functions
  2. Implement the logic to pass tests
  3. Run tests automatically on each block execution
  4. Refactor while keeping tests green

Error Handling & Reliability

Graceful Error Handling

Handle Expected Errors:
# ✅ Handle errors gracefully
def load_data_from_api():
    try:
        response = api.get_data()
        return response
    except APIError as e:
        logger.error(f"API error: {e}")
        # Return empty DataFrame or cached data
        return pd.DataFrame()
    except Exception as e:
        logger.error(f"Unexpected error: {e}")
        raise
Validate Inputs:
# ✅ Validate inputs early
def transform_data(df):
    if df is None or df.empty:
        raise ValueError("Input DataFrame is empty")
    
    required_columns = ['id', 'name', 'value']
    missing = set(required_columns) - set(df.columns)
    if missing:
        raise ValueError(f"Missing required columns: {missing}")
    
    # Continue with transformation
    return df

Retry Logic

Implement Retry for Transient Failures: You can implement retry logic in your code for handling transient failures, or use Mage’s built-in automatic retry feature for block runs. Code-level retry:
from tenacity import retry, stop_after_attempt, wait_exponential

@retry(stop=stop_after_attempt(3), wait=wait_exponential(multiplier=1, min=4, max=10))
def load_data_with_retry():
    """Retry on transient failures."""
    return api.get_data()
Automatic retry for block runs: Mage provides automatic retry functionality for failed block runs. This is especially useful for handling transient network issues, temporary API rate limits, or other recoverable errors. Configure automatic retry settings to automatically retry failed blocks without manual intervention. For more information, see the Automatic Retry Guide.

Idempotency

Design blocks to be idempotent (safe to run multiple times):
# ✅ Idempotent block
def export_data(df):
    """Export data, safe to run multiple times."""
    # Use upsert instead of insert
    df.to_sql('table', engine, if_exists='replace', index=False)
    # Or use merge/upsert logic
    # merge_data(df, 'table', on='id')

Checkpointing

For long-running pipelines, implement checkpointing:
# ✅ Save intermediate state
def process_large_dataset(df):
    checkpoint_file = 'checkpoint.pkl'
    
    # Load checkpoint if exists
    if os.path.exists(checkpoint_file):
        processed = pd.read_pickle(checkpoint_file)
        start_idx = len(processed)
    else:
        processed = []
        start_idx = 0
    
    # Process remaining data
    for i in range(start_idx, len(df)):
        result = process_row(df.iloc[i])
        processed.append(result)
        
        # Save checkpoint every 1000 rows
        if i % 1000 == 0:
            pd.DataFrame(processed).to_pickle(checkpoint_file)
    
    return pd.DataFrame(processed)

Resource Management

Compute Resources

Right-Size Your Resources:
  • Use appropriate compute clusters for workload size
  • Monitor CPU and memory usage in Cluster Manager
  • Scale up for large transformations, scale down for simple tasks
Use Spark for Large Datasets:
  • Enable Spark for datasets > 10GB
  • Configure Spark resources appropriately
  • Monitor Spark job performance

Connection Management

Reuse Connections:
# ✅ Module-level connection (reused across runs)
import os
from sqlalchemy import create_engine

DATABASE_URL = os.getenv('DATABASE_URL')
engine = create_engine(DATABASE_URL, pool_size=5, max_overflow=10)

def load_data():
    return pd.read_sql("SELECT * FROM table", engine)
Close Connections Properly:
# ✅ Context manager for connections
def load_data():
    with create_connection() as conn:
        df = pd.read_sql("SELECT * FROM table", conn)
        return df
    # Connection automatically closed

Memory Optimization

Process Data in Chunks:
# ✅ Chunk processing for large datasets
def process_large_file(file_path):
    chunk_size = 50000
    results = []
    
    for chunk in pd.read_csv(file_path, chunksize=chunk_size):
        processed = transform_chunk(chunk)
        results.append(processed)
        # Clear memory
        del chunk, processed
    
    return pd.concat(results)
Use Efficient Data Structures:
# ✅ Use appropriate data types
df['id'] = df['id'].astype('int32')  # 50% memory savings vs int64
df['category'] = df['category'].astype('category')  # Efficient for repeated values
df['date'] = pd.to_datetime(df['date'])  # Proper datetime type

Collaboration & Version Control

Use Workspaces Effectively

Separate Environments:
  • Development: For experimentation and testing
  • Staging: For pre-production validation
  • Production: For live data pipelines
Workspace Best Practices:
  • Use consistent naming across workspaces
  • Document workspace purposes
  • Limit production workspace access

Version Control Integration

Commit Frequently:
  • Commit after completing each block or feature
  • Use descriptive commit messages
  • Tag releases and important milestones
Branch Strategy:
  • Use feature branches for new development
  • Keep main/master branch stable
  • Use pull requests for code review
Git Best Practices:
# ✅ Good commit messages
git commit -m "feat: Add customer data transformation block"
git commit -m "fix: Resolve null handling in revenue calculation"
git commit -m "refactor: Extract common validation logic to utility"

Code Review

Review Checklist:
  • Code follows team standards
  • Tests are included and passing
  • Error handling is appropriate
  • Documentation is clear
  • Performance considerations addressed
  • Security concerns reviewed

Documentation

Document Your Blocks:
def transform_customer_data(df):
    """
    Transform customer data by standardizing formats and cleaning values.
    
    Args:
        df: DataFrame with customer data containing columns:
            - customer_id: Unique customer identifier
            - email: Customer email address
            - name: Customer full name
            - created_at: Account creation timestamp
    
    Returns:
        DataFrame with transformed customer data:
            - customer_id: Same as input
            - email: Lowercase, validated email addresses
            - first_name: Extracted from name
            - last_name: Extracted from name
            - created_at: Parsed datetime object
    
    Raises:
        ValueError: If required columns are missing
    """
    # Implementation...
Pipeline Documentation:
  • Document pipeline purpose and business logic
  • Include data flow diagrams
  • Note dependencies and requirements
  • Document expected run times and resource needs

Monitoring & Observability

Logging

Use Structured Logging:
import logging

logger = logging.getLogger(__name__)

def transform_data(df):
    logger.info(f"Starting transformation for {len(df)} rows")
    
    try:
        result = perform_transformation(df)
        logger.info(f"Transformation completed successfully. Output: {len(result)} rows")
        return result
    except Exception as e:
        logger.error(f"Transformation failed: {e}", exc_info=True)
        raise
Log Key Metrics:
def process_data(df):
    start_time = time.time()
    logger.info(f"Processing {len(df)} records")
    
    result = transform(df)
    
    duration = time.time() - start_time
    logger.info(f"Processed {len(result)} records in {duration:.2f}s")
    logger.info(f"Processing rate: {len(result)/duration:.0f} records/sec")
    
    return result

Monitoring Pipeline Health

Set Up Alerts:
  • Pipeline failure alerts
  • Performance degradation alerts
  • Data quality threshold alerts
  • Resource usage alerts
Monitor Key Metrics:
  • Pipeline run duration
  • Block execution times
  • Data volume processed
  • Error rates
  • Resource utilization

Data Quality Monitoring

Track Data Quality Metrics:
def validate_and_log_quality(df):
    """Validate data and log quality metrics."""
    metrics = {
        'row_count': len(df),
        'null_percentage': df.isnull().sum() / len(df) * 100,
        'duplicate_count': df.duplicated().sum(),
    }
    
    logger.info(f"Data quality metrics: {metrics}")
    
    # Alert if quality thresholds exceeded
    if metrics['null_percentage'].max() > 10:
        logger.warning(f"High null percentage detected: {metrics['null_percentage']}")
    
    return df

Security & Governance

Authentication

User Authentication is Enabled by Default:
  • Mage Pro: User authentication is always enabled by default (REQUIRE_USER_AUTHENTICATION=True)
  • Mage OSS (0.9.78+): User authentication is enabled by default
  • Mage OSS (0.8.4-0.9.77): Can be enabled by setting REQUIRE_USER_AUTHENTICATION=1
Authentication ensures that only authorized users can access the Mage UI and APIs. All users must sign in before creating, managing, or running pipelines. IP Whitelisting (Recommended for SaaS and Self-Hosted Mage Pro): For additional security, especially for SaaS or self-hosted Mage Pro deployments, it’s recommended to set up an IP whitelist to restrict access to your Mage instance. Implementation options:
  • Load Balancer/Reverse Proxy: Configure IP whitelisting at the load balancer or reverse proxy level (e.g., Nginx, AWS ALB, Azure Application Gateway)
  • Firewall Rules: Use network-level firewall rules to restrict access
  • VPN/Private Network: Deploy Mage Pro in a private network accessible only via VPN
Best Practices:
  • Whitelist only necessary IP ranges (office networks, VPN endpoints)
  • Regularly review and update the whitelist
  • Use CIDR notation for network ranges
  • Document all whitelisted IPs and their purposes
  • Consider using a VPN for remote access instead of public IPs
For more information, see the Authentication Guide.

Secrets Management

Use Mage Secrets:
  • Store API keys, passwords, and tokens in Mage’s secrets management
  • Never hardcode credentials in blocks
  • Rotate secrets regularly
# ✅ Use secrets
import os

api_key = os.getenv('SALESFORCE_API_KEY')
database_password = os.getenv('DATABASE_PASSWORD')
For more information on creating and managing secrets, see the Secrets Guide.

Access Control

Follow Principle of Least Privilege:
  • Grant minimum necessary permissions
  • Use workspace-level access controls
  • Review and audit access regularly

Data Privacy

Handle Sensitive Data Appropriately:
  • Mask PII in logs and outputs
  • Encrypt sensitive data at rest and in transit
  • Comply with data protection regulations (GDPR, CCPA, etc.)
# ✅ Mask sensitive data in logs
import hashlib

def log_customer_data(customer_id, email):
    # Hash sensitive information
    email_hash = hashlib.sha256(email.encode()).hexdigest()[:8]
    logger.info(f"Processing customer {customer_id} with email {email_hash}...")

Code Security

Security Best Practices:
  • Avoid executing user input directly
  • Validate and sanitize all inputs
  • Use parameterized queries for databases
  • Keep dependencies updated
# ✅ Parameterized queries
def load_customer_data(customer_id):
    query = "SELECT * FROM customers WHERE id = :id"
    return pd.read_sql(query, engine, params={'id': customer_id})

# ❌ Avoid SQL injection
# query = f"SELECT * FROM customers WHERE id = {customer_id}"

Deployment & CI/CD

Use CI/CD Deployments (Mage Pro)

Deployment Workflow:
  1. Develop in development workspace
  2. Test in staging workspace
  3. Deploy to production using CI/CD
Best Practices:
  • Automate deployments through CI/CD
  • Use deployment pipelines for consistency
  • Test deployments in staging first
  • Roll back quickly if issues occur

Environment Management

Separate Configurations:
# ✅ Environment-specific configuration
import os

ENV = os.getenv('ENVIRONMENT', 'development')

if ENV == 'production':
    database_url = os.getenv('PROD_DATABASE_URL')
    api_base_url = 'https://api.production.com'
elif ENV == 'staging':
    database_url = os.getenv('STAGING_DATABASE_URL')
    api_base_url = 'https://api.staging.com'
else:
    database_url = os.getenv('DEV_DATABASE_URL')
    api_base_url = 'https://api.dev.com'

Pipeline Scheduling

Schedule Appropriately:
  • Match schedule to data freshness requirements
  • Consider source system load
  • Account for pipeline execution time
  • Use appropriate time zones
Error Handling in Scheduled Pipelines:
  • Set up retry policies using automatic retry for block runs
  • Configure alerting for failures
  • Implement dead letter queues for failed runs

Backfilling

Backfill Strategy:
  • Use Mage’s built-in backfilling for historical data
  • Process in chunks to avoid resource exhaustion
  • Monitor backfill progress
  • Validate backfilled data

Summary

Following these best practices will help you: Build reliable pipelines with proper error handling and testing
Optimize performance through efficient data processing and resource management
Maintain code quality with modular, reusable blocks
Collaborate effectively using version control and workspaces
Monitor and debug with comprehensive logging and observability
Deploy safely using CI/CD and environment management
Remember: Start with these practices and adapt them to your team’s specific needs and constraints. The goal is to build maintainable, scalable, and reliable data pipelines.