> ## Documentation Index
> Fetch the complete documentation index at: https://docs.mage.ai/llms.txt
> Use this file to discover all available pages before exploring further.

# Microsoft SQL Server CDC

> Real-time Change Data Capture from SQL Server CDC change tables with automatic schema tracking

export const ProOnly = ({button = 'Get started for free', description = 'Try our fully managed solution to access this advanced feature.', source = 'documentation', title = 'Only in Mage Pro.'}) => <a href={`https://cloud.mage.ai/sign-up?source=${source}`} className="block my-4 px-5 py-4 overflow-hidden rounded-xl flex gap-3 border border-emerald-500/20 bg-emerald-50/50 dark:border-emerald-500/30 dark:bg-emerald-500/10" target="_blank">
    <div style={{
  display: 'flex',
  alignItems: 'center',
  width: '100%'
}}>
      <div className="text-sm prose min-w-0 text-emerald-900 dark:text-emerald-200" style={{
  flex: 1
}}>
        {title}
        <p className="normal">{description}</p>
      </div>

      <div> </div>

      <div>
        <ProButton label={button} href={`https://cloud.mage.ai/sign-up?source=${source}`} />
      </div>
    </div>
  </a>;

export const ProButton = ({href, label = 'Get started with Mage Pro for free', source = 'documentation'}) => <div style={{
  height: 32,
  position: 'relative'
}}>
    <a target="_blank" className="group px-4 py-1.5 relative inline-flex items-center text-sm font-medium rounded-full" href={href ?? `https://cloud.mage.ai/sign-up?source=${source}`}>
      <span className="absolute inset-0 bg-primary-dark dark:bg-primary-light/10 border-primary-light/30 rounded-full dark:border group-hover:opacity-[0.9] dark:group-hover:border-primary-light/60">
      </span>

      <div className="mr-0.5 space-x-2.5 flex items-center">
        <span class="z-10 text-white dark:text-primary-light">
          {label}
        </span>

        <svg width="3" height="24" viewBox="0 -9 3 24" class="h-5 rotate-0 overflow-visible text-white/90 dark:text-primary-light">
          <path d="M0 0L3 3L0 6" fill="none" stroke="currentColor" stroke-width="1.5" stroke-linecap="round"></path>
        </svg>
      </div>
    </a>
  </div>;

<ProOnly source="mssql-cdc" />

## Overview

The Microsoft SQL Server CDC streaming source captures real-time changes from SQL Server Change Data Capture (CDC) change tables and processes them into structured records. It uses SQL Server's built-in CDC feature to poll for changes and provides comprehensive CDC capabilities with automatic schema tracking and primary key detection.

<CardGroup cols={2}>
  <Card title="Real-time CDC" icon="zap">
    Captures changes as they happen in SQL Server
  </Card>

  <Card title="Native CDC" icon="database">
    Uses SQL Server's built-in Change Data Capture feature
  </Card>

  <Card title="Primary Key Detection" icon="key">
    Automatically identifies and extracts primary key columns
  </Card>

  <Card title="LSN Tracking" icon="shield">
    Tracks Log Sequence Numbers for reliable resume capabilities
  </Card>
</CardGroup>

## Configuration

### Required Parameters

| Parameter  | Type   | Description                         |
| ---------- | ------ | ----------------------------------- |
| `host`     | string | SQL Server hostname or IP address   |
| `port`     | number | SQL Server port (default: 1433)     |
| `database` | string | Database name                       |
| `user`     | string | SQL Server username with CDC access |
| `password` | string | SQL Server password                 |

### Optional Parameters

| Parameter                | Type    | Default                         | Description                                                                  |
| ------------------------ | ------- | ------------------------------- | ---------------------------------------------------------------------------- |
| `connect_timeout`        | number  | 10                              | Connection timeout in seconds                                                |
| `driver`                 | string  | "ODBC Driver 18 for SQL Server" | ODBC driver name                                                             |
| `authentication`         | string  | -                               | Authentication method (e.g., "ActiveDirectoryPassword")                      |
| `schema`                 | string  | dbo                             | Default schema name                                                          |
| `schemas`                | array   | -                               | Include only these schemas                                                   |
| `tables`                 | array   | -                               | Include only these tables                                                    |
| `ignore_schemas`         | array   | -                               | Exclude these schemas (supports wildcards like "sys\*")                      |
| `ignore_tables`          | array   | -                               | Exclude these tables (supports wildcards like "temp\_\*")                    |
| `start_lsn`              | string  | -                               | Start LSN to begin reading from (hex string, e.g., '0x00000000000000000000') |
| `capture_instance`       | string  | -                               | Capture instance name (auto-detected if not provided)                        |
| `max_batch_size`         | number  | 100                             | Maximum events per batch                                                     |
| `flush_interval_seconds` | number  | 1.0                             | Maximum time between batch flushes                                           |
| `poll_interval_seconds`  | number  | 1.0                             | How often to poll for new changes                                            |
| `return_db_records_only` | boolean | true                            | Return only DB records with Mage timestamp columns                           |

## Record Formats

### DB Records Only Format

When `return_db_records_only: true` (default):

```json theme={"system"}
{
  "data": {
    "id": 1,
    "name": "John",
    "email": "john@example.com",
    "_mage_created_at": "2021-12-31T23:00:00+00:00",
    "_mage_updated_at": null,
    "_mage_deleted_at": null
  },
  "metadata": {
    "schema": "dbo",
    "table": "users",
    "key_columns": ["id"],
    "operation": "INSERT",
    "lsn": "0x00000000000000000000"
  }
}
```

**Note**: The `_mage_*` timestamp columns are returned as datetime objects (ISO 8601 format strings when serialized) in UTC timezone, not Unix timestamps. This ensures proper type handling when writing to databases.

## Event Types

<AccordionGroup>
  <Accordion title="Row Events (Core CDC)">
    * **Insert** (`operation: "INSERT"`): INSERT operations (CDC operation code 2)
    * **Update** (`operation: "UPDATE"`): UPDATE operations (CDC operation code 4, after image only)
    * **Delete** (`operation: "DELETE"`): DELETE operations (CDC operation code 1)

    These events contain the actual data changes. UPDATE operations only include the after image (the new values), not the before image.
  </Accordion>
</AccordionGroup>

## Primary Key Detection

The source automatically detects and extracts primary key columns:

1. **Schema Discovery**: Queries `INFORMATION_SCHEMA` for table structure
2. **Primary Key Detection**: Identifies actual primary key columns from database constraints
3. **Caching**: Stores schema and primary key information for performance
4. **Key Extraction**: Extracts primary key column names from row events

<CodeGroup>
  ```json Example Output theme={"system"}
  {
    "metadata": {
      "key_columns": ["user_id", "tenant_id"]
    }
  }
  ```
</CodeGroup>

**Note**: `key_columns` is a list of column names (not values) that can be used for deduplication or upsert operations in downstream sinks.

## Examples

<CodeGroup>
  ```yaml Basic Setup theme={"system"}
  connector_type: mssql
  host: localhost
  port: 1433
  database: mydb
  user: cdc_user
  password: password
  schemas: ["dbo"]
  return_db_records_only: true
  ```

  ```yaml Production Setup theme={"system"}
  connector_type: mssql
  host: sqlserver-prod.company.com
  port: 1433
  database: production_db
  user: cdc_user
  password: secure_password
  driver: "ODBC Driver 18 for SQL Server"
  schemas: ["dbo", "sales"]
  ignore_tables: ["temp_*", "log_*"]
  max_batch_size: 500
  flush_interval_seconds: 2.0
  poll_interval_seconds: 0.5
  return_db_records_only: true
  ```

  ```yaml Resume from LSN theme={"system"}
  connector_type: mssql
  host: localhost
  port: 1433
  database: mydb
  user: cdc_user
  password: password
  start_lsn: "0x00000000000000000000"
  schemas: ["dbo"]
  return_db_records_only: true
  ```

  ```yaml With Azure AD Authentication theme={"system"}
  connector_type: mssql
  host: sqlserver.database.windows.net
  port: 1433
  database: mydb
  user: user@domain.com
  password: password
  authentication: "ActiveDirectoryPassword"
  driver: "ODBC Driver 18 for SQL Server"
  schemas: ["dbo"]
  return_db_records_only: true
  ```

  ```yaml Filter Specific Tables theme={"system"}
  connector_type: mssql
  host: localhost
  port: 1433
  database: mydb
  user: cdc_user
  password: password
  tables: ["users", "orders", "products"]
  return_db_records_only: true
  ```
</CodeGroup>

## Prerequisites

### Enable CDC on Database

CDC must be enabled at the database level before it can be used:

```sql theme={"system"}
USE your_database_name;
GO
EXEC sys.sp_cdc_enable_db;
GO
```

Verify CDC is enabled:

```sql theme={"system"}
SELECT is_cdc_enabled FROM sys.databases WHERE name = 'your_database_name';
-- Should return: 1
```

### Enable CDC on Tables

After enabling CDC on the database, enable it for each table you want to monitor:

```sql theme={"system"}
USE your_database_name;
GO
EXEC sys.sp_cdc_enable_table
    @source_schema = N'dbo',
    @source_name = N'YourTableName',
    @role_name = NULL,
    @supports_net_changes = 0;
GO
```

**Parameters:**

* `@source_schema`: Schema name (e.g., 'dbo')
* `@source_name`: Table name
* `@role_name`: Role name for CDC access control (NULL = no role restriction)
* `@supports_net_changes`: 0 = all changes, 1 = net changes only

### Verify CDC on Tables

Check which tables have CDC enabled:

```sql theme={"system"}
SELECT
    s.name AS schema_name,
    t.name AS table_name,
    c.capture_instance
FROM cdc.change_tables c
INNER JOIN sys.tables t ON c.source_object_id = t.object_id
INNER JOIN sys.schemas s ON t.schema_id = s.schema_id
ORDER BY s.name, t.name;
```

### SQL Server Agent

CDC requires SQL Server Agent to be running. Verify it's running:

```sql theme={"system"}
EXEC xp_servicecontrol 'QueryState', N'SQLServerAGENT';
```

If not running, start the SQL Server Agent service.

### User Permissions

The user needs the following permissions:

```sql theme={"system"}
-- Grant database access
USE your_database_name;
GO

-- Grant SELECT on tables
GRANT SELECT ON SCHEMA::dbo TO cdc_user;

-- Grant SELECT on CDC tables
GRANT SELECT ON SCHEMA::cdc TO cdc_user;

-- Grant SELECT on system tables for metadata queries
GRANT SELECT ON sys.tables TO cdc_user;
GRANT SELECT ON sys.schemas TO cdc_user;
GRANT SELECT ON sys.databases TO cdc_user;
GRANT SELECT ON INFORMATION_SCHEMA.TABLES TO cdc_user;
GRANT SELECT ON INFORMATION_SCHEMA.COLUMNS TO cdc_user;
GRANT SELECT ON INFORMATION_SCHEMA.TABLE_CONSTRAINTS TO cdc_user;
GRANT SELECT ON INFORMATION_SCHEMA.KEY_COLUMN_USAGE TO cdc_user;
```

## Troubleshooting

<AccordionGroup>
  <Accordion title="Common Issues">
    1. **CDC Not Enabled on Database**: Run `EXEC sys.sp_cdc_enable_db;` on the database
    2. **CDC Not Enabled on Table**: Run `sys.sp_cdc_enable_table` for each table
    3. **SQL Server Agent Not Running**: Start SQL Server Agent service
    4. **Permission Denied**: Ensure user has SELECT permissions on tables and CDC schema
    5. **Connection Timeout**: Increase `connect_timeout` value
    6. **LSN Format Error**: Ensure LSN is in hex format (e.g., '0x00000000000000000000')
    7. **ODBC Driver Not Found**: Install appropriate ODBC driver (e.g., "ODBC Driver 18 for SQL Server")
    8. **Max LSN Returns NULL**: This usually means CDC is not enabled on the database
  </Accordion>

  <Accordion title="Debugging">
    Check CDC status on database:

    ```sql theme={"system"}
    SELECT is_cdc_enabled FROM sys.databases WHERE name = 'your_database_name';
    ```

    Check CDC enabled tables:

    ```sql theme={"system"}
    SELECT
        s.name AS schema_name,
        t.name AS table_name,
        c.capture_instance
    FROM cdc.change_tables c
    INNER JOIN sys.tables t ON c.source_object_id = t.object_id
    INNER JOIN sys.schemas s ON t.schema_id = s.schema_id;
    ```

    Check current max LSN:

    ```sql theme={"system"}
    SELECT sys.fn_cdc_get_max_lsn() AS max_lsn;
    ```

    Check min LSN for a capture instance:

    ```sql theme={"system"}
    SELECT sys.fn_cdc_get_min_lsn('dbo_YourTableName_CT') AS min_lsn;
    ```

    View recent CDC changes:

    ```sql theme={"system"}
    DECLARE @from_lsn binary(10) = sys.fn_cdc_get_min_lsn('dbo_YourTableName_CT');
    DECLARE @to_lsn binary(10) = sys.fn_cdc_get_max_lsn();

    SELECT * FROM cdc.fn_cdc_get_all_changes_dbo_YourTableName_CT(
        @from_lsn, @to_lsn, N'all'
    );
    ```
  </Accordion>

  <Accordion title="Monitoring">
    Monitor key metrics:

    * **Batch Size**: Average events per batch
    * **Flush Rate**: How often batches are flushed
    * **Error Rate**: Failed events or connections
    * **Lag**: Time between event occurrence and processing
    * **LSN Progress**: Track Log Sequence Number progress

    Check CDC capture instance status:

    ```sql theme={"system"}
    SELECT
        capture_instance,
        source_schema,
        source_table,
        capture_job_id,
        supports_net_changes
    FROM cdc.change_tables;
    ```

    Monitor CDC job status:

    ```sql theme={"system"}
    SELECT
        job_id,
        name,
        enabled,
        date_modified
    FROM msdb.dbo.sysjobs
    WHERE name LIKE 'cdc.%';
    ```
  </Accordion>
</AccordionGroup>

## Integration with Generic IO Sink

The SQL Server CDC source works seamlessly with the Generic IO sink, which provides:

* **Automatic Column Type Mapping**: Automatically maps `_mage_*` timestamp columns to appropriate database types (TIMESTAMP, DATETIME2, DateTime64, etc.) based on the target database
* **Metadata Interpolation**: Use metadata values from SQL Server CDC events in sink configurations for dynamic routing and table naming

### Supported Databases

Generic IO Sink supports the following databases:

* **BigQuery**
* **ClickHouse**
* **DuckDB**
* **MySQL**
* **MSSQL**
* **Postgres**

### Metadata Interpolation

You can use metadata values from SQL Server CDC events in your sink configuration using Python string formatting syntax:

* **`{schema}`**: Schema name from the event
* **`{table}`**: Table name from the event
* **`{key_columns}`**: List of primary key column names (e.g., `["id"]` or `["user_id", "tenant_id"]`)

When using `{key_columns}` in `unique_constraints`, it will be automatically converted from a string representation to a list. The format supports both Python-style (`"['id']"`) and JSON-style (`'["id"]'`) array strings.

### Example Configurations

<CodeGroup>
  ```yaml BigQuery Example theme={"system"}
  connector_type: bigquery
  profile: default
  config:
    table_id: "{schema}.{table}"  # Dynamic table names from metadata
    unique_conflict_method: UPDATE
    unique_constraints: "{key_columns}"  # Uses primary keys from metadata
  ```

  ```yaml Postgres Example theme={"system"}
  connector_type: postgres
  profile: default
  config:
    schema_name: "{schema}"  # Dynamic schema from metadata
    table_name: "{table}"    # Dynamic table name from metadata
    unique_conflict_method: UPDATE
    unique_constraints: "{key_columns}"  # Uses primary keys from metadata
    if_exists: append
  ```

  ```yaml MSSQL Example theme={"system"}
  connector_type: mssql
  profile: default
  config:
    schema_name: "{schema}"
    table_name: "{table}"
    unique_conflict_method: UPDATE
    unique_constraints: "{key_columns}"
    if_exists: append
  ```

  ```yaml Static Configuration theme={"system"}
  connector_type: postgres
  profile: default
  config:
    schema_name: public
    table_name: cdc_events  # Static table name
    unique_constraints: ['id', 'event_id']  # Static list (string format also works: "['id', 'event_id']")
    unique_conflict_method: UPDATE
    if_exists: append
  ```
</CodeGroup>

### How Metadata Interpolation Works

1. **Message Grouping**: Messages are automatically grouped by their interpolated config values. For example, if you use `table_name: "{schema}_{table}"`, messages from `dbo.users` will be grouped together and written to `dbo_users` table.

2. **Key Columns Interpolation**: When using `{key_columns}` in `unique_constraints`, the sink automatically:
   * Converts the list to a string representation during interpolation
   * Parses it back to a list (supports both `"['id']"` and `'["id"]'` formats)
   * Uses it for upsert operations based on `unique_conflict_method`

**Example**: If you have a table `users` with primary key `id`, and you configure `unique_constraints: "{key_columns}"`, it will automatically use `["id"]` for upsert operations.

## Best Practices

1. **Enable CDC on Database First**: Always enable CDC at the database level before enabling on tables
2. **Monitor SQL Server Agent**: Ensure SQL Server Agent is running for CDC to work
3. **Filter Events**: Use schema/table filters to reduce processing overhead
4. **Monitor Resources**: Watch memory and CPU usage, especially with high-volume tables
5. **Test Resume**: Verify checkpoint functionality with LSN tracking
6. **Secure Connections**: Use encrypted connections in production
7. **Regular Backups**: Backup checkpoint files
8. **Schema Validation**: Test with schema changes
9. **Use Generic IO Sink**: Leverage automatic type mapping and metadata interpolation for flexible data routing
10. **Metadata Interpolation**: Use `{schema}`, `{table}`, and `{key_columns}` for dynamic table routing and upsert configuration
11. **Poll Interval**: Adjust `poll_interval_seconds` based on your latency requirements (lower = more frequent polling, higher CPU usage)
12. **Batch Size**: Adjust `max_batch_size` based on your throughput needs (larger = fewer handler calls, more memory usage)

## Timestamp Handling

The SQL Server CDC source automatically adds Mage timestamp columns:

* **`_mage_created_at`**: Set to event timestamp (datetime) for INSERT operations
* **`_mage_updated_at`**: Set to event timestamp (datetime) for UPDATE operations
* **`_mage_deleted_at`**: Set to event timestamp (datetime) for DELETE operations

All timestamps are in UTC timezone and returned as datetime objects, which ensures:

* Proper type handling in downstream databases
* Automatic type conversion in Generic IO sink
* Consistent timezone handling across systems

## Limitations

* **SQL Server 2008+**: Requires SQL Server 2008 or later (Enterprise, Developer, or Standard edition)
* **CDC Feature**: Requires CDC to be enabled on database and tables
* **SQL Server Agent**: Requires SQL Server Agent to be running
* **Network Dependency**: Requires stable network connection
* **Memory Usage**: Schema caching uses memory
* **CDC Retention**: Depends on SQL Server CDC retention settings (default: 3 days)
* **Timezone**: All timestamps are in UTC timezone
* **Polling**: Uses polling mechanism (not push-based like logical replication)
* **UPDATE Events**: Only includes after image, not before image
* **ODBC Driver**: Requires appropriate ODBC driver to be installed

## LSN Format

SQL Server CDC uses Log Sequence Numbers (LSN) in binary format, which are converted to hex strings for storage and checkpointing:

* **Format**: `0x` followed by hexadecimal digits (e.g., `0x00000000000000000000`)
* **Length**: Typically 10 bytes (20 hex characters)
* **Comparison**: LSNs can be compared as integers for range queries
* **Checkpoint**: Last processed LSN is saved in checkpoint file for resume capability

## Capture Instance Names

When CDC is enabled on a table, SQL Server automatically creates a capture instance. The naming convention is:

* **Format**: `{schema}_{table}_CT`
* **Example**: `dbo_users_CT` for table `users` in schema `dbo`

The source automatically discovers capture instances, but you can also specify `capture_instance` in the configuration if needed.
