feat: Metrics to count all non-terminal execution statuses#186
feat: Metrics to count all non-terminal execution statuses#186morgan-wowk wants to merge 1 commit intotask-status-durationfrom
Conversation
|
Warning This pull request is not mergeable via GitHub because a downstack PR is open. Once all requirements are satisfied, merge this PR as a stack on Graphite.
This stack of pull requests is managed by Graphite. Learn more about stacking. |
ccde576 to
cfdf72a
Compare
cfdf72a to
365b4d3
Compare
949e41c to
c8256fc
Compare
365b4d3 to
a3c0001
Compare
| self._poll_interval_seconds = poll_interval_seconds | ||
| self._lock = threading.Lock() | ||
| # Initialize all active statuses to 0 | ||
| self._counts: dict[str, int] = {s.value: 0 for s in _ACTIVE_STATUSES} |
There was a problem hiding this comment.
Nit: used in two places, might make sense to put into a function.
There was a problem hiding this comment.
Good call — extracted into _empty_status_counts() module-level helper, used in both __init__ and _poll.
| # Register our observe method as the gauge callback. | ||
| # The OTel SDK stores callbacks in _callbacks; we append after creation | ||
| # since create_observable_gauge is called at module load time in metrics.py. | ||
| app_metrics.execution_status_count._callbacks.append(self._observe) |
There was a problem hiding this comment.
You're calling a private variable, is it possible to make a function set sets up execution_status_count with the callbacks in metric.py?
Reason for this is:
- The private variable could change since this is not meant to be called, which will have issues if it does in the future.
- Will there ever be a case you're appending more than one callback? If so, can you elaborate that use case. My thought is you would only ever need to "set" one callback.
There was a problem hiding this comment.
I'll see what we can do!
start_local.py
Outdated
| run_configured_metrics_poller = lambda: run_metrics_poller( | ||
| db_engine=db_engine, | ||
| ) |
There was a problem hiding this comment.
It's better to do
def run_configured_metrics_poller() -> None:
run_metrics_poller(db_engine=db_engine)
Instead of lambda for variables. Reason why is 1) troubleshooting, if there was a stack trace it'll just say "lambda" instead of the actual function name and 2) cannot have type hinting/docstrings to have a strongly type function for type checking.
Also, if we had a linter, there are some that would complain about this.
Use cases for when using lambda are inline/throwaway like:
sorted(items, key=lambda x: x.created_at)
There was a problem hiding this comment.
For now this is intentional to match existing code standards set by the orchestrator. It matches the orchestrator code exactly. I'm not sure I want to introduce an alternative format right now unless @Ark-kun would like make the switch.
I do like all your points for not using a lambda in this case.
| with self._lock: | ||
| counts = self._counts.copy() | ||
| for status_value, count in counts.items(): | ||
| yield otel_metrics.Observation(count, {"execution.status": status_value}) |
There was a problem hiding this comment.
Curious, when will observe be called? Will there ever be a race condition where observe is called before the poll happened/completed? Would it make sense to put a flag here that at last 1 poll completed?
There was a problem hiding this comment.
The lock will prevent it from copying counts that are currently being modified.
As for there being at least 1 iteration, i'll add a check.
start_local.py
Outdated
| from cloud_pipelines_backend.instrumentation.opentelemetry._internal import ( | ||
| configuration as otel_configuration, | ||
| ) | ||
|
|
||
|
|
||
| def run_metrics_poller(*, db_engine: sqlalchemy.Engine) -> None: | ||
| otel_config = otel_configuration.resolve() | ||
| if otel_config is None or otel_config.metrics is None: | ||
| logger.info( | ||
| f"No OTel metrics endpoint configured" | ||
| f" (set {otel_configuration.EnvVar.METRIC_EXPORTER_ENDPOINT})" | ||
| f" — metrics poller not starting" | ||
| ) | ||
| return | ||
| session_factory = orm.sessionmaker( | ||
| autocommit=False, autoflush=False, bind=db_engine | ||
| ) | ||
| metrics_poller.PollingService(session_factory=session_factory).run_loop() | ||
|
|
||
|
|
There was a problem hiding this comment.
Let's move this to metrics_poller
There was a problem hiding this comment.
Done — moved into metrics_polling.run(*, db_engine). start_local.py now just passes db_engine and metrics_polling handles OTel config detection and session factory creation.
There was a problem hiding this comment.
Style: Module names should be plural or uncountable nouns. metrics_polling?
There was a problem hiding this comment.
Renamed to metrics_polling.py. Updated all import sites including oasis-backend.
| def _observe( | ||
| self, _options: otel_metrics.CallbackOptions | ||
| ) -> typing.Iterable[otel_metrics.Observation]: | ||
| with self._lock: |
There was a problem hiding this comment.
Are you sure you need lock at all? The dictionary does not seem to ever be modified - it's only ever assigned to new value.
There was a problem hiding this comment.
You're right — removed. self._counts = new_counts is a single STORE_ATTR bytecode — a complete new dict is assigned atomically, so _observe never sees a partially-built state. Added a comment noting the CPython GIL assumption.
a3c0001 to
7247044
Compare
c8256fc to
ba1fae1
Compare
ba1fae1 to
2fd4062
Compare
7247044 to
37b67a8
Compare
2fd4062 to
236acdb
Compare
37b67a8 to
af70874
Compare
236acdb to
ecc6948
Compare
af70874 to
99160c2
Compare
ecc6948 to
d51d844
Compare
99160c2 to
de5a07b
Compare
d51d844 to
ae305ae
Compare
de5a07b to
241d212
Compare
ae305ae to
6632d71
Compare
241d212 to
ceeae59
Compare
6632d71 to
e7872c5
Compare
796be02 to
991a8f4
Compare
171fef8 to
7e42851
Compare
991a8f4 to
01f7395
Compare
7e42851 to
79b4de3
Compare
01f7395 to
6884a43
Compare
79b4de3 to
747ee6f
Compare
f14929b to
2a24486
Compare
747ee6f to
e425fd8
Compare
2a24486 to
fe6865e
Compare
7037c83 to
abca7d4
Compare
6d6485a to
03261de
Compare
abca7d4 to
83a6328
Compare
…us count gauge Add a PollingService daemon thread that runs COUNT GROUP BY status every 30s and emits an ObservableGauge per active (non-terminal) execution status. - Add cloud_pipelines_backend/instrumentation/metrics_polling.py with PollingService, configure_logging(), and _ACTIVE_STATUSES - Add execution_status_count ObservableGauge to metrics.py - Add metrics poller region to start_local.py with OTel guard
83a6328 to
34dc0a9
Compare
03261de to
c63b881
Compare

TL;DR
Added a metrics polling service that periodically queries the database to emit execution status count gauges for monitoring active (non-terminal) pipeline executions.
What changed?
EXECUTIONSmetric unit toMetricUnitenumexecution_status_countobservable gauge to track execution node counts by statusPollingServiceclass in newmetrics_poller.pymodule that:How to test?
examples/observability/otel-jaeger-prometheus)execution.status.countgauge emits observations with correct counts for each statusWhy make this change?
This enables real-time monitoring of pipeline execution health by providing visibility into how many executions are currently in each active state, which is essential for operational observability.