diff --git a/handwritten/pubsub/package.json b/handwritten/pubsub/package.json index 617dabc9f63e..bfe42f13bdd8 100644 --- a/handwritten/pubsub/package.json +++ b/handwritten/pubsub/package.json @@ -72,6 +72,7 @@ }, "devDependencies": { "@grpc/proto-loader": "^0.8.0", + "@opentelemetry/context-async-hooks": "^2.6.1", "@opentelemetry/sdk-trace-base": "^1.17.0", "@types/duplexify": "^3.6.4", "@types/extend": "^3.0.4", diff --git a/handwritten/pubsub/src/lease-manager.ts b/handwritten/pubsub/src/lease-manager.ts index d8afdb11a359..76f5f56cb6d9 100644 --- a/handwritten/pubsub/src/lease-manager.ts +++ b/handwritten/pubsub/src/lease-manager.ts @@ -15,6 +15,7 @@ */ import {EventEmitter} from 'events'; +import {context as otelContext} from '@opentelemetry/api'; import {AckError, Message, Subscriber} from './subscriber'; import {defaultOptions} from './default-options'; @@ -297,8 +298,13 @@ export class LeaseManager extends EventEmitter { message.ackId, ); message.subSpans.processingStart(this._subscriber.name); + const emitCallback = () => this._subscriber.emit('message', message); try { - this._subscriber.emit('message', message); + if (message.parentContext) { + otelContext.with(message.parentContext, emitCallback); + } else { + emitCallback(); + } } catch (e: unknown) { logs.callbackExceptions.error( 'message (ID %s, ackID %s) caused a user callback exception: %o', diff --git a/handwritten/pubsub/src/subscriber.ts b/handwritten/pubsub/src/subscriber.ts index 46834636e1b7..604f505f54ef 100644 --- a/handwritten/pubsub/src/subscriber.ts +++ b/handwritten/pubsub/src/subscriber.ts @@ -14,6 +14,7 @@ * limitations under the License. */ +import {Context} from '@opentelemetry/api'; import {DateStruct, PreciseDate} from '@google-cloud/precise-date'; import {replaceProjectIdToken} from '@google-cloud/projectify'; import {promisify} from '@google-cloud/promisify'; @@ -289,6 +290,15 @@ export class Message implements tracing.MessageWithAttributes { */ parentSpan?: tracing.Span; + /** + * @private + * + * Tracks the propagation context (including baggage) extracted from the + * incoming message. Used to set the active context when dispatching to + * user callbacks so that baggage is accessible. + */ + parentContext?: Context; + /** * We'll save the state of the subscription's exactly once delivery flag at the * time the message was received. This is pretty much only for tracing, as we will diff --git a/handwritten/pubsub/src/telemetry-tracing.ts b/handwritten/pubsub/src/telemetry-tracing.ts index 54a516d1b6b4..2ee65b0ac734 100644 --- a/handwritten/pubsub/src/telemetry-tracing.ts +++ b/handwritten/pubsub/src/telemetry-tracing.ts @@ -1,5 +1,5 @@ /*! - * Copyright 2020-2024 Google LLC + * Copyright 2020-2026 Google LLC * Licensed under the Apache License, Version 2.0 (the "License"); * you may not use this file except in compliance with the License. * You may obtain a copy of the License at @@ -26,7 +26,11 @@ import { Context, Link, } from '@opentelemetry/api'; -import {W3CTraceContextPropagator} from '@opentelemetry/core'; +import { + W3CTraceContextPropagator, + W3CBaggagePropagator, + CompositePropagator, +} from '@opentelemetry/core'; import {Attributes, PubsubMessage} from './publisher/pubsub-message'; import {Duration} from './temporal'; @@ -76,7 +80,9 @@ export enum OpenTelemetryLevel { * @private * @internal */ -const w3cTraceContextPropagator = new W3CTraceContextPropagator(); +const compositePropagator = new CompositePropagator({ + propagators: [new W3CTraceContextPropagator(), new W3CBaggagePropagator()], +}); // True if user code elsewhere wants to enable OpenTelemetry support. let globallyEnabled = false; @@ -117,6 +123,7 @@ export function isEnabled(): OpenTelemetryLevel { export interface MessageWithAttributes { attributes?: Attributes | null | undefined; parentSpan?: Span; + parentContext?: Context; } /** @@ -225,6 +232,7 @@ export function spanContextToContext( * @internal */ export const modernAttributeName = 'googclient_traceparent'; +export const baggageAttributeName = 'googclient_baggage'; export interface AttributeParams { // Fully qualified. @@ -768,9 +776,20 @@ export function injectSpan(span: Span, message: MessageWithAttributes): void { delete message.attributes[modernAttributeName]; } - // Always do propagation injection with the trace context. - const context = trace.setSpanContext(ROOT_CONTEXT, span.spanContext()); - w3cTraceContextPropagator.inject(context, message, pubsubSetter); + if (message.attributes[baggageAttributeName]) { + console.warn( + `${baggageAttributeName} key set as message attribute, but will be overridden.`, + ); + + delete message.attributes[baggageAttributeName]; + } + + // Always do propagation injection with the trace and baggage context. + const propagationContext = trace.setSpanContext( + context.active(), + span.spanContext(), + ); + compositePropagator.inject(propagationContext, message, pubsubSetter); // Also put the direct reference to the Span object for while we're // passing it around in the client library. @@ -778,12 +797,15 @@ export function injectSpan(span: Span, message: MessageWithAttributes): void { } /** - * Returns true if this message potentially contains a span context. + * Returns true if this message potentially contains a propagation context + * (trace context or baggage). * * @private * @internal */ -export function containsSpanContext(message: MessageWithAttributes): boolean { +export function containsPropagationContext( + message: MessageWithAttributes, +): boolean { if (message.parentSpan) { return true; } @@ -793,7 +815,9 @@ export function containsSpanContext(message: MessageWithAttributes): boolean { } const keys = Object.getOwnPropertyNames(message.attributes); - return !!keys.find(n => n === modernAttributeName); + return !!keys.find( + n => n === modernAttributeName || n === baggageAttributeName, + ); } /** @@ -822,12 +846,11 @@ export function extractSpan( let context: Context | undefined; - if (keys.includes(modernAttributeName)) { - context = w3cTraceContextPropagator.extract( - ROOT_CONTEXT, - message, - pubsubGetter, - ); + if ( + keys.includes(modernAttributeName) || + keys.includes(baggageAttributeName) + ) { + context = compositePropagator.extract(ROOT_CONTEXT, message, pubsubGetter); } const span = PubsubSpans.createReceiveSpan( @@ -837,5 +860,8 @@ export function extractSpan( 'extractSpan', ); message.parentSpan = span; + if (context) { + message.parentContext = context; + } return span; } diff --git a/handwritten/pubsub/test/telemetry-tracing.ts b/handwritten/pubsub/test/telemetry-tracing.ts index 4ef42f105575..b82732223089 100644 --- a/handwritten/pubsub/test/telemetry-tracing.ts +++ b/handwritten/pubsub/test/telemetry-tracing.ts @@ -20,7 +20,7 @@ import {describe, it, beforeEach} from 'mocha'; import * as trace from '@opentelemetry/sdk-trace-base'; import * as otel from '../src/telemetry-tracing'; import {exporter} from './tracing'; -import {SpanKind} from '@opentelemetry/api'; +import {SpanKind, context, propagation} from '@opentelemetry/api'; import sinon = require('sinon'); import {PubsubMessage} from '../src/publisher'; import {Duration} from '../src/temporal'; @@ -160,7 +160,7 @@ describe('OpenTelemetryTracer', () => { } }); - it('should be able to determine if attributes are present', () => { + it('should be able to determine if propagation attributes are present', () => { let message: otel.MessageWithAttributes; message = { @@ -168,10 +168,17 @@ describe('OpenTelemetryTracer', () => { [otel.modernAttributeName]: 'foobar', }, }; - assert.strictEqual(otel.containsSpanContext(message), true); + assert.strictEqual(otel.containsPropagationContext(message), true); + + message = { + attributes: { + [otel.baggageAttributeName]: 'key=value', + }, + }; + assert.strictEqual(otel.containsPropagationContext(message), true); message = {}; - assert.strictEqual(otel.containsSpanContext(message), false); + assert.strictEqual(otel.containsPropagationContext(message), false); }); it('extracts a trace context', () => { @@ -191,6 +198,111 @@ describe('OpenTelemetryTracer', () => { 'd4cda95b652f4a1592b449d5929fda1b', ); }); + + it('injects baggage from the active context into message attributes', () => { + const baggage = propagation.createBaggage({ + 'test-key': {value: 'test-value'}, + }); + + const publishMessage: PubsubMessage = { + attributes: {}, + }; + const span = otel.PubsubSpans.createPublisherSpan( + publishMessage, + 'projects/test/topics/topicfoo', + 'tests', + ); + assert.ok(span); + + // Set the baggage on the active context. + const ctxWithBaggage = propagation.setBaggage(context.active(), baggage); + + // Execute injectSpan within the scope of the active context. + context.with(ctxWithBaggage, () => { + otel.injectSpan(span, publishMessage); + }); + + // Verify baggage attribute was set on the message by the compositePropagator. + assert.strictEqual( + Object.getOwnPropertyNames(publishMessage.attributes).includes( + otel.baggageAttributeName, + ), + true, + ); + assert.ok( + ( + publishMessage.attributes![otel.baggageAttributeName] as string + ).includes('test-key=test-value'), + ); + }); + + it('should issue a warning if baggage attribute key is set', () => { + const message: PubsubMessage = { + attributes: { + [otel.baggageAttributeName]: 'bazbar', + }, + }; + const span = otel.PubsubSpans.createPublisherSpan( + message, + 'projects/test/topics/topicfoo', + 'tests', + ); + assert.ok(span); + + const warnSpy = sinon.spy(console, 'warn'); + try { + otel.injectSpan(span, message); + assert.strictEqual(warnSpy.callCount, 1); + } finally { + warnSpy.restore(); + } + }); + + it('extracts baggage from message attributes', () => { + const message: otel.MessageWithAttributes = { + attributes: { + [otel.modernAttributeName]: + '00-d4cda95b652f4a1592b449d5929fda1b-553964cd9101a314-01', + [otel.baggageAttributeName]: 'test-key=test-value', + }, + }; + + const childSpan = otel.extractSpan( + message, + 'projects/test/subscriptions/subfoo', + ); + assert.ok(childSpan); + assert.strictEqual( + childSpan.spanContext().traceId, + 'd4cda95b652f4a1592b449d5929fda1b', + ); + + // Verify baggage is accessible on the extracted context. + assert.ok(message.parentContext); + const baggage = propagation.getBaggage(message.parentContext!); + assert.ok(baggage); + assert.strictEqual(baggage!.getEntry('test-key')?.value, 'test-value'); + }); + + it('extracts span when only baggage is present', () => { + const message: otel.MessageWithAttributes = { + attributes: { + [otel.baggageAttributeName]: 'test-key=test-value', + }, + }; + + const childSpan = otel.extractSpan( + message, + 'projects/test/subscriptions/subfoo', + ); + assert.ok(childSpan); + + // Verify baggage is accessible even without a trace context. + assert.ok(message.parentContext); + const baggage = propagation.getBaggage(message.parentContext!); + assert.ok(baggage); + assert.strictEqual(baggage!.getEntry('test-key')?.value, 'test-value'); + }); }); describe('attribute creation', () => { diff --git a/handwritten/pubsub/test/tracing.ts b/handwritten/pubsub/test/tracing.ts index 7689253ad437..81053b738a17 100644 --- a/handwritten/pubsub/test/tracing.ts +++ b/handwritten/pubsub/test/tracing.ts @@ -19,6 +19,13 @@ import { InMemorySpanExporter, SimpleSpanProcessor, } from '@opentelemetry/sdk-trace-base'; +import {context, propagation} from '@opentelemetry/api'; +import { + CompositePropagator, + W3CTraceContextPropagator, + W3CBaggagePropagator, +} from '@opentelemetry/core'; +import {AsyncLocalStorageContextManager} from '@opentelemetry/context-async-hooks'; /** * This file is used to initialise a global tracing provider and span exporter @@ -38,4 +45,11 @@ import { export const exporter: InMemorySpanExporter = new InMemorySpanExporter(); export const provider: BasicTracerProvider = new BasicTracerProvider(); provider.addSpanProcessor(new SimpleSpanProcessor(exporter)); +const contextManager = new AsyncLocalStorageContextManager(); +context.setGlobalContextManager(contextManager); provider.register(); +propagation.setGlobalPropagator( + new CompositePropagator({ + propagators: [new W3CTraceContextPropagator(), new W3CBaggagePropagator()], + }), +);