Skip to content

feat: Metrics to count all non-terminal execution statuses#186

Open
morgan-wowk wants to merge 1 commit intotask-status-durationfrom
execution-status-count
Open

feat: Metrics to count all non-terminal execution statuses#186
morgan-wowk wants to merge 1 commit intotask-status-durationfrom
execution-status-count

Conversation

@morgan-wowk
Copy link
Copy Markdown
Collaborator

@morgan-wowk morgan-wowk commented Mar 24, 2026

TL;DR

Added a metrics polling service that periodically queries the database to emit execution status count gauges for monitoring active (non-terminal) pipeline executions.

Screenshot 2026-03-23 at 5.24.33 PM.png

What changed?

  • Added EXECUTIONS metric unit to MetricUnit enum
  • Created execution_status_count observable gauge to track execution node counts by status
  • Implemented PollingService class in new metrics_poller.py module that:
    • Queries the database every 30 seconds for execution status counts
    • Only tracks active (non-terminal) statuses like PENDING, RUNNING, etc.
    • Thread-safely updates gauge observations with current counts
  • Integrated metrics poller as a daemon thread in the application startup, with automatic detection of OpenTelemetry metrics configuration

How to test?

  1. Set the OpenTelemetry metrics exporter endpoint environment variable (see examples/observability/otel-jaeger-prometheus)
  2. Start the application and verify the metrics poller thread launches
  3. Create pipeline executions in various active statuses
  4. Check that the execution.status.count gauge emits observations with correct counts for each status
  5. Verify that terminal statuses (SUCCEEDED, FAILED) are not included in the gauge metrics

Why make this change?

This enables real-time monitoring of pipeline execution health by providing visibility into how many executions are currently in each active state, which is essential for operational observability.

Copy link
Copy Markdown
Collaborator Author

morgan-wowk commented Mar 24, 2026

Warning

This pull request is not mergeable via GitHub because a downstack PR is open. Once all requirements are satisfied, merge this PR as a stack on Graphite.
Learn more

This stack of pull requests is managed by Graphite. Learn more about stacking.

@morgan-wowk morgan-wowk force-pushed the execution-status-count branch 2 times, most recently from ccde576 to cfdf72a Compare March 24, 2026 00:35
@morgan-wowk morgan-wowk marked this pull request as ready for review March 24, 2026 00:40
@morgan-wowk morgan-wowk requested a review from Ark-kun as a code owner March 24, 2026 00:40
@morgan-wowk morgan-wowk force-pushed the execution-status-count branch from cfdf72a to 365b4d3 Compare March 24, 2026 00:58
@morgan-wowk morgan-wowk force-pushed the task-status-duration branch from 949e41c to c8256fc Compare March 24, 2026 01:16
@morgan-wowk morgan-wowk force-pushed the execution-status-count branch from 365b4d3 to a3c0001 Compare March 24, 2026 01:16
self._poll_interval_seconds = poll_interval_seconds
self._lock = threading.Lock()
# Initialize all active statuses to 0
self._counts: dict[str, int] = {s.value: 0 for s in _ACTIVE_STATUSES}
Copy link
Copy Markdown
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Nit: used in two places, might make sense to put into a function.

Copy link
Copy Markdown
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Good call — extracted into _empty_status_counts() module-level helper, used in both __init__ and _poll.

# Register our observe method as the gauge callback.
# The OTel SDK stores callbacks in _callbacks; we append after creation
# since create_observable_gauge is called at module load time in metrics.py.
app_metrics.execution_status_count._callbacks.append(self._observe)
Copy link
Copy Markdown
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

You're calling a private variable, is it possible to make a function set sets up execution_status_count with the callbacks in metric.py?

Reason for this is:

  • The private variable could change since this is not meant to be called, which will have issues if it does in the future.
  • Will there ever be a case you're appending more than one callback? If so, can you elaborate that use case. My thought is you would only ever need to "set" one callback.

Copy link
Copy Markdown
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I'll see what we can do!

start_local.py Outdated
Comment on lines +237 to +239
run_configured_metrics_poller = lambda: run_metrics_poller(
db_engine=db_engine,
)
Copy link
Copy Markdown
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

It's better to do

def run_configured_metrics_poller() -> None:
    run_metrics_poller(db_engine=db_engine)

Instead of lambda for variables. Reason why is 1) troubleshooting, if there was a stack trace it'll just say "lambda" instead of the actual function name and 2) cannot have type hinting/docstrings to have a strongly type function for type checking.

Also, if we had a linter, there are some that would complain about this.

Use cases for when using lambda are inline/throwaway like:

sorted(items, key=lambda x: x.created_at)

Copy link
Copy Markdown
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

For now this is intentional to match existing code standards set by the orchestrator. It matches the orchestrator code exactly. I'm not sure I want to introduce an alternative format right now unless @Ark-kun would like make the switch.

I do like all your points for not using a lambda in this case.

Comment on lines +82 to +85
with self._lock:
counts = self._counts.copy()
for status_value, count in counts.items():
yield otel_metrics.Observation(count, {"execution.status": status_value})
Copy link
Copy Markdown
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Curious, when will observe be called? Will there ever be a race condition where observe is called before the poll happened/completed? Would it make sense to put a flag here that at last 1 poll completed?

Copy link
Copy Markdown
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The lock will prevent it from copying counts that are currently being modified.

As for there being at least 1 iteration, i'll add a check.

start_local.py Outdated
Comment on lines +217 to +236
from cloud_pipelines_backend.instrumentation.opentelemetry._internal import (
configuration as otel_configuration,
)


def run_metrics_poller(*, db_engine: sqlalchemy.Engine) -> None:
otel_config = otel_configuration.resolve()
if otel_config is None or otel_config.metrics is None:
logger.info(
f"No OTel metrics endpoint configured"
f" (set {otel_configuration.EnvVar.METRIC_EXPORTER_ENDPOINT})"
f" — metrics poller not starting"
)
return
session_factory = orm.sessionmaker(
autocommit=False, autoflush=False, bind=db_engine
)
metrics_poller.PollingService(session_factory=session_factory).run_loop()


Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Let's move this to metrics_poller

Copy link
Copy Markdown
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Done — moved into metrics_polling.run(*, db_engine). start_local.py now just passes db_engine and metrics_polling handles OTel config detection and session factory creation.

Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Style: Module names should be plural or uncountable nouns. metrics_polling?

Copy link
Copy Markdown
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Renamed to metrics_polling.py. Updated all import sites including oasis-backend.

def _observe(
self, _options: otel_metrics.CallbackOptions
) -> typing.Iterable[otel_metrics.Observation]:
with self._lock:
Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Are you sure you need lock at all? The dictionary does not seem to ever be modified - it's only ever assigned to new value.

Copy link
Copy Markdown
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

You're right — removed. self._counts = new_counts is a single STORE_ATTR bytecode — a complete new dict is assigned atomically, so _observe never sees a partially-built state. Added a comment noting the CPython GIL assumption.

@morgan-wowk morgan-wowk force-pushed the execution-status-count branch from a3c0001 to 7247044 Compare April 8, 2026 19:02
@morgan-wowk morgan-wowk force-pushed the task-status-duration branch from c8256fc to ba1fae1 Compare April 8, 2026 19:02
@morgan-wowk morgan-wowk requested a review from yuechao-qin April 8, 2026 19:28
@morgan-wowk morgan-wowk force-pushed the task-status-duration branch from ba1fae1 to 2fd4062 Compare April 8, 2026 19:53
@morgan-wowk morgan-wowk force-pushed the execution-status-count branch from 7247044 to 37b67a8 Compare April 8, 2026 19:53
@morgan-wowk morgan-wowk force-pushed the task-status-duration branch from 2fd4062 to 236acdb Compare April 8, 2026 20:03
@morgan-wowk morgan-wowk force-pushed the execution-status-count branch from 37b67a8 to af70874 Compare April 8, 2026 20:04
@morgan-wowk morgan-wowk force-pushed the task-status-duration branch from 236acdb to ecc6948 Compare April 8, 2026 20:22
@morgan-wowk morgan-wowk force-pushed the execution-status-count branch from af70874 to 99160c2 Compare April 8, 2026 20:22
@morgan-wowk morgan-wowk force-pushed the task-status-duration branch from ecc6948 to d51d844 Compare April 8, 2026 20:32
@morgan-wowk morgan-wowk force-pushed the execution-status-count branch from 99160c2 to de5a07b Compare April 8, 2026 20:34
@morgan-wowk morgan-wowk force-pushed the task-status-duration branch from ae305ae to 6632d71 Compare April 8, 2026 23:01
@morgan-wowk morgan-wowk force-pushed the execution-status-count branch from 241d212 to ceeae59 Compare April 8, 2026 23:01
@morgan-wowk morgan-wowk force-pushed the task-status-duration branch from 6632d71 to e7872c5 Compare April 8, 2026 23:17
@morgan-wowk morgan-wowk force-pushed the execution-status-count branch 2 times, most recently from 796be02 to 991a8f4 Compare April 8, 2026 23:59
@morgan-wowk morgan-wowk force-pushed the task-status-duration branch 2 times, most recently from 171fef8 to 7e42851 Compare April 9, 2026 00:08
@morgan-wowk morgan-wowk force-pushed the execution-status-count branch from 991a8f4 to 01f7395 Compare April 9, 2026 00:08
@morgan-wowk morgan-wowk force-pushed the task-status-duration branch from 7e42851 to 79b4de3 Compare April 9, 2026 00:12
@morgan-wowk morgan-wowk force-pushed the execution-status-count branch from 01f7395 to 6884a43 Compare April 9, 2026 00:12
@morgan-wowk morgan-wowk force-pushed the task-status-duration branch from 79b4de3 to 747ee6f Compare April 9, 2026 19:15
@morgan-wowk morgan-wowk force-pushed the execution-status-count branch 2 times, most recently from f14929b to 2a24486 Compare April 13, 2026 21:07
@morgan-wowk morgan-wowk force-pushed the task-status-duration branch from 747ee6f to e425fd8 Compare April 13, 2026 21:07
@morgan-wowk morgan-wowk force-pushed the execution-status-count branch from 2a24486 to fe6865e Compare April 13, 2026 21:21
@morgan-wowk morgan-wowk force-pushed the task-status-duration branch 2 times, most recently from 7037c83 to abca7d4 Compare April 13, 2026 21:31
@morgan-wowk morgan-wowk force-pushed the execution-status-count branch 2 times, most recently from 6d6485a to 03261de Compare April 13, 2026 21:40
@morgan-wowk morgan-wowk force-pushed the task-status-duration branch from abca7d4 to 83a6328 Compare April 13, 2026 21:40
…us count gauge

Add a PollingService daemon thread that runs COUNT GROUP BY status every
30s and emits an ObservableGauge per active (non-terminal) execution status.

- Add cloud_pipelines_backend/instrumentation/metrics_polling.py with
  PollingService, configure_logging(), and _ACTIVE_STATUSES
- Add execution_status_count ObservableGauge to metrics.py
- Add metrics poller region to start_local.py with OTel guard
@morgan-wowk morgan-wowk force-pushed the task-status-duration branch from 83a6328 to 34dc0a9 Compare April 14, 2026 20:58
@morgan-wowk morgan-wowk force-pushed the execution-status-count branch from 03261de to c63b881 Compare April 14, 2026 20:58
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment

Labels

None yet

Projects

None yet

Development

Successfully merging this pull request may close these issues.

3 participants