Mage supports 2 types of change data capture with PostgreSQL:

  1. Batch query
  2. Log replication

Batch query

Mage will query PostgreSQL in batches using SELECT, WHERE, and ORDER BY statements.

Log replication

Mage will read the logs from PostgreSQL and use those as instructions to either create new rows, update existing rows, or delete rows in the destination.

How to setup log replication with PostgreSQL

Setup in PostgreSQL

  1. Enable logical replication in Postgres config

    1. Local Postgres
      1. Open the postgresql.conf file. Here is an example location on Mac OSX: /Users/Mage/Library/Application Support/Postgres/var-14/postgresql.conf.

      2. Under the settings section, change the value of wal_level to logical. The line in your postgresql.conf file should look like this:

        wal_level = logical
        
      3. Restart the PostgreSQL service or database. You can do this via the PostgreSQL app or if you’re on Linux, run the following commands:

        sudo service postgresql stop
        sudo service postgresql start
        
      4. Run the following query in your PostgreSQL database: SHOW wal_level. The result should be:

        wal_level
        logical
    2. AWS RDS/Aurora
      1. Follow this doc to set up PostgreSQL logical replication for an Aurora PostgreSQL DB cluster
      2. Logical replication for AWS RDS (PostgreSQL) can be set up in a similar way with the Aurora cluster.
  2. Run the following command in PostgreSQL to create a replication slot:

    SELECT pg_create_logical_replication_slot('mage_slot', 'pgoutput');
    
    The replication slot name mage_slot quoted is used as an example in the guide. The actual replication slot name should be unique for each pipeline source.

    The result should looking something like this:

    pg_create_logical_replication_slot
    (mage_slot,0/51A80778)
  3. Create a publication for all tables or for 1 specific table using the following commands:

    CREATE PUBLICATION mage_pub FOR ALL TABLES;
    
    mage_pub is used in Mage’s code

    or for 1 table:

    CREATE PUBLICATION mage_pub FOR TABLE some_schema.some_table_name;
    
    Replace some_schema with the schema of the table and some_table_name with the name of the table you want to replicate.

    Run the following command to add or remove a table from publication.

    ALTER PUBLICATION mage_pub ADD/DROP TABLE table_name;
    
  4. Verify that the publication was created successfully by running the following command in PostgreSQL:

    SELECT * FROM pg_publication_tables;
    

    The result should looking something like this:

    pubnameschemanametablename
    mage_pubpublicusers
  5. Grant the database user permission to read the replication slot:

    ALTER ROLE <username> WITH REPLICATION;
    

Create data integration pipeline in Mage

Follow this guide to create a data integration pipeline in Mage.

However, choose PostgreSQL as the source and choose LOG_BASED as the replication method.


Testing pipeline end-to-end

Once you’ve created the pipeline, add a few rows into your PostgreSQL table that you just created a logical replication for.

You can use the INSERT command to add rows. For example:

INSERT INTO some_schema.some_table_name
VALUES (1, 2, 3)

Replace some_schema with the schema of the table and some_table_name with the name of the table you want to replicate.

Change the VALUES to match the columns in your table.

Verify replication logs being created

Run the following commands in PostgreSQL to check for new logs:

SELECT
  *
FROM pg_logical_slot_peek_binary_changes('mage_slot', null, null, 'proto_version', '1', 'publication_names', 'mage_pub');
Run sync

After you added a few new rows, create a trigger to start running your pipeline and begin syncing data.