Skip to content
Draft
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -70,6 +70,7 @@ public final class GeneralConfig {
"trace.tracer.metrics.buffering.enabled";
public static final String TRACER_METRICS_MAX_AGGREGATES = "trace.tracer.metrics.max.aggregates";
public static final String TRACER_METRICS_MAX_PENDING = "trace.tracer.metrics.max.pending";
public static final String APM_ADDITIONAL_METRIC_TAGS = "apm.additional.metric.tags";
public static final String TRACER_METRICS_IGNORED_RESOURCES =
"trace.tracer.metrics.ignored.resources";

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -41,6 +41,7 @@
import java.util.ArrayList;
import java.util.Arrays;
import java.util.Collections;
import java.util.Comparator;
import java.util.HashSet;
import java.util.List;
import java.util.Map;
Expand Down Expand Up @@ -80,6 +81,16 @@ public final class ConflatingMetricsAggregator implements MetricsAggregator, Eve
Pair.of(
DDCaches.newFixedSizeCache(512),
value -> UTF8BytesString.create(key + ":" + value));
private static final DDCache<
String, Pair<DDCache<String, UTF8BytesString>, Function<String, UTF8BytesString>>>
SPAN_DERIVED_PRIMARY_TAGS_CACHE = DDCaches.newFixedSizeCache(64);
private static final Function<
String, Pair<DDCache<String, UTF8BytesString>, Function<String, UTF8BytesString>>>
SPAN_DERIVED_PRIMARY_TAGS_CACHE_ADDER =
key ->
Pair.of(
DDCaches.newFixedSizeCache(512),
value -> UTF8BytesString.create(key + ":" + value));
private static final CharSequence SYNTHETICS_ORIGIN = "synthetics";

private static final Set<String> ELIGIBLE_SPAN_KINDS_FOR_METRICS =
Expand All @@ -103,18 +114,21 @@ public final class ConflatingMetricsAggregator implements MetricsAggregator, Eve
private final long reportingInterval;
private final TimeUnit reportingIntervalTimeUnit;
private final DDAgentFeaturesDiscovery features;
private final Set<String> additionalMetricTags;
private final HealthMetrics healthMetrics;
private final boolean includeEndpointInMetrics;

private volatile AgentTaskScheduler.Scheduled<?> cancellation;

// TODO: Refactor to one / fewer constructors?
public ConflatingMetricsAggregator(
Config config,
SharedCommunicationObjects sharedCommunicationObjects,
HealthMetrics healthMetrics) {
this(
config.getWellKnownTags(),
config.getMetricsIgnoredResources(),
config.getAdditionalMetricTags(),
sharedCommunicationObjects.featuresDiscovery(config),
healthMetrics,
new OkHttpSink(
Expand All @@ -141,6 +155,7 @@ public ConflatingMetricsAggregator(
this(
wellKnownTags,
ignoredResources,
Collections.emptySet(),
features,
healthMetric,
sink,
Expand All @@ -163,7 +178,58 @@ public ConflatingMetricsAggregator(
TimeUnit timeUnit,
boolean includeEndpointInMetrics) {
this(
wellKnownTags,
ignoredResources,
Collections.emptySet(),
features,
healthMetric,
sink,
maxAggregates,
queueSize,
reportingInterval,
timeUnit,
includeEndpointInMetrics);
}

ConflatingMetricsAggregator(
WellKnownTags wellKnownTags,
Set<String> ignoredResources,
Set<String> additionalMetricTags,
DDAgentFeaturesDiscovery features,
HealthMetrics healthMetric,
Sink sink,
int maxAggregates,
int queueSize,
boolean includeEndpointInMetrics) {
this(
wellKnownTags,
ignoredResources,
additionalMetricTags,
features,
healthMetric,
sink,
maxAggregates,
queueSize,
10,
SECONDS,
includeEndpointInMetrics);
}

ConflatingMetricsAggregator(
WellKnownTags wellKnownTags,
Set<String> ignoredResources,
Set<String> additionalMetricTags,
DDAgentFeaturesDiscovery features,
HealthMetrics healthMetric,
Sink sink,
int maxAggregates,
int queueSize,
long reportingInterval,
TimeUnit timeUnit,
boolean includeEndpointInMetrics) {
this(
ignoredResources,
additionalMetricTags,
features,
healthMetric,
sink,
Expand All @@ -186,7 +252,35 @@ public ConflatingMetricsAggregator(
long reportingInterval,
TimeUnit timeUnit,
boolean includeEndpointInMetrics) {
this(
ignoredResources,
Collections.emptySet(),
features,
healthMetric,
sink,
metricWriter,
maxAggregates,
queueSize,
reportingInterval,
timeUnit,
includeEndpointInMetrics);
}

ConflatingMetricsAggregator(
Set<String> ignoredResources,
Set<String> additionalMetricTags,
DDAgentFeaturesDiscovery features,
HealthMetrics healthMetric,
Sink sink,
MetricWriter metricWriter,
int maxAggregates,
int queueSize,
long reportingInterval,
TimeUnit timeUnit,
boolean includeEndpointInMetrics) {
this.ignoredResources = ignoredResources;
this.additionalMetricTags =
additionalMetricTags == null ? Collections.<String>emptySet() : additionalMetricTags;
this.includeEndpointInMetrics = includeEndpointInMetrics;
this.inbox = Queues.mpscArrayQueue(queueSize);
this.batchPool = Queues.spmcArrayQueue(maxAggregates);
Expand Down Expand Up @@ -347,6 +441,7 @@ private boolean publish(CoreSpan<?> span, boolean isTopLevel, CharSequence spanK
SPAN_KINDS.computeIfAbsent(
spanKind, UTF8BytesString::create), // save repeated utf8 conversions
getPeerTags(span, spanKind.toString()),
getAdditionalMetricTags(span),
httpMethod,
httpEndpoint,
grpcStatusCode);
Expand Down Expand Up @@ -412,6 +507,34 @@ private List<UTF8BytesString> getPeerTags(CoreSpan<?> span, String spanKind) {
return Collections.emptyList();
}

// TODO: This method is very similar to getPeerTags. We can probably consolidate to a helper.
private List<UTF8BytesString> getAdditionalMetricTags(CoreSpan<?> span) {
if (additionalMetricTags == null || additionalMetricTags.isEmpty()) {
return Collections.emptyList();
}
List<UTF8BytesString> tagValues = new ArrayList<>(additionalMetricTags.size());
for (String tagKey : additionalMetricTags) {
Object value = span.unsafeGetTag(tagKey);
if (value != null) {
final Pair<DDCache<String, UTF8BytesString>, Function<String, UTF8BytesString>>
cacheAndCreator =
SPAN_DERIVED_PRIMARY_TAGS_CACHE.computeIfAbsent(
tagKey, SPAN_DERIVED_PRIMARY_TAGS_CACHE_ADDER);
tagValues.add(
cacheAndCreator
.getLeft()
.computeIfAbsent(value.toString(), cacheAndCreator.getRight()));
}
}
if (tagValues.isEmpty()) {
return Collections.emptyList();
}
if (tagValues.size() > 1) {
tagValues.sort(Comparator.comparing(UTF8BytesString::toString));
}
return tagValues;
}

private static boolean isSynthetic(CoreSpan<?> span) {
return span.getOrigin() != null && SYNTHETICS_ORIGIN.equals(span.getOrigin().toString());
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -36,6 +36,7 @@ public final class MetricKey {
private final boolean isTraceRoot;
private final UTF8BytesString spanKind;
private final List<UTF8BytesString> peerTags;
private final List<UTF8BytesString> additionalMetricTags;
private final UTF8BytesString httpMethod;
private final UTF8BytesString httpEndpoint;
private final UTF8BytesString grpcStatusCode;
Expand All @@ -54,6 +55,39 @@ public MetricKey(
CharSequence httpMethod,
CharSequence httpEndpoint,
CharSequence grpcStatusCode) {
this(
resource,
service,
operationName,
serviceSource,
type,
httpStatusCode,
synthetics,
isTraceRoot,
spanKind,
peerTags,
Collections.emptyList(),
httpMethod,
httpEndpoint,
grpcStatusCode);
}

// TODO: Should we keep one constructor? We'd need to refactor all of the old calls.
public MetricKey(
CharSequence resource,
CharSequence service,
CharSequence operationName,
CharSequence serviceSource,
CharSequence type,
int httpStatusCode,
boolean synthetics,
boolean isTraceRoot,
CharSequence spanKind,
List<UTF8BytesString> peerTags,
List<UTF8BytesString> additionalMetricTags,
CharSequence httpMethod,
CharSequence httpEndpoint,
CharSequence grpcStatusCode) {
this.resource = null == resource ? EMPTY : utf8(RESOURCE_CACHE, resource);
this.service = null == service ? EMPTY : utf8(SERVICE_CACHE, service);
this.serviceSource = null == serviceSource ? null : utf8(SERVICE_SOURCE_CACHE, serviceSource);
Expand All @@ -64,6 +98,8 @@ public MetricKey(
this.isTraceRoot = isTraceRoot;
this.spanKind = null == spanKind ? EMPTY : utf8(KIND_CACHE, spanKind);
this.peerTags = peerTags == null ? Collections.emptyList() : peerTags;
this.additionalMetricTags =
additionalMetricTags == null ? Collections.emptyList() : additionalMetricTags;
this.httpMethod = httpMethod == null ? null : utf8(HTTP_METHOD_CACHE, httpMethod);
this.httpEndpoint = httpEndpoint == null ? null : utf8(HTTP_ENDPOINT_CACHE, httpEndpoint);
this.grpcStatusCode =
Expand All @@ -73,6 +109,7 @@ public MetricKey(
tmpHash = HashingUtils.addToHash(tmpHash, this.isTraceRoot);
tmpHash = HashingUtils.addToHash(tmpHash, this.spanKind);
tmpHash = HashingUtils.addToHash(tmpHash, this.peerTags);
tmpHash = HashingUtils.addToHash(tmpHash, this.additionalMetricTags);
tmpHash = HashingUtils.addToHash(tmpHash, this.resource);
tmpHash = HashingUtils.addToHash(tmpHash, this.service);
tmpHash = HashingUtils.addToHash(tmpHash, this.operationName);
Expand Down Expand Up @@ -134,6 +171,10 @@ public List<UTF8BytesString> getPeerTags() {
return peerTags;
}

public List<UTF8BytesString> getAdditionalMetricTags() {
return additionalMetricTags;
}

public UTF8BytesString getHttpMethod() {
return httpMethod;
}
Expand Down Expand Up @@ -163,6 +204,7 @@ public boolean equals(Object o) {
&& isTraceRoot == metricKey.isTraceRoot
&& spanKind.equals(metricKey.spanKind)
&& peerTags.equals(metricKey.peerTags)
&& additionalMetricTags.equals(metricKey.additionalMetricTags)
&& Objects.equals(serviceSource, metricKey.serviceSource)
&& Objects.equals(httpMethod, metricKey.httpMethod)
&& Objects.equals(httpEndpoint, metricKey.httpEndpoint)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -41,6 +41,8 @@ public final class SerializingMetricWriter implements MetricWriter {
private static final byte[] IS_TRACE_ROOT = "IsTraceRoot".getBytes(ISO_8859_1);
private static final byte[] SPAN_KIND = "SpanKind".getBytes(ISO_8859_1);
private static final byte[] PEER_TAGS = "PeerTags".getBytes(ISO_8859_1);
private static final byte[] SPAN_DERIVED_PRIMARY_TAGS =
"SpanDerivedPrimaryTags".getBytes(ISO_8859_1);
private static final byte[] HTTP_METHOD = "HTTPMethod".getBytes(ISO_8859_1);
private static final byte[] HTTP_ENDPOINT = "HTTPEndpoint".getBytes(ISO_8859_1);
private static final byte[] GRPC_STATUS_CODE = "GRPCStatusCode".getBytes(ISO_8859_1);
Expand Down Expand Up @@ -149,7 +151,7 @@ public void add(MetricKey key, AggregateMetric aggregate) {
final boolean hasServiceSource = key.getServiceSource() != null;
final boolean hasGrpcStatusCode = key.getGrpcStatusCode() != null;
final int mapSize =
15
16
+ (hasServiceSource ? 1 : 0)
+ (hasHttpMethod ? 1 : 0)
+ (hasHttpEndpoint ? 1 : 0)
Expand Down Expand Up @@ -189,6 +191,14 @@ public void add(MetricKey key, AggregateMetric aggregate) {
writer.writeUTF8(peerTag);
}

writer.writeUTF8(SPAN_DERIVED_PRIMARY_TAGS);
final List<UTF8BytesString> additionalMetricTags = key.getAdditionalMetricTags();
writer.startArray(additionalMetricTags.size());

for (UTF8BytesString additionalMetricTag : additionalMetricTags) {
writer.writeUTF8(additionalMetricTag);
}

if (hasServiceSource) {
writer.writeUTF8(SERVICE_SOURCE);
writer.writeUTF8(key.getServiceSource());
Expand Down
Loading
Loading