diff --git a/cloud_pipelines_backend/instrumentation/metrics.py b/cloud_pipelines_backend/instrumentation/metrics.py index 916a0ea..a597802 100644 --- a/cloud_pipelines_backend/instrumentation/metrics.py +++ b/cloud_pipelines_backend/instrumentation/metrics.py @@ -291,6 +291,94 @@ def track_cache_hit(): cache_counter.add(1) +# Container Performance Metrics +_container_launches_counter = None +_container_execution_duration_histogram = None + + +def get_container_performance_metrics(): + """Get or create container performance metrics.""" + global _container_launches_counter, _container_execution_duration_histogram + + meter = get_meter() + if meter is None: + return None, None + + if _container_launches_counter is None: + _container_launches_counter = meter.create_counter( + name="container_launches_total", + description="Total number of container launches by launcher type and success", + unit="1", + ) + + if _container_execution_duration_histogram is None: + _container_execution_duration_histogram = meter.create_histogram( + name="container_execution_duration_seconds", + description="Duration of container executions in seconds", + unit="s", + ) + + return _container_launches_counter, _container_execution_duration_histogram + + +def _get_launcher_type(launcher_class_name: str) -> str: + """Extract launcher type from launcher class name.""" + class_name_lower = launcher_class_name.lower() + if "kubernetes" in class_name_lower or "gke" in class_name_lower: + return "kubernetes" + elif "docker" in class_name_lower: + return "docker" + elif "huggingface" in class_name_lower: + return "huggingface" + else: + return "unknown" + + +def track_container_launch(launcher_class_name: str, success: bool): + """ + Track container launch attempt. + + Args: + launcher_class_name: Name of the launcher class used + success: Whether the launch was successful + """ + counter, _ = get_container_performance_metrics() + if counter: + launcher_type = _get_launcher_type(launcher_class_name) + counter.add( + 1, + { + "launcher_type": launcher_type, + "success": str(success).lower(), + }, + ) + + +def track_container_execution_duration( + launcher_class_name: str, + status: str, + duration_seconds: float | None = None, +): + """ + Track container execution duration. + + Args: + launcher_class_name: Name of the launcher class used + status: Final status (succeeded/failed) + duration_seconds: Duration from container start to end + """ + _, histogram = get_container_performance_metrics() + if histogram and duration_seconds is not None: + launcher_type = _get_launcher_type(launcher_class_name) + histogram.record( + duration_seconds, + { + "launcher_type": launcher_type, + "status": status, + }, + ) + + class HTTPMetricsMiddleware(BaseHTTPMiddleware): """ Middleware to track HTTP request metrics. diff --git a/cloud_pipelines_backend/orchestrator_sql.py b/cloud_pipelines_backend/orchestrator_sql.py index 487d5b0..843ba69 100644 --- a/cloud_pipelines_backend/orchestrator_sql.py +++ b/cloud_pipelines_backend/orchestrator_sql.py @@ -536,6 +536,7 @@ def generate_execution_log_uri( container_execution_uuid ) + launcher_class_name = self._launcher.__class__.__name__ try: launched_container: launcher_interfaces.LaunchedContainer = ( self._launcher.launch_container_task( @@ -547,6 +548,11 @@ def generate_execution_log_uri( annotations=full_annotations, ) ) + # Track successful container launch + metrics.track_container_launch( + launcher_class_name=launcher_class_name, + success=True, + ) if launched_container.status not in ( launcher_interfaces.ContainerStatus.PENDING, launcher_interfaces.ContainerStatus.RUNNING, @@ -555,6 +561,11 @@ def generate_execution_log_uri( f"Unexpected status of just launched container: {launched_container.status=}, {launched_container=}" ) except Exception as ex: + # Track failed container launch + metrics.track_container_launch( + launcher_class_name=launcher_class_name, + success=False, + ) session.rollback() with session.begin(): # Logs whole exception @@ -745,6 +756,15 @@ def internal_process_one_running_execution( container_execution.started_at = reloaded_launched_container.started_at container_execution.ended_at = reloaded_launched_container.ended_at + # Track container execution duration + if container_execution.started_at and container_execution.ended_at: + duration = container_execution.ended_at - container_execution.started_at + metrics.track_container_execution_duration( + launcher_class_name=self._launcher.__class__.__name__, + status="succeeded", + duration_seconds=duration.total_seconds(), + ) + # Don't fail the execution if log upload fails. # Logs are important, but not so important that we should fail a successfully completed container execution. try: @@ -798,6 +818,16 @@ def _maybe_preload_value( if missing_output_names: # Marking the container execution as FAILED (even though the program itself has completed successfully) container_execution.status = bts.ContainerExecutionStatus.FAILED + # Track container execution duration + if container_execution.started_at and container_execution.ended_at: + duration = ( + container_execution.ended_at - container_execution.started_at + ) + metrics.track_container_execution_duration( + launcher_class_name=self._launcher.__class__.__name__, + status="failed", + duration_seconds=duration.total_seconds(), + ) orchestration_error_message = f"Container execution is marked as FAILED due to missing outputs: {missing_output_names}." _logger.error(orchestration_error_message) _record_orchestration_error_message( @@ -897,6 +927,16 @@ def _maybe_preload_value( container_execution.exit_code = reloaded_launched_container.exit_code container_execution.started_at = reloaded_launched_container.started_at container_execution.ended_at = reloaded_launched_container.ended_at + + # Track container execution duration + if container_execution.started_at and container_execution.ended_at: + duration = container_execution.ended_at - container_execution.started_at + metrics.track_container_execution_duration( + launcher_class_name=self._launcher.__class__.__name__, + status="failed", + duration_seconds=duration.total_seconds(), + ) + launcher_error = reloaded_launched_container.launcher_error_message if launcher_error: orchestration_error_message = f"Launcher error: {launcher_error}"