-
Notifications
You must be signed in to change notification settings - Fork 816
New APIs to add/remove metric readers at run-time #4863
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
base: main
Are you sure you want to change the base?
Changes from all commits
File filter
Filter by extension
Conversations
Jump to
Diff view
Diff view
There are no files selected for viewing
| Original file line number | Diff line number | Diff line change |
|---|---|---|
|
|
@@ -18,6 +18,7 @@ | |
| from threading import Lock | ||
| from time import time_ns | ||
| from typing import Iterable, List, Mapping, Optional | ||
| from weakref import WeakSet | ||
|
|
||
| # This kind of import is needed to avoid Sphinx errors. | ||
| import opentelemetry.sdk.metrics | ||
|
|
@@ -59,10 +60,10 @@ class SynchronousMeasurementConsumer(MeasurementConsumer): | |
| def __init__( | ||
| self, | ||
| sdk_config: "opentelemetry.sdk.metrics._internal.SdkConfiguration", | ||
| metric_readers: Iterable["opentelemetry.sdk.metrics.MetricReader"], | ||
|
Contributor
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Likewise here we could switch this to accepting Internally, we can switch |
||
| ) -> None: | ||
| self._lock = Lock() | ||
| self._sdk_config = sdk_config | ||
| # should never be mutated | ||
| self._reader_storages: Mapping[ | ||
| "opentelemetry.sdk.metrics.MetricReader", MetricReaderStorage | ||
| ] = { | ||
|
|
@@ -71,7 +72,7 @@ def __init__( | |
| reader._instrument_class_temporality, | ||
| reader._instrument_class_aggregation, | ||
| ) | ||
| for reader in sdk_config.metric_readers | ||
| for reader in metric_readers | ||
| } | ||
| self._async_instruments: List[ | ||
| "opentelemetry.sdk.metrics._internal.instrument._Asynchronous" | ||
|
|
@@ -86,7 +87,9 @@ def consume_measurement(self, measurement: Measurement) -> None: | |
| measurement.context, | ||
| ) | ||
| ) | ||
| for reader_storage in self._reader_storages.values(): | ||
| with self._lock: | ||
|
Member
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. I don't love that this adds a global lock, even though I imagine most users will never need to add/remove consumers. Unfortunately our existing benchmark suite is not great and we don't have anything cover measurements in multiple threads. But I think I'm OK with it for now. We can always roll this back or look into copying approaches, RW locks, etc Did you measure the perf impact here at all?
Author
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. No performance testing so far but I'll check the benchmarks and add appropriate tests
Member
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. I don't think the benchmarks cover it too well. If you're up for adding some benchmarks, it's very appreciated! |
||
| reader_storages = WeakSet(self._reader_storages.values()) | ||
JP-MY marked this conversation as resolved.
Show resolved
Hide resolved
Member
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Why do we need a weak set here vs a plain old list?
Contributor
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more.
This was something I suggested to allow for reader storages to be garbage collected in case they are removed between releasing the lock and calling |
||
| for reader_storage in reader_storages: | ||
| reader_storage.consume_measurement( | ||
| measurement, should_sample_exemplar | ||
| ) | ||
|
|
@@ -143,3 +146,21 @@ def collect( | |
| result = self._reader_storages[metric_reader].collect() | ||
|
|
||
| return result | ||
|
|
||
| def add_metric_reader( | ||
|
Contributor
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Perhaps instead of
Author
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. regarding changing metric_readers to reader_storages: the change is simple but what is the advantage of doing this? It seems like it'd only shift the responsibility from |
||
| self, metric_reader: "opentelemetry.sdk.metrics.MetricReader" | ||
| ) -> None: | ||
| """Registers a new metric reader.""" | ||
| with self._lock: | ||
| self._reader_storages[metric_reader] = MetricReaderStorage( | ||
| self._sdk_config, | ||
| metric_reader._instrument_class_temporality, | ||
|
Contributor
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. I think pylint might be complaining about these lines here. Maybe instead of referencing private attributes from the metric reader itself, you can make these arguments to the
Author
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. this would only shift the lint error to
|
||
| metric_reader._instrument_class_aggregation, | ||
| ) | ||
JP-MY marked this conversation as resolved.
Show resolved
Hide resolved
|
||
|
|
||
| def remove_metric_reader( | ||
| self, metric_reader: "opentelemetry.sdk.metrics.MetricReader" | ||
| ) -> None: | ||
| """Unregisters the given metric reader.""" | ||
| with self._lock: | ||
| self._reader_storages.pop(metric_reader) | ||
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
@xrmx Any concerns on new public API here?