Skip to content
Draft
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
88 changes: 88 additions & 0 deletions cloud_pipelines_backend/instrumentation/metrics.py
Original file line number Diff line number Diff line change
Expand Up @@ -291,6 +291,94 @@ def track_cache_hit():
cache_counter.add(1)


# Container Performance Metrics
_container_launches_counter = None
_container_execution_duration_histogram = None


def get_container_performance_metrics():
"""Get or create container performance metrics."""
global _container_launches_counter, _container_execution_duration_histogram

meter = get_meter()
if meter is None:
return None, None

if _container_launches_counter is None:
_container_launches_counter = meter.create_counter(
name="container_launches_total",
description="Total number of container launches by launcher type and success",
unit="1",
)

if _container_execution_duration_histogram is None:
_container_execution_duration_histogram = meter.create_histogram(
name="container_execution_duration_seconds",
description="Duration of container executions in seconds",
unit="s",
)

return _container_launches_counter, _container_execution_duration_histogram


def _get_launcher_type(launcher_class_name: str) -> str:
"""Extract launcher type from launcher class name."""
class_name_lower = launcher_class_name.lower()
if "kubernetes" in class_name_lower or "gke" in class_name_lower:
return "kubernetes"
elif "docker" in class_name_lower:
return "docker"
elif "huggingface" in class_name_lower:
return "huggingface"
else:
return "unknown"


def track_container_launch(launcher_class_name: str, success: bool):
"""
Track container launch attempt.

Args:
launcher_class_name: Name of the launcher class used
success: Whether the launch was successful
"""
counter, _ = get_container_performance_metrics()
if counter:
launcher_type = _get_launcher_type(launcher_class_name)
counter.add(
1,
{
"launcher_type": launcher_type,
"success": str(success).lower(),
},
)


def track_container_execution_duration(
launcher_class_name: str,
status: str,
duration_seconds: float | None = None,
):
"""
Track container execution duration.

Args:
launcher_class_name: Name of the launcher class used
status: Final status (succeeded/failed)
duration_seconds: Duration from container start to end
"""
_, histogram = get_container_performance_metrics()
if histogram and duration_seconds is not None:
launcher_type = _get_launcher_type(launcher_class_name)
histogram.record(
duration_seconds,
{
"launcher_type": launcher_type,
"status": status,
},
)


class HTTPMetricsMiddleware(BaseHTTPMiddleware):
"""
Middleware to track HTTP request metrics.
Expand Down
40 changes: 40 additions & 0 deletions cloud_pipelines_backend/orchestrator_sql.py
Original file line number Diff line number Diff line change
Expand Up @@ -536,6 +536,7 @@ def generate_execution_log_uri(
container_execution_uuid
)

launcher_class_name = self._launcher.__class__.__name__
try:
launched_container: launcher_interfaces.LaunchedContainer = (
self._launcher.launch_container_task(
Expand All @@ -547,6 +548,11 @@ def generate_execution_log_uri(
annotations=full_annotations,
)
)
# Track successful container launch
metrics.track_container_launch(
launcher_class_name=launcher_class_name,
success=True,
)
if launched_container.status not in (
launcher_interfaces.ContainerStatus.PENDING,
launcher_interfaces.ContainerStatus.RUNNING,
Expand All @@ -555,6 +561,11 @@ def generate_execution_log_uri(
f"Unexpected status of just launched container: {launched_container.status=}, {launched_container=}"
)
except Exception as ex:
# Track failed container launch
metrics.track_container_launch(
launcher_class_name=launcher_class_name,
success=False,
)
session.rollback()
with session.begin():
# Logs whole exception
Expand Down Expand Up @@ -745,6 +756,15 @@ def internal_process_one_running_execution(
container_execution.started_at = reloaded_launched_container.started_at
container_execution.ended_at = reloaded_launched_container.ended_at

# Track container execution duration
if container_execution.started_at and container_execution.ended_at:
duration = container_execution.ended_at - container_execution.started_at
metrics.track_container_execution_duration(
launcher_class_name=self._launcher.__class__.__name__,
status="succeeded",
duration_seconds=duration.total_seconds(),
)

# Don't fail the execution if log upload fails.
# Logs are important, but not so important that we should fail a successfully completed container execution.
try:
Expand Down Expand Up @@ -798,6 +818,16 @@ def _maybe_preload_value(
if missing_output_names:
# Marking the container execution as FAILED (even though the program itself has completed successfully)
container_execution.status = bts.ContainerExecutionStatus.FAILED
# Track container execution duration
if container_execution.started_at and container_execution.ended_at:
duration = (
container_execution.ended_at - container_execution.started_at
)
metrics.track_container_execution_duration(
launcher_class_name=self._launcher.__class__.__name__,
status="failed",
duration_seconds=duration.total_seconds(),
)
orchestration_error_message = f"Container execution is marked as FAILED due to missing outputs: {missing_output_names}."
_logger.error(orchestration_error_message)
_record_orchestration_error_message(
Expand Down Expand Up @@ -897,6 +927,16 @@ def _maybe_preload_value(
container_execution.exit_code = reloaded_launched_container.exit_code
container_execution.started_at = reloaded_launched_container.started_at
container_execution.ended_at = reloaded_launched_container.ended_at

# Track container execution duration
if container_execution.started_at and container_execution.ended_at:
duration = container_execution.ended_at - container_execution.started_at
metrics.track_container_execution_duration(
launcher_class_name=self._launcher.__class__.__name__,
status="failed",
duration_seconds=duration.total_seconds(),
)

launcher_error = reloaded_launched_container.launcher_error_message
if launcher_error:
orchestration_error_message = f"Launcher error: {launcher_error}"
Expand Down