Skip to content
Draft
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
5 changes: 5 additions & 0 deletions cloud_pipelines_backend/api_server_sql.py
Original file line number Diff line number Diff line change
Expand Up @@ -30,6 +30,7 @@ def _get_current_time() -> datetime.datetime:
from . import backend_types_sql as bts
from . import errors
from .errors import ItemNotFoundError
from .instrumentation import metrics


# ==== PipelineJobService
Expand Down Expand Up @@ -111,6 +112,10 @@ def create(
session.commit()

session.refresh(pipeline_run)

# Track pipeline creation metric
metrics.track_pipeline_created(created_by=created_by)

return PipelineRunResponse.from_db(pipeline_run)

def get(self, session: orm.Session, id: bts.IdType) -> PipelineRunResponse:
Expand Down
76 changes: 76 additions & 0 deletions cloud_pipelines_backend/instrumentation/metrics.py
Original file line number Diff line number Diff line change
Expand Up @@ -90,6 +90,82 @@ def get_meter():
return _meter


# Pipeline Run Metrics
_pipeline_runs_counter = None
_pipeline_run_duration_histogram = None


def get_pipeline_metrics():
"""Get or create pipeline run metrics."""
global _pipeline_runs_counter, _pipeline_run_duration_histogram

meter = get_meter()
if meter is None:
return None, None

if _pipeline_runs_counter is None:
_pipeline_runs_counter = meter.create_counter(
name="pipeline_runs_total",
description="Total number of pipeline runs by status",
unit="1",
)

if _pipeline_run_duration_histogram is None:
_pipeline_run_duration_histogram = meter.create_histogram(
name="pipeline_run_duration_seconds",
description="Duration of pipeline runs in seconds",
unit="s",
)

return _pipeline_runs_counter, _pipeline_run_duration_histogram


def track_pipeline_created(created_by: str | None = None):
"""Track pipeline run creation."""
counter, _ = get_pipeline_metrics()
if counter:
counter.add(
1,
{
"status": "running",
"created_by": created_by or "unknown",
},
)


def track_pipeline_completed(
status: str,
created_by: str | None = None,
duration_seconds: float | None = None,
):
"""
Track pipeline run completion.

Args:
status: Terminal status (succeeded/failed/cancelled)
created_by: Username who created the pipeline
duration_seconds: Total pipeline duration from creation to completion
"""
counter, histogram = get_pipeline_metrics()

if counter:
counter.add(
1,
{
"status": status,
"created_by": created_by or "unknown",
},
)

if histogram and duration_seconds is not None:
histogram.record(
duration_seconds,
{
"status": status,
},
)


class HTTPMetricsMiddleware(BaseHTTPMiddleware):
"""
Middleware to track HTTP request metrics.
Expand Down
79 changes: 79 additions & 0 deletions cloud_pipelines_backend/orchestrator_sql.py
Original file line number Diff line number Diff line change
Expand Up @@ -21,6 +21,7 @@
from .launchers import common_annotations
from .launchers import interfaces as launcher_interfaces
from .instrumentation import contextual_logging
from .instrumentation import metrics

_logger = logging.getLogger(__name__)

Expand Down Expand Up @@ -111,6 +112,10 @@ def internal_process_queued_executions_queue(self, session: orm.Session):
record_system_error_exception(
execution=queued_execution, exception=ex
)
# Track pipeline completion if this is a root execution
_track_pipeline_completion_if_root(
session=session, execution_node=queued_execution
)
session.commit()
finally:
duration_ms = (time.monotonic_ns() - start_timestamp) / 1_000_000
Expand Down Expand Up @@ -400,6 +405,10 @@ def internal_process_one_queued_execution(
execution.container_execution_status = (
bts.ContainerExecutionStatus.CANCELLED
)
# Track pipeline completion if this is a root execution
_track_pipeline_completion_if_root(
session=session, execution_node=execution
)
_mark_all_downstream_executions_as_skipped(
session=session, execution=execution
)
Expand Down Expand Up @@ -526,6 +535,10 @@ def generate_execution_log_uri(
bts.ContainerExecutionStatus.SYSTEM_ERROR
)
record_system_error_exception(execution=execution, exception=ex)
# Track pipeline completion if this is a root execution
_track_pipeline_completion_if_root(
session=session, execution_node=execution
)
_mark_all_downstream_executions_as_skipped(
session=session, execution=execution
)
Expand Down Expand Up @@ -636,6 +649,10 @@ def internal_process_one_running_execution(
execution_node.container_execution_status = (
bts.ContainerExecutionStatus.CANCELLED
)
# Track pipeline completion if this is a root execution
_track_pipeline_completion_if_root(
session=session, execution_node=execution_node
)
_mark_all_downstream_executions_as_skipped(
session=session, execution=execution_node
)
Expand Down Expand Up @@ -755,6 +772,10 @@ def _maybe_preload_value(
execution_node.container_execution_status = (
bts.ContainerExecutionStatus.FAILED
)
# Track pipeline completion if this is a root execution
_track_pipeline_completion_if_root(
session=session, execution_node=execution_node
)
_mark_all_downstream_executions_as_skipped(
session=session, execution=execution_node
)
Expand Down Expand Up @@ -797,6 +818,10 @@ def _maybe_preload_value(
execution_node.container_execution_status = (
bts.ContainerExecutionStatus.SUCCEEDED
)
# Track pipeline completion if this is a root execution
_track_pipeline_completion_if_root(
session=session, execution_node=execution_node
)
# TODO: Optimize
for output_name, artifact_node in session.execute(
sql.select(bts.OutputArtifactLink.output_name, bts.ArtifactNode)
Expand Down Expand Up @@ -839,6 +864,10 @@ def _maybe_preload_value(
execution_node.container_execution_status = (
bts.ContainerExecutionStatus.FAILED
)
# Track pipeline completion if this is a root execution
_track_pipeline_completion_if_root(
session=session, execution_node=execution_node
)
_mark_all_downstream_executions_as_skipped(
session=session, execution=execution_node
)
Expand Down Expand Up @@ -984,6 +1013,56 @@ def record_system_error_exception(execution: bts.ExecutionNode, exception: Excep
] = traceback.format_exc()


def _track_pipeline_completion_if_root(
session: orm.Session,
execution_node: bts.ExecutionNode,
):
"""
Check if execution is a root execution and track pipeline completion metrics.

Args:
session: Database session
execution_node: Execution node that reached a terminal state
"""
# Check if this execution is a root execution of a pipeline run
pipeline_run = session.scalar(
sql.select(bts.PipelineRun).where(
bts.PipelineRun.root_execution_id == execution_node.id
)
)

if pipeline_run:
# This is a root execution - track pipeline completion
status = execution_node.container_execution_status

# Map execution status to pipeline status
status_map = {
bts.ContainerExecutionStatus.SUCCEEDED: "succeeded",
bts.ContainerExecutionStatus.FAILED: "failed",
bts.ContainerExecutionStatus.CANCELLED: "cancelled",
bts.ContainerExecutionStatus.SYSTEM_ERROR: "failed",
bts.ContainerExecutionStatus.SKIPPED: "cancelled",
}

pipeline_status = status_map.get(status)
if pipeline_status:
# Update pipeline run updated_at timestamp
current_time = _get_current_time()
pipeline_run.updated_at = current_time

# Calculate duration
duration_seconds = None
if pipeline_run.created_at:
duration = current_time - pipeline_run.created_at
duration_seconds = duration.total_seconds()

metrics.track_pipeline_completed(
status=pipeline_status,
created_by=pipeline_run.created_by,
duration_seconds=duration_seconds,
)


def _record_orchestration_error_message(
container_execution: bts.ContainerExecution,
execution_nodes: list[bts.ExecutionNode],
Expand Down