> ## Documentation Index
> Fetch the complete documentation index at: https://docs.mage.ai/llms.txt
> Use this file to discover all available pages before exploring further.

# Best Practices

> Learn best practices for building, maintaining, and scaling data pipelines in Mage Pro.

This guide provides comprehensive best practices for building, maintaining, and scaling data pipelines in Mage Pro. Following these practices will help you create reliable, performant, and maintainable data engineering solutions.

## Table of Contents

* [Block Design & Architecture](#block-design-architecture)
* [Pipeline Organization](#pipeline-organization)
* [Performance Optimization](#performance-optimization)
* [Testing & Validation](#testing-validation)
* [Error Handling & Reliability](#error-handling-reliability)
* [Resource Management](#resource-management)
* [Collaboration & Version Control](#collaboration-version-control)
* [Monitoring & Observability](#monitoring-observability)
* [Security & Governance](#security-governance)
* [Deployment & CI/CD](#deployment-cicd)

***

## Block Design & Architecture

### Single Responsibility Principle

Each block should have a single, well-defined purpose. This makes blocks easier to understand, test, and maintain.

**✅ Good:**

```python theme={"system"}
# Block: load_customer_data
def load_customer_data():
    """Load customer data from source database."""
    return pd.read_sql("SELECT * FROM customers", connection)

# Block: transform_customer_data
def transform_customer_data(df):
    """Clean and standardize customer data."""
    df = df.dropna(subset=['email'])
    df['email'] = df['email'].str.lower()
    return df
```

**❌ Avoid:**

```python theme={"system"}
# Block: load_and_transform_customers
def load_and_transform_customers():
    """Loads, transforms, and exports customer data."""
    # Loading, transforming, and exporting in one block
    df = pd.read_sql("SELECT * FROM customers", connection)
    df = df.dropna(subset=['email'])
    df.to_parquet('output.parquet')
```

### Modularity & Reusability

Design blocks to be reusable across pipelines and projects. Use clear, descriptive names and avoid hardcoded values.

**✅ Good:**

```python theme={"system"}
# Block: load_data_from_source
def load_data_from_source(source_table: str, connection_string: str):
    """Generic data loader that can be reused for different tables."""
    return pd.read_sql(f"SELECT * FROM {source_table}", connection_string)
```

**❌ Avoid:**

```python theme={"system"}
# Block: load_customers_jan_2024
def load_customers_jan_2024():
    """Hardcoded for specific use case."""
    return pd.read_sql("SELECT * FROM customers WHERE month='2024-01'", conn)
```

### Block Naming Conventions

Use clear, descriptive names that indicate the block's purpose:

* **Data loaders**: `load_<source>_<entity>` (e.g., `load_salesforce_contacts`)
* **Transformers**: `transform_<action>_<entity>` (e.g., `transform_clean_customer_data`)
* **Data exporters**: `export_<entity>_to_<destination>` (e.g., `export_customers_to_warehouse`)

### Organizing Blocks in Subdirectories

Organize blocks in subdirectories within their respective block type folders to improve project structure and maintainability.

**✅ Benefits:**

* Better organization for large projects
* Easier to find related blocks
* Clearer project structure
* Supports team collaboration

**How to Organize:**

1. **Create subdirectories by category:**
   * By source type: `api/`, `database/`, `file/`, `cloud/`
   * By domain: `sales/`, `marketing/`, `finance/`
   * By function: `validation/`, `cleaning/`, `aggregation/`

2. **Using the UI:**
   * When creating a block, name it with the subdirectory: `subfolder_name/block_name`
   * Example: `api/load_stripe` creates `data_loaders/api/load_stripe.py`
   * The subdirectory is automatically created if it doesn't exist

3. **Example structure:**
   ```
   data_loaders/
   ├── api/
   │   ├── load_stripe.py
   │   ├── load_salesforce.py
   │   └── load_hubspot.py
   ├── database/
   │   ├── load_postgres.py
   │   └── load_mysql.py
   └── file/
       ├── load_csv.py
       └── load_parquet.py

   transformers/
   ├── cleaning/
   │   ├── remove_duplicates.py
   │   └── standardize_format.py
   └── aggregation/
       ├── calculate_metrics.py
       └── group_by_category.py
   ```

4. **Best practices:**
   * Use consistent subdirectory naming across block types
   * Keep subdirectory depth to 1-2 levels maximum
   * Group related blocks together logically
   * Document your organization structure for your team

For more details, see the [Project Structure Guide](/design/abstractions/project-structure).

### Block Size Guidelines

Keep blocks focused and reasonably sized:

* **Ideal**: 50-200 lines of code per block
* **Maximum**: 500 lines (consider splitting if larger)
* **Minimum**: Enough code to accomplish one clear task

### Use Block Templates

Create reusable block templates for common patterns:

* Standard ETL patterns
* Data validation patterns
* Error handling templates
* Logging templates

This ensures consistency across your team and reduces duplicate code.

***

## Pipeline Organization

### Choosing the Right Pipeline Type

Mage supports three main pipeline types, each optimized for different use cases. Understanding when to use each type is crucial for building efficient and maintainable data pipelines.

#### Data Integration Pipelines

**Use data integration pipelines when:**

* You need to **sync data between systems** (external to internal, or internal to external)
* You're working with **pre-built connectors** for SaaS tools, databases, or data warehouses
* You want **low-code/no-code** data synchronization
* You need **incremental sync** capabilities with automatic state tracking
* You're doing **reverse ETL** (syncing data from your warehouse to external systems)

**Characteristics:**

* Uses Singer spec-compliant connectors
* Handles schema migrations automatically
* Supports incremental and full sync modes
* Minimal code required - mostly configuration
* Can be combined with batch pipelines for custom transformations

**Example use cases:**

* Syncing Salesforce contacts to your data warehouse
* Loading Stripe transaction data into BigQuery
* Exporting customer segments from Snowflake to HubSpot
* Syncing product data from PostgreSQL to MongoDB

**Learn more:** [Data Integration Overview](/data-integrations/overview)

#### Batch Pipelines

**Use batch pipelines when:**

* You need **scheduled data processing** (hourly, daily, weekly)
* You require **custom transformations** using Python, SQL, or R
* You're building **ETL/ELT workflows** with complex business logic
* You need to **process large volumes** of historical data
* You want to **combine multiple data sources** and apply custom logic
* You're building **analytics pipelines** for reporting and dashboards

**Characteristics:**

* Runs on a schedule, completes, and exits
* Full flexibility with custom code blocks
* Can combine data integration blocks with custom transformers
* Supports backfilling historical data
* Ideal for data warehousing and analytics workloads

**Example use cases:**

* Daily revenue reporting with complex aggregations
* Monthly financial reconciliation
* Customer segmentation analysis
* Data quality checks and validation
* Feature engineering for machine learning

**Learn more:** [Building Batch Pipelines](/getting-started/build-pro-pipeline)

#### Streaming Pipelines

**Use streaming pipelines when:**

* You need **real-time data processing** (sub-second to second latency)
* You're processing **continuous data streams** (Kafka, Kinesis, etc.)
* You require **immediate insights** or alerts
* You're building **real-time analytics** or monitoring systems
* You need **event-driven architectures**
* Low latency is critical (milliseconds to seconds)

**Characteristics:**

* Long-running processes that continuously monitor sources
* Processes data as it arrives (not in batches)
* Event-driven architecture
* Lower latency than batch processing
* Supports stateful processing with window aggregations

**Example use cases:**

* Real-time fraud detection
* Live dashboard updates
* IoT sensor data processing
* Real-time recommendation systems
* Event-driven data transformations
* Real-time data quality monitoring

**Learn more:** [Streaming Pipeline Introduction](/guides/streaming/introduction)

#### Decision Matrix

| Requirement          | Data Integration    | Batch                | Streaming               |
| -------------------- | ------------------- | -------------------- | ----------------------- |
| **Latency**          | Minutes to hours    | Minutes to hours     | Milliseconds to seconds |
| **Code Complexity**  | Low (mostly config) | High (custom code)   | Medium (transformers)   |
| **Use Case**         | Data sync           | Scheduled processing | Real-time processing    |
| **Data Volume**      | Small to large      | Large                | Continuous streams      |
| **Custom Logic**     | Limited             | Full flexibility     | Transformers only       |
| **State Management** | Automatic           | Manual               | Automatic (optional)    |

**Hybrid Approach:**
You can combine pipeline types:

* Use **data integration pipelines** to sync data into your warehouse
* Use **batch pipelines** to transform and aggregate the synced data
* Use **streaming pipelines** for real-time processing of critical events

### Pipeline Structure

Organize pipelines logically:

1. **Data Integration Pipelines**: Extract and load data from sources
2. **Transformation Pipelines**: Transform and clean data
3. **Analytics Pipelines**: Generate reports and metrics
4. **Orchestration Pipelines**: Coordinate multiple pipelines

### Pipeline Naming

Use consistent naming conventions:

* **Format**: `<domain>_<purpose>_<frequency>` (e.g., `sales_daily_revenue_report`)
* **Examples**:
  * `marketing_campaign_attribution_hourly`
  * `finance_monthly_reconciliation`
  * `product_user_behavior_realtime`

### Pipeline Tagging

Use tags to organize and group related pipelines for easier discovery and management.

**✅ Best Practices for Tagging:**

* **Use consistent tag names**: Establish a tagging convention across your team
  * Domain tags: `sales`, `marketing`, `finance`, `product`
  * Type tags: `etl`, `analytics`, `reporting`, `integration`
  * Environment tags: `production`, `staging`, `development`
  * Priority tags: `critical`, `high`, `low`

* **Apply multiple tags**: Pipelines can have multiple tags for flexible grouping
  ```python theme={"system"}
  # Example: A pipeline might have tags: ['sales', 'etl', 'production', 'critical']
  ```

* **Group by tags**: Use the pipeline list view to group and filter by tags
  * Right-click a pipeline → "Add/Remove tags"
  * Select or type tags (separated by Enter)
  * Filter and group pipelines by tag on the Pipeline page

* **Tag naming conventions:**
  * Use lowercase with underscores: `data_quality`, `customer_analytics`
  * Keep tags short and descriptive
  * Avoid special characters and spaces

**Benefits of Tagging:**

* Quickly find related pipelines
* Filter pipelines by domain, type, or environment
* Organize pipelines for different teams or projects
* Group pipelines for monitoring and alerting

For more details, see the [Pipeline Tagging Guide](/pipelines/pipeline-tagging).

### Limit Pipeline Complexity

**✅ Recommended:**

* **Block count**: 10-30 blocks per pipeline
* **Depth**: Maximum 5-7 levels of dependencies
* **Branches**: Limit parallel branches to 5-10

**⚠️ If your pipeline exceeds these limits:**

* Split into multiple pipelines
* Use orchestration pipelines to coordinate
* Extract common logic into reusable blocks

### Dependency Management

* **Minimize dependencies**: Only create dependencies where data actually flows
* **Avoid circular dependencies**: Design acyclic graphs
* **Use clear data contracts**: Document expected input/output schemas

***

## Performance Optimization

### Efficient Data Processing

**Batch Processing:**

```python theme={"system"}
# ✅ Process data in batches
def transform_large_dataset(df):
    batch_size = 10000
    results = []
    for i in range(0, len(df), batch_size):
        batch = df.iloc[i:i+batch_size]
        transformed = process_batch(batch)
        results.append(transformed)
    return pd.concat(results)
```

**Use Appropriate Data Types:**

```python theme={"system"}
# ✅ Optimize data types
df['id'] = df['id'].astype('int32')  # Instead of int64
df['category'] = df['category'].astype('category')  # Memory efficient
```

### Leverage Spark for Large Datasets

For datasets larger than memory or requiring distributed processing:

```python theme={"system"}
# ✅ Use Spark for large-scale transformations
from mage_ai.data_preparation.models.block import Block

@block
def transform_with_spark(df):
    spark_df = spark.createDataFrame(df)
    result = spark_df.groupBy('category').agg({'value': 'sum'})
    return result.toPandas()
```

### Optimize API Calls

**Batch API Requests:**

```python theme={"system"}
# ✅ Batch API calls
def load_data_from_api(ids):
    # Process in batches instead of one-by-one
    batch_size = 100
    results = []
    for i in range(0, len(ids), batch_size):
        batch = ids[i:i+batch_size]
        response = api.get_batch(batch)
        results.extend(response)
    return results
```

**Use Connection Pooling:**

```python theme={"system"}
# ✅ Reuse connections
from sqlalchemy import create_engine

# Create connection pool at module level
engine = create_engine('postgresql://...', pool_size=10)

def load_data():
    return pd.read_sql("SELECT * FROM table", engine)
```

### Memory Management

**Stream Large Files:**

```python theme={"system"}
# ✅ Stream processing for large files
def process_large_file(file_path):
    chunk_size = 10000
    for chunk in pd.read_csv(file_path, chunksize=chunk_size):
        process_chunk(chunk)
```

**Clean Up Resources:**

```python theme={"system"}
# ✅ Explicitly close connections and free memory
def load_data():
    conn = create_connection()
    try:
        df = pd.read_sql("SELECT * FROM table", conn)
        return df
    finally:
        conn.close()
        del df  # Help garbage collector
```

### Pipeline Performance

**Choose the Right Executor:**

* **`local_python` executor**: Use for faster execution when you have sufficient local resources. Blocks run in the same process or separate processes on the same machine, which reduces overhead and improves performance for smaller to medium-sized pipelines.

* **`k8s` executor**: Use for scalability when you need more resources or want to distribute block execution across multiple pods. Ideal for large-scale pipelines, resource-intensive transformations, or when you need to scale horizontally.

**Optimize Execution Mode:**

* **`run_pipeline_in_one_process: true`**: Speeds up pipeline execution by running all blocks in a single process. This reduces process creation overhead and improves performance for pipelines with many small blocks or when blocks need to share memory efficiently.

* **`run_pipeline_in_one_process: false`**: Runs blocks in separate processes (for `local_python` executor) or separate pods (for `k8s` executor). Use this when:
  * Blocks need isolation (e.g., different resource requirements)
  * You want to leverage parallel execution across multiple processes/pods
  * Memory isolation is important to prevent one block from affecting others

**Optimize Block Execution:**

* Use dynamic blocks for parallel processing
* Leverage conditional execution for optional steps
* Cache intermediate results when appropriate

**Control Concurrency:**

Configure concurrency limits to optimize resource usage and prevent system overload. You can set limits at both the global (project) and pipeline levels:

* **Global concurrency**: Limit the maximum number of concurrent block runs across all pipelines
* **Pipeline-level concurrency**: Control concurrent block runs and pipeline runs per trigger
* **Per-block concurrency**: Set specific concurrency limits for individual blocks

For more information, see the [Concurrency Control Guide](/design/data-pipeline-management#concurrency).

***

## Testing & Validation

### Write Tests in Blocks

Mage's built-in testing framework makes it easy to write tests alongside your code:

```python theme={"system"}
# ✅ Include tests in your blocks
def transform_data(df):
    # Transform logic
    df['normalized_value'] = df['value'] / df['max_value']
    return df

def test_output(output):
    """Test that transformation produces valid results."""
    assert output is not None, "Output should not be None"
    assert 'normalized_value' in output.columns, "Missing normalized_value column"
    assert (output['normalized_value'] <= 1.0).all(), "Values should be normalized"
    assert output['normalized_value'].notna().all(), "No null values allowed"
```

### Data Quality Checks

**Schema Validation:**

```python theme={"system"}
def validate_schema(df, expected_schema):
    """Validate DataFrame schema matches expected structure."""
    assert set(df.columns) == set(expected_schema.keys()), "Column mismatch"
    for col, dtype in expected_schema.items():
        assert df[col].dtype == dtype, f"Type mismatch for {col}"
```

**Data Quality Rules:**

```python theme={"system"}
def validate_data_quality(df):
    """Comprehensive data quality checks."""
    # Check for nulls in critical columns
    assert df['customer_id'].notna().all(), "No null customer IDs allowed"
    
    # Check value ranges
    assert (df['age'] >= 0).all() and (df['age'] <= 150).all(), "Invalid age range"
    
    # Check for duplicates
    assert df['email'].is_unique, "Duplicate emails found"
    
    # Check referential integrity
    assert df['status'].isin(['active', 'inactive', 'pending']).all(), "Invalid status values"
```

### Test-Driven Development

Write tests before or alongside implementation:

1. **Define expected behavior** in test functions
2. **Implement the logic** to pass tests
3. **Run tests** automatically on each block execution
4. **Refactor** while keeping tests green

***

## Error Handling & Reliability

### Graceful Error Handling

**Handle Expected Errors:**

```python theme={"system"}
# ✅ Handle errors gracefully
def load_data_from_api():
    try:
        response = api.get_data()
        return response
    except APIError as e:
        logger.error(f"API error: {e}")
        # Return empty DataFrame or cached data
        return pd.DataFrame()
    except Exception as e:
        logger.error(f"Unexpected error: {e}")
        raise
```

**Validate Inputs:**

```python theme={"system"}
# ✅ Validate inputs early
def transform_data(df):
    if df is None or df.empty:
        raise ValueError("Input DataFrame is empty")
    
    required_columns = ['id', 'name', 'value']
    missing = set(required_columns) - set(df.columns)
    if missing:
        raise ValueError(f"Missing required columns: {missing}")
    
    # Continue with transformation
    return df
```

### Retry Logic

**Implement Retry for Transient Failures:**

You can implement retry logic in your code for handling transient failures, or use Mage's built-in automatic retry feature for block runs.

**Code-level retry:**

```python theme={"system"}
from tenacity import retry, stop_after_attempt, wait_exponential

@retry(stop=stop_after_attempt(3), wait=wait_exponential(multiplier=1, min=4, max=10))
def load_data_with_retry():
    """Retry on transient failures."""
    return api.get_data()
```

**Automatic retry for block runs:**

Mage provides automatic retry functionality for failed block runs. This is especially useful for handling transient network issues, temporary API rate limits, or other recoverable errors. Configure automatic retry settings to automatically retry failed blocks without manual intervention.

For more information, see the [Automatic Retry Guide](/orchestration/pipeline-runs/retrying-block-runs#automatic-retry).

### Idempotency

Design blocks to be idempotent (safe to run multiple times):

```python theme={"system"}
# ✅ Idempotent block
def export_data(df):
    """Export data, safe to run multiple times."""
    # Use upsert instead of insert
    df.to_sql('table', engine, if_exists='replace', index=False)
    # Or use merge/upsert logic
    # merge_data(df, 'table', on='id')
```

### Checkpointing

For long-running pipelines, implement checkpointing:

```python theme={"system"}
# ✅ Save intermediate state
def process_large_dataset(df):
    checkpoint_file = 'checkpoint.pkl'
    
    # Load checkpoint if exists
    if os.path.exists(checkpoint_file):
        processed = pd.read_pickle(checkpoint_file)
        start_idx = len(processed)
    else:
        processed = []
        start_idx = 0
    
    # Process remaining data
    for i in range(start_idx, len(df)):
        result = process_row(df.iloc[i])
        processed.append(result)
        
        # Save checkpoint every 1000 rows
        if i % 1000 == 0:
            pd.DataFrame(processed).to_pickle(checkpoint_file)
    
    return pd.DataFrame(processed)
```

***

## Resource Management

### Compute Resources

**Right-Size Your Resources:**

* Use appropriate compute clusters for workload size
* Monitor CPU and memory usage in Cluster Manager
* Scale up for large transformations, scale down for simple tasks

**Use Spark for Large Datasets:**

* Enable Spark for datasets > 10GB
* Configure Spark resources appropriately
* Monitor Spark job performance

### Connection Management

**Reuse Connections:**

```python theme={"system"}
# ✅ Module-level connection (reused across runs)
import os
from sqlalchemy import create_engine

DATABASE_URL = os.getenv('DATABASE_URL')
engine = create_engine(DATABASE_URL, pool_size=5, max_overflow=10)

def load_data():
    return pd.read_sql("SELECT * FROM table", engine)
```

**Close Connections Properly:**

```python theme={"system"}
# ✅ Context manager for connections
def load_data():
    with create_connection() as conn:
        df = pd.read_sql("SELECT * FROM table", conn)
        return df
    # Connection automatically closed
```

### Memory Optimization

**Process Data in Chunks:**

```python theme={"system"}
# ✅ Chunk processing for large datasets
def process_large_file(file_path):
    chunk_size = 50000
    results = []
    
    for chunk in pd.read_csv(file_path, chunksize=chunk_size):
        processed = transform_chunk(chunk)
        results.append(processed)
        # Clear memory
        del chunk, processed
    
    return pd.concat(results)
```

**Use Efficient Data Structures:**

```python theme={"system"}
# ✅ Use appropriate data types
df['id'] = df['id'].astype('int32')  # 50% memory savings vs int64
df['category'] = df['category'].astype('category')  # Efficient for repeated values
df['date'] = pd.to_datetime(df['date'])  # Proper datetime type
```

***

## Collaboration & Version Control

### Use Workspaces Effectively

**Separate Environments:**

* **Development**: For experimentation and testing
* **Staging**: For pre-production validation
* **Production**: For live data pipelines

**Workspace Best Practices:**

* Use consistent naming across workspaces
* Document workspace purposes
* Limit production workspace access

### Version Control Integration

**Commit Frequently:**

* Commit after completing each block or feature
* Use descriptive commit messages
* Tag releases and important milestones

**Branch Strategy:**

* Use feature branches for new development
* Keep main/master branch stable
* Use pull requests for code review

**Git Best Practices:**

```bash theme={"system"}
# ✅ Good commit messages
git commit -m "feat: Add customer data transformation block"
git commit -m "fix: Resolve null handling in revenue calculation"
git commit -m "refactor: Extract common validation logic to utility"
```

### Code Review

**Review Checklist:**

* [ ] Code follows team standards
* [ ] Tests are included and passing
* [ ] Error handling is appropriate
* [ ] Documentation is clear
* [ ] Performance considerations addressed
* [ ] Security concerns reviewed

### Documentation

**Document Your Blocks:**

```python theme={"system"}
def transform_customer_data(df):
    """
    Transform customer data by standardizing formats and cleaning values.
    
    Args:
        df: DataFrame with customer data containing columns:
            - customer_id: Unique customer identifier
            - email: Customer email address
            - name: Customer full name
            - created_at: Account creation timestamp
    
    Returns:
        DataFrame with transformed customer data:
            - customer_id: Same as input
            - email: Lowercase, validated email addresses
            - first_name: Extracted from name
            - last_name: Extracted from name
            - created_at: Parsed datetime object
    
    Raises:
        ValueError: If required columns are missing
    """
    # Implementation...
```

**Pipeline Documentation:**

* Document pipeline purpose and business logic
* Include data flow diagrams
* Note dependencies and requirements
* Document expected run times and resource needs

***

## Monitoring & Observability

### Logging

**Use Structured Logging:**

```python theme={"system"}
import logging

logger = logging.getLogger(__name__)

def transform_data(df):
    logger.info(f"Starting transformation for {len(df)} rows")
    
    try:
        result = perform_transformation(df)
        logger.info(f"Transformation completed successfully. Output: {len(result)} rows")
        return result
    except Exception as e:
        logger.error(f"Transformation failed: {e}", exc_info=True)
        raise
```

**Log Key Metrics:**

```python theme={"system"}
def process_data(df):
    start_time = time.time()
    logger.info(f"Processing {len(df)} records")
    
    result = transform(df)
    
    duration = time.time() - start_time
    logger.info(f"Processed {len(result)} records in {duration:.2f}s")
    logger.info(f"Processing rate: {len(result)/duration:.0f} records/sec")
    
    return result
```

### Monitoring Pipeline Health

**Set Up Alerts:**

* Pipeline failure alerts
* Performance degradation alerts
* Data quality threshold alerts
* Resource usage alerts

**Monitor Key Metrics:**

* Pipeline run duration
* Block execution times
* Data volume processed
* Error rates
* Resource utilization

### Data Quality Monitoring

**Track Data Quality Metrics:**

```python theme={"system"}
def validate_and_log_quality(df):
    """Validate data and log quality metrics."""
    metrics = {
        'row_count': len(df),
        'null_percentage': df.isnull().sum() / len(df) * 100,
        'duplicate_count': df.duplicated().sum(),
    }
    
    logger.info(f"Data quality metrics: {metrics}")
    
    # Alert if quality thresholds exceeded
    if metrics['null_percentage'].max() > 10:
        logger.warning(f"High null percentage detected: {metrics['null_percentage']}")
    
    return df
```

***

## Security & Governance

### Authentication

**User Authentication is Enabled by Default:**

* **Mage Pro**: User authentication is **always enabled by default** (`REQUIRE_USER_AUTHENTICATION=True`)
* **Mage OSS (0.9.78+)**: User authentication is **enabled by default**
* **Mage OSS (0.8.4-0.9.77)**: Can be enabled by setting `REQUIRE_USER_AUTHENTICATION=1`

Authentication ensures that only authorized users can access the Mage UI and APIs. All users must sign in before creating, managing, or running pipelines.

**IP Whitelisting (Recommended for SaaS and Self-Hosted Mage Pro):**

For additional security, especially for SaaS or self-hosted Mage Pro deployments, it's recommended to set up an IP whitelist to restrict access to your Mage instance.

**Implementation options:**

* **Load Balancer/Reverse Proxy**: Configure IP whitelisting at the load balancer or reverse proxy level (e.g., Nginx, AWS ALB, Azure Application Gateway)
* **Firewall Rules**: Use network-level firewall rules to restrict access
* **VPN/Private Network**: Deploy Mage Pro in a private network accessible only via VPN

**Best Practices:**

* Whitelist only necessary IP ranges (office networks, VPN endpoints)
* Regularly review and update the whitelist
* Use CIDR notation for network ranges
* Document all whitelisted IPs and their purposes
* Consider using a VPN for remote access instead of public IPs

For more information, see the [Authentication Guide](/production/authentication/overview).

### Secrets Management

**Use Mage Secrets:**

* Store API keys, passwords, and tokens in Mage's secrets management
* Never hardcode credentials in blocks
* Rotate secrets regularly

```python theme={"system"}
# ✅ Use secrets
import os

api_key = os.getenv('SALESFORCE_API_KEY')
database_password = os.getenv('DATABASE_PASSWORD')
```

For more information on creating and managing secrets, see the [Secrets Guide](/development/variables/secrets#secrets).

### Access Control

**Follow Principle of Least Privilege:**

* Grant minimum necessary permissions
* Use workspace-level access controls
* Review and audit access regularly

### Data Privacy

**Handle Sensitive Data Appropriately:**

* Mask PII in logs and outputs
* Encrypt sensitive data at rest and in transit
* Comply with data protection regulations (GDPR, CCPA, etc.)

```python theme={"system"}
# ✅ Mask sensitive data in logs
import hashlib

def log_customer_data(customer_id, email):
    # Hash sensitive information
    email_hash = hashlib.sha256(email.encode()).hexdigest()[:8]
    logger.info(f"Processing customer {customer_id} with email {email_hash}...")
```

### Code Security

**Security Best Practices:**

* Avoid executing user input directly
* Validate and sanitize all inputs
* Use parameterized queries for databases
* Keep dependencies updated

```python theme={"system"}
# ✅ Parameterized queries
def load_customer_data(customer_id):
    query = "SELECT * FROM customers WHERE id = :id"
    return pd.read_sql(query, engine, params={'id': customer_id})

# ❌ Avoid SQL injection
# query = f"SELECT * FROM customers WHERE id = {customer_id}"
```

***

## Deployment & CI/CD

### Use CI/CD Deployments (Mage Pro)

**Deployment Workflow:**

1. **Develop** in development workspace
2. **Test** in staging workspace
3. **Deploy** to production using CI/CD

**Best Practices:**

* Automate deployments through CI/CD
* Use deployment pipelines for consistency
* Test deployments in staging first
* Roll back quickly if issues occur

### Environment Management

**Separate Configurations:**

```python theme={"system"}
# ✅ Environment-specific configuration
import os

ENV = os.getenv('ENVIRONMENT', 'development')

if ENV == 'production':
    database_url = os.getenv('PROD_DATABASE_URL')
    api_base_url = 'https://api.production.com'
elif ENV == 'staging':
    database_url = os.getenv('STAGING_DATABASE_URL')
    api_base_url = 'https://api.staging.com'
else:
    database_url = os.getenv('DEV_DATABASE_URL')
    api_base_url = 'https://api.dev.com'
```

### Pipeline Scheduling

**Schedule Appropriately:**

* Match schedule to data freshness requirements
* Consider source system load
* Account for pipeline execution time
* Use appropriate time zones

**Error Handling in Scheduled Pipelines:**

* Set up retry policies using [automatic retry](/orchestration/pipeline-runs/retrying-block-runs#automatic-retry) for block runs
* Configure alerting for failures
* Implement dead letter queues for failed runs

### Backfilling

**Backfill Strategy:**

* Use Mage's built-in backfilling for historical data
* Process in chunks to avoid resource exhaustion
* Monitor backfill progress
* Validate backfilled data

***

## Summary

Following these best practices will help you:

✅ **Build reliable pipelines** with proper error handling and testing\
✅ **Optimize performance** through efficient data processing and resource management\
✅ **Maintain code quality** with modular, reusable blocks\
✅ **Collaborate effectively** using version control and workspaces\
✅ **Monitor and debug** with comprehensive logging and observability\
✅ **Deploy safely** using CI/CD and environment management

Remember: Start with these practices and adapt them to your team's specific needs and constraints. The goal is to build maintainable, scalable, and reliable data pipelines.
