- Sources
- Change Data Capture (CDC) with PostgreSQL
Sources
Change Data Capture (CDC) with PostgreSQL
Mage supports 2 types of change data capture with PostgreSQL:
- Batch query
- Log replication
Batch query
Mage will query PostgreSQL in batches using SELECT
, WHERE
, and ORDER BY
statements.
Log replication
Mage will read the logs from PostgreSQL and use those as instructions to either create new rows, update existing rows, or delete rows in the destination.
How to setup log replication with PostgreSQL
Setup in PostgreSQL
-
Open the
postgresql.conf
file. Here is an example location on Mac OSX:/Users/Mage/Library/Application Support/Postgres/var-14/postgresql.conf
. -
Under the settings section, change the value of
wal_level
tological
. The line in yourpostgresql.conf
file should look like this:wal_level = logical
-
Restart the PostgreSQL service or database. You can do this via the PostgreSQL app or if you’re on Linux, run the following commands:
sudo service postgresql stop sudo service postgresql start
-
Run the following query in your PostgreSQL database:
SHOW wal_level
. The result should be:wal_level
logical
-
Run the following command in PostgreSQL to create a replication slot:
The replication slot nameSELECT pg_create_logical_replication_slot('mage_slot', 'pgoutput');
mage_slot
quoted is used as an example in the guide. The actual replication slot name should be unique for each pipeline source.The result should looking something like this:
pg_create_logical_replication_slot
(mage_slot,0/51A80778)
-
Create a publication for all tables or for 1 specific table using the following commands:
CREATE PUBLICATION mage_pub FOR ALL TABLES;
mage_pub
is used in Mage’s codeor for 1 table:
ReplaceCREATE PUBLICATION mage_pub FOR TABLE some_schema.some_table_name;
some_schema
with the schema of the table andsome_table_name
with the name of the table you want to replicate. -
Verify that the publication was created successfully by running the following command in PostgreSQL:
SELECT * FROM pg_publication_tables;
The result should looking something like this:
pubname
schemaname
tablename
mage_pub
public
users
Create data integration pipeline in Mage
Follow this guide to create a data integration pipeline in Mage.
However, choose PostgreSQL as the source and choose LOG_BASED
as the
replication method.
Testing pipeline end-to-end
Once you’ve created the pipeline, add a few rows into your PostgreSQL table that you just created a logical replication for.
You can use the INSERT
command to add rows. For example:
INSERT INTO some_schema.some_table_name
VALUES (1, 2, 3)
Replace some_schema
with the schema of the table and some_table_name
with the name
of the table you want to replicate.
Change the VALUES
to match the columns in your table.
Verify replication logs being created
Run the following commands in PostgreSQL to check for new logs:
SELECT
*
FROM pg_logical_slot_peek_binary_changes('mage_slot', null, null, 'proto_version', '1', 'publication_names', 'mage_pub');
Run sync
After you added a few new rows, create a trigger to start running your pipeline and begin syncing data.