1717import { promisify , promisifyAll } from '@google-cloud/promisify' ;
1818import * as extend from 'extend' ;
1919import { CallOptions } from 'google-gax' ;
20+ import { Span } from '@opentelemetry/api' ;
2021
2122import { BatchPublishOptions } from './message-batch' ;
2223import { Queue , OrderedQueue } from './message-queues' ;
2324import { Topic } from '../topic' ;
2425import { RequestCallback , EmptyCallback } from '../pubsub' ;
2526import { google } from '../../protos/protos' ;
2627import { defaultOptions } from '../default-options' ;
28+ import { OpenTelemetryTracer } from '../opentelemetry-tracing' ;
2729
2830export type PubsubMessage = google . pubsub . v1 . IPubsubMessage ;
2931
@@ -37,6 +39,7 @@ export interface PublishOptions {
3739 batching ?: BatchPublishOptions ;
3840 gaxOpts ?: CallOptions ;
3941 messageOrdering ?: boolean ;
42+ enableOpenTelemetryTracing ?: boolean ;
4043}
4144
4245/**
@@ -72,11 +75,16 @@ export class Publisher {
7275 settings ! : PublishOptions ;
7376 queue : Queue ;
7477 orderedQueues : Map < string , OrderedQueue > ;
78+ tracing : OpenTelemetryTracer | undefined ;
7579 constructor ( topic : Topic , options ?: PublishOptions ) {
7680 this . setOptions ( options ) ;
7781 this . topic = topic ;
7882 this . queue = new Queue ( this ) ;
7983 this . orderedQueues = new Map ( ) ;
84+ this . tracing =
85+ this . settings && this . settings . enableOpenTelemetryTracing
86+ ? new OpenTelemetryTracer ( )
87+ : undefined ;
8088 }
8189
8290 flush ( ) : Promise < void > ;
@@ -162,8 +170,13 @@ export class Publisher {
162170 }
163171 }
164172
173+ const span : Span | undefined = this . constructSpan ( message ) ;
174+
165175 if ( ! message . orderingKey ) {
166176 this . queue . add ( message , callback ) ;
177+ if ( span ) {
178+ span . end ( ) ;
179+ }
167180 return ;
168181 }
169182
@@ -177,6 +190,10 @@ export class Publisher {
177190
178191 const queue = this . orderedQueues . get ( key ) ! ;
179192 queue . add ( message , callback ) ;
193+
194+ if ( span ) {
195+ span . end ( ) ;
196+ }
180197 }
181198 /**
182199 * Indicates to the publisher that it is safe to continue publishing for the
@@ -211,13 +228,19 @@ export class Publisher {
211228 gaxOpts : {
212229 isBundling : false ,
213230 } ,
231+ enableOpenTelemetryTracing : false ,
214232 } ;
215233
216- const { batching, gaxOpts, messageOrdering} = extend (
217- true ,
218- defaults ,
219- options
220- ) ;
234+ const {
235+ batching,
236+ gaxOpts,
237+ messageOrdering,
238+ enableOpenTelemetryTracing,
239+ } = extend ( true , defaults , options ) ;
240+
241+ this . tracing = enableOpenTelemetryTracing
242+ ? new OpenTelemetryTracer ( )
243+ : undefined ;
221244
222245 this . settings = {
223246 batching : {
@@ -227,11 +250,45 @@ export class Publisher {
227250 } ,
228251 gaxOpts,
229252 messageOrdering,
253+ enableOpenTelemetryTracing,
230254 } ;
231255 }
256+
257+ /**
258+ * Constructs an OpenTelemetry span
259+ *
260+ * @private
261+ *
262+ * @param {PubsubMessage } message The message to create a span for
263+ */
264+ constructSpan ( message : PubsubMessage ) : Span | undefined {
265+ const spanAttributes = {
266+ data : message . data ,
267+ } ;
268+ const span : Span | undefined = this . tracing
269+ ? this . tracing . createSpan ( `${ this . topic . name } publisher` , spanAttributes )
270+ : undefined ;
271+ if ( span ) {
272+ if (
273+ message . attributes &&
274+ message . attributes [ 'googclient_OpenTelemetrySpanContext' ]
275+ ) {
276+ console . warn (
277+ 'googclient_OpenTelemetrySpanContext key set as message attribute, but will be overridden.'
278+ ) ;
279+ }
280+ if ( ! message . attributes ) {
281+ message . attributes = { } ;
282+ }
283+ message . attributes [
284+ 'googclient_OpenTelemetrySpanContext'
285+ ] = JSON . stringify ( span . context ( ) ) ;
286+ }
287+ return span ;
288+ }
232289}
233290
234291promisifyAll ( Publisher , {
235292 singular : true ,
236- exclude : [ 'publish' , 'setOptions' ] ,
293+ exclude : [ 'publish' , 'setOptions' , 'constructSpan' ] ,
237294} ) ;
0 commit comments