From 2b229827d6cbcd53d4756fe10d88573e8ddc12da Mon Sep 17 00:00:00 2001 From: Morgan Wowk Date: Mon, 2 Feb 2026 12:26:15 -0800 Subject: [PATCH] feat: Add pipeline run outcome metrics Tracks pipeline lifecycle from creation to completion with labels for status and user. Metrics added: - pipeline_runs_total: Counter tracking pipeline runs by status (running/succeeded/failed/cancelled) and created_by - pipeline_run_duration_seconds: Histogram tracking total pipeline duration by final status These metrics provide visibility into pipeline success rates, completion times, and usage patterns per user. Durations measure total lifecycle time from creation to terminal state (including queue and execution time). --- cloud_pipelines_backend/api_server_sql.py | 5 ++ .../instrumentation/metrics.py | 76 ++++++++++++++++++ cloud_pipelines_backend/orchestrator_sql.py | 79 +++++++++++++++++++ 3 files changed, 160 insertions(+) diff --git a/cloud_pipelines_backend/api_server_sql.py b/cloud_pipelines_backend/api_server_sql.py index e8e0624..f0406cc 100644 --- a/cloud_pipelines_backend/api_server_sql.py +++ b/cloud_pipelines_backend/api_server_sql.py @@ -30,6 +30,7 @@ def _get_current_time() -> datetime.datetime: from . import backend_types_sql as bts from . import errors from .errors import ItemNotFoundError +from .instrumentation import metrics # ==== PipelineJobService @@ -111,6 +112,10 @@ def create( session.commit() session.refresh(pipeline_run) + + # Track pipeline creation metric + metrics.track_pipeline_created(created_by=created_by) + return PipelineRunResponse.from_db(pipeline_run) def get(self, session: orm.Session, id: bts.IdType) -> PipelineRunResponse: diff --git a/cloud_pipelines_backend/instrumentation/metrics.py b/cloud_pipelines_backend/instrumentation/metrics.py index a33b61a..1974bb9 100644 --- a/cloud_pipelines_backend/instrumentation/metrics.py +++ b/cloud_pipelines_backend/instrumentation/metrics.py @@ -90,6 +90,82 @@ def get_meter(): return _meter +# Pipeline Run Metrics +_pipeline_runs_counter = None +_pipeline_run_duration_histogram = None + + +def get_pipeline_metrics(): + """Get or create pipeline run metrics.""" + global _pipeline_runs_counter, _pipeline_run_duration_histogram + + meter = get_meter() + if meter is None: + return None, None + + if _pipeline_runs_counter is None: + _pipeline_runs_counter = meter.create_counter( + name="pipeline_runs_total", + description="Total number of pipeline runs by status", + unit="1", + ) + + if _pipeline_run_duration_histogram is None: + _pipeline_run_duration_histogram = meter.create_histogram( + name="pipeline_run_duration_seconds", + description="Duration of pipeline runs in seconds", + unit="s", + ) + + return _pipeline_runs_counter, _pipeline_run_duration_histogram + + +def track_pipeline_created(created_by: str | None = None): + """Track pipeline run creation.""" + counter, _ = get_pipeline_metrics() + if counter: + counter.add( + 1, + { + "status": "running", + "created_by": created_by or "unknown", + }, + ) + + +def track_pipeline_completed( + status: str, + created_by: str | None = None, + duration_seconds: float | None = None, +): + """ + Track pipeline run completion. + + Args: + status: Terminal status (succeeded/failed/cancelled) + created_by: Username who created the pipeline + duration_seconds: Total pipeline duration from creation to completion + """ + counter, histogram = get_pipeline_metrics() + + if counter: + counter.add( + 1, + { + "status": status, + "created_by": created_by or "unknown", + }, + ) + + if histogram and duration_seconds is not None: + histogram.record( + duration_seconds, + { + "status": status, + }, + ) + + class HTTPMetricsMiddleware(BaseHTTPMiddleware): """ Middleware to track HTTP request metrics. diff --git a/cloud_pipelines_backend/orchestrator_sql.py b/cloud_pipelines_backend/orchestrator_sql.py index 1b09d6a..eb2c6f5 100644 --- a/cloud_pipelines_backend/orchestrator_sql.py +++ b/cloud_pipelines_backend/orchestrator_sql.py @@ -21,6 +21,7 @@ from .launchers import common_annotations from .launchers import interfaces as launcher_interfaces from .instrumentation import contextual_logging +from .instrumentation import metrics _logger = logging.getLogger(__name__) @@ -111,6 +112,10 @@ def internal_process_queued_executions_queue(self, session: orm.Session): record_system_error_exception( execution=queued_execution, exception=ex ) + # Track pipeline completion if this is a root execution + _track_pipeline_completion_if_root( + session=session, execution_node=queued_execution + ) session.commit() finally: duration_ms = (time.monotonic_ns() - start_timestamp) / 1_000_000 @@ -400,6 +405,10 @@ def internal_process_one_queued_execution( execution.container_execution_status = ( bts.ContainerExecutionStatus.CANCELLED ) + # Track pipeline completion if this is a root execution + _track_pipeline_completion_if_root( + session=session, execution_node=execution + ) _mark_all_downstream_executions_as_skipped( session=session, execution=execution ) @@ -526,6 +535,10 @@ def generate_execution_log_uri( bts.ContainerExecutionStatus.SYSTEM_ERROR ) record_system_error_exception(execution=execution, exception=ex) + # Track pipeline completion if this is a root execution + _track_pipeline_completion_if_root( + session=session, execution_node=execution + ) _mark_all_downstream_executions_as_skipped( session=session, execution=execution ) @@ -636,6 +649,10 @@ def internal_process_one_running_execution( execution_node.container_execution_status = ( bts.ContainerExecutionStatus.CANCELLED ) + # Track pipeline completion if this is a root execution + _track_pipeline_completion_if_root( + session=session, execution_node=execution_node + ) _mark_all_downstream_executions_as_skipped( session=session, execution=execution_node ) @@ -755,6 +772,10 @@ def _maybe_preload_value( execution_node.container_execution_status = ( bts.ContainerExecutionStatus.FAILED ) + # Track pipeline completion if this is a root execution + _track_pipeline_completion_if_root( + session=session, execution_node=execution_node + ) _mark_all_downstream_executions_as_skipped( session=session, execution=execution_node ) @@ -797,6 +818,10 @@ def _maybe_preload_value( execution_node.container_execution_status = ( bts.ContainerExecutionStatus.SUCCEEDED ) + # Track pipeline completion if this is a root execution + _track_pipeline_completion_if_root( + session=session, execution_node=execution_node + ) # TODO: Optimize for output_name, artifact_node in session.execute( sql.select(bts.OutputArtifactLink.output_name, bts.ArtifactNode) @@ -839,6 +864,10 @@ def _maybe_preload_value( execution_node.container_execution_status = ( bts.ContainerExecutionStatus.FAILED ) + # Track pipeline completion if this is a root execution + _track_pipeline_completion_if_root( + session=session, execution_node=execution_node + ) _mark_all_downstream_executions_as_skipped( session=session, execution=execution_node ) @@ -984,6 +1013,56 @@ def record_system_error_exception(execution: bts.ExecutionNode, exception: Excep ] = traceback.format_exc() +def _track_pipeline_completion_if_root( + session: orm.Session, + execution_node: bts.ExecutionNode, +): + """ + Check if execution is a root execution and track pipeline completion metrics. + + Args: + session: Database session + execution_node: Execution node that reached a terminal state + """ + # Check if this execution is a root execution of a pipeline run + pipeline_run = session.scalar( + sql.select(bts.PipelineRun).where( + bts.PipelineRun.root_execution_id == execution_node.id + ) + ) + + if pipeline_run: + # This is a root execution - track pipeline completion + status = execution_node.container_execution_status + + # Map execution status to pipeline status + status_map = { + bts.ContainerExecutionStatus.SUCCEEDED: "succeeded", + bts.ContainerExecutionStatus.FAILED: "failed", + bts.ContainerExecutionStatus.CANCELLED: "cancelled", + bts.ContainerExecutionStatus.SYSTEM_ERROR: "failed", + bts.ContainerExecutionStatus.SKIPPED: "cancelled", + } + + pipeline_status = status_map.get(status) + if pipeline_status: + # Update pipeline run updated_at timestamp + current_time = _get_current_time() + pipeline_run.updated_at = current_time + + # Calculate duration + duration_seconds = None + if pipeline_run.created_at: + duration = current_time - pipeline_run.created_at + duration_seconds = duration.total_seconds() + + metrics.track_pipeline_completed( + status=pipeline_status, + created_by=pipeline_run.created_by, + duration_seconds=duration_seconds, + ) + + def _record_orchestration_error_message( container_execution: bts.ContainerExecution, execution_nodes: list[bts.ExecutionNode],