Basic config
profileselects which credentials and connection settings to use. It is the name of a profile (section) in your project’sio_config.yaml. The sink loads that profile via the same IO config used elsewhere in Mage (e.g.ConfigFileLoader(repo_path/io_config.yaml, profile)). Use thedefaultprofile unless you have multiple BigQuery connections (e.g. dev vs prod); then define separate profiles (e.g.default,prod) inio_config.yamland setprofile: prodin the sink.configis a key-value map passed to the BigQuery export call. It controls where and how data is written (e.g.table_id,if_exists,database,overwrite_types). It is merged with defaults (e.g.if_exists: append) and supports metadata interpolation so you can use values from each message (e.g.table_id: "{schema}.{table}").configis separate from credentials: credentials come from the chosen profile; config only holds export options like table and column types.
How it works
- Storage Write API: Streaming inserts use the BigQuery Storage Write API (not the older streaming insert API). Requires
google-cloud-bigquery-storageand PyArrow. If the table does not exist, the pipeline will not create it; create the table beforehand. - Schema: Only DataFrame columns that exist in the BigQuery table schema are written. Extra columns are dropped. Column types are cast to match the table (STRING, INT64, TIMESTAMP, RECORD, etc.).
- CDC columns: The sink automatically maps
_mage_created_at,_mage_updated_at, and_mage_deleted_atto BigQueryTIMESTAMPtype when present in the data. - Required (NOT NULL) fields: If the table has REQUIRED columns and any row has nulls in those columns (including nested RECORD fields), the write fails and failed rows can be sent to a dead-letter queue (see below).
Optional config
Config parameters
| Parameter | Type | Description | Default |
|---|---|---|---|
table_id | string | BigQuery table identifier. Use project_id.dataset_id.table_id or dataset_id.table_id (project from profile or database). Supports metadata interpolation, e.g. "{schema}.{table}" for CDC. | (required) |
database | string | Default project when table_id is two-part (dataset_id.table_id). Overrides the profile project if set. | profile project |
if_exists | string | Behavior when the table already exists: append (add rows), replace (truncate/overwrite existing rows; preserves table schema), or fail (throw an error). | append |
overwrite_types | object | Map of column name → BigQuery type (e.g. STRING, INT64, TIMESTAMP, RECORD). Used to cast columns before write. Useful for _mage_* or custom columns. | — |
unique_constraints | list (or string) | List of column names for upsert/merge. When set with unique_conflict_method, exports as MERGE instead of append. Supports interpolation, e.g. "{key_columns}" from PostgreSQL CDC. | — |
unique_conflict_method | string | For upsert: e.g. UPDATE to update existing rows on conflict. Used only when unique_constraints is set. | — |
streaming_inserts: true so writes use the BigQuery Storage Write API. The target table must already exist (the sink does not create it).
Dead-letter queue
When a batch fails (e.g. required field has null, or Storage Write API returns row errors), failed rows can be written to a dead-letter destination. Dead-letter is top-level (sibling toconfig), not inside config. Schema: DeadLetterConfig:
| Field | Description |
|---|---|
destination_type | Where to write failed rows: file (default) = JSONL file; table = BigQuery table (same project/dataset as main table). |
enabled | If true, write failed rows to the DLQ (per destination_type). If false, do not write to DLQ. Default: true. |
table_suffix | When destination_type is table: suffix applied to the main table_id to form the DLQ table (e.g. _dlq → project.dataset.table_dlq). Ignored if table_name is set. Default: _dlq. |
table_name | When destination_type is table: full table identifier for the DLQ table (e.g. project_id.dataset_id.dlq_table_id). Overrides table_suffix when set. |
file_path | When destination_type is file: path for the JSONL file. Default: under the pipeline variables dir (e.g. <variables_dir>/streaming_dlq.jsonl). |
dead_letter is omitted entirely, failed rows are written to the default file path (same as destination_type: file with default file_path).
DLQ record format (each failed row becomes one record):
original_record: the failed row as a JSON object (file) or JSON string column (table).failure_reason: error message.failure_timestamp: ISO timestamp (UTC).connector_type: e.g.bigquery.row_errors: optional list of per-row error details (index, code, message) when the API provides them.
original_record STRING/JSON, failure_reason STRING, failure_timestamp STRING/TIMESTAMP, connector_type STRING, row_errors STRING/JSON). When writing to a file, each line is one JSON object with the fields above.
If failed rows are successfully written to the dead-letter destination, the pipeline continues. If the DLQ write fails or is disabled, the pipeline raises the error so you can alert or retry.
Metadata interpolation
You can use message metadata in the config (e.g. schema/table per source). Keys inconfig are interpolated with {metadata_key}. Example: table_id: "{schema}.{table}" using metadata.schema and metadata.table from each message.
This pattern works particularly well with CDC sources such as PostgreSQL CDC, where each event carries schema, table, and primary key metadata. For example, you can route changes from many source tables into matching BigQuery tables by setting: