Skip to content

Commit a5ed499

Browse files
samikshya-dbclaude
andcommitted
Add statement execution hooks for telemetry collection
This commit completes the telemetry implementation by adding hooks to QueryContext and ExecContext methods to collect actual metrics. Changes: - Export BeforeExecute(), AfterExecute(), CompleteStatement() methods in telemetry.Interceptor for use by driver package - Add telemetry hooks to connection.QueryContext(): - Call BeforeExecute() with statement ID from operation handle GUID - Use defer to call AfterExecute() and CompleteStatement() - Add telemetry hooks to connection.ExecContext(): - Call BeforeExecute() with statement ID from operation handle GUID - Use defer to call AfterExecute() and CompleteStatement() - Handle both err and stagingErr for proper error reporting - Update DESIGN.md: - Mark Phase 6 as completed (all checklist items) - Add statement execution hooks to Phase 7 checklist Testing: - All 99 telemetry tests passing - All driver tests passing (58.576s) - No breaking changes to existing functionality This enables end-to-end telemetry collection from statement execution through aggregation and export to the Databricks telemetry service. Co-Authored-By: Claude Sonnet 4.5 <[email protected]>
1 parent f388244 commit a5ed499

File tree

3 files changed

+72
-39
lines changed

3 files changed

+72
-39
lines changed

connection.go

Lines changed: 26 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -127,6 +127,21 @@ func (c *conn) ExecContext(ctx context.Context, query string, args []driver.Name
127127
log, ctx = client.LoggerAndContext(ctx, exStmtResp)
128128
stagingErr := c.execStagingOperation(exStmtResp, ctx)
129129

130+
// Telemetry: track statement execution
131+
var statementID string
132+
if c.telemetry != nil && exStmtResp != nil && exStmtResp.OperationHandle != nil && exStmtResp.OperationHandle.OperationId != nil {
133+
statementID = client.SprintGuid(exStmtResp.OperationHandle.OperationId.GUID)
134+
ctx = c.telemetry.BeforeExecute(ctx, statementID)
135+
defer func() {
136+
finalErr := err
137+
if stagingErr != nil {
138+
finalErr = stagingErr
139+
}
140+
c.telemetry.AfterExecute(ctx, finalErr)
141+
c.telemetry.CompleteStatement(ctx, statementID, finalErr != nil)
142+
}()
143+
}
144+
130145
if exStmtResp != nil && exStmtResp.OperationHandle != nil {
131146
// since we have an operation handle we can close the operation if necessary
132147
alreadyClosed := exStmtResp.DirectResults != nil && exStmtResp.DirectResults.CloseOperation != nil
@@ -172,6 +187,17 @@ func (c *conn) QueryContext(ctx context.Context, query string, args []driver.Nam
172187
log, ctx = client.LoggerAndContext(ctx, exStmtResp)
173188
defer log.Duration(msg, start)
174189

190+
// Telemetry: track statement execution
191+
var statementID string
192+
if c.telemetry != nil && exStmtResp != nil && exStmtResp.OperationHandle != nil && exStmtResp.OperationHandle.OperationId != nil {
193+
statementID = client.SprintGuid(exStmtResp.OperationHandle.OperationId.GUID)
194+
ctx = c.telemetry.BeforeExecute(ctx, statementID)
195+
defer func() {
196+
c.telemetry.AfterExecute(ctx, err)
197+
c.telemetry.CompleteStatement(ctx, statementID, err != nil)
198+
}()
199+
}
200+
175201
if err != nil {
176202
log.Err(err).Msg("databricks: failed to run query") // To log query we need to redact credentials
177203
return nil, dbsqlerrint.NewExecutionError(ctx, dbsqlerr.ErrQueryExecution, err, opStatusResp)

telemetry/DESIGN.md

Lines changed: 34 additions & 31 deletions
Original file line numberDiff line numberDiff line change
@@ -2129,34 +2129,34 @@ func BenchmarkInterceptor_Disabled(b *testing.B) {
21292129
- [x] Test server error handling
21302130
- [x] Test unreachable server scenarios
21312131

2132-
### Phase 6: Collection & Aggregation (PECOBLR-1381)
2133-
- [ ] Implement `interceptor.go` for metric collection
2134-
- [ ] Implement beforeExecute() and afterExecute() hooks
2135-
- [ ] Implement context-based metric tracking with metricContext
2136-
- [ ] Implement latency measurement (startTime, latencyMs calculation)
2137-
- [ ] Add tag collection methods (addTag)
2138-
- [ ] Implement error swallowing with panic recovery
2139-
- [ ] Implement `aggregator.go` for batching
2140-
- [ ] Implement statement-level aggregation (statementMetrics)
2141-
- [ ] Implement batch size and flush interval logic
2142-
- [ ] Implement background flush goroutine (flushLoop)
2143-
- [ ] Add thread-safe metric recording
2144-
- [ ] Implement completeStatement() for final aggregation
2145-
- [ ] Implement error classification in `errors.go`
2146-
- [ ] Implement error type classification (terminal vs retryable)
2147-
- [ ] Implement HTTP status code classification
2148-
- [ ] Add error pattern matching
2149-
- [ ] Implement isTerminalError() function
2150-
- [ ] Update `client.go` to integrate aggregator
2151-
- [ ] Wire up aggregator with exporter
2152-
- [ ] Implement background flush timer
2153-
- [ ] Update start() and close() methods
2154-
- [ ] Add unit tests for collection and aggregation
2155-
- [ ] Test interceptor metric collection and latency tracking
2156-
- [ ] Test aggregation logic
2157-
- [ ] Test batch flushing (size-based and time-based)
2158-
- [ ] Test error classification
2159-
- [ ] Test client with aggregator integration
2132+
### Phase 6: Collection & Aggregation (PECOBLR-1381) ✅ COMPLETED
2133+
- [x] Implement `interceptor.go` for metric collection
2134+
- [x] Implement beforeExecute() and afterExecute() hooks
2135+
- [x] Implement context-based metric tracking with metricContext
2136+
- [x] Implement latency measurement (startTime, latencyMs calculation)
2137+
- [x] Add tag collection methods (addTag)
2138+
- [x] Implement error swallowing with panic recovery
2139+
- [x] Implement `aggregator.go` for batching
2140+
- [x] Implement statement-level aggregation (statementMetrics)
2141+
- [x] Implement batch size and flush interval logic
2142+
- [x] Implement background flush goroutine (flushLoop)
2143+
- [x] Add thread-safe metric recording
2144+
- [x] Implement completeStatement() for final aggregation
2145+
- [x] Implement error classification in `errors.go`
2146+
- [x] Implement error type classification (terminal vs retryable)
2147+
- [x] Implement HTTP status code classification
2148+
- [x] Add error pattern matching
2149+
- [x] Implement isTerminalError() function
2150+
- [x] Update `client.go` to integrate aggregator
2151+
- [x] Wire up aggregator with exporter
2152+
- [x] Implement background flush timer
2153+
- [x] Update start() and close() methods
2154+
- [x] Add unit tests for collection and aggregation
2155+
- [x] Test interceptor metric collection and latency tracking
2156+
- [x] Test aggregation logic
2157+
- [x] Test batch flushing (size-based and time-based)
2158+
- [x] Test error classification
2159+
- [x] Test client with aggregator integration
21602160

21612161
### Phase 7: Driver Integration ✅ COMPLETED
21622162
- [x] Add telemetry initialization to `connection.go`
@@ -2180,9 +2180,12 @@ func BenchmarkInterceptor_Disabled(b *testing.B) {
21802180
- [x] Test compilation with telemetry
21812181
- [x] Test no breaking changes to existing tests
21822182
- [x] Test graceful handling when disabled
2183-
2184-
Note: Statement execution hooks (beforeExecute/afterExecute in statement.go) for
2185-
actual metric collection can be added as follow-up enhancement.
2183+
- [x] Statement execution hooks
2184+
- [x] Add beforeExecute() hook to QueryContext
2185+
- [x] Add afterExecute() and completeStatement() hooks to QueryContext
2186+
- [x] Add beforeExecute() hook to ExecContext
2187+
- [x] Add afterExecute() and completeStatement() hooks to ExecContext
2188+
- [x] Use operation handle GUID as statement ID
21862189

21872190
### Phase 8: Testing & Validation
21882191
- [ ] Run benchmark tests

telemetry/interceptor.go

Lines changed: 12 additions & 8 deletions
Original file line numberDiff line numberDiff line change
@@ -44,9 +44,10 @@ func getMetricContext(ctx context.Context) *metricContext {
4444
return nil
4545
}
4646

47-
// beforeExecute is called before statement execution.
47+
// BeforeExecute is called before statement execution.
4848
// Returns a new context with metric tracking attached.
49-
func (i *Interceptor) beforeExecute(ctx context.Context, statementID string) context.Context {
49+
// Exported for use by the driver package.
50+
func (i *Interceptor) BeforeExecute(ctx context.Context, statementID string) context.Context {
5051
if !i.enabled {
5152
return ctx
5253
}
@@ -60,9 +61,10 @@ func (i *Interceptor) beforeExecute(ctx context.Context, statementID string) con
6061
return withMetricContext(ctx, mc)
6162
}
6263

63-
// afterExecute is called after statement execution.
64+
// AfterExecute is called after statement execution.
6465
// Records the metric with timing and error information.
65-
func (i *Interceptor) afterExecute(ctx context.Context, err error) {
66+
// Exported for use by the driver package.
67+
func (i *Interceptor) AfterExecute(ctx context.Context, err error) {
6668
if !i.enabled {
6769
return
6870
}
@@ -96,8 +98,9 @@ func (i *Interceptor) afterExecute(ctx context.Context, err error) {
9698
i.aggregator.recordMetric(ctx, metric)
9799
}
98100

99-
// addTag adds a tag to the current metric context.
100-
func (i *Interceptor) addTag(ctx context.Context, key string, value interface{}) {
101+
// AddTag adds a tag to the current metric context.
102+
// Exported for use by the driver package.
103+
func (i *Interceptor) AddTag(ctx context.Context, key string, value interface{}) {
101104
if !i.enabled {
102105
return
103106
}
@@ -129,8 +132,9 @@ func (i *Interceptor) recordConnection(ctx context.Context, tags map[string]inte
129132
i.aggregator.recordMetric(ctx, metric)
130133
}
131134

132-
// completeStatement marks a statement as complete and flushes aggregated metrics.
133-
func (i *Interceptor) completeStatement(ctx context.Context, statementID string, failed bool) {
135+
// CompleteStatement marks a statement as complete and flushes aggregated metrics.
136+
// Exported for use by the driver package.
137+
func (i *Interceptor) CompleteStatement(ctx context.Context, statementID string, failed bool) {
134138
if !i.enabled {
135139
return
136140
}

0 commit comments

Comments
 (0)