> ## Documentation Index
> Fetch the complete documentation index at: https://docs.mage.ai/llms.txt
> Use this file to discover all available pages before exploring further.

# PostgreSQL CDC

> Real-time Change Data Capture from PostgreSQL WAL with logical replication

export const ProOnly = ({button = 'Get started for free', description = 'Try our fully managed solution to access this advanced feature.', source = 'documentation', title = 'Only in Mage Pro.'}) => <a href={`https://cloud.mage.ai/sign-up?source=${source}`} className="block my-4 px-5 py-4 overflow-hidden rounded-xl flex gap-3 border border-emerald-500/20 bg-emerald-50/50 dark:border-emerald-500/30 dark:bg-emerald-500/10" target="_blank">
    <div style={{
  display: 'flex',
  alignItems: 'center',
  width: '100%'
}}>
      <div className="text-sm prose min-w-0 text-emerald-900 dark:text-emerald-200" style={{
  flex: 1
}}>
        {title}
        <p className="normal">{description}</p>
      </div>

      <div> </div>

      <div>
        <ProButton label={button} href={`https://cloud.mage.ai/sign-up?source=${source}`} />
      </div>
    </div>
  </a>;

export const ProButton = ({href, label = 'Get started with Mage Pro for free', source = 'documentation'}) => <div style={{
  height: 32,
  position: 'relative'
}}>
    <a target="_blank" className="group px-4 py-1.5 relative inline-flex items-center text-sm font-medium rounded-full" href={href ?? `https://cloud.mage.ai/sign-up?source=${source}`}>
      <span className="absolute inset-0 bg-primary-dark dark:bg-primary-light/10 border-primary-light/30 rounded-full dark:border group-hover:opacity-[0.9] dark:group-hover:border-primary-light/60">
      </span>

      <div className="mr-0.5 space-x-2.5 flex items-center">
        <span class="z-10 text-white dark:text-primary-light">
          {label}
        </span>

        <svg width="3" height="24" viewBox="0 -9 3 24" class="h-5 rotate-0 overflow-visible text-white/90 dark:text-primary-light">
          <path d="M0 0L3 3L0 6" fill="none" stroke="currentColor" stroke-width="1.5" stroke-linecap="round"></path>
        </svg>
      </div>
    </a>
  </div>;

<ProOnly source="postgres-cdc" />

## Overview

The PostgreSQL CDC streaming source captures real-time changes from PostgreSQL Write-Ahead Log (WAL) using logical replication (pgoutput) and processes them into structured records. It provides comprehensive CDC capabilities with automatic schema evolution tracking and primary key detection.

<CardGroup cols={2}>
  <Card title="Real-time CDC" icon="zap">
    Captures changes as they happen in PostgreSQL
  </Card>

  <Card title="Logical Replication" icon="database">
    Uses PostgreSQL's native logical replication protocol
  </Card>

  <Card title="Primary Key Detection" icon="key">
    Automatically identifies and extracts primary key columns
  </Card>

  <Card title="LSN Tracking" icon="shield">
    Tracks Log Sequence Numbers for reliable resume capabilities
  </Card>
</CardGroup>

## Configuration

### Required Parameters

| Parameter  | Type   | Description                                     |
| ---------- | ------ | ----------------------------------------------- |
| `host`     | string | PostgreSQL server hostname or IP address        |
| `port`     | number | PostgreSQL server port (default: 5432)          |
| `database` | string | Database name (default: postgres)               |
| `user`     | string | PostgreSQL username with replication privileges |
| `password` | string | PostgreSQL password                             |

### Optional Parameters

| Parameter                    | Type    | Default    | Description                                                                                                                                                                                             |
| ---------------------------- | ------- | ---------- | ------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------- |
| `connect_timeout`            | number  | 10         | Connection timeout in seconds                                                                                                                                                                           |
| `schemas`                    | array   | -          | Include only these schemas                                                                                                                                                                              |
| `tables`                     | array   | -          | Include only these tables                                                                                                                                                                               |
| `ignore_schemas`             | array   | -          | Exclude these schemas                                                                                                                                                                                   |
| `ignore_tables`              | array   | -          | Exclude these tables                                                                                                                                                                                    |
| `replication_slot`           | string  | mage\_slot | Logical replication slot name                                                                                                                                                                           |
| `publication_name`           | string  | mage\_pub  | Publication name for logical replication                                                                                                                                                                |
| `start_lsn`                  | string  | -          | Log Sequence Number to start from (e.g., '0/1234567')                                                                                                                                                   |
| `max_batch_size`             | number  | 100        | Maximum events per batch                                                                                                                                                                                |
| `flush_interval_seconds`     | number  | 1.0        | Maximum time between batch flushes                                                                                                                                                                      |
| `keep_alive_time`            | number  | 10.0       | Time between keep-alive messages in seconds                                                                                                                                                             |
| `heartbeat_table_schema`     | string  | -          | Schema name for heartbeat table (e.g., 'public')                                                                                                                                                        |
| `heartbeat_table_name`       | string  | -          | Table name for heartbeat (e.g., 'cdc\_status')                                                                                                                                                          |
| `heartbeat_interval_seconds` | number  | 60.0       | How often to write heartbeat in seconds                                                                                                                                                                 |
| `heartbeat_update_query`     | string  | -          | Custom SQL query to update heartbeat (overrides default). **WARNING**: Query is executed directly without sanitization. Only use queries from trusted sources to prevent SQL injection vulnerabilities. |
| `return_db_records_only`     | boolean | true       | Return only DB records with Mage timestamp columns                                                                                                                                                      |

### SSL Configuration

| Parameter         | Type   | Description                                                      |
| ----------------- | ------ | ---------------------------------------------------------------- |
| `ssl.sslmode`     | string | SSL mode (e.g., 'require', 'prefer', 'verify-ca', 'verify-full') |
| `ssl.sslcert`     | string | Client certificate file path                                     |
| `ssl.sslkey`      | string | Client private key file path                                     |
| `ssl.sslrootcert` | string | Root certificate file path                                       |

## Record Formats

### DB Records Only Format

When `return_db_records_only: true` (default):

```json theme={"system"}
{
  "data": {
    "id": 1,
    "name": "John",
    "email": "john@example.com",
    "_mage_created_at": "2021-12-31T23:00:00+00:00",
    "_mage_updated_at": null,
    "_mage_deleted_at": null
  },
  "metadata": {
    "schema": "public",
    "table": "users",
    "key_columns": ["id"],
    "operation": "INSERT",
    "lsn": "0/1234567"
  }
}
```

**Note**: The `_mage_*` timestamp columns are returned as datetime objects (ISO 8601 format strings when serialized) in UTC timezone, not Unix timestamps. This ensures proper type handling when writing to databases.

## Event Types

<AccordionGroup>
  <Accordion title="Row Events (Core CDC)">
    * **Insert** (`operation: "INSERT"`): INSERT operations
    * **Update** (`operation: "UPDATE"`): UPDATE operations with before/after data
    * **Delete** (`operation: "DELETE"`): DELETE operations
      These events contain the actual data changes.
  </Accordion>

  <Accordion title="Schema Events">
    * **Relation** (`operation: "RELATION"`): Table structure metadata
    * **Truncate** (`operation: "TRUNCATE"`): TRUNCATE TABLE operations
      Automatically tracks table schema changes for column name mapping.
  </Accordion>

  <Accordion title="Transaction Events">
    * **Begin** (`operation: "BEGIN"`): Transaction begin events
    * **Commit** (`operation: "COMMIT"`): Transaction commit events
      Useful for maintaining transaction boundaries and ensuring data consistency.
  </Accordion>
</AccordionGroup>

## Primary Key Detection

The source automatically detects and extracts primary key columns:

1. **Schema Discovery**: Queries `INFORMATION_SCHEMA` for table structure
2. **Primary Key Detection**: Identifies actual primary key columns from database constraints
3. **Caching**: Stores schema and primary key information for performance
4. **Key Extraction**: Extracts primary key column names from row events

<CodeGroup>
  ```json Example Output theme={"system"}
  {
    "metadata": {
      "key_columns": ["user_id", "tenant_id"]
    }
  }
  ```
</CodeGroup>

**Note**: `key_columns` is a list of column names (not values) that can be used for deduplication or upsert operations in downstream sinks.

## Examples

<CodeGroup>
  ```yaml Basic Setup theme={"system"}
  connector_type: postgres
  host: localhost
  port: 5432
  database: mydb
  user: repl_user
  password: password
  schemas: ["public"]
  return_db_records_only: true
  ```

  ```yaml Production Setup theme={"system"}
  connector_type: postgres
  host: postgres-prod.company.com
  port: 5432
  database: production_db
  user: cdc_user
  password: secure_password
  ssl:
    sslmode: require
    sslcert: "/etc/ssl/postgres/client-cert.pem"
    sslkey: "/etc/ssl/postgres/client-key.pem"
    sslrootcert: "/etc/ssl/postgres/ca.pem"
  schemas: ["public", "analytics"]
  ignore_tables: ["temp_*", "log_*"]
  replication_slot: prod_cdc_slot
  publication_name: prod_cdc_pub
  max_batch_size: 500
  flush_interval_seconds: 2.0
  return_db_records_only: true
  ```

  ```yaml Resume from LSN theme={"system"}
  connector_type: postgres
  host: localhost
  port: 5432
  database: mydb
  user: repl_user
  password: password
  start_lsn: "0/1234567"
  schemas: ["public"]
  return_db_records_only: true
  ```

  ```yaml With Heartbeat Table theme={"system"}
  connector_type: postgres
  host: aiven-postgres.company.com
  port: 5432
  database: mydb
  user: cdc_user
  password: password
  heartbeat_table_schema: public
  heartbeat_table_name: cdc_status
  heartbeat_interval_seconds: 60.0
  schemas: ["public"]
  return_db_records_only: true
  ```

  ```yaml With Custom Heartbeat Query theme={"system"}
  connector_type: postgres
  host: aiven-postgres.company.com
  port: 5432
  database: mydb
  user: cdc_user
  password: password
  heartbeat_table_schema: public
  heartbeat_table_name: cdc_status
  heartbeat_interval_seconds: 60.0
  heartbeat_update_query: "UPDATE public.cdc_status SET timestamp = NOW(), last_updated_by = 'mage_cdc' WHERE id = 1"
  schemas: ["public"]
  return_db_records_only: true
  ```
</CodeGroup>

## Heartbeat Table

The heartbeat table is used to prevent WAL logs from filling up in managed PostgreSQL services (like Aiven). When configured, the source periodically writes to this table to keep the replication slot active.

### Schema

The heartbeat table is automatically created if it doesn't exist. It has the following schema:

```sql theme={"system"}
CREATE TABLE {schema}.{table} (
    id INTEGER PRIMARY KEY DEFAULT 1,
    timestamp TIMESTAMP DEFAULT CURRENT_TIMESTAMP
);
```

**Columns:**

* `id` (INTEGER PRIMARY KEY): Primary key column with default value of 1
* `timestamp` (TIMESTAMP): Timestamp column that gets updated with the current timestamp on each heartbeat write

### How It Works

1. **Automatic Creation**: If the table doesn't exist, it's automatically created when the source initializes
2. **Periodic Updates**: The source updates the `timestamp` column every `heartbeat_interval_seconds` (default: 60 seconds)
3. **Default UPSERT Pattern**: Uses `INSERT ... ON CONFLICT DO UPDATE` to update the single row with `id=1`, or falls back to `UPDATE`/`INSERT` if the database doesn't support `ON CONFLICT`
4. **Custom Query**: You can override the default heartbeat update query by providing `heartbeat_update_query` in the configuration

### Security Warning for Custom Heartbeat Query

<Warning>
  **Important Security Notice**: The `heartbeat_update_query` parameter executes SQL queries directly without sanitization or parameterization. This could introduce SQL injection vulnerabilities if the query is sourced from untrusted input.

  **Best Practices:**

  * Only use queries from trusted, validated sources
  * Never construct queries dynamically from user input or external APIs
  * Validate that the query is not empty or whitespace-only before use
  * Consider using the default heartbeat mechanism unless you have specific requirements
  * Review and test custom queries thoroughly before deploying to production

  The source performs basic validation to ensure the query is not empty or whitespace-only, but does not perform SQL injection protection. Always ensure your configuration management system properly secures and validates this parameter.
</Warning>

### Manual Creation

If you prefer to create the table manually, you can use:

```sql theme={"system"}
CREATE TABLE public.cdc_status (
    id INTEGER PRIMARY KEY DEFAULT 1,
    timestamp TIMESTAMP DEFAULT CURRENT_TIMESTAMP
);

-- Insert initial row
INSERT INTO public.cdc_status (id, timestamp)
VALUES (1, CURRENT_TIMESTAMP);
```

**Note**: The table must have a primary key or unique constraint on the `id` column for the UPSERT pattern to work correctly.

## Prerequisites

### PostgreSQL Server Configuration

```sql theme={"system"}
-- Enable logical replication
ALTER SYSTEM SET wal_level = logical;
-- Restart PostgreSQL for changes to take effect

-- Verify WAL level
SHOW wal_level;
-- Should return: logical
```

### Create Replication Slot

```sql theme={"system"}
-- Create logical replication slot
SELECT pg_create_logical_replication_slot('mage_slot', 'pgoutput');
```

### Create Publication

```sql theme={"system"}
-- Create publication for all tables in a schema
CREATE PUBLICATION mage_pub FOR ALL TABLES;

-- Or create publication for specific tables
CREATE PUBLICATION mage_pub FOR TABLE users, orders, products;

-- Or create publication for specific schemas
CREATE PUBLICATION mage_pub FOR TABLES IN SCHEMA public, analytics;
```

### User Permissions

```sql theme={"system"}
-- Create replication user
CREATE USER repl_user WITH PASSWORD 'password';

-- Grant replication privileges
ALTER USER repl_user WITH REPLICATION;

-- Grant connect privileges
GRANT CONNECT ON DATABASE mydb TO repl_user;

-- Grant usage on schemas
GRANT USAGE ON SCHEMA public TO repl_user;
GRANT USAGE ON SCHEMA analytics TO repl_user;

-- Grant SELECT privileges for schema queries
GRANT SELECT ON ALL TABLES IN SCHEMA public TO repl_user;
GRANT SELECT ON ALL TABLES IN SCHEMA analytics TO repl_user;

-- Grant SELECT on information_schema for metadata queries
GRANT SELECT ON ALL TABLES IN SCHEMA information_schema TO repl_user;
```

## Troubleshooting

<AccordionGroup>
  <Accordion title="Common Issues">
    1. **Replication Slot Not Found**: Create the replication slot using `pg_create_logical_replication_slot()`
    2. **Publication Not Found**: Create the publication using `CREATE PUBLICATION`
    3. **Permission Denied**: Ensure user has `REPLICATION` privilege and `SELECT` on tables
    4. **WAL Level Not Logical**: Set `wal_level = logical` and restart PostgreSQL
    5. **Connection Timeout**: Increase `connect_timeout` value
    6. **LSN Format Error**: Ensure LSN is in format 'X/Y' (e.g., '0/1234567')
  </Accordion>

  <Accordion title="Debugging">
    Check replication slot status:

    ```sql theme={"system"}
    SELECT * FROM pg_replication_slots WHERE slot_name = 'mage_slot';
    ```

    Check publication status:

    ```sql theme={"system"}
    SELECT * FROM pg_publication WHERE pubname = 'mage_pub';
    SELECT * FROM pg_publication_tables WHERE pubname = 'mage_pub';
    ```
  </Accordion>

  <Accordion title="Monitoring">
    Monitor key metrics:

    * **Batch Size**: Average events per batch
    * **Flush Rate**: How often batches are flushed
    * **Error Rate**: Failed events or connections
    * **Lag**: Time between event occurrence and processing
    * **LSN Progress**: Track Log Sequence Number progress

    Check replication lag:

    ```sql theme={"system"}
    SELECT 
        slot_name,
        pg_size_pretty(pg_wal_lsn_diff(pg_current_wal_lsn(), confirmed_flush_lsn)) AS replication_lag
    FROM pg_replication_slots
    WHERE slot_name = 'mage_slot';
    ```
  </Accordion>
</AccordionGroup>

## Integration with Generic IO Sink

The PostgreSQL CDC source works seamlessly with the Generic IO sink, which provides:

* **Automatic Column Type Mapping**: Automatically maps `_mage_*` timestamp columns to appropriate database types (TIMESTAMP, DATETIME2, DateTime64, etc.) based on the target database
* **Metadata Interpolation**: Use metadata values from PostgreSQL CDC events in sink configurations for dynamic routing and table naming

### Supported Databases

Generic IO Sink supports the following databases:

* **BigQuery**
* **ClickHouse**
* **DuckDB**
* **MySQL**
* **MSSQL**
* **Postgres**

### Metadata Interpolation

You can use metadata values from PostgreSQL CDC events in your sink configuration using Python string formatting syntax:

* **`{schema}`**: Schema name from the event
* **`{table}`**: Table name from the event
* **`{key_columns}`**: List of primary key column names (e.g., `["id"]` or `["user_id", "tenant_id"]`)

When using `{key_columns}` in `unique_constraints`, it will be automatically converted from a string representation to a list. The format supports both Python-style (`"['id']"`) and JSON-style (`'["id"]'`) array strings.

### Example Configurations

<CodeGroup>
  ```yaml BigQuery Example theme={"system"}
  connector_type: bigquery
  profile: default
  config:
    table_id: "{schema}.{table}"  # Dynamic table names from metadata
    unique_conflict_method: UPDATE
    unique_constraints: "{key_columns}"  # Uses primary keys from metadata
  ```

  ```yaml Postgres Example theme={"system"}
  connector_type: postgres
  profile: default
  config:
    schema_name: "{schema}"  # Dynamic schema from metadata
    table_name: "{table}"    # Dynamic table name from metadata
    unique_conflict_method: UPDATE
    unique_constraints: "{key_columns}"  # Uses primary keys from metadata
    if_exists: append
  ```

  ```yaml MySQL Example theme={"system"}
  connector_type: mysql
  profile: default
  config:
    schema_name: "{schema}"
    table_name: "{table}"
    unique_conflict_method: UPDATE
    unique_constraints: "{key_columns}"
    if_exists: append
  ```

  ```yaml Static Configuration theme={"system"}
  connector_type: postgres
  profile: default
  config:
    schema_name: public
    table_name: cdc_events  # Static table name
    unique_constraints: ['id', 'event_id']  # Static list (string format also works: "['id', 'event_id']")
    unique_conflict_method: UPDATE
    if_exists: append
  ```
</CodeGroup>

### How Metadata Interpolation Works

1. **Message Grouping**: Messages are automatically grouped by their interpolated config values. For example, if you use `table_name: "{schema}_{table}"`, messages from `public.users` will be grouped together and written to `public_users` table.

2. **Key Columns Interpolation**: When using `{key_columns}` in `unique_constraints`, the sink automatically:
   * Converts the list to a string representation during interpolation
   * Parses it back to a list (supports both `"['id']"` and `'["id"]'` formats)
   * Uses it for upsert operations based on `unique_conflict_method`

**Example**: If you have a table `users` with primary key `id`, and you configure `unique_constraints: "{key_columns}"`, it will automatically use `["id"]` for upsert operations.

## Best Practices

1. **Use Replication Slots**: Always use replication slots for reliable CDC tracking
2. **Monitor WAL Growth**: Monitor WAL size and ensure proper cleanup
3. **Filter Events**: Use schema/table filters to reduce processing overhead
4. **Monitor Resources**: Watch memory and CPU usage
5. **Test Resume**: Verify checkpoint functionality with LSN tracking
6. **Secure Connections**: Use SSL in production
7. **Regular Backups**: Backup checkpoint files
8. **Schema Validation**: Test with schema changes
9. **Use Generic IO Sink**: Leverage automatic type mapping and metadata interpolation for flexible data routing
10. **Metadata Interpolation**: Use `{schema}`, `{table}`, and `{key_columns}` for dynamic table routing and upsert configuration
11. **Heartbeat Tables**: Use heartbeat tables for managed services (like Aiven) to prevent WAL log filling

## Timestamp Handling

The PostgreSQL CDC source automatically adds Mage timestamp columns:

* **`_mage_created_at`**: Set to event timestamp (datetime) for INSERT operations
* **`_mage_updated_at`**: Set to event timestamp (datetime) for UPDATE operations
* **`_mage_deleted_at`**: Set to event timestamp (datetime) for DELETE operations

All timestamps are in UTC timezone and returned as datetime objects, which ensures:

* Proper type handling in downstream databases
* Automatic type conversion in Generic IO sink
* Consistent timezone handling across systems

## Limitations

* **PostgreSQL 10+**: Requires PostgreSQL 10 or later for logical replication
* **WAL Level**: Requires `wal_level = logical`
* **Network Dependency**: Requires stable network connection
* **Memory Usage**: Schema caching uses memory
* **WAL Retention**: Depends on PostgreSQL WAL retention settings
* **Timezone**: All timestamps are in UTC timezone
* **Replication Slot**: Requires a dedicated replication slot per consumer
* **Publication**: Tables must be added to a publication to be replicated
