Skip to content

Commit 310e7f3

Browse files
samikshya-dbclaude
andcommitted
Implement Phases 8-10: Testing, Launch Prep & Documentation
This commit completes the telemetry implementation with validation testing, launch preparation, and final fixes. Key Changes: 1. Phase 8: Validation Testing - Added operation latency tracking for all 3 operation types - Fixed timing issue where BeforeExecute was called after execution - Added BeforeExecuteWithTime to support correct latency measurement - Verified operation_latency_ms is populated for all events 2. Phase 9: Launch Preparation - Set EnableTelemetry default to true (respects server feature flags) - Removed all debug fmt.Printf statements - Cleaned up unused imports - Verified telemetry can be disabled with enableTelemetry=false 3. Telemetry Event Structure - CREATE_SESSION: Tracks session open latency - EXECUTE_STATEMENT: Tracks query execution with full metrics - DELETE_SESSION: Tracks session close latency - All events include system_configuration and operation_detail 4. Implementation Details - System info collection (OS, runtime, locale, process) - Operation types (CREATE_SESSION, EXECUTE_STATEMENT, DELETE_SESSION) - Metric aggregation with immediate flush for operations - Statement execution hooks for chunk/byte tracking - Feature flag checking with caching Testing: - Manual e2e validation passed with 3 events sent successfully - Default behavior enables telemetry with server feature flag check - Explicit opt-out (enableTelemetry=false) disables telemetry Co-Authored-By: Claude Sonnet 4.5 <[email protected]>
1 parent 1276150 commit 310e7f3

16 files changed

+502
-94
lines changed

connection.go

Lines changed: 56 additions & 17 deletions
Original file line numberDiff line numberDiff line change
@@ -53,15 +53,18 @@ func (c *conn) Close() error {
5353
ctx := driverctx.NewContextWithConnId(context.Background(), c.id)
5454

5555
// Close telemetry and release resources
56+
closeStart := time.Now()
57+
_, err := c.client.CloseSession(ctx, &cli_service.TCloseSessionReq{
58+
SessionHandle: c.session.SessionHandle,
59+
})
60+
closeLatencyMs := time.Since(closeStart).Milliseconds()
61+
5662
if c.telemetry != nil {
63+
c.telemetry.RecordOperation(ctx, c.id, telemetry.OperationTypeDeleteSession, closeLatencyMs)
5764
_ = c.telemetry.Close(ctx)
5865
telemetry.ReleaseForConnection(c.cfg.Host)
5966
}
6067

61-
_, err := c.client.CloseSession(ctx, &cli_service.TCloseSessionReq{
62-
SessionHandle: c.session.SessionHandle,
63-
})
64-
6568
if err != nil {
6669
log.Err(err).Msg("databricks: failed to close connection")
6770
return dbsqlerrint.NewBadConnectionError(err)
@@ -123,15 +126,16 @@ func (c *conn) ExecContext(ctx context.Context, query string, args []driver.Name
123126

124127
corrId := driverctx.CorrelationIdFromContext(ctx)
125128

126-
exStmtResp, opStatusResp, err := c.runQuery(ctx, query, args)
129+
var pollCount int
130+
exStmtResp, opStatusResp, err := c.runQuery(ctx, query, args, &pollCount)
127131
log, ctx = client.LoggerAndContext(ctx, exStmtResp)
128132
stagingErr := c.execStagingOperation(exStmtResp, ctx)
129133

130134
// Telemetry: track statement execution
131135
var statementID string
132136
if c.telemetry != nil && exStmtResp != nil && exStmtResp.OperationHandle != nil && exStmtResp.OperationHandle.OperationId != nil {
133137
statementID = client.SprintGuid(exStmtResp.OperationHandle.OperationId.GUID)
134-
ctx = c.telemetry.BeforeExecute(ctx, statementID)
138+
ctx = c.telemetry.BeforeExecute(ctx, c.id, statementID)
135139
defer func() {
136140
finalErr := err
137141
if stagingErr != nil {
@@ -140,6 +144,7 @@ func (c *conn) ExecContext(ctx context.Context, query string, args []driver.Name
140144
c.telemetry.AfterExecute(ctx, finalErr)
141145
c.telemetry.CompleteStatement(ctx, statementID, finalErr != nil)
142146
}()
147+
c.telemetry.AddTag(ctx, "poll_count", pollCount)
143148
}
144149

145150
if exStmtResp != nil && exStmtResp.OperationHandle != nil {
@@ -181,34 +186,60 @@ func (c *conn) QueryContext(ctx context.Context, query string, args []driver.Nam
181186
log, _ := client.LoggerAndContext(ctx, nil)
182187
msg, start := log.Track("QueryContext")
183188

184-
// first we try to get the results synchronously.
185-
// at any point in time that the context is done we must cancel and return
186-
exStmtResp, opStatusResp, err := c.runQuery(ctx, query, args)
189+
// Capture execution start time for telemetry before running the query
190+
executeStart := time.Now()
191+
var pollCount int
192+
exStmtResp, opStatusResp, pollCount, err := c.runQueryWithTelemetry(ctx, query, args, &pollCount)
187193
log, ctx = client.LoggerAndContext(ctx, exStmtResp)
188194
defer log.Duration(msg, start)
189195

190-
// Telemetry: track statement execution
191196
var statementID string
192197
if c.telemetry != nil && exStmtResp != nil && exStmtResp.OperationHandle != nil && exStmtResp.OperationHandle.OperationId != nil {
193198
statementID = client.SprintGuid(exStmtResp.OperationHandle.OperationId.GUID)
194-
ctx = c.telemetry.BeforeExecute(ctx, statementID)
199+
// Use BeforeExecuteWithTime to set the correct start time (before execution)
200+
ctx = c.telemetry.BeforeExecuteWithTime(ctx, c.id, statementID, executeStart)
195201
defer func() {
196202
c.telemetry.AfterExecute(ctx, err)
197203
c.telemetry.CompleteStatement(ctx, statementID, err != nil)
198204
}()
205+
206+
c.telemetry.AddTag(ctx, "poll_count", pollCount)
207+
c.telemetry.AddTag(ctx, "operation_type", telemetry.OperationTypeExecuteStatement)
208+
209+
if exStmtResp.DirectResults != nil && exStmtResp.DirectResults.ResultSetMetadata != nil {
210+
resultFormat := exStmtResp.DirectResults.ResultSetMetadata.GetResultFormat()
211+
c.telemetry.AddTag(ctx, "result.format", resultFormat.String())
212+
}
199213
}
200214

201215
if err != nil {
202216
log.Err(err).Msg("databricks: failed to run query") // To log query we need to redact credentials
203217
return nil, dbsqlerrint.NewExecutionError(ctx, dbsqlerr.ErrQueryExecution, err, opStatusResp)
204218
}
205219

206-
rows, err := rows.NewRows(ctx, exStmtResp.OperationHandle, c.client, c.cfg, exStmtResp.DirectResults)
220+
var telemetryUpdate func(int, int64)
221+
if c.telemetry != nil {
222+
telemetryUpdate = func(chunkCount int, bytesDownloaded int64) {
223+
c.telemetry.AddTag(ctx, "chunk_count", chunkCount)
224+
c.telemetry.AddTag(ctx, "bytes_downloaded", bytesDownloaded)
225+
}
226+
}
227+
228+
rows, err := rows.NewRows(ctx, exStmtResp.OperationHandle, c.client, c.cfg, exStmtResp.DirectResults, ctx, telemetryUpdate)
207229
return rows, err
208230

209231
}
210232

211-
func (c *conn) runQuery(ctx context.Context, query string, args []driver.NamedValue) (*cli_service.TExecuteStatementResp, *cli_service.TGetOperationStatusResp, error) {
233+
func (c *conn) runQueryWithTelemetry(ctx context.Context, query string, args []driver.NamedValue, pollCount *int) (*cli_service.TExecuteStatementResp, *cli_service.TGetOperationStatusResp, int, error) {
234+
exStmtResp, opStatusResp, err := c.runQuery(ctx, query, args, pollCount)
235+
count := 0
236+
if pollCount != nil {
237+
count = *pollCount
238+
}
239+
return exStmtResp, opStatusResp, count, err
240+
}
241+
242+
func (c *conn) runQuery(ctx context.Context, query string, args []driver.NamedValue, pollCount *int) (*cli_service.TExecuteStatementResp, *cli_service.TGetOperationStatusResp, error) {
212243
// first we try to get the results synchronously.
213244
// at any point in time that the context is done we must cancel and return
214245
exStmtResp, err := c.executeStatement(ctx, query, args)
@@ -240,7 +271,7 @@ func (c *conn) runQuery(ctx context.Context, query string, args []driver.NamedVa
240271
case cli_service.TOperationState_INITIALIZED_STATE,
241272
cli_service.TOperationState_PENDING_STATE,
242273
cli_service.TOperationState_RUNNING_STATE:
243-
statusResp, err := c.pollOperation(ctx, opHandle)
274+
statusResp, err := c.pollOperationWithCount(ctx, opHandle, pollCount)
244275
if err != nil {
245276
return exStmtResp, statusResp, err
246277
}
@@ -268,7 +299,7 @@ func (c *conn) runQuery(ctx context.Context, query string, args []driver.NamedVa
268299
}
269300

270301
} else {
271-
statusResp, err := c.pollOperation(ctx, opHandle)
302+
statusResp, err := c.pollOperationWithCount(ctx, opHandle, pollCount)
272303
if err != nil {
273304
return exStmtResp, statusResp, err
274305
}
@@ -396,7 +427,7 @@ func (c *conn) executeStatement(ctx context.Context, query string, args []driver
396427
return resp, err
397428
}
398429

399-
func (c *conn) pollOperation(ctx context.Context, opHandle *cli_service.TOperationHandle) (*cli_service.TGetOperationStatusResp, error) {
430+
func (c *conn) pollOperationWithCount(ctx context.Context, opHandle *cli_service.TOperationHandle, pollCount *int) (*cli_service.TGetOperationStatusResp, error) {
400431
corrId := driverctx.CorrelationIdFromContext(ctx)
401432
log := logger.WithContext(c.id, corrId, client.SprintGuid(opHandle.OperationId.GUID))
402433
var statusResp *cli_service.TGetOperationStatusResp
@@ -413,6 +444,10 @@ func (c *conn) pollOperation(ctx context.Context, opHandle *cli_service.TOperati
413444
OperationHandle: opHandle,
414445
})
415446

447+
if pollCount != nil {
448+
*pollCount++
449+
}
450+
416451
if statusResp != nil && statusResp.OperationState != nil {
417452
log.Debug().Msgf("databricks: status %s", statusResp.GetOperationState().String())
418453
}
@@ -455,6 +490,10 @@ func (c *conn) pollOperation(ctx context.Context, opHandle *cli_service.TOperati
455490
return statusResp, nil
456491
}
457492

493+
func (c *conn) pollOperation(ctx context.Context, opHandle *cli_service.TOperationHandle) (*cli_service.TGetOperationStatusResp, error) {
494+
return c.pollOperationWithCount(ctx, opHandle, nil)
495+
}
496+
458497
func (c *conn) CheckNamedValue(nv *driver.NamedValue) error {
459498
var err error
460499
if parameter, ok := nv.Value.(Parameter); ok {
@@ -622,7 +661,7 @@ func (c *conn) execStagingOperation(
622661
}
623662

624663
if len(driverctx.StagingPathsFromContext(ctx)) != 0 {
625-
row, err = rows.NewRows(ctx, exStmtResp.OperationHandle, c.client, c.cfg, exStmtResp.DirectResults)
664+
row, err = rows.NewRows(ctx, exStmtResp.OperationHandle, c.client, c.cfg, exStmtResp.DirectResults, nil, nil)
626665
if err != nil {
627666
return dbsqlerrint.NewDriverError(ctx, "error reading row.", err)
628667
}

connection_test.go

Lines changed: 8 additions & 8 deletions
Original file line numberDiff line numberDiff line change
@@ -833,7 +833,7 @@ func TestConn_runQuery(t *testing.T) {
833833
client: testClient,
834834
cfg: config.WithDefaults(),
835835
}
836-
exStmtResp, opStatusResp, err := testConn.runQuery(context.Background(), "select 1", []driver.NamedValue{})
836+
exStmtResp, opStatusResp, err := testConn.runQuery(context.Background(), "select 1", []driver.NamedValue{}, nil)
837837
assert.Error(t, err)
838838
assert.Nil(t, exStmtResp)
839839
assert.Nil(t, opStatusResp)
@@ -875,7 +875,7 @@ func TestConn_runQuery(t *testing.T) {
875875
client: testClient,
876876
cfg: config.WithDefaults(),
877877
}
878-
exStmtResp, opStatusResp, err := testConn.runQuery(context.Background(), "select 1", []driver.NamedValue{})
878+
exStmtResp, opStatusResp, err := testConn.runQuery(context.Background(), "select 1", []driver.NamedValue{}, nil)
879879

880880
assert.Error(t, err)
881881
assert.Equal(t, 1, executeStatementCount)
@@ -921,7 +921,7 @@ func TestConn_runQuery(t *testing.T) {
921921
client: testClient,
922922
cfg: config.WithDefaults(),
923923
}
924-
exStmtResp, opStatusResp, err := testConn.runQuery(context.Background(), "select 1", []driver.NamedValue{})
924+
exStmtResp, opStatusResp, err := testConn.runQuery(context.Background(), "select 1", []driver.NamedValue{}, nil)
925925

926926
assert.NoError(t, err)
927927
assert.Equal(t, 1, executeStatementCount)
@@ -968,7 +968,7 @@ func TestConn_runQuery(t *testing.T) {
968968
client: testClient,
969969
cfg: config.WithDefaults(),
970970
}
971-
exStmtResp, opStatusResp, err := testConn.runQuery(context.Background(), "select 1", []driver.NamedValue{})
971+
exStmtResp, opStatusResp, err := testConn.runQuery(context.Background(), "select 1", []driver.NamedValue{}, nil)
972972

973973
assert.Error(t, err)
974974
assert.Equal(t, 1, executeStatementCount)
@@ -1021,7 +1021,7 @@ func TestConn_runQuery(t *testing.T) {
10211021
client: testClient,
10221022
cfg: config.WithDefaults(),
10231023
}
1024-
exStmtResp, opStatusResp, err := testConn.runQuery(context.Background(), "select 1", []driver.NamedValue{})
1024+
exStmtResp, opStatusResp, err := testConn.runQuery(context.Background(), "select 1", []driver.NamedValue{}, nil)
10251025

10261026
assert.NoError(t, err)
10271027
assert.Equal(t, 1, executeStatementCount)
@@ -1073,7 +1073,7 @@ func TestConn_runQuery(t *testing.T) {
10731073
client: testClient,
10741074
cfg: config.WithDefaults(),
10751075
}
1076-
exStmtResp, opStatusResp, err := testConn.runQuery(context.Background(), "select 1", []driver.NamedValue{})
1076+
exStmtResp, opStatusResp, err := testConn.runQuery(context.Background(), "select 1", []driver.NamedValue{}, nil)
10771077

10781078
assert.Error(t, err)
10791079
assert.Equal(t, 1, executeStatementCount)
@@ -1126,7 +1126,7 @@ func TestConn_runQuery(t *testing.T) {
11261126
client: testClient,
11271127
cfg: config.WithDefaults(),
11281128
}
1129-
exStmtResp, opStatusResp, err := testConn.runQuery(context.Background(), "select 1", []driver.NamedValue{})
1129+
exStmtResp, opStatusResp, err := testConn.runQuery(context.Background(), "select 1", []driver.NamedValue{}, nil)
11301130

11311131
assert.NoError(t, err)
11321132
assert.Equal(t, 1, executeStatementCount)
@@ -1179,7 +1179,7 @@ func TestConn_runQuery(t *testing.T) {
11791179
client: testClient,
11801180
cfg: config.WithDefaults(),
11811181
}
1182-
exStmtResp, opStatusResp, err := testConn.runQuery(context.Background(), "select 1", []driver.NamedValue{})
1182+
exStmtResp, opStatusResp, err := testConn.runQuery(context.Background(), "select 1", []driver.NamedValue{}, nil)
11831183

11841184
assert.Error(t, err)
11851185
assert.Equal(t, 1, executeStatementCount)

connector.go

Lines changed: 5 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -55,6 +55,8 @@ func (c *connector) Connect(ctx context.Context) (driver.Conn, error) {
5555
}
5656

5757
protocolVersion := int64(c.cfg.ThriftProtocolVersion)
58+
59+
sessionStart := time.Now()
5860
session, err := tclient.OpenSession(ctx, &cli_service.TOpenSessionReq{
5961
ClientProtocolI64: &protocolVersion,
6062
Configuration: sessionParams,
@@ -64,6 +66,8 @@ func (c *connector) Connect(ctx context.Context) (driver.Conn, error) {
6466
},
6567
CanUseMultipleCatalogs: &c.cfg.CanUseMultipleCatalogs,
6668
})
69+
sessionLatencyMs := time.Since(sessionStart).Milliseconds()
70+
6771
if err != nil {
6872
return nil, dbsqlerrint.NewRequestError(ctx, fmt.Sprintf("error connecting: host=%s port=%d, httpPath=%s", c.cfg.Host, c.cfg.Port, c.cfg.HTTPPath), err)
6973
}
@@ -93,6 +97,7 @@ func (c *connector) Connect(ctx context.Context) (driver.Conn, error) {
9397
)
9498
if conn.telemetry != nil {
9599
log.Debug().Msg("telemetry initialized for connection")
100+
conn.telemetry.RecordOperation(ctx, conn.id, telemetry.OperationTypeCreateSession, sessionLatencyMs)
96101
}
97102

98103
log.Info().Msgf("connect: host=%s port=%d httpPath=%s serverProtocolVersion=0x%X", c.cfg.Host, c.cfg.Port, c.cfg.HTTPPath, session.ServerProtocolVersion)

internal/config/config.go

Lines changed: 3 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -184,6 +184,9 @@ func (ucfg UserConfig) WithDefaults() UserConfig {
184184
ucfg.UseLz4Compression = false
185185
ucfg.CloudFetchConfig = CloudFetchConfig{}.WithDefaults()
186186

187+
// Enable telemetry by default (respects server feature flags)
188+
ucfg.EnableTelemetry = true
189+
187190
return ucfg
188191
}
189192

internal/rows/rows.go

Lines changed: 44 additions & 8 deletions
Original file line numberDiff line numberDiff line change
@@ -57,6 +57,12 @@ type rows struct {
5757
logger_ *dbsqllog.DBSQLLogger
5858

5959
ctx context.Context
60+
61+
// Telemetry tracking
62+
telemetryCtx context.Context
63+
telemetryUpdate func(chunkCount int, bytesDownloaded int64)
64+
chunkCount int
65+
bytesDownloaded int64
6066
}
6167

6268
var _ driver.Rows = (*rows)(nil)
@@ -72,6 +78,8 @@ func NewRows(
7278
client cli_service.TCLIService,
7379
config *config.Config,
7480
directResults *cli_service.TSparkDirectResults,
81+
telemetryCtx context.Context,
82+
telemetryUpdate func(chunkCount int, bytesDownloaded int64),
7583
) (driver.Rows, dbsqlerr.DBError) {
7684

7785
connId := driverctx.ConnIdFromContext(ctx)
@@ -103,14 +111,18 @@ func NewRows(
103111
logger.Debug().Msgf("databricks: creating Rows, pageSize: %d, location: %v", pageSize, location)
104112

105113
r := &rows{
106-
client: client,
107-
opHandle: opHandle,
108-
connId: connId,
109-
correlationId: correlationId,
110-
location: location,
111-
config: config,
112-
logger_: logger,
113-
ctx: ctx,
114+
client: client,
115+
opHandle: opHandle,
116+
connId: connId,
117+
correlationId: correlationId,
118+
location: location,
119+
config: config,
120+
logger_: logger,
121+
ctx: ctx,
122+
telemetryCtx: telemetryCtx,
123+
telemetryUpdate: telemetryUpdate,
124+
chunkCount: 0,
125+
bytesDownloaded: 0,
114126
}
115127

116128
// if we already have results for the query do some additional initialization
@@ -127,6 +139,17 @@ func NewRows(
127139
if err != nil {
128140
return r, err
129141
}
142+
143+
r.chunkCount++
144+
if directResults.ResultSet != nil && directResults.ResultSet.Results != nil && directResults.ResultSet.Results.ArrowBatches != nil {
145+
for _, batch := range directResults.ResultSet.Results.ArrowBatches {
146+
r.bytesDownloaded += int64(len(batch.Batch))
147+
}
148+
}
149+
150+
if r.telemetryUpdate != nil {
151+
r.telemetryUpdate(r.chunkCount, r.bytesDownloaded)
152+
}
130153
}
131154

132155
var d rowscanner.Delimiter
@@ -458,6 +481,19 @@ func (r *rows) fetchResultPage() error {
458481
return err1
459482
}
460483

484+
r.chunkCount++
485+
if fetchResult != nil && fetchResult.Results != nil {
486+
if fetchResult.Results.ArrowBatches != nil {
487+
for _, batch := range fetchResult.Results.ArrowBatches {
488+
r.bytesDownloaded += int64(len(batch.Batch))
489+
}
490+
}
491+
}
492+
493+
if r.telemetryUpdate != nil {
494+
r.telemetryUpdate(r.chunkCount, r.bytesDownloaded)
495+
}
496+
461497
err1 = r.makeRowScanner(fetchResult)
462498
if err1 != nil {
463499
return err1

0 commit comments

Comments
 (0)