> ## Documentation Index
> Fetch the complete documentation index at: https://docs.mage.ai/llms.txt
> Use this file to discover all available pages before exploring further.

# Query trigger metadata from database

Pipeline triggres are stored in `pipeline_schedule` table in the database. If you want to query
the trigger metadata, you can either directly query the data from database or use Python code
to fetch `PipelineSchedule` models.

## Query from DB

Here is the query from Postgres. You can adapt it to the syntax of other databases when needed.

```sql theme={"system"}
SELECT
    *
FROM pipeline_schedule
WHERE pipeline_uuid = 'example_pipeline' -- Filter by pipeline_uuid
AND status = 'ACTIVE' -- Filter by status
AND repo_path = '/home/src/default_repo' -- Filter by repo path
```

## Query DB models via Python

```python theme={"system"}
from mage_ai.orchestration.db import db_connection
from mage_ai.orchestration.db.models.schedules import PipelineSchedule, PipelineRun
from mage_ai.data_preparation.models.triggers import ScheduleStatus
from mage_ai.orchestration.pipeline_scheduler import PipelineScheduler

db_connection.start_session()

pipeline_schedules = PipelineSchedule.query.filter(
    # Filter by pipeline uuid
    PipelineSchedule.pipeline_uuid == 'example_pipeline',
    # Filter by trigger name
    PipelineSchedule.name == 'trigger_name',
)

# Get pipeline schedule count
print(f'Number of triggers: {pipeline_schedules.count()}')

# Print the detailed info
print([p.to_dict() for p in pipeline_schedules.all()])

# Update trigger status to ACTIVE
for p in pipeline_schedules:
    p.update(status=ScheduleStatus.ACTIVE)

# Update trigger status to INACTIVE
for p in pipeline_schedules:
    p.update(status=ScheduleStatus.INACTIVE)

# Cancel pipeline runs associated with the triggers
pipeline_schedule_ids = [s.id for s in pipeline_schedules]
pipeline_runs_to_cancel = PipelineRun.query.filter(
    PipelineRun.pipeline_schedule_id.in_(pipeline_schedule_ids)
).filter(
    PipelineRun.status.in_([
        PipelineRun.PipelineRunStatus.INITIAL,
        PipelineRun.PipelineRunStatus.RUNNING,
    ])
)
for pipeline_run in pipeline_runs_to_cancel:
    PipelineScheduler(pipeline_run).stop()

# Delete triggers
for p in pipeline_schedules:
    p.delete()
```
