Skip to main content

Overview

The PostgreSQL CDC streaming source captures real-time changes from PostgreSQL Write-Ahead Log (WAL) using logical replication (pgoutput) and processes them into structured records. It provides comprehensive CDC capabilities with automatic schema evolution tracking and primary key detection.

Real-time CDC

Captures changes as they happen in PostgreSQL

Logical Replication

Uses PostgreSQL’s native logical replication protocol

Primary Key Detection

Automatically identifies and extracts primary key columns

LSN Tracking

Tracks Log Sequence Numbers for reliable resume capabilities

Configuration

Required Parameters

ParameterTypeDescription
hoststringPostgreSQL server hostname or IP address
portnumberPostgreSQL server port (default: 5432)
databasestringDatabase name (default: postgres)
userstringPostgreSQL username with replication privileges
passwordstringPostgreSQL password

Optional Parameters

ParameterTypeDefaultDescription
connect_timeoutnumber10Connection timeout in seconds
schemasarray-Include only these schemas
tablesarray-Include only these tables
ignore_schemasarray-Exclude these schemas
ignore_tablesarray-Exclude these tables
replication_slotstringmage_slotLogical replication slot name
publication_namestringmage_pubPublication name for logical replication
start_lsnstring-Log Sequence Number to start from (e.g., ‘0/1234567’)
max_batch_sizenumber100Maximum events per batch
flush_interval_secondsnumber1.0Maximum time between batch flushes
keep_alive_timenumber10.0Time between keep-alive messages in seconds
heartbeat_table_schemastring-Schema name for heartbeat table (e.g., ‘public’)
heartbeat_table_namestring-Table name for heartbeat (e.g., ‘cdc_status’)
heartbeat_interval_secondsnumber60.0How often to write heartbeat in seconds
return_db_records_onlybooleantrueReturn only DB records with Mage timestamp columns

SSL Configuration

ParameterTypeDescription
ssl.sslmodestringSSL mode (e.g., ‘require’, ‘prefer’, ‘verify-ca’, ‘verify-full’)
ssl.sslcertstringClient certificate file path
ssl.sslkeystringClient private key file path
ssl.sslrootcertstringRoot certificate file path

Record Formats

DB Records Only Format

When return_db_records_only: true (default):
{
  "data": {
    "id": 1,
    "name": "John",
    "email": "john@example.com",
    "_mage_created_at": "2021-12-31T23:00:00+00:00",
    "_mage_updated_at": null,
    "_mage_deleted_at": null
  },
  "metadata": {
    "schema": "public",
    "table": "users",
    "key_columns": ["id"],
    "operation": "INSERT",
    "lsn": "0/1234567"
  }
}
Note: The _mage_* timestamp columns are returned as datetime objects (ISO 8601 format strings when serialized) in UTC timezone, not Unix timestamps. This ensures proper type handling when writing to databases.

Event Types

  • Insert (operation: "INSERT"): INSERT operations
  • Update (operation: "UPDATE"): UPDATE operations with before/after data
  • Delete (operation: "DELETE"): DELETE operations These events contain the actual data changes.
  • Relation (operation: "RELATION"): Table structure metadata
  • Truncate (operation: "TRUNCATE"): TRUNCATE TABLE operations Automatically tracks table schema changes for column name mapping.
  • Begin (operation: "BEGIN"): Transaction begin events
  • Commit (operation: "COMMIT"): Transaction commit events Useful for maintaining transaction boundaries and ensuring data consistency.

Primary Key Detection

The source automatically detects and extracts primary key columns:
  1. Schema Discovery: Queries INFORMATION_SCHEMA for table structure
  2. Primary Key Detection: Identifies actual primary key columns from database constraints
  3. Caching: Stores schema and primary key information for performance
  4. Key Extraction: Extracts primary key column names from row events
{
  "metadata": {
    "key_columns": ["user_id", "tenant_id"]
  }
}
Note: key_columns is a list of column names (not values) that can be used for deduplication or upsert operations in downstream sinks.

Examples

connector_type: postgres
host: localhost
port: 5432
database: mydb
user: repl_user
password: password
schemas: ["public"]
return_db_records_only: true

Prerequisites

PostgreSQL Server Configuration

-- Enable logical replication
ALTER SYSTEM SET wal_level = logical;
-- Restart PostgreSQL for changes to take effect

-- Verify WAL level
SHOW wal_level;
-- Should return: logical

Create Replication Slot

-- Create logical replication slot
SELECT pg_create_logical_replication_slot('mage_slot', 'pgoutput');

Create Publication

-- Create publication for all tables in a schema
CREATE PUBLICATION mage_pub FOR ALL TABLES;

-- Or create publication for specific tables
CREATE PUBLICATION mage_pub FOR TABLE users, orders, products;

-- Or create publication for specific schemas
CREATE PUBLICATION mage_pub FOR TABLES IN SCHEMA public, analytics;

User Permissions

-- Create replication user
CREATE USER repl_user WITH PASSWORD 'password';

-- Grant replication privileges
ALTER USER repl_user WITH REPLICATION;

-- Grant connect privileges
GRANT CONNECT ON DATABASE mydb TO repl_user;

-- Grant usage on schemas
GRANT USAGE ON SCHEMA public TO repl_user;
GRANT USAGE ON SCHEMA analytics TO repl_user;

-- Grant SELECT privileges for schema queries
GRANT SELECT ON ALL TABLES IN SCHEMA public TO repl_user;
GRANT SELECT ON ALL TABLES IN SCHEMA analytics TO repl_user;

-- Grant SELECT on information_schema for metadata queries
GRANT SELECT ON ALL TABLES IN SCHEMA information_schema TO repl_user;

Troubleshooting

  1. Replication Slot Not Found: Create the replication slot using pg_create_logical_replication_slot()
  2. Publication Not Found: Create the publication using CREATE PUBLICATION
  3. Permission Denied: Ensure user has REPLICATION privilege and SELECT on tables
  4. WAL Level Not Logical: Set wal_level = logical and restart PostgreSQL
  5. Connection Timeout: Increase connect_timeout value
  6. LSN Format Error: Ensure LSN is in format ‘X/Y’ (e.g., ‘0/1234567’)
Check replication slot status:
SELECT * FROM pg_replication_slots WHERE slot_name = 'mage_slot';
Check publication status:
SELECT * FROM pg_publication WHERE pubname = 'mage_pub';
SELECT * FROM pg_publication_tables WHERE pubname = 'mage_pub';
Monitor key metrics:
  • Batch Size: Average events per batch
  • Flush Rate: How often batches are flushed
  • Error Rate: Failed events or connections
  • Lag: Time between event occurrence and processing
  • LSN Progress: Track Log Sequence Number progress
Check replication lag:
SELECT 
    slot_name,
    pg_size_pretty(pg_wal_lsn_diff(pg_current_wal_lsn(), confirmed_flush_lsn)) AS replication_lag
FROM pg_replication_slots
WHERE slot_name = 'mage_slot';

Integration with Generic IO Sink

The PostgreSQL CDC source works seamlessly with the Generic IO sink, which provides:
  • Automatic Column Type Mapping: Automatically maps _mage_* timestamp columns to appropriate database types (TIMESTAMP, DATETIME2, DateTime64, etc.) based on the target database
  • Metadata Interpolation: Use metadata values from PostgreSQL CDC events in sink configurations for dynamic routing and table naming

Supported Databases

Generic IO Sink supports the following databases:
  • BigQuery
  • ClickHouse
  • DuckDB
  • MySQL
  • MSSQL
  • Postgres

Metadata Interpolation

You can use metadata values from PostgreSQL CDC events in your sink configuration using Python string formatting syntax:
  • {schema}: Schema name from the event
  • {table}: Table name from the event
  • {key_columns}: List of primary key column names (e.g., ["id"] or ["user_id", "tenant_id"])
When using {key_columns} in unique_constraints, it will be automatically converted from a string representation to a list. The format supports both Python-style ("['id']") and JSON-style ('["id"]') array strings.

Example Configurations

connector_type: bigquery
profile: default
config:
  table_id: "{schema}.{table}"  # Dynamic table names from metadata
  unique_conflict_method: UPDATE
  unique_constraints: "{key_columns}"  # Uses primary keys from metadata

How Metadata Interpolation Works

  1. Message Grouping: Messages are automatically grouped by their interpolated config values. For example, if you use table_name: "{schema}_{table}", messages from public.users will be grouped together and written to public_users table.
  2. Key Columns Interpolation: When using {key_columns} in unique_constraints, the sink automatically:
    • Converts the list to a string representation during interpolation
    • Parses it back to a list (supports both "['id']" and '["id"]' formats)
    • Uses it for upsert operations based on unique_conflict_method
Example: If you have a table users with primary key id, and you configure unique_constraints: "{key_columns}", it will automatically use ["id"] for upsert operations.

Best Practices

  1. Use Replication Slots: Always use replication slots for reliable CDC tracking
  2. Monitor WAL Growth: Monitor WAL size and ensure proper cleanup
  3. Filter Events: Use schema/table filters to reduce processing overhead
  4. Monitor Resources: Watch memory and CPU usage
  5. Test Resume: Verify checkpoint functionality with LSN tracking
  6. Secure Connections: Use SSL in production
  7. Regular Backups: Backup checkpoint files
  8. Schema Validation: Test with schema changes
  9. Use Generic IO Sink: Leverage automatic type mapping and metadata interpolation for flexible data routing
  10. Metadata Interpolation: Use {schema}, {table}, and {key_columns} for dynamic table routing and upsert configuration
  11. Heartbeat Tables: Use heartbeat tables for managed services (like Aiven) to prevent WAL log filling

Timestamp Handling

The PostgreSQL CDC source automatically adds Mage timestamp columns:
  • _mage_created_at: Set to event timestamp (datetime) for INSERT operations
  • _mage_updated_at: Set to event timestamp (datetime) for UPDATE operations
  • _mage_deleted_at: Set to event timestamp (datetime) for DELETE operations
All timestamps are in UTC timezone and returned as datetime objects, which ensures:
  • Proper type handling in downstream databases
  • Automatic type conversion in Generic IO sink
  • Consistent timezone handling across systems

Limitations

  • PostgreSQL 10+: Requires PostgreSQL 10 or later for logical replication
  • WAL Level: Requires wal_level = logical
  • Network Dependency: Requires stable network connection
  • Memory Usage: Schema caching uses memory
  • WAL Retention: Depends on PostgreSQL WAL retention settings
  • Timezone: All timestamps are in UTC timezone
  • Replication Slot: Requires a dedicated replication slot per consumer
  • Publication: Tables must be added to a publication to be replicated