Skip to content

Commit d750438

Browse files
samikshya-dbclaude
andcommitted
Add statement execution hooks for telemetry collection
This commit completes the telemetry implementation by adding hooks to QueryContext and ExecContext methods to collect actual metrics. Changes: - Export BeforeExecute(), AfterExecute(), CompleteStatement() methods in telemetry.Interceptor for use by driver package - Add telemetry hooks to connection.QueryContext(): - Call BeforeExecute() with statement ID from operation handle GUID - Use defer to call AfterExecute() and CompleteStatement() - Add telemetry hooks to connection.ExecContext(): - Call BeforeExecute() with statement ID from operation handle GUID - Use defer to call AfterExecute() and CompleteStatement() - Handle both err and stagingErr for proper error reporting - Update DESIGN.md: - Mark Phase 6 as completed (all checklist items) - Add statement execution hooks to Phase 7 checklist Testing: - All 99 telemetry tests passing - All driver tests passing (58.576s) - No breaking changes to existing functionality This enables end-to-end telemetry collection from statement execution through aggregation and export to the Databricks telemetry service. Co-Authored-By: Claude Sonnet 4.5 <noreply@anthropic.com>
1 parent 7771cfd commit d750438

File tree

3 files changed

+72
-39
lines changed

3 files changed

+72
-39
lines changed

connection.go

Lines changed: 26 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -126,6 +126,21 @@ func (c *conn) ExecContext(ctx context.Context, query string, args []driver.Name
126126
log, ctx = client.LoggerAndContext(ctx, exStmtResp)
127127
stagingErr := c.execStagingOperation(exStmtResp, ctx)
128128

129+
// Telemetry: track statement execution
130+
var statementID string
131+
if c.telemetry != nil && exStmtResp != nil && exStmtResp.OperationHandle != nil && exStmtResp.OperationHandle.OperationId != nil {
132+
statementID = client.SprintGuid(exStmtResp.OperationHandle.OperationId.GUID)
133+
ctx = c.telemetry.BeforeExecute(ctx, statementID)
134+
defer func() {
135+
finalErr := err
136+
if stagingErr != nil {
137+
finalErr = stagingErr
138+
}
139+
c.telemetry.AfterExecute(ctx, finalErr)
140+
c.telemetry.CompleteStatement(ctx, statementID, finalErr != nil)
141+
}()
142+
}
143+
129144
if exStmtResp != nil && exStmtResp.OperationHandle != nil {
130145
// since we have an operation handle we can close the operation if necessary
131146
alreadyClosed := exStmtResp.DirectResults != nil && exStmtResp.DirectResults.CloseOperation != nil
@@ -171,6 +186,17 @@ func (c *conn) QueryContext(ctx context.Context, query string, args []driver.Nam
171186
log, ctx = client.LoggerAndContext(ctx, exStmtResp)
172187
defer log.Duration(msg, start)
173188

189+
// Telemetry: track statement execution
190+
var statementID string
191+
if c.telemetry != nil && exStmtResp != nil && exStmtResp.OperationHandle != nil && exStmtResp.OperationHandle.OperationId != nil {
192+
statementID = client.SprintGuid(exStmtResp.OperationHandle.OperationId.GUID)
193+
ctx = c.telemetry.BeforeExecute(ctx, statementID)
194+
defer func() {
195+
c.telemetry.AfterExecute(ctx, err)
196+
c.telemetry.CompleteStatement(ctx, statementID, err != nil)
197+
}()
198+
}
199+
174200
if err != nil {
175201
log.Err(err).Msg("databricks: failed to run query") // To log query we need to redact credentials
176202
return nil, dbsqlerrint.NewExecutionError(ctx, dbsqlerr.ErrQueryExecution, err, opStatusResp)

telemetry/DESIGN.md

Lines changed: 34 additions & 31 deletions
Original file line numberDiff line numberDiff line change
@@ -2069,34 +2069,34 @@ func BenchmarkInterceptor_Disabled(b *testing.B) {
20692069
- [x] Test server error handling
20702070
- [x] Test unreachable server scenarios
20712071

2072-
### Phase 6: Collection & Aggregation (PECOBLR-1381)
2073-
- [ ] Implement `interceptor.go` for metric collection
2074-
- [ ] Implement beforeExecute() and afterExecute() hooks
2075-
- [ ] Implement context-based metric tracking with metricContext
2076-
- [ ] Implement latency measurement (startTime, latencyMs calculation)
2077-
- [ ] Add tag collection methods (addTag)
2078-
- [ ] Implement error swallowing with panic recovery
2079-
- [ ] Implement `aggregator.go` for batching
2080-
- [ ] Implement statement-level aggregation (statementMetrics)
2081-
- [ ] Implement batch size and flush interval logic
2082-
- [ ] Implement background flush goroutine (flushLoop)
2083-
- [ ] Add thread-safe metric recording
2084-
- [ ] Implement completeStatement() for final aggregation
2085-
- [ ] Implement error classification in `errors.go`
2086-
- [ ] Implement error type classification (terminal vs retryable)
2087-
- [ ] Implement HTTP status code classification
2088-
- [ ] Add error pattern matching
2089-
- [ ] Implement isTerminalError() function
2090-
- [ ] Update `client.go` to integrate aggregator
2091-
- [ ] Wire up aggregator with exporter
2092-
- [ ] Implement background flush timer
2093-
- [ ] Update start() and close() methods
2094-
- [ ] Add unit tests for collection and aggregation
2095-
- [ ] Test interceptor metric collection and latency tracking
2096-
- [ ] Test aggregation logic
2097-
- [ ] Test batch flushing (size-based and time-based)
2098-
- [ ] Test error classification
2099-
- [ ] Test client with aggregator integration
2072+
### Phase 6: Collection & Aggregation (PECOBLR-1381) ✅ COMPLETED
2073+
- [x] Implement `interceptor.go` for metric collection
2074+
- [x] Implement beforeExecute() and afterExecute() hooks
2075+
- [x] Implement context-based metric tracking with metricContext
2076+
- [x] Implement latency measurement (startTime, latencyMs calculation)
2077+
- [x] Add tag collection methods (addTag)
2078+
- [x] Implement error swallowing with panic recovery
2079+
- [x] Implement `aggregator.go` for batching
2080+
- [x] Implement statement-level aggregation (statementMetrics)
2081+
- [x] Implement batch size and flush interval logic
2082+
- [x] Implement background flush goroutine (flushLoop)
2083+
- [x] Add thread-safe metric recording
2084+
- [x] Implement completeStatement() for final aggregation
2085+
- [x] Implement error classification in `errors.go`
2086+
- [x] Implement error type classification (terminal vs retryable)
2087+
- [x] Implement HTTP status code classification
2088+
- [x] Add error pattern matching
2089+
- [x] Implement isTerminalError() function
2090+
- [x] Update `client.go` to integrate aggregator
2091+
- [x] Wire up aggregator with exporter
2092+
- [x] Implement background flush timer
2093+
- [x] Update start() and close() methods
2094+
- [x] Add unit tests for collection and aggregation
2095+
- [x] Test interceptor metric collection and latency tracking
2096+
- [x] Test aggregation logic
2097+
- [x] Test batch flushing (size-based and time-based)
2098+
- [x] Test error classification
2099+
- [x] Test client with aggregator integration
21002100

21012101
### Phase 7: Driver Integration ✅ COMPLETED
21022102
- [x] Add telemetry initialization to `connection.go`
@@ -2120,9 +2120,12 @@ func BenchmarkInterceptor_Disabled(b *testing.B) {
21202120
- [x] Test compilation with telemetry
21212121
- [x] Test no breaking changes to existing tests
21222122
- [x] Test graceful handling when disabled
2123-
2124-
Note: Statement execution hooks (beforeExecute/afterExecute in statement.go) for
2125-
actual metric collection can be added as follow-up enhancement.
2123+
- [x] Statement execution hooks
2124+
- [x] Add beforeExecute() hook to QueryContext
2125+
- [x] Add afterExecute() and completeStatement() hooks to QueryContext
2126+
- [x] Add beforeExecute() hook to ExecContext
2127+
- [x] Add afterExecute() and completeStatement() hooks to ExecContext
2128+
- [x] Use operation handle GUID as statement ID
21262129

21272130
### Phase 8: Testing & Validation
21282131
- [ ] Run benchmark tests

telemetry/interceptor.go

Lines changed: 12 additions & 8 deletions
Original file line numberDiff line numberDiff line change
@@ -44,9 +44,10 @@ func getMetricContext(ctx context.Context) *metricContext {
4444
return nil
4545
}
4646

47-
// beforeExecute is called before statement execution.
47+
// BeforeExecute is called before statement execution.
4848
// Returns a new context with metric tracking attached.
49-
func (i *Interceptor) beforeExecute(ctx context.Context, statementID string) context.Context {
49+
// Exported for use by the driver package.
50+
func (i *Interceptor) BeforeExecute(ctx context.Context, statementID string) context.Context {
5051
if !i.enabled {
5152
return ctx
5253
}
@@ -60,9 +61,10 @@ func (i *Interceptor) beforeExecute(ctx context.Context, statementID string) con
6061
return withMetricContext(ctx, mc)
6162
}
6263

63-
// afterExecute is called after statement execution.
64+
// AfterExecute is called after statement execution.
6465
// Records the metric with timing and error information.
65-
func (i *Interceptor) afterExecute(ctx context.Context, err error) {
66+
// Exported for use by the driver package.
67+
func (i *Interceptor) AfterExecute(ctx context.Context, err error) {
6668
if !i.enabled {
6769
return
6870
}
@@ -96,8 +98,9 @@ func (i *Interceptor) afterExecute(ctx context.Context, err error) {
9698
i.aggregator.recordMetric(ctx, metric)
9799
}
98100

99-
// addTag adds a tag to the current metric context.
100-
func (i *Interceptor) addTag(ctx context.Context, key string, value interface{}) {
101+
// AddTag adds a tag to the current metric context.
102+
// Exported for use by the driver package.
103+
func (i *Interceptor) AddTag(ctx context.Context, key string, value interface{}) {
101104
if !i.enabled {
102105
return
103106
}
@@ -129,8 +132,9 @@ func (i *Interceptor) recordConnection(ctx context.Context, tags map[string]inte
129132
i.aggregator.recordMetric(ctx, metric)
130133
}
131134

132-
// completeStatement marks a statement as complete and flushes aggregated metrics.
133-
func (i *Interceptor) completeStatement(ctx context.Context, statementID string, failed bool) {
135+
// CompleteStatement marks a statement as complete and flushes aggregated metrics.
136+
// Exported for use by the driver package.
137+
func (i *Interceptor) CompleteStatement(ctx context.Context, statementID string, failed bool) {
134138
if !i.enabled {
135139
return
136140
}

0 commit comments

Comments
 (0)