> ## Documentation Index
> Fetch the complete documentation index at: https://docs.mage.ai/llms.txt
> Use this file to discover all available pages before exploring further.

# MySQL CDC

> Real-time Change Data Capture from MySQL binary logs with schema evolution tracking

export const ProOnly = ({button = 'Get started for free', description = 'Try our fully managed solution to access this advanced feature.', source = 'documentation', title = 'Only in Mage Pro.'}) => <a href={`https://cloud.mage.ai/sign-up?source=${source}`} className="block my-4 px-5 py-4 overflow-hidden rounded-xl flex gap-3 border border-emerald-500/20 bg-emerald-50/50 dark:border-emerald-500/30 dark:bg-emerald-500/10" target="_blank">
    <div style={{
  display: 'flex',
  alignItems: 'center',
  width: '100%'
}}>
      <div className="text-sm prose min-w-0 text-emerald-900 dark:text-emerald-200" style={{
  flex: 1
}}>
        {title}
        <p className="normal">{description}</p>
      </div>

      <div> </div>

      <div>
        <ProButton label={button} href={`https://cloud.mage.ai/sign-up?source=${source}`} />
      </div>
    </div>
  </a>;

export const ProButton = ({href, label = 'Get started with Mage Pro for free', source = 'documentation'}) => <div style={{
  height: 32,
  position: 'relative'
}}>
    <a target="_blank" className="group px-4 py-1.5 relative inline-flex items-center text-sm font-medium rounded-full" href={href ?? `https://cloud.mage.ai/sign-up?source=${source}`}>
      <span className="absolute inset-0 bg-primary-dark dark:bg-primary-light/10 border-primary-light/30 rounded-full dark:border group-hover:opacity-[0.9] dark:group-hover:border-primary-light/60">
      </span>

      <div className="mr-0.5 space-x-2.5 flex items-center">
        <span class="z-10 text-white dark:text-primary-light">
          {label}
        </span>

        <svg width="3" height="24" viewBox="0 -9 3 24" class="h-5 rotate-0 overflow-visible text-white/90 dark:text-primary-light">
          <path d="M0 0L3 3L0 6" fill="none" stroke="currentColor" stroke-width="1.5" stroke-linecap="round"></path>
        </svg>
      </div>
    </a>
  </div>;

<ProOnly source="mysql-cdc" />

## Overview

The MySQL CDC streaming source captures real-time changes from MySQL binary logs and processes them into structured records. It provides comprehensive CDC capabilities with automatic schema evolution tracking and primary key detection.

<CardGroup cols={2}>
  <Card title="Real-time CDC" icon="zap">
    Captures changes as they happen in MySQL
  </Card>

  <Card title="Schema Evolution" icon="database">
    Tracks table structure changes automatically
  </Card>

  <Card title="Primary Key Detection" icon="key">
    Automatically identifies and extracts primary key columns
  </Card>

  <Card title="Error Recovery" icon="shield">
    Handles server ID conflicts and binlog file issues
  </Card>
</CardGroup>

## Configuration

### Required Parameters

| Parameter  | Type   | Description                                |
| ---------- | ------ | ------------------------------------------ |
| `host`     | string | MySQL server hostname or IP address        |
| `port`     | number | MySQL server port (default: 3306)          |
| `user`     | string | MySQL username with replication privileges |
| `password` | string | MySQL password                             |

### Optional Parameters

| Parameter                    | Type    | Default | Description                                            |
| ---------------------------- | ------- | ------- | ------------------------------------------------------ |
| `connect_timeout`            | number  | 10      | Connection timeout in seconds                          |
| `charset`                    | string  | utf8mb4 | Character set for connection                           |
| `databases`                  | array   | -       | Include only these databases                           |
| `tables`                     | array   | -       | Include only these tables                              |
| `ignore_databases`           | array   | -       | Exclude these databases                                |
| `ignore_tables`              | array   | -       | Exclude these tables                                   |
| `server_id`                  | number  | 54001   | Unique server ID for replication                       |
| `auto_generate_server_id`    | boolean | true    | Auto-generate unique server\_id if conflict            |
| `resume_from_gtids`          | string  | -       | Resume from specific GTID set                          |
| `start_log_file`             | string  | -       | Resume from specific binlog file                       |
| `start_log_pos`              | number  | -       | Resume from specific position in binlog file           |
| `start_timestamp`            | number  | -       | Resume from specific timestamp (Unix timestamp)        |
| `include_ddl`                | boolean | false   | Include DDL events (CREATE, ALTER, DROP, etc.)         |
| `include_heartbeat`          | boolean | true    | Include heartbeat events                               |
| `include_transaction_events` | boolean | true    | Include transaction commit events                      |
| `include_gtid_events`        | boolean | true    | Include GTID events                                    |
| `include_rotate_events`      | boolean | false   | Include binlog rotation events                         |
| `include_intvar_events`      | boolean | true    | Include integer variable events                        |
| `include_load_query_events`  | boolean | true    | Include LOAD DATA INFILE events                        |
| `include_table_map_events`   | boolean | true    | Include table map events (required for column mapping) |
| `heartbeat_seconds`          | number  | 1.0     | Heartbeat interval in seconds                          |
| `blocking`                   | boolean | true    | Use blocking mode for event reading                    |
| `max_batch_size`             | number  | 100     | Maximum events per batch                               |
| `flush_interval_seconds`     | number  | 1.0     | Maximum time between batch flushes                     |
| `return_db_records_only`     | boolean | true    | Return only DB records with Mage timestamp columns     |

### SSL Configuration

| Parameter  | Type   | Description                     |
| ---------- | ------ | ------------------------------- |
| `ssl.ca`   | string | Certificate Authority file path |
| `ssl.cert` | string | Client certificate file path    |
| `ssl.key`  | string | Client private key file path    |

## Record Formats

### Standard Event Format

When `return_db_records_only: false`:

```json theme={"system"}
{
  "op": "c",
  "schema": "mydb",
  "table": "users",
  "ts": 1640995200,
  "tx": {
    "gtid": "uuid:1-100",
    "xid": 12345
  },
  "before": {
    "id": 1,
    "name": "John",
    "email": "john@example.com"
  },
  "after": {
    "id": 1,
    "name": "John",
    "email": "john@example.com"
  },
  "offset": {
    "file": "mysql-bin.000001",
    "pos": 1234,
    "gtid": "uuid:1-100"
  }
}
```

### DB Records Only Format

When `return_db_records_only: true`:

```json theme={"system"}
{
  "data": {
    "id": 1,
    "name": "John",
    "email": "john@example.com",
    "_mage_created_at": "2021-12-31T23:00:00+00:00",
    "_mage_updated_at": null,
    "_mage_deleted_at": null
  },
  "metadata": {
    "schema": "mydb",
    "table": "users",
    "key_columns": ["id"],
    "operation": "INSERT"
  }
}
```

**Note**: The `_mage_*` timestamp columns are returned as datetime objects (ISO 8601 format strings when serialized) in UTC timezone, not Unix timestamps. This ensures proper type handling when writing to databases.

## Event Types

<AccordionGroup>
  <Accordion title="Row Events (Core CDC)">
    * **WriteRowsEvent** (`op: "c"`): INSERT operations
    * **UpdateRowsEvent** (`op: "u"`): UPDATE operations with before/after data
    * **DeleteRowsEvent** (`op: "d"`): DELETE operations
  </Accordion>

  <Accordion title="DDL Events">
    * **QueryEvent** (`op: "ddl"`): DDL statements (CREATE, ALTER, DROP)
    * Automatically clears table schema cache when DDL occurs
    * Extracts table name from SQL for targeted cache clearing
  </Accordion>

  <Accordion title="Transaction Events">
    * **XidEvent** (`op: "transaction"`): Transaction commit events
    * Useful for maintaining transaction boundaries
  </Accordion>

  <Accordion title="Replication Events">
    * **GtidEvent** (`op: "gtid"`): Global Transaction Identifier events
    * **RotateEvent** (`op: "rotate"`): Binlog file rotation events
    * **HeartbeatLogEvent** (`op: "heartbeat"`): Replication heartbeat events
  </Accordion>

  <Accordion title="Metadata Events">
    * **TableMapEvent** (`op: "table_map"`): Table structure metadata
    * **IntvarEvent** (`op: "intvar"`): Integer variable changes
    * **LoadQueryEvent** (`op: "begin_load_query"`/`op: "execute_load_query"`): LOAD DATA INFILE events
  </Accordion>
</AccordionGroup>

## Primary Key Detection

The source automatically detects and extracts primary key columns:

1. **Schema Discovery**: Queries `INFORMATION_SCHEMA` for table structure
2. **Primary Key Detection**: Identifies actual primary key columns from database constraints
3. **Caching**: Stores schema and primary key information for performance
4. **Key Extraction**: Extracts primary key column names from row events

<CodeGroup>
  ```json Example Output theme={"system"}
  {
    "metadata": {
      "key_columns": ["user_id", "tenant_id"]
    }
  }
  ```
</CodeGroup>

**Note**: `key_columns` is a list of column names (not values) that can be used for deduplication or upsert operations in downstream sinks.

## Examples

<CodeGroup>
  ```yaml Basic Setup theme={"system"}
  connector_type: mysql
  host: localhost
  port: 3306
  user: repl_user
  password: password
  databases: ["mydb"]
  return_db_records_only: true
  ```

  ```yaml Production Setup theme={"system"}
  connector_type: mysql
  host: mysql-prod.company.com
  port: 3306
  user: cdc_user
  password: secure_password
  ssl:
    ca: "/etc/ssl/mysql/ca.pem"
    cert: "/etc/ssl/mysql/client-cert.pem"
    key: "/etc/ssl/mysql/client-key.pem"
  databases: ["production_db"]
  ignore_tables: ["temp_*", "log_*"]
  server_id: 1001
  auto_generate_server_id: false
  max_batch_size: 500
  flush_interval_seconds: 2.0
  include_ddl: true
  return_db_records_only: true
  ```

  ```yaml Resume from Position theme={"system"}
  connector_type: mysql
  host: localhost
  port: 3306
  user: repl_user
  password: password
  start_log_file: "mysql-bin.000123"
  start_log_pos: 456789
  databases: ["mydb"]
  return_db_records_only: true
  ```
</CodeGroup>

## Prerequisites

### MySQL Server Configuration

```sql theme={"system"}
-- Enable binary logging
SET GLOBAL log_bin = ON;

-- Set binlog format to ROW
SET GLOBAL binlog_format = 'ROW';

-- Enable GTID (recommended)
SET GLOBAL gtid_mode = ON;
SET GLOBAL enforce_gtid_consistency = ON;
```

### User Permissions

```sql theme={"system"}
-- Create replication user
CREATE USER 'repl_user'@'%' IDENTIFIED BY 'password';

-- Grant replication privileges
GRANT REPLICATION SLAVE, REPLICATION CLIENT ON *.* TO 'repl_user'@'%';

-- Grant SELECT privileges for schema queries
GRANT SELECT ON information_schema.* TO 'repl_user'@'%';
GRANT SELECT ON mydb.* TO 'repl_user'@'%';
```

## Troubleshooting

<AccordionGroup>
  <Accordion title="Common Issues">
    1. **Server ID Conflict**: Enable `auto_generate_server_id: true`
    2. **Binlog File Not Found**: Check if binlog file exists and is accessible
    3. **Permission Denied**: Ensure user has replication privileges
    4. **Connection Timeout**: Increase `connect_timeout` value
    5. **Schema Not Found**: Ensure `include_table_map_events: true`
  </Accordion>

  <Accordion title="Debugging">
    Enable debug logging to see detailed event processing:

    ```python theme={"system"}
    import logging
    logging.basicConfig(level=logging.DEBUG)
    ```
  </Accordion>

  <Accordion title="Monitoring">
    Monitor key metrics:

    * **Batch Size**: Average events per batch
    * **Flush Rate**: How often batches are flushed
    * **Error Rate**: Failed events or connections
    * **Lag**: Time between event occurrence and processing
  </Accordion>
</AccordionGroup>

## Integration with Generic IO Sink

The MySQL CDC source works seamlessly with the Generic IO sink, which provides:

* **Automatic Column Type Mapping**: Automatically maps `_mage_*` timestamp columns to appropriate database types (TIMESTAMP, DATETIME2, DateTime64, etc.) based on the target database
* **Metadata Interpolation**: Use metadata values from MySQL CDC events in sink configurations for dynamic routing and table naming

### Supported Databases

Generic IO Sink supports the following databases:

* **BigQuery**
* **ClickHouse**
* **DuckDB**
* **MySQL**
* **MSSQL**
* **Postgres**

### Metadata Interpolation

You can use metadata values from MySQL CDC events in your sink configuration using Python string formatting syntax:

* **`{schema}`**: Database/schema name from the event
* **`{table}`**: Table name from the event
* **`{key_columns}`**: List of primary key column names (e.g., `["id"]` or `["user_id", "tenant_id"]`)

When using `{key_columns}` in `unique_constraints`, it will be automatically converted from a string representation to a list. The format supports both Python-style (`"['id']"`) and JSON-style (`'["id"]'`) array strings.

### Example Configurations

<CodeGroup>
  ```yaml BigQuery Example theme={"system"}
  connector_type: bigquery
  profile: default
  config:
    table_id: "{schema}.{table}"  # Dynamic table names from metadata
    unique_conflict_method: UPDATE
    unique_constraints: "{key_columns}"  # Uses primary keys from metadata
  ```

  ```yaml Postgres Example theme={"system"}
  connector_type: postgres
  profile: default
  config:
    schema_name: "{schema}"  # Dynamic schema from metadata
    table_name: "{table}"    # Dynamic table name from metadata
    unique_conflict_method: UPDATE
    unique_constraints: "{key_columns}"  # Uses primary keys from metadata
    if_exists: append
  ```

  ```yaml MySQL Example theme={"system"}
  connector_type: mysql
  profile: default
  config:
    schema_name: "{schema}"
    table_name: "{table}"
    unique_conflict_method: UPDATE
    unique_constraints: "{key_columns}"
    if_exists: append
  ```

  ```yaml Static Configuration theme={"system"}
  connector_type: postgres
  profile: default
  config:
    schema_name: public
    table_name: cdc_events  # Static table name
    unique_constraints: ['id', 'event_id']  # Static list (string format also works: "['id', 'event_id']")
    unique_conflict_method: UPDATE
    if_exists: append
  ```
</CodeGroup>

### How Metadata Interpolation Works

1. **Message Grouping**: Messages are automatically grouped by their interpolated config values. For example, if you use `table_name: "{schema}_{table}"`, messages from `mydb.users` will be grouped together and written to `mydb_users` table.

2. **Key Columns Interpolation**: When using `{key_columns}` in `unique_constraints`, the sink automatically:
   * Converts the list to a string representation during interpolation
   * Parses it back to a list (supports both `"['id']"` and `'["id"]'` formats)
   * Uses it for upsert operations based on `unique_conflict_method`

**Example**: If you have a table `users` with primary key `id`, and you configure `unique_constraints: "{key_columns}"`, it will automatically use `["id"]` for upsert operations.

## Best Practices

1. **Use GTID**: Enable GTID for reliable resume capabilities
2. **Filter Events**: Disable unnecessary event types for performance
3. **Monitor Resources**: Watch memory and CPU usage
4. **Test Resume**: Verify checkpoint functionality
5. **Secure Connections**: Use SSL in production
6. **Regular Backups**: Backup checkpoint files
7. **Schema Validation**: Test with schema changes
8. **Use Generic IO Sink**: Leverage automatic type mapping and metadata interpolation for flexible data routing
9. **Metadata Interpolation**: Use `{schema}`, `{table}`, and `{key_columns}` for dynamic table routing and upsert configuration

## Timestamp Handling

The MySQL CDC source automatically converts Unix timestamps to datetime objects:

* **`_mage_created_at`**: Set to event timestamp (datetime) for INSERT operations
* **`_mage_updated_at`**: Set to event timestamp (datetime) for UPDATE operations
* **`_mage_deleted_at`**: Set to event timestamp (datetime) for DELETE operations

All timestamps are in UTC timezone and returned as datetime objects, which ensures:

* Proper type handling in downstream databases
* Automatic type conversion in Generic IO sink
* Consistent timezone handling across systems

## Limitations

* **MySQL 5.7+**: Requires MySQL 5.7 or later
* **ROW Format**: Requires `binlog_format = ROW`
* **Network Dependency**: Requires stable network connection
* **Memory Usage**: Schema caching uses memory
* **Binlog Retention**: Depends on MySQL binlog retention settings
* **Timezone**: All timestamps are in UTC timezone
