Pipeline triggres are stored in pipeline_schedule
table in the database. If you want to query
the trigger metadata, you can either directly query the data from database or use Python code
to fetch PipelineSchedule
models.
Query from DB
Here is the query from Postgres. You can adapt it to the syntax of other databases when needed.
SELECT
*
FROM pipeline_schedule
WHERE pipeline_uuid = 'example_pipeline' -- Filter by pipeline_uuid
AND status = 'ACTIVE' -- Filter by status
AND repo_path = '/home/src/default_repo' -- Filter by repo path
Query DB models via Python
from mage_ai.orchestration.db import db_connection
from mage_ai.orchestration.db.models.schedules import PipelineSchedule, PipelineRun
from mage_ai.data_preparation.models.triggers import ScheduleStatus
from mage_ai.orchestration.pipeline_scheduler import PipelineScheduler
db_connection.start_session()
pipeline_schedules = PipelineSchedule.query.filter(
# Filter by pipeline uuid
PipelineSchedule.pipeline_uuid == 'example_pipeline',
# Filter by trigger name
PipelineSchedule.name == 'trigger_name',
)
# Get pipeline schedule count
print(f'Number of triggers: {pipeline_schedules.count()}')
# Print the detailed info
print([p.to_dict() for p in pipeline_schedules.all()])
# Update trigger status to ACTIVE
for p in pipeline_schedules:
p.update(status=ScheduleStatus.ACTIVE)
# Update trigger status to INACTIVE
for p in pipeline_schedules:
p.update(status=ScheduleStatus.INACTIVE)
# Cancel pipeline runs associated with the triggers
pipeline_schedule_ids = [s.id for s in pipeline_schedules]
pipeline_runs_to_cancel = PipelineRun.query.filter(
PipelineRun.pipeline_schedule_id.in_(pipeline_schedule_ids)
).filter(
PipelineRun.status.in_([
PipelineRun.PipelineRunStatus.INITIAL,
PipelineRun.PipelineRunStatus.RUNNING,
])
)
for pipeline_run in pipeline_runs_to_cancel:
PipelineScheduler(pipeline_run).stop()
# Delete triggers
for p in pipeline_schedules:
p.delete()