Mage Pro streaming pipeline supports OracleDB CDC as the source. This document explains how to set up OracleDB CDC and integrate it into Mage’s streaming pipeline using LogMiner.

Basic Config

connector_type: oracledb
host: "oracledb.example.com"
port: 1521
service: "xepdb1"
password: "password"
user: "user"
mode: "thin"        # Value can be `thin` or `thick`
table_names: []     # Filter table names

Setting Up OracleDB for CDC

Step 1: Enable Supplemental Logging

Run the following SQL commands to enable CDC:

ALTER DATABASE ADD SUPPLEMENTAL LOG DATA;
ALTER DATABASE ADD SUPPLEMENTAL LOG DATA (ALL) COLUMNS;

Step 2: Create a User for CDC

CREATE USER cdc_user IDENTIFIED BY YourPassword;
GRANT CONNECT, RESOURCE TO cdc_user;
GRANT EXECUTE ON DBMS_LOGMNR TO cdc_user;
GRANT EXECUTE ON DBMS_LOGMNR_D TO cdc_user;
GRANT SELECT ON V_$LOGMNR_CONTENTS TO cdc_user;
GRANT SELECT ON V_$DATABASE TO cdc_user;
GRANT SELECT ON V_$ARCHIVED_LOG TO cdc_user;
GRANT SELECT ON V_$LOGMNR_LOGS TO cdc_user;
GRANT SELECT ON V_$LOGMNR_PARAMETERS TO cdc_user;
GRANT SELECT ANY TABLE TO cdc_user;

Step 3: Start LogMiner Session

BEGIN
  DBMS_LOGMNR.START_LOGMNR(
    OPTIONS => DBMS_LOGMNR.DICT_FROM_ONLINE_CATALOG + DBMS_LOGMNR.NEW
  );
END;
/

Test using LogMiner for CDC

LogMiner reads redo logs to track INSERT, UPDATE, DELETE operations.

Mage Data Pipeline for LogMiner-Based CDC

Create a data loader in Mage to pull real-time CDC data:

from mage_ai.settings.repo import get_repo_path
from mage_ai.io.config import ConfigFileLoader
from mage_ai.io.oracledb import OracleDB
from os import path
if 'data_loader' not in globals():
    from mage_ai.data_preparation.decorators import data_loader


@data_loader
def load_data_from_oracledb(*args, **kwargs):
    query = """
    SELECT SCN, OPERATION, TIMESTAMP, SQL_REDO 
    FROM V$LOGMNR_CONTENTS 
    WHERE SEG_NAME = 'YOUR_TABLE' 
      AND OPERATION IN ('INSERT', 'UPDATE', 'DELETE')
    """
    config_path = path.join(get_repo_path(), 'io_config.yaml')
    config_profile = 'default'

    with OracleDB.with_config(ConfigFileLoader(config_path, config_profile)) as loader:
        return loader.load(query)