Only in Mage Pro.Try our fully managed solution to access this advanced feature.
Overview
The PostgreSQL CDC streaming source captures real-time changes from PostgreSQL Write-Ahead Log (WAL) using logical replication (pgoutput) and processes them into structured records. It provides comprehensive CDC capabilities with automatic schema evolution tracking and primary key detection.Real-time CDC
Captures changes as they happen in PostgreSQL
Logical Replication
Uses PostgreSQL’s native logical replication protocol
Primary Key Detection
Automatically identifies and extracts primary key columns
LSN Tracking
Tracks Log Sequence Numbers for reliable resume capabilities
Configuration
Required Parameters
| Parameter | Type | Description |
|---|---|---|
host | string | PostgreSQL server hostname or IP address |
port | number | PostgreSQL server port (default: 5432) |
database | string | Database name (default: postgres) |
user | string | PostgreSQL username with replication privileges |
password | string | PostgreSQL password |
Optional Parameters
| Parameter | Type | Default | Description |
|---|---|---|---|
connect_timeout | number | 10 | Connection timeout in seconds |
schemas | array | - | Include only these schemas |
tables | array | - | Include only these tables |
ignore_schemas | array | - | Exclude these schemas |
ignore_tables | array | - | Exclude these tables |
replication_slot | string | mage_slot | Logical replication slot name |
publication_name | string | mage_pub | Publication name for logical replication |
start_lsn | string | - | Log Sequence Number to start from (e.g., ‘0/1234567’) |
max_batch_size | number | 100 | Maximum events per batch |
flush_interval_seconds | number | 1.0 | Maximum time between batch flushes |
keep_alive_time | number | 10.0 | Time between keep-alive messages in seconds |
heartbeat_table_schema | string | - | Schema name for heartbeat table (e.g., ‘public’) |
heartbeat_table_name | string | - | Table name for heartbeat (e.g., ‘cdc_status’) |
heartbeat_interval_seconds | number | 60.0 | How often to write heartbeat in seconds |
return_db_records_only | boolean | true | Return only DB records with Mage timestamp columns |
SSL Configuration
| Parameter | Type | Description |
|---|---|---|
ssl.sslmode | string | SSL mode (e.g., ‘require’, ‘prefer’, ‘verify-ca’, ‘verify-full’) |
ssl.sslcert | string | Client certificate file path |
ssl.sslkey | string | Client private key file path |
ssl.sslrootcert | string | Root certificate file path |
Record Formats
DB Records Only Format
Whenreturn_db_records_only: true (default):
_mage_* timestamp columns are returned as datetime objects (ISO 8601 format strings when serialized) in UTC timezone, not Unix timestamps. This ensures proper type handling when writing to databases.
Event Types
Row Events (Core CDC)
Row Events (Core CDC)
- Insert (
operation: "INSERT"): INSERT operations - Update (
operation: "UPDATE"): UPDATE operations with before/after data - Delete (
operation: "DELETE"): DELETE operations These events contain the actual data changes.
Schema Events
Schema Events
- Relation (
operation: "RELATION"): Table structure metadata - Truncate (
operation: "TRUNCATE"): TRUNCATE TABLE operations Automatically tracks table schema changes for column name mapping.
Transaction Events
Transaction Events
- Begin (
operation: "BEGIN"): Transaction begin events - Commit (
operation: "COMMIT"): Transaction commit events Useful for maintaining transaction boundaries and ensuring data consistency.
Primary Key Detection
The source automatically detects and extracts primary key columns:- Schema Discovery: Queries
INFORMATION_SCHEMAfor table structure - Primary Key Detection: Identifies actual primary key columns from database constraints
- Caching: Stores schema and primary key information for performance
- Key Extraction: Extracts primary key column names from row events
key_columns is a list of column names (not values) that can be used for deduplication or upsert operations in downstream sinks.
Examples
Prerequisites
PostgreSQL Server Configuration
Create Replication Slot
Create Publication
User Permissions
Troubleshooting
Common Issues
Common Issues
- Replication Slot Not Found: Create the replication slot using
pg_create_logical_replication_slot() - Publication Not Found: Create the publication using
CREATE PUBLICATION - Permission Denied: Ensure user has
REPLICATIONprivilege andSELECTon tables - WAL Level Not Logical: Set
wal_level = logicaland restart PostgreSQL - Connection Timeout: Increase
connect_timeoutvalue - LSN Format Error: Ensure LSN is in format ‘X/Y’ (e.g., ‘0/1234567’)
Debugging
Debugging
Check replication slot status:Check publication status:
Monitoring
Monitoring
Monitor key metrics:
- Batch Size: Average events per batch
- Flush Rate: How often batches are flushed
- Error Rate: Failed events or connections
- Lag: Time between event occurrence and processing
- LSN Progress: Track Log Sequence Number progress
Integration with Generic IO Sink
The PostgreSQL CDC source works seamlessly with the Generic IO sink, which provides:- Automatic Column Type Mapping: Automatically maps
_mage_*timestamp columns to appropriate database types (TIMESTAMP, DATETIME2, DateTime64, etc.) based on the target database - Metadata Interpolation: Use metadata values from PostgreSQL CDC events in sink configurations for dynamic routing and table naming
Supported Databases
Generic IO Sink supports the following databases:- BigQuery
- ClickHouse
- DuckDB
- MySQL
- MSSQL
- Postgres
Metadata Interpolation
You can use metadata values from PostgreSQL CDC events in your sink configuration using Python string formatting syntax:{schema}: Schema name from the event{table}: Table name from the event{key_columns}: List of primary key column names (e.g.,["id"]or["user_id", "tenant_id"])
{key_columns} in unique_constraints, it will be automatically converted from a string representation to a list. The format supports both Python-style ("['id']") and JSON-style ('["id"]') array strings.
Example Configurations
How Metadata Interpolation Works
-
Message Grouping: Messages are automatically grouped by their interpolated config values. For example, if you use
table_name: "{schema}_{table}", messages frompublic.userswill be grouped together and written topublic_userstable. -
Key Columns Interpolation: When using
{key_columns}inunique_constraints, the sink automatically:- Converts the list to a string representation during interpolation
- Parses it back to a list (supports both
"['id']"and'["id"]'formats) - Uses it for upsert operations based on
unique_conflict_method
users with primary key id, and you configure unique_constraints: "{key_columns}", it will automatically use ["id"] for upsert operations.
Best Practices
- Use Replication Slots: Always use replication slots for reliable CDC tracking
- Monitor WAL Growth: Monitor WAL size and ensure proper cleanup
- Filter Events: Use schema/table filters to reduce processing overhead
- Monitor Resources: Watch memory and CPU usage
- Test Resume: Verify checkpoint functionality with LSN tracking
- Secure Connections: Use SSL in production
- Regular Backups: Backup checkpoint files
- Schema Validation: Test with schema changes
- Use Generic IO Sink: Leverage automatic type mapping and metadata interpolation for flexible data routing
- Metadata Interpolation: Use
{schema},{table}, and{key_columns}for dynamic table routing and upsert configuration - Heartbeat Tables: Use heartbeat tables for managed services (like Aiven) to prevent WAL log filling
Timestamp Handling
The PostgreSQL CDC source automatically adds Mage timestamp columns:_mage_created_at: Set to event timestamp (datetime) for INSERT operations_mage_updated_at: Set to event timestamp (datetime) for UPDATE operations_mage_deleted_at: Set to event timestamp (datetime) for DELETE operations
- Proper type handling in downstream databases
- Automatic type conversion in Generic IO sink
- Consistent timezone handling across systems
Limitations
- PostgreSQL 10+: Requires PostgreSQL 10 or later for logical replication
- WAL Level: Requires
wal_level = logical - Network Dependency: Requires stable network connection
- Memory Usage: Schema caching uses memory
- WAL Retention: Depends on PostgreSQL WAL retention settings
- Timezone: All timestamps are in UTC timezone
- Replication Slot: Requires a dedicated replication slot per consumer
- Publication: Tables must be added to a publication to be replicated