diff --git a/cloud_pipelines_backend/api_server_sql.py b/cloud_pipelines_backend/api_server_sql.py index e8e0624..f0406cc 100644 --- a/cloud_pipelines_backend/api_server_sql.py +++ b/cloud_pipelines_backend/api_server_sql.py @@ -30,6 +30,7 @@ def _get_current_time() -> datetime.datetime: from . import backend_types_sql as bts from . import errors from .errors import ItemNotFoundError +from .instrumentation import metrics # ==== PipelineJobService @@ -111,6 +112,10 @@ def create( session.commit() session.refresh(pipeline_run) + + # Track pipeline creation metric + metrics.track_pipeline_created(created_by=created_by) + return PipelineRunResponse.from_db(pipeline_run) def get(self, session: orm.Session, id: bts.IdType) -> PipelineRunResponse: diff --git a/cloud_pipelines_backend/instrumentation/metrics.py b/cloud_pipelines_backend/instrumentation/metrics.py index a33b61a..1974bb9 100644 --- a/cloud_pipelines_backend/instrumentation/metrics.py +++ b/cloud_pipelines_backend/instrumentation/metrics.py @@ -90,6 +90,82 @@ def get_meter(): return _meter +# Pipeline Run Metrics +_pipeline_runs_counter = None +_pipeline_run_duration_histogram = None + + +def get_pipeline_metrics(): + """Get or create pipeline run metrics.""" + global _pipeline_runs_counter, _pipeline_run_duration_histogram + + meter = get_meter() + if meter is None: + return None, None + + if _pipeline_runs_counter is None: + _pipeline_runs_counter = meter.create_counter( + name="pipeline_runs_total", + description="Total number of pipeline runs by status", + unit="1", + ) + + if _pipeline_run_duration_histogram is None: + _pipeline_run_duration_histogram = meter.create_histogram( + name="pipeline_run_duration_seconds", + description="Duration of pipeline runs in seconds", + unit="s", + ) + + return _pipeline_runs_counter, _pipeline_run_duration_histogram + + +def track_pipeline_created(created_by: str | None = None): + """Track pipeline run creation.""" + counter, _ = get_pipeline_metrics() + if counter: + counter.add( + 1, + { + "status": "running", + "created_by": created_by or "unknown", + }, + ) + + +def track_pipeline_completed( + status: str, + created_by: str | None = None, + duration_seconds: float | None = None, +): + """ + Track pipeline run completion. + + Args: + status: Terminal status (succeeded/failed/cancelled) + created_by: Username who created the pipeline + duration_seconds: Total pipeline duration from creation to completion + """ + counter, histogram = get_pipeline_metrics() + + if counter: + counter.add( + 1, + { + "status": status, + "created_by": created_by or "unknown", + }, + ) + + if histogram and duration_seconds is not None: + histogram.record( + duration_seconds, + { + "status": status, + }, + ) + + class HTTPMetricsMiddleware(BaseHTTPMiddleware): """ Middleware to track HTTP request metrics. diff --git a/cloud_pipelines_backend/orchestrator_sql.py b/cloud_pipelines_backend/orchestrator_sql.py index 1b09d6a..eb2c6f5 100644 --- a/cloud_pipelines_backend/orchestrator_sql.py +++ b/cloud_pipelines_backend/orchestrator_sql.py @@ -21,6 +21,7 @@ from .launchers import common_annotations from .launchers import interfaces as launcher_interfaces from .instrumentation import contextual_logging +from .instrumentation import metrics _logger = logging.getLogger(__name__) @@ -111,6 +112,10 @@ def internal_process_queued_executions_queue(self, session: orm.Session): record_system_error_exception( execution=queued_execution, exception=ex ) + # Track pipeline completion if this is a root execution + _track_pipeline_completion_if_root( + session=session, execution_node=queued_execution + ) session.commit() finally: duration_ms = (time.monotonic_ns() - start_timestamp) / 1_000_000 @@ -400,6 +405,10 @@ def internal_process_one_queued_execution( execution.container_execution_status = ( bts.ContainerExecutionStatus.CANCELLED ) + # Track pipeline completion if this is a root execution + _track_pipeline_completion_if_root( + session=session, execution_node=execution + ) _mark_all_downstream_executions_as_skipped( session=session, execution=execution ) @@ -526,6 +535,10 @@ def generate_execution_log_uri( bts.ContainerExecutionStatus.SYSTEM_ERROR ) record_system_error_exception(execution=execution, exception=ex) + # Track pipeline completion if this is a root execution + _track_pipeline_completion_if_root( + session=session, execution_node=execution + ) _mark_all_downstream_executions_as_skipped( session=session, execution=execution ) @@ -636,6 +649,10 @@ def internal_process_one_running_execution( execution_node.container_execution_status = ( bts.ContainerExecutionStatus.CANCELLED ) + # Track pipeline completion if this is a root execution + _track_pipeline_completion_if_root( + session=session, execution_node=execution_node + ) _mark_all_downstream_executions_as_skipped( session=session, execution=execution_node ) @@ -755,6 +772,10 @@ def _maybe_preload_value( execution_node.container_execution_status = ( bts.ContainerExecutionStatus.FAILED ) + # Track pipeline completion if this is a root execution + _track_pipeline_completion_if_root( + session=session, execution_node=execution_node + ) _mark_all_downstream_executions_as_skipped( session=session, execution=execution_node ) @@ -797,6 +818,10 @@ def _maybe_preload_value( execution_node.container_execution_status = ( bts.ContainerExecutionStatus.SUCCEEDED ) + # Track pipeline completion if this is a root execution + _track_pipeline_completion_if_root( + session=session, execution_node=execution_node + ) # TODO: Optimize for output_name, artifact_node in session.execute( sql.select(bts.OutputArtifactLink.output_name, bts.ArtifactNode) @@ -839,6 +864,10 @@ def _maybe_preload_value( execution_node.container_execution_status = ( bts.ContainerExecutionStatus.FAILED ) + # Track pipeline completion if this is a root execution + _track_pipeline_completion_if_root( + session=session, execution_node=execution_node + ) _mark_all_downstream_executions_as_skipped( session=session, execution=execution_node ) @@ -984,6 +1013,56 @@ def record_system_error_exception(execution: bts.ExecutionNode, exception: Excep ] = traceback.format_exc() +def _track_pipeline_completion_if_root( + session: orm.Session, + execution_node: bts.ExecutionNode, +): + """ + Check if execution is a root execution and track pipeline completion metrics. + + Args: + session: Database session + execution_node: Execution node that reached a terminal state + """ + # Check if this execution is a root execution of a pipeline run + pipeline_run = session.scalar( + sql.select(bts.PipelineRun).where( + bts.PipelineRun.root_execution_id == execution_node.id + ) + ) + + if pipeline_run: + # This is a root execution - track pipeline completion + status = execution_node.container_execution_status + + # Map execution status to pipeline status + status_map = { + bts.ContainerExecutionStatus.SUCCEEDED: "succeeded", + bts.ContainerExecutionStatus.FAILED: "failed", + bts.ContainerExecutionStatus.CANCELLED: "cancelled", + bts.ContainerExecutionStatus.SYSTEM_ERROR: "failed", + bts.ContainerExecutionStatus.SKIPPED: "cancelled", + } + + pipeline_status = status_map.get(status) + if pipeline_status: + # Update pipeline run updated_at timestamp + current_time = _get_current_time() + pipeline_run.updated_at = current_time + + # Calculate duration + duration_seconds = None + if pipeline_run.created_at: + duration = current_time - pipeline_run.created_at + duration_seconds = duration.total_seconds() + + metrics.track_pipeline_completed( + status=pipeline_status, + created_by=pipeline_run.created_by, + duration_seconds=duration_seconds, + ) + + def _record_orchestration_error_message( container_execution: bts.ContainerExecution, execution_nodes: list[bts.ExecutionNode],