Only in Mage Pro.
Try our fully managed solution to access this advanced feature.
Mage Pro streaming pipeline supports OracleDB CDC as the source. This document explains how to set up OracleDB CDC and integrate it into Mage’s streaming pipeline using LogMiner.
Basic Config
connector_type: oracledb
host: "oracledb.example.com"
port: 1521
service: "xepdb1"
password: "password"
user: "user"
mode: "thin" # Value can be `thin` or `thick`
table_names: [] # Filter table names
Setting Up OracleDB for CDC
Step 1: Enable Supplemental Logging
Run the following SQL commands to enable CDC:
ALTER DATABASE ADD SUPPLEMENTAL LOG DATA;
ALTER DATABASE ADD SUPPLEMENTAL LOG DATA (ALL) COLUMNS;
Step 2: Create a User for CDC
CREATE USER cdc_user IDENTIFIED BY YourPassword;
GRANT CONNECT, RESOURCE TO cdc_user;
GRANT EXECUTE ON DBMS_LOGMNR TO cdc_user;
GRANT EXECUTE ON DBMS_LOGMNR_D TO cdc_user;
GRANT SELECT ON V_$LOGMNR_CONTENTS TO cdc_user;
GRANT SELECT ON V_$DATABASE TO cdc_user;
GRANT SELECT ON V_$ARCHIVED_LOG TO cdc_user;
GRANT SELECT ON V_$LOGMNR_LOGS TO cdc_user;
GRANT SELECT ON V_$LOGMNR_PARAMETERS TO cdc_user;
GRANT SELECT ANY TABLE TO cdc_user;
Step 3: Start LogMiner Session
BEGIN
DBMS_LOGMNR.START_LOGMNR(
OPTIONS => DBMS_LOGMNR.DICT_FROM_ONLINE_CATALOG + DBMS_LOGMNR.NEW
);
END;
/
Test using LogMiner for CDC
LogMiner reads redo logs to track INSERT, UPDATE, DELETE operations.
Mage Data Pipeline for LogMiner-Based CDC
Create a data loader in Mage to pull real-time CDC data:
from mage_ai.settings.repo import get_repo_path
from mage_ai.io.config import ConfigFileLoader
from mage_ai.io.oracledb import OracleDB
from os import path
if 'data_loader' not in globals():
from mage_ai.data_preparation.decorators import data_loader
@data_loader
def load_data_from_oracledb(*args, **kwargs):
query = """
SELECT SCN, OPERATION, TIMESTAMP, SQL_REDO
FROM V$LOGMNR_CONTENTS
WHERE SEG_NAME = 'YOUR_TABLE'
AND OPERATION IN ('INSERT', 'UPDATE', 'DELETE')
"""
config_path = path.join(get_repo_path(), 'io_config.yaml')
config_profile = 'default'
with OracleDB.with_config(ConfigFileLoader(config_path, config_profile)) as loader:
return loader.load(query)