> ## Documentation Index
> Fetch the complete documentation index at: https://docs.mage.ai/llms.txt
> Use this file to discover all available pages before exploring further.

# BigQuery

> Stream data to Google BigQuery using the Storage Write API, with optional dead-letter queue and metadata interpolation for CDC.

The BigQuery streaming destination uses the **Generic IO** sink. Writes go through the **BigQuery Storage Write API** (default stream, Arrow format) for better throughput and lower cost. The target table must already exist.

## Basic config

```yaml theme={"system"}
connector_type: bigquery
profile: default    # profile in io_config.yaml
config:
  table_id: project_id.dataset_id.table_id   # or dataset_id.table_id (uses profile project)
  if_exists: "append"
# dead_letter is optional and top-level (outside config)
# dead_letter:
#   destination_type: file   # "file" or "table"
#   enabled: true
#   table_suffix: "_dlq"
#   table_name: null
#   file_path: null
```

* **`profile`** selects which credentials and connection settings to use. It is the name of a **profile** (section) in your project’s `io_config.yaml`. The sink loads that profile via the same [IO config](/development/io_config_setup) used elsewhere in Mage (e.g. `ConfigFileLoader(repo_path/io_config.yaml, profile)`). Use the `default` profile unless you have multiple BigQuery connections (e.g. dev vs prod); then define separate profiles (e.g. `default`, `prod`) in `io_config.yaml` and set `profile: prod` in the sink.
* **`config`** is a key-value map passed to the BigQuery **export** call. It controls *where* and *how* data is written (e.g. `table_id`, `if_exists`, `database`, `overwrite_types`). It is merged with defaults (e.g. `if_exists: append`) and supports [metadata interpolation](#metadata-interpolation) so you can use values from each message (e.g. `table_id: "{schema}.{table}"`). **`config` is separate from credentials**: credentials come from the chosen **profile**; **config** only holds export options like table and column types.

## How it works

* **Storage Write API**: Streaming inserts use the BigQuery Storage Write API (not the older streaming insert API). Requires `google-cloud-bigquery-storage` and PyArrow. If the table does not exist, the pipeline will not create it; create the table beforehand.
* **Schema**: Only DataFrame columns that exist in the BigQuery table schema are written. Extra columns are dropped. Column types are cast to match the table (STRING, INT64, TIMESTAMP, RECORD, etc.).
* **CDC columns**: The sink automatically maps `_mage_created_at`, `_mage_updated_at`, and `_mage_deleted_at` to BigQuery `TIMESTAMP` type when present in the data.
* **Required (NOT NULL) fields**: If the table has REQUIRED columns and any row has nulls in those columns (including nested RECORD fields), the write fails and failed rows can be sent to a dead-letter queue (see below).

## Optional config

```yaml theme={"system"}
config:
  table_id: project_id.dataset_id.table_id
  if_exists: "append"
  database: project_id   # override project when table_id is dataset.table

  # Override column types (e.g. for _mage_* or custom columns)
  overwrite_types:
    _mage_created_at: TIMESTAMP
    my_column: STRING

# Dead-letter queue (top-level, outside config). Schema: DeadLetterConfig
dead_letter:
  destination_type: file     # "file" (default) or "table"
  enabled: true
  table_suffix: "_dlq"       # for destination_type=table: append to main table_id
  table_name: null           # optional: full table id (overrides table_suffix)
  file_path: null            # for destination_type=file: path for JSONL file
```

## Config parameters

| Parameter                | Type             | Description                                                                                                                                                                                                                      | Default         |
| ------------------------ | ---------------- | -------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------- | --------------- |
| `table_id`               | string           | BigQuery table identifier. Use `project_id.dataset_id.table_id` or `dataset_id.table_id` (project from profile or `database`). Supports [metadata interpolation](#metadata-interpolation), e.g. `"{schema}.{table}"` for CDC.    | *(required)*    |
| `database`               | string           | Default project when `table_id` is two-part (`dataset_id.table_id`). Overrides the profile project if set.                                                                                                                       | profile project |
| `if_exists`              | string           | Behavior when the table already exists: `append` (add rows), `replace` (truncate/overwrite existing rows; preserves table schema), or `fail` (throw an error).                                                                   | `append`        |
| `overwrite_types`        | object           | Map of column name → BigQuery type (e.g. `STRING`, `INT64`, `TIMESTAMP`, `RECORD`). Used to cast columns before write. Useful for `_mage_*` or custom columns.                                                                   | —               |
| `unique_constraints`     | list (or string) | List of column names for upsert/merge. When set with `unique_conflict_method`, exports as MERGE instead of append. Supports interpolation, e.g. `"{key_columns}"` from [PostgreSQL CDC](/guides/streaming/sources/postgres-cdc). | —               |
| `unique_conflict_method` | string           | For upsert: e.g. `UPDATE` to update existing rows on conflict. Used only when `unique_constraints` is set.                                                                                                                       | —               |

The sink automatically sets `streaming_inserts: true` so writes use the BigQuery Storage Write API. The target table must already exist (the sink does not create it).

## Dead-letter queue

When a batch fails (e.g. required field has null, or Storage Write API returns row errors), failed rows can be written to a dead-letter destination. **Dead-letter is top-level** (sibling to `config`), not inside `config`. Schema: **DeadLetterConfig**:

| Field              | Description                                                                                                                                                                                   |
| ------------------ | --------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------- |
| `destination_type` | Where to write failed rows: `file` (default) = JSONL file; `table` = BigQuery table (same project/dataset as main table).                                                                     |
| `enabled`          | If `true`, write failed rows to the DLQ (per `destination_type`). If `false`, do not write to DLQ. Default: `true`.                                                                           |
| `table_suffix`     | When `destination_type` is `table`: suffix applied to the main `table_id` to form the DLQ table (e.g. `_dlq` → `project.dataset.table_dlq`). Ignored if `table_name` is set. Default: `_dlq`. |
| `table_name`       | When `destination_type` is `table`: full table identifier for the DLQ table (e.g. `project_id.dataset_id.dlq_table_id`). Overrides `table_suffix` when set.                                   |
| `file_path`        | When `destination_type` is `file`: path for the JSONL file. Default: under the pipeline variables dir (e.g. `<variables_dir>/streaming_dlq.jsonl`).                                           |

When `dead_letter` is omitted entirely, failed rows are written to the default file path (same as `destination_type: file` with default `file_path`).

**DLQ record format** (each failed row becomes one record):

* `original_record`: the failed row as a JSON object (file) or JSON string column (table).
* `failure_reason`: error message.
* `failure_timestamp`: ISO timestamp (UTC).
* `connector_type`: e.g. `bigquery`.
* `row_errors`: optional list of per-row error details (index, code, message) when the API provides them.

When writing to a **table**, the DLQ table must have columns compatible with these fields (e.g. `original_record` STRING/JSON, `failure_reason` STRING, `failure_timestamp` STRING/TIMESTAMP, `connector_type` STRING, `row_errors` STRING/JSON). When writing to a **file**, each line is one JSON object with the fields above.

If failed rows are successfully written to the dead-letter destination, the pipeline continues. If the DLQ write fails or is disabled, the pipeline raises the error so you can alert or retry.

## Metadata interpolation

You can use message metadata in the config (e.g. schema/table per source). Keys in `config` are interpolated with `{metadata_key}`. Example: `table_id: "{schema}.{table}"` using `metadata.schema` and `metadata.table` from each message.

This pattern works particularly well with **CDC** sources such as [PostgreSQL CDC](/guides/streaming/sources/postgres-cdc), where each event carries `schema`, `table`, and primary key metadata. For example, you can route changes from many source tables into matching BigQuery tables by setting:

```yaml theme={"system"}
config:
  table_id: "{schema}.{table}"
```
