Skip to main content

Overview

The MySQL CDC streaming source captures real-time changes from MySQL binary logs and processes them into structured records. It provides comprehensive CDC capabilities with automatic schema evolution tracking and primary key detection.

Real-time CDC

Captures changes as they happen in MySQL

Schema Evolution

Tracks table structure changes automatically

Primary Key Detection

Automatically identifies and extracts primary key columns

Error Recovery

Handles server ID conflicts and binlog file issues

Configuration

Required Parameters

ParameterTypeDescription
hoststringMySQL server hostname or IP address
portnumberMySQL server port (default: 3306)
userstringMySQL username with replication privileges
passwordstringMySQL password

Optional Parameters

ParameterTypeDefaultDescription
connect_timeoutnumber10Connection timeout in seconds
charsetstringutf8mb4Character set for connection
databasesarray-Include only these databases
tablesarray-Include only these tables
ignore_databasesarray-Exclude these databases
ignore_tablesarray-Exclude these tables
server_idnumber54001Unique server ID for replication
auto_generate_server_idbooleantrueAuto-generate unique server_id if conflict
resume_from_gtidsstring-Resume from specific GTID set
start_log_filestring-Resume from specific binlog file
start_log_posnumber-Resume from specific position in binlog file
start_timestampnumber-Resume from specific timestamp (Unix timestamp)
include_ddlbooleanfalseInclude DDL events (CREATE, ALTER, DROP, etc.)
include_heartbeatbooleantrueInclude heartbeat events
include_transaction_eventsbooleantrueInclude transaction commit events
include_gtid_eventsbooleantrueInclude GTID events
include_rotate_eventsbooleanfalseInclude binlog rotation events
include_intvar_eventsbooleantrueInclude integer variable events
include_load_query_eventsbooleantrueInclude LOAD DATA INFILE events
include_table_map_eventsbooleantrueInclude table map events (required for column mapping)
heartbeat_secondsnumber1.0Heartbeat interval in seconds
blockingbooleantrueUse blocking mode for event reading
max_batch_sizenumber100Maximum events per batch
flush_interval_secondsnumber1.0Maximum time between batch flushes
return_db_records_onlybooleantrueReturn only DB records with Mage timestamp columns

SSL Configuration

ParameterTypeDescription
ssl.castringCertificate Authority file path
ssl.certstringClient certificate file path
ssl.keystringClient private key file path

Record Formats

Standard Event Format

When return_db_records_only: false:
{
  "op": "c",
  "schema": "mydb",
  "table": "users",
  "ts": 1640995200,
  "tx": {
    "gtid": "uuid:1-100",
    "xid": 12345
  },
  "before": {
    "id": 1,
    "name": "John",
    "email": "john@example.com"
  },
  "after": {
    "id": 1,
    "name": "John",
    "email": "john@example.com"
  },
  "offset": {
    "file": "mysql-bin.000001",
    "pos": 1234,
    "gtid": "uuid:1-100"
  }
}

DB Records Only Format

When return_db_records_only: true:
{
  "data": {
    "id": 1,
    "name": "John",
    "email": "john@example.com",
    "_mage_created_at": 1640995200,
    "_mage_updated_at": null,
    "_mage_deleted_at": null
  },
  "metadata": {
    "schema": "mydb",
    "table": "users",
    "key_columns": {
      "id": 1
    },
    "operation": "INSERT"
  }
}

Event Types

  • WriteRowsEvent (op: "c"): INSERT operations
  • UpdateRowsEvent (op: "u"): UPDATE operations with before/after data
  • DeleteRowsEvent (op: "d"): DELETE operations
  • QueryEvent (op: "ddl"): DDL statements (CREATE, ALTER, DROP)
  • Automatically clears table schema cache when DDL occurs
  • Extracts table name from SQL for targeted cache clearing
  • XidEvent (op: "transaction"): Transaction commit events
  • Useful for maintaining transaction boundaries
  • GtidEvent (op: "gtid"): Global Transaction Identifier events
  • RotateEvent (op: "rotate"): Binlog file rotation events
  • HeartbeatLogEvent (op: "heartbeat"): Replication heartbeat events
  • TableMapEvent (op: "table_map"): Table structure metadata
  • IntvarEvent (op: "intvar"): Integer variable changes
  • LoadQueryEvent (op: "begin_load_query"/op: "execute_load_query"): LOAD DATA INFILE events

Primary Key Detection

The source automatically detects and extracts primary key columns:
  1. Schema Discovery: Queries INFORMATION_SCHEMA for table structure
  2. Primary Key Detection: Identifies actual primary key columns from database constraints
  3. Caching: Stores schema and primary key information for performance
  4. Key Extraction: Extracts primary key values from row events
{
  "metadata": {
    "key_columns": {
      "user_id": 123,
      "tenant_id": 456
    }
  }
}

Examples

connector_type: mysql
host: localhost
port: 3306
user: repl_user
password: password
databases: ["mydb"]
return_db_records_only: true

Prerequisites

MySQL Server Configuration

-- Enable binary logging
SET GLOBAL log_bin = ON;

-- Set binlog format to ROW
SET GLOBAL binlog_format = 'ROW';

-- Enable GTID (recommended)
SET GLOBAL gtid_mode = ON;
SET GLOBAL enforce_gtid_consistency = ON;

User Permissions

-- Create replication user
CREATE USER 'repl_user'@'%' IDENTIFIED BY 'password';

-- Grant replication privileges
GRANT REPLICATION SLAVE, REPLICATION CLIENT ON *.* TO 'repl_user'@'%';

-- Grant SELECT privileges for schema queries
GRANT SELECT ON information_schema.* TO 'repl_user'@'%';
GRANT SELECT ON mydb.* TO 'repl_user'@'%';

Troubleshooting

  1. Server ID Conflict: Enable auto_generate_server_id: true
  2. Binlog File Not Found: Check if binlog file exists and is accessible
  3. Permission Denied: Ensure user has replication privileges
  4. Connection Timeout: Increase connect_timeout value
  5. Schema Not Found: Ensure include_table_map_events: true
Enable debug logging to see detailed event processing:
import logging
logging.basicConfig(level=logging.DEBUG)
Monitor key metrics:
  • Batch Size: Average events per batch
  • Flush Rate: How often batches are flushed
  • Error Rate: Failed events or connections
  • Lag: Time between event occurrence and processing

Best Practices

  1. Use GTID: Enable GTID for reliable resume capabilities
  2. Filter Events: Disable unnecessary event types for performance
  3. Monitor Resources: Watch memory and CPU usage
  4. Test Resume: Verify checkpoint functionality
  5. Secure Connections: Use SSL in production
  6. Regular Backups: Backup checkpoint files
  7. Schema Validation: Test with schema changes

Limitations

  • MySQL 5.7+: Requires MySQL 5.7 or later
  • ROW Format: Requires binlog_format = ROW
  • Network Dependency: Requires stable network connection
  • Memory Usage: Schema caching uses memory
  • Binlog Retention: Depends on MySQL binlog retention settings
I