Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -432,11 +432,12 @@ def __init__(
)
),
resource=resource,
metric_readers=metric_readers,
views=views,
)
self._metric_readers = metric_readers
self._measurement_consumer = SynchronousMeasurementConsumer(
sdk_config=self._sdk_config
sdk_config=self._sdk_config,
metric_readers=metric_readers,
)
disabled = environ.get(OTEL_SDK_DISABLED, "")
self._disabled = disabled.lower().strip() == "true"
Expand All @@ -448,7 +449,7 @@ def __init__(
self._shutdown_once = Once()
self._shutdown = False

for metric_reader in self._sdk_config.metric_readers:
for metric_reader in self._metric_readers:
with self._all_metric_readers_lock:
if metric_reader in self._all_metric_readers:
# pylint: disable=broad-exception-raised
Expand All @@ -468,7 +469,7 @@ def force_flush(self, timeout_millis: float = 10_000) -> bool:

metric_reader_error = {}

for metric_reader in self._sdk_config.metric_readers:
for metric_reader in self._metric_readers:
current_ts = time_ns()
try:
if current_ts >= deadline_ns:
Expand Down Expand Up @@ -513,7 +514,7 @@ def _shutdown():

metric_reader_error = {}

for metric_reader in self._sdk_config.metric_readers:
for metric_reader in self._metric_readers:
current_ts = time_ns()
try:
if current_ts >= deadline_ns:
Expand Down Expand Up @@ -580,3 +581,31 @@ def get_meter(
self._measurement_consumer,
)
return self._meters[info]

def add_metric_reader(
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@xrmx Any concerns on new public API here?

self, metric_reader: "opentelemetry.sdk.metrics.export.MetricReader"
) -> None:
with self._all_metric_readers_lock:
if metric_reader in self._all_metric_readers:
raise ValueError(
f"MetricReader {metric_reader} has been registered already!"
)
self._measurement_consumer.add_metric_reader(metric_reader)
metric_reader._set_collect_callback(
self._measurement_consumer.collect
)
self._all_metric_readers.add(metric_reader)

def remove_metric_reader(
self,
metric_reader: "opentelemetry.sdk.metrics.export.MetricReader",
) -> None:
with self._all_metric_readers_lock:
if metric_reader not in self._all_metric_readers:
raise ValueError(
f"MetricReader {metric_reader} has not been registered!"
)
self._measurement_consumer.remove_metric_reader(metric_reader)
metric_reader._set_collect_callback(None)
metric_reader.shutdown()
self._all_metric_readers.remove(metric_reader)
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,7 @@
from threading import Lock
from time import time_ns
from typing import Iterable, List, Mapping, Optional
from weakref import WeakSet

# This kind of import is needed to avoid Sphinx errors.
import opentelemetry.sdk.metrics
Expand Down Expand Up @@ -59,10 +60,10 @@ class SynchronousMeasurementConsumer(MeasurementConsumer):
def __init__(
self,
sdk_config: "opentelemetry.sdk.metrics._internal.SdkConfiguration",
metric_readers: Iterable["opentelemetry.sdk.metrics.MetricReader"],
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Likewise here we could switch this to accepting reader_storages: Mapping["opentelemetry.sdk.metrics.MetricReader", MetricReaderStorage]

Internally, we can switch self._reader_storages to being a weakref.WeakKeyDictionary as an extra optimization.

) -> None:
self._lock = Lock()
self._sdk_config = sdk_config
# should never be mutated
self._reader_storages: Mapping[
"opentelemetry.sdk.metrics.MetricReader", MetricReaderStorage
] = {
Expand All @@ -71,7 +72,7 @@ def __init__(
reader._instrument_class_temporality,
reader._instrument_class_aggregation,
)
for reader in sdk_config.metric_readers
for reader in metric_readers
}
self._async_instruments: List[
"opentelemetry.sdk.metrics._internal.instrument._Asynchronous"
Expand All @@ -86,7 +87,9 @@ def consume_measurement(self, measurement: Measurement) -> None:
measurement.context,
)
)
for reader_storage in self._reader_storages.values():
with self._lock:
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I don't love that this adds a global lock, even though I imagine most users will never need to add/remove consumers. Unfortunately our existing benchmark suite is not great and we don't have anything cover measurements in multiple threads. But I think I'm OK with it for now. We can always roll this back or look into copying approaches, RW locks, etc

Did you measure the perf impact here at all?

Copy link
Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

No performance testing so far but I'll check the benchmarks and add appropriate tests

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I don't think the benchmarks cover it too well. If you're up for adding some benchmarks, it's very appreciated!

reader_storages = WeakSet(self._reader_storages.values())
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Why do we need a weak set here vs a plain old list?

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Why do we need a weak set here vs a plain old list?

This was something I suggested to allow for reader storages to be garbage collected in case they are removed between releasing the lock and calling consume_measurement

for reader_storage in reader_storages:
reader_storage.consume_measurement(
measurement, should_sample_exemplar
)
Expand Down Expand Up @@ -143,3 +146,21 @@ def collect(
result = self._reader_storages[metric_reader].collect()

return result

def add_metric_reader(
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Perhaps instead of add_metric_reader and remove_metric_reader we could just have add_reader_storage and remove_reader_storage which simply accepts the metric reader and a pre-constructed MetricReaderStorage for add_metric_reader and just the metric reader for remove_reader_storage

Copy link
Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

regarding changing metric_readers to reader_storages: the change is simple but what is the advantage of doing this? It seems like it'd only shift the responsibility from SynchronousMeasurementConsumer to MeterProvider. can you please explain the intent a bit more?

self, metric_reader: "opentelemetry.sdk.metrics.MetricReader"
) -> None:
"""Registers a new metric reader."""
with self._lock:
self._reader_storages[metric_reader] = MetricReaderStorage(
self._sdk_config,
metric_reader._instrument_class_temporality,
Copy link
Contributor

@herin049 herin049 Feb 16, 2026

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I think pylint might be complaining about these lines here. Maybe instead of referencing private attributes from the metric reader itself, you can make these arguments to the add_metric_reader function.

Copy link
Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

this would only shift the lint error to MeterProvider as it still has to access the private attribute of the metric reader (if I understand your suggestion correctly). The other one is the _set_collect_callback in MeterProvider which is a surprising lint error since this exact file, uses this exact function on line 462 without any special lint comments but somehow the linter doesn't complain about this.

************* Module opentelemetry.sdk.metrics._internal opentelemetry-sdk/src/opentelemetry/sdk/metrics/_internal/__init__.py:594:12: W0212: Access to a protected member _set_collect_callback of a client class (protected-access) opentelemetry-sdk/src/opentelemetry/sdk/metrics/_internal/__init__.py:609:12: W0212: Access to a protected member _set_collect_callback of a client class (protected-access)

metric_reader._instrument_class_aggregation,
)

def remove_metric_reader(
self, metric_reader: "opentelemetry.sdk.metrics.MetricReader"
) -> None:
"""Unregisters the given metric reader."""
with self._lock:
self._reader_storages.pop(metric_reader)
Original file line number Diff line number Diff line change
Expand Up @@ -26,5 +26,4 @@
class SdkConfiguration:
exemplar_filter: "opentelemetry.sdk.metrics.ExemplarFilter"
resource: "opentelemetry.sdk.resources.Resource"
metric_readers: Sequence["opentelemetry.sdk.metrics.MetricReader"]
views: Sequence["opentelemetry.sdk.metrics.View"]
27 changes: 14 additions & 13 deletions opentelemetry-sdk/tests/metrics/test_measurement_consumer.py
Original file line number Diff line number Diff line change
Expand Up @@ -34,7 +34,8 @@
class TestSynchronousMeasurementConsumer(TestCase):
def test_parent(self, _):
self.assertIsInstance(
SynchronousMeasurementConsumer(MagicMock()), MeasurementConsumer
SynchronousMeasurementConsumer(MagicMock(), metric_readers=()),
MeasurementConsumer,
)

def test_creates_metric_reader_storages(self, MockMetricReaderStorage):
Expand All @@ -44,9 +45,9 @@ def test_creates_metric_reader_storages(self, MockMetricReaderStorage):
SdkConfiguration(
exemplar_filter=Mock(),
resource=Mock(),
metric_readers=reader_mocks,
views=Mock(),
)
),
metric_readers=reader_mocks,
)
self.assertEqual(len(MockMetricReaderStorage.mock_calls), 5)

Expand All @@ -61,9 +62,9 @@ def test_measurements_passed_to_each_reader_storage(
SdkConfiguration(
exemplar_filter=Mock(should_sample=Mock(return_value=False)),
resource=Mock(),
metric_readers=reader_mocks,
views=Mock(),
)
),
metric_readers=reader_mocks,
)
measurement_mock = Mock()
consumer.consume_measurement(measurement_mock)
Expand All @@ -83,9 +84,9 @@ def test_collect_passed_to_reader_stage(self, MockMetricReaderStorage):
SdkConfiguration(
exemplar_filter=Mock(),
resource=Mock(),
metric_readers=reader_mocks,
views=Mock(),
)
),
metric_readers=reader_mocks,
)
for r_mock, rs_mock in zip(reader_mocks, reader_storage_mocks):
rs_mock.collect.assert_not_called()
Expand All @@ -102,9 +103,9 @@ def test_collect_calls_async_instruments(self, MockMetricReaderStorage):
SdkConfiguration(
exemplar_filter=Mock(should_sample=Mock(return_value=False)),
resource=Mock(),
metric_readers=[reader_mock],
views=Mock(),
)
),
metric_readers=[reader_mock],
)
async_instrument_mocks = [MagicMock() for _ in range(5)]
for i_mock in async_instrument_mocks:
Expand Down Expand Up @@ -133,9 +134,9 @@ def test_collect_timeout(self, MockMetricReaderStorage):
SdkConfiguration(
exemplar_filter=Mock(),
resource=Mock(),
metric_readers=[reader_mock],
views=Mock(),
)
),
metric_readers=[reader_mock],
)

def sleep_1(*args, **kwargs):
Expand Down Expand Up @@ -166,9 +167,9 @@ def test_collect_deadline(
SdkConfiguration(
exemplar_filter=Mock(),
resource=Mock(),
metric_readers=[reader_mock],
views=Mock(),
)
),
metric_readers=[reader_mock],
)

def sleep_1(*args, **kwargs):
Expand Down
16 changes: 0 additions & 16 deletions opentelemetry-sdk/tests/metrics/test_metric_reader_storage.py
Original file line number Diff line number Diff line change
Expand Up @@ -79,7 +79,6 @@ def test_creates_view_instrument_matches(
SdkConfiguration(
exemplar_filter=Mock(),
resource=Mock(),
metric_readers=(),
views=(view1, view2),
),
MagicMock(
Expand Down Expand Up @@ -146,7 +145,6 @@ def test_forwards_calls_to_view_instrument_match(
SdkConfiguration(
exemplar_filter=Mock(),
resource=Mock(),
metric_readers=(),
views=(view1, view2),
),
MagicMock(
Expand Down Expand Up @@ -256,7 +254,6 @@ def test_race_concurrent_measurements(self, MockViewInstrumentMatch: Mock):
SdkConfiguration(
exemplar_filter=Mock(),
resource=Mock(),
metric_readers=(),
views=(view1,),
),
MagicMock(
Expand Down Expand Up @@ -291,7 +288,6 @@ def test_default_view_enabled(self, MockViewInstrumentMatch: Mock):
SdkConfiguration(
exemplar_filter=Mock(),
resource=Mock(),
metric_readers=(),
views=(),
),
MagicMock(
Expand Down Expand Up @@ -327,7 +323,6 @@ def test_drop_aggregation(self):
SdkConfiguration(
exemplar_filter=Mock(),
resource=Mock(),
metric_readers=(),
views=(
View(
instrument_name="name", aggregation=DropAggregation()
Expand Down Expand Up @@ -355,7 +350,6 @@ def test_same_collection_start(self):
SdkConfiguration(
exemplar_filter=Mock(),
resource=Mock(),
metric_readers=(),
views=(View(instrument_name="name"),),
),
MagicMock(
Expand Down Expand Up @@ -402,7 +396,6 @@ def test_conflicting_view_configuration(self):
SdkConfiguration(
exemplar_filter=Mock(),
resource=Mock(),
metric_readers=(),
views=(
View(
instrument_name="observable_counter",
Expand Down Expand Up @@ -451,7 +444,6 @@ def test_view_instrument_match_conflict_0(self):
SdkConfiguration(
exemplar_filter=Mock(),
resource=Mock(),
metric_readers=(),
views=(
View(instrument_name="observable_counter_0", name="foo"),
View(instrument_name="observable_counter_1", name="foo"),
Expand Down Expand Up @@ -509,7 +501,6 @@ def test_view_instrument_match_conflict_1(self):
SdkConfiguration(
exemplar_filter=Mock(),
resource=Mock(),
metric_readers=(),
views=(
View(instrument_name="bar", name="foo"),
View(instrument_name="baz", name="foo"),
Expand Down Expand Up @@ -578,7 +569,6 @@ def test_view_instrument_match_conflict_2(self):
SdkConfiguration(
exemplar_filter=Mock(),
resource=Mock(),
metric_readers=(),
views=(
View(instrument_name="foo"),
View(instrument_name="bar"),
Expand Down Expand Up @@ -631,7 +621,6 @@ def test_view_instrument_match_conflict_3(self):
SdkConfiguration(
exemplar_filter=Mock(),
resource=Mock(),
metric_readers=(),
views=(
View(instrument_name="bar", name="foo"),
View(instrument_name="baz", name="foo"),
Expand Down Expand Up @@ -682,7 +671,6 @@ def test_view_instrument_match_conflict_4(self):
SdkConfiguration(
exemplar_filter=Mock(),
resource=Mock(),
metric_readers=(),
views=(
View(instrument_name="bar", name="foo"),
View(instrument_name="baz", name="foo"),
Expand Down Expand Up @@ -729,7 +717,6 @@ def test_view_instrument_match_conflict_5(self):
SdkConfiguration(
exemplar_filter=Mock(),
resource=Mock(),
metric_readers=(),
views=(
View(instrument_name="observable_counter_0", name="foo"),
View(instrument_name="observable_counter_1", name="foo"),
Expand Down Expand Up @@ -784,7 +771,6 @@ def test_view_instrument_match_conflict_6(self):
SdkConfiguration(
exemplar_filter=Mock(),
resource=Mock(),
metric_readers=(),
views=(
View(instrument_name="observable_counter", name="foo"),
View(instrument_name="histogram", name="foo"),
Expand Down Expand Up @@ -839,7 +825,6 @@ def test_view_instrument_match_conflict_7(self):
SdkConfiguration(
exemplar_filter=Mock(),
resource=Mock(),
metric_readers=(),
views=(
View(instrument_name="observable_counter_0", name="foo"),
View(instrument_name="observable_counter_1", name="foo"),
Expand Down Expand Up @@ -894,7 +879,6 @@ def test_view_instrument_match_conflict_8(self):
SdkConfiguration(
exemplar_filter=Mock(),
resource=Mock(),
metric_readers=(),
views=(
View(instrument_name="up_down_counter", name="foo"),
View(
Expand Down
Loading
Loading