Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions handwritten/pubsub/package.json
Original file line number Diff line number Diff line change
Expand Up @@ -72,6 +72,7 @@
},
"devDependencies": {
"@grpc/proto-loader": "^0.8.0",
"@opentelemetry/context-async-hooks": "^2.6.1",
"@opentelemetry/sdk-trace-base": "^1.17.0",
"@types/duplexify": "^3.6.4",
"@types/extend": "^3.0.4",
Expand Down
8 changes: 7 additions & 1 deletion handwritten/pubsub/src/lease-manager.ts
Original file line number Diff line number Diff line change
Expand Up @@ -15,6 +15,7 @@
*/

import {EventEmitter} from 'events';
import {context as otelContext} from '@opentelemetry/api';

import {AckError, Message, Subscriber} from './subscriber';
import {defaultOptions} from './default-options';
Expand Down Expand Up @@ -297,8 +298,13 @@ export class LeaseManager extends EventEmitter {
message.ackId,
);
message.subSpans.processingStart(this._subscriber.name);
const emitCallback = () => this._subscriber.emit('message', message);
try {
this._subscriber.emit('message', message);
if (message.parentContext) {
otelContext.with(message.parentContext, emitCallback);
} else {
emitCallback();
}
} catch (e: unknown) {
logs.callbackExceptions.error(
'message (ID %s, ackID %s) caused a user callback exception: %o',
Expand Down
10 changes: 10 additions & 0 deletions handwritten/pubsub/src/subscriber.ts
Original file line number Diff line number Diff line change
Expand Up @@ -14,6 +14,7 @@
* limitations under the License.
*/

import {Context} from '@opentelemetry/api';
import {DateStruct, PreciseDate} from '@google-cloud/precise-date';
import {replaceProjectIdToken} from '@google-cloud/projectify';
import {promisify} from '@google-cloud/promisify';
Expand Down Expand Up @@ -289,6 +290,15 @@ export class Message implements tracing.MessageWithAttributes {
*/
parentSpan?: tracing.Span;

/**
* @private
*
* Tracks the propagation context (including baggage) extracted from the
* incoming message. Used to set the active context when dispatching to
* user callbacks so that baggage is accessible.
*/
parentContext?: Context;

/**
* We'll save the state of the subscription's exactly once delivery flag at the
* time the message was received. This is pretty much only for tracing, as we will
Expand Down
56 changes: 41 additions & 15 deletions handwritten/pubsub/src/telemetry-tracing.ts
Original file line number Diff line number Diff line change
@@ -1,5 +1,5 @@
/*!
* Copyright 2020-2024 Google LLC
* Copyright 2020-2026 Google LLC
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
Expand All @@ -26,7 +26,11 @@ import {
Context,
Link,
} from '@opentelemetry/api';
import {W3CTraceContextPropagator} from '@opentelemetry/core';
import {
W3CTraceContextPropagator,
W3CBaggagePropagator,
CompositePropagator,
} from '@opentelemetry/core';
import {Attributes, PubsubMessage} from './publisher/pubsub-message';
import {Duration} from './temporal';

Expand Down Expand Up @@ -76,7 +80,9 @@ export enum OpenTelemetryLevel {
* @private
* @internal
*/
const w3cTraceContextPropagator = new W3CTraceContextPropagator();
const compositePropagator = new CompositePropagator({
propagators: [new W3CTraceContextPropagator(), new W3CBaggagePropagator()],
});

// True if user code elsewhere wants to enable OpenTelemetry support.
let globallyEnabled = false;
Expand Down Expand Up @@ -117,6 +123,7 @@ export function isEnabled(): OpenTelemetryLevel {
export interface MessageWithAttributes {
attributes?: Attributes | null | undefined;
parentSpan?: Span;
parentContext?: Context;
}

/**
Expand Down Expand Up @@ -225,6 +232,7 @@ export function spanContextToContext(
* @internal
*/
export const modernAttributeName = 'googclient_traceparent';
export const baggageAttributeName = 'googclient_baggage';

export interface AttributeParams {
// Fully qualified.
Expand Down Expand Up @@ -768,22 +776,36 @@ export function injectSpan(span: Span, message: MessageWithAttributes): void {
delete message.attributes[modernAttributeName];
}

// Always do propagation injection with the trace context.
const context = trace.setSpanContext(ROOT_CONTEXT, span.spanContext());
w3cTraceContextPropagator.inject(context, message, pubsubSetter);
if (message.attributes[baggageAttributeName]) {
console.warn(
`${baggageAttributeName} key set as message attribute, but will be overridden.`,
);

delete message.attributes[baggageAttributeName];
}

// Always do propagation injection with the trace and baggage context.
const propagationContext = trace.setSpanContext(
context.active(),
span.spanContext(),
);
compositePropagator.inject(propagationContext, message, pubsubSetter);

// Also put the direct reference to the Span object for while we're
// passing it around in the client library.
message.parentSpan = span;
}

/**
* Returns true if this message potentially contains a span context.
* Returns true if this message potentially contains a propagation context
* (trace context or baggage).
*
* @private
* @internal
*/
export function containsSpanContext(message: MessageWithAttributes): boolean {
export function containsPropagationContext(
message: MessageWithAttributes,
): boolean {
if (message.parentSpan) {
return true;
}
Expand All @@ -793,7 +815,9 @@ export function containsSpanContext(message: MessageWithAttributes): boolean {
}

const keys = Object.getOwnPropertyNames(message.attributes);
return !!keys.find(n => n === modernAttributeName);
return !!keys.find(
n => n === modernAttributeName || n === baggageAttributeName,
);
}

/**
Expand Down Expand Up @@ -822,12 +846,11 @@ export function extractSpan(

let context: Context | undefined;

if (keys.includes(modernAttributeName)) {
context = w3cTraceContextPropagator.extract(
ROOT_CONTEXT,
message,
pubsubGetter,
);
if (
keys.includes(modernAttributeName) ||
keys.includes(baggageAttributeName)
) {
context = compositePropagator.extract(ROOT_CONTEXT, message, pubsubGetter);
}

const span = PubsubSpans.createReceiveSpan(
Expand All @@ -837,5 +860,8 @@ export function extractSpan(
'extractSpan',
);
message.parentSpan = span;
if (context) {
message.parentContext = context;
}
return span;
}
120 changes: 116 additions & 4 deletions handwritten/pubsub/test/telemetry-tracing.ts
Original file line number Diff line number Diff line change
Expand Up @@ -20,7 +20,7 @@ import {describe, it, beforeEach} from 'mocha';
import * as trace from '@opentelemetry/sdk-trace-base';
import * as otel from '../src/telemetry-tracing';
import {exporter} from './tracing';
import {SpanKind} from '@opentelemetry/api';
import {SpanKind, context, propagation} from '@opentelemetry/api';
import sinon = require('sinon');
import {PubsubMessage} from '../src/publisher';
import {Duration} from '../src/temporal';
Expand Down Expand Up @@ -160,18 +160,25 @@ describe('OpenTelemetryTracer', () => {
}
});

it('should be able to determine if attributes are present', () => {
it('should be able to determine if propagation attributes are present', () => {
let message: otel.MessageWithAttributes;

message = {
attributes: {
[otel.modernAttributeName]: 'foobar',
},
};
assert.strictEqual(otel.containsSpanContext(message), true);
assert.strictEqual(otel.containsPropagationContext(message), true);

message = {
attributes: {
[otel.baggageAttributeName]: 'key=value',
},
};
assert.strictEqual(otel.containsPropagationContext(message), true);

message = {};
assert.strictEqual(otel.containsSpanContext(message), false);
assert.strictEqual(otel.containsPropagationContext(message), false);
});

it('extracts a trace context', () => {
Expand All @@ -191,6 +198,111 @@ describe('OpenTelemetryTracer', () => {
'd4cda95b652f4a1592b449d5929fda1b',
);
});

it('injects baggage from the active context into message attributes', () => {
const baggage = propagation.createBaggage({
'test-key': {value: 'test-value'},
});

const publishMessage: PubsubMessage = {
attributes: {},
};
const span = otel.PubsubSpans.createPublisherSpan(
publishMessage,
'projects/test/topics/topicfoo',
'tests',
);
assert.ok(span);

// Set the baggage on the active context.
const ctxWithBaggage = propagation.setBaggage(context.active(), baggage);

// Execute injectSpan within the scope of the active context.
context.with(ctxWithBaggage, () => {
otel.injectSpan(span, publishMessage);
});

// Verify baggage attribute was set on the message by the compositePropagator.
assert.strictEqual(
Object.getOwnPropertyNames(publishMessage.attributes).includes(
otel.baggageAttributeName,
),
true,
);
assert.ok(
(
publishMessage.attributes![otel.baggageAttributeName] as string
).includes('test-key=test-value'),
);
});

it('should issue a warning if baggage attribute key is set', () => {
const message: PubsubMessage = {
attributes: {
[otel.baggageAttributeName]: 'bazbar',
},
};
const span = otel.PubsubSpans.createPublisherSpan(
message,
'projects/test/topics/topicfoo',
'tests',
);
assert.ok(span);

const warnSpy = sinon.spy(console, 'warn');
try {
otel.injectSpan(span, message);
assert.strictEqual(warnSpy.callCount, 1);
} finally {
warnSpy.restore();
}
});

it('extracts baggage from message attributes', () => {
const message: otel.MessageWithAttributes = {
attributes: {
[otel.modernAttributeName]:
'00-d4cda95b652f4a1592b449d5929fda1b-553964cd9101a314-01',
[otel.baggageAttributeName]: 'test-key=test-value',
},
};

const childSpan = otel.extractSpan(
message,
'projects/test/subscriptions/subfoo',
);
assert.ok(childSpan);
assert.strictEqual(
childSpan.spanContext().traceId,
'd4cda95b652f4a1592b449d5929fda1b',
);

// Verify baggage is accessible on the extracted context.
assert.ok(message.parentContext);
const baggage = propagation.getBaggage(message.parentContext!);
assert.ok(baggage);
assert.strictEqual(baggage!.getEntry('test-key')?.value, 'test-value');
});

it('extracts span when only baggage is present', () => {
const message: otel.MessageWithAttributes = {
attributes: {
[otel.baggageAttributeName]: 'test-key=test-value',
},
};

const childSpan = otel.extractSpan(
message,
'projects/test/subscriptions/subfoo',
);
assert.ok(childSpan);

// Verify baggage is accessible even without a trace context.
assert.ok(message.parentContext);
const baggage = propagation.getBaggage(message.parentContext!);
assert.ok(baggage);
assert.strictEqual(baggage!.getEntry('test-key')?.value, 'test-value');
});
});

describe('attribute creation', () => {
Expand Down
14 changes: 14 additions & 0 deletions handwritten/pubsub/test/tracing.ts
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,13 @@ import {
InMemorySpanExporter,
SimpleSpanProcessor,
} from '@opentelemetry/sdk-trace-base';
import {context, propagation} from '@opentelemetry/api';
import {
CompositePropagator,
W3CTraceContextPropagator,
W3CBaggagePropagator,
} from '@opentelemetry/core';
import {AsyncLocalStorageContextManager} from '@opentelemetry/context-async-hooks';

/**
* This file is used to initialise a global tracing provider and span exporter
Expand All @@ -38,4 +45,11 @@ import {
export const exporter: InMemorySpanExporter = new InMemorySpanExporter();
export const provider: BasicTracerProvider = new BasicTracerProvider();
provider.addSpanProcessor(new SimpleSpanProcessor(exporter));
const contextManager = new AsyncLocalStorageContextManager();
context.setGlobalContextManager(contextManager);
provider.register();
propagation.setGlobalPropagator(
new CompositePropagator({
propagators: [new W3CTraceContextPropagator(), new W3CBaggagePropagator()],
}),
);
Loading