Skip to content

Commit 392796e

Browse files
authored
Merge pull request #95 from Azure/guwe/azure-specific-diag
Feat: support resource-specific tables for diagnostics
2 parents 1a314a8 + e16ab5c commit 392796e

File tree

7 files changed

+1522
-95
lines changed

7 files changed

+1522
-95
lines changed

internal/components/monitor/diagnostics/handlers.go

Lines changed: 14 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -69,17 +69,27 @@ func HandleControlPlaneLogs(params map[string]interface{}, cfg *config.ConfigDat
6969
return "", err
7070
}
7171

72-
// Get workspace GUID from diagnostic settings
73-
workspaceGUID, err := ExtractWorkspaceGUIDFromDiagnosticSettings(subscriptionID, resourceGroup, clusterName, cfg)
72+
// Find the diagnostic setting that has the requested log category enabled
73+
// This handles cases where multiple diagnostic settings exist for the same cluster
74+
workspaceResourceID, isResourceSpecific, err := FindDiagnosticSettingForCategory(subscriptionID, resourceGroup, clusterName, logCategory, cfg)
75+
if err != nil {
76+
return "", fmt.Errorf("failed to find diagnostic setting for log category %s in cluster %s: %w", logCategory, clusterName, err)
77+
}
78+
79+
// Get workspace GUID from the workspace resource ID
80+
workspaceGUID, err := getWorkspaceGUID(workspaceResourceID, cfg)
7481
if err != nil {
7582
return "", fmt.Errorf("failed to get workspace GUID for cluster %s: %w", clusterName, err)
7683
}
7784

7885
// Build cluster resource ID for scoping using utility function
7986
clusterResourceID := buildClusterResourceID(subscriptionID, resourceGroup, clusterName)
8087

81-
// Build safe KQL query scoped to this specific AKS cluster
82-
kqlQuery := BuildSafeKQLQuery(logCategory, logLevel, maxRecords, clusterResourceID)
88+
// Build safe KQL query scoped to this specific AKS cluster with appropriate table mode
89+
kqlQuery, err := BuildSafeKQLQuery(logCategory, logLevel, maxRecords, clusterResourceID, isResourceSpecific)
90+
if err != nil {
91+
return "", fmt.Errorf("failed to build KQL query for cluster %s: %w", clusterName, err)
92+
}
8393

8494
// Calculate timespan for the query
8595
timespan, err := CalculateTimespan(startTime, endTime)

internal/components/monitor/diagnostics/kql.go

Lines changed: 261 additions & 26 deletions
Original file line numberDiff line numberDiff line change
@@ -2,50 +2,285 @@ package diagnostics
22

33
import (
44
"fmt"
5+
"regexp"
56
"strings"
67
"time"
78
)
89

9-
// Log level prefixes used in Kubernetes logs
10-
var logLevelPrefixes = map[string]string{
11-
"info": "I",
12-
"warning": "W",
13-
"error": "E",
10+
// LogLevelMapping defines the mapping between log levels and their representations
11+
type LogLevelMapping struct {
12+
AzureDiagnosticsPrefix string // Prefix used in Azure Diagnostics log_s field
13+
ResourceSpecificLevel string // Level value used in resource-specific tables
1414
}
1515

16-
// Audit log categories that use different log level format
16+
// logLevelMappings contains the mapping for each log level
17+
var logLevelMappings = map[string]LogLevelMapping{
18+
"info": {AzureDiagnosticsPrefix: "I", ResourceSpecificLevel: "INFO"},
19+
"warning": {AzureDiagnosticsPrefix: "W", ResourceSpecificLevel: "WARNING"},
20+
"error": {AzureDiagnosticsPrefix: "E", ResourceSpecificLevel: "ERROR"},
21+
}
22+
23+
// AuditCategories defines which log categories are audit logs (different handling)
1724
var auditCategories = map[string]bool{
1825
"kube-audit": true,
1926
"kube-audit-admin": true,
2027
}
2128

22-
// isAuditCategory checks if the given category is an audit log category
23-
func isAuditCategory(category string) bool {
24-
return auditCategories[category]
29+
// ResourceSpecificTableMapping defines the mapping from log categories to resource-specific table names
30+
var resourceSpecificTableMapping = map[string]string{
31+
"kube-audit": "AKSAudit",
32+
"kube-audit-admin": "AKSAuditAdmin",
33+
"kube-apiserver": "AKSControlPlane",
34+
"kube-controller-manager": "AKSControlPlane",
35+
"kube-scheduler": "AKSControlPlane",
36+
"cluster-autoscaler": "AKSControlPlane",
37+
"cloud-controller-manager": "AKSControlPlane",
38+
"guard": "AKSControlPlane",
39+
"csi-azuredisk-controller": "AKSControlPlane",
40+
"csi-azurefile-controller": "AKSControlPlane",
41+
"csi-snapshot-controller": "AKSControlPlane",
2542
}
2643

27-
// BuildSafeKQLQuery builds pre-validated KQL queries to prevent injection, scoped to specific AKS cluster
28-
func BuildSafeKQLQuery(category, logLevel string, maxRecords int, clusterResourceID string) string {
29-
// Convert resource ID to uppercase as it's stored in uppercase in Log Analytics
30-
upperResourceID := strings.ToUpper(clusterResourceID)
31-
baseQuery := fmt.Sprintf("AzureDiagnostics | where Category == '%s' and ResourceId == '%s'", category, upperResourceID)
32-
33-
if logLevel != "" && !isAuditCategory(category) {
34-
// For Kubernetes component logs (not audit), use the log_s prefix pattern
35-
// Kubernetes logs use format like "I0715" (Info), "W0715" (Warning), "E0715" (Error)
36-
// Audit logs don't follow this pattern, so we skip log level filtering for them
37-
if prefix, exists := logLevelPrefixes[strings.ToLower(logLevel)]; exists {
38-
baseQuery += fmt.Sprintf(" | where log_s startswith '%s'", prefix)
44+
// KQLQueryBuilder builds KQL queries for AKS control plane logs
45+
type KQLQueryBuilder struct {
46+
category string // The log category (e.g., "kube-audit", "kube-apiserver").
47+
logLevel string // The log level (e.g., "info", "warning", "error").
48+
maxRecords int // The maximum number of records to retrieve.
49+
clusterResourceID string // The resource ID of the cluster being queried.
50+
tableMode TableMode // Specifies the mode of the table being queried (e.g., AzureDiagnosticsMode or ResourceSpecificMode).
51+
selectedTable string // The name of the table selected for the query.
52+
processedResourceID string // The processed resource ID used in the query.
53+
}
54+
55+
// TableMode represents the type of table being used
56+
type TableMode int
57+
58+
const (
59+
AzureDiagnosticsMode TableMode = iota
60+
ResourceSpecificMode
61+
)
62+
63+
// ValidKQLLogLevels defines valid log levels for KQL queries
64+
var validKQLLogLevels = map[string]bool{
65+
"info": true,
66+
"warning": true,
67+
"error": true,
68+
}
69+
70+
// KQL query validation constants
71+
const (
72+
MinMaxRecords = 1
73+
MaxMaxRecords = 1000
74+
DefaultKQLMaxRecords = 100
75+
)
76+
77+
// azureResourceIDPattern matches Azure resource IDs (case-insensitive, allows test IDs)
78+
var azureResourceIDPattern = regexp.MustCompile(`(?i)^/subscriptions/[a-zA-Z0-9-]+/resourcegroups?/[^/]+/providers/microsoft\.containerservice/managedclusters/[^/]+$`)
79+
80+
// ValidateKQLQueryParams validates all parameters for KQL query builder
81+
func ValidateKQLQueryParams(category, logLevel string, maxRecords int, clusterResourceID string, tableMode TableMode) error {
82+
// Validate category (empty not allowed, but unknown categories are permitted for forward compatibility)
83+
if category == "" {
84+
return fmt.Errorf("category cannot be empty")
85+
}
86+
// Note: We allow unknown categories for forward compatibility as Azure may add new log categories
87+
88+
// Validate log level (empty is allowed)
89+
if logLevel != "" && !validKQLLogLevels[logLevel] {
90+
validLevels := make([]string, 0, len(validKQLLogLevels))
91+
for level := range validKQLLogLevels {
92+
validLevels = append(validLevels, level)
3993
}
94+
return fmt.Errorf("invalid log level '%s'. Valid levels: %s (or empty for no filtering)", logLevel, strings.Join(validLevels, ", "))
95+
}
96+
97+
// Validate maxRecords
98+
if maxRecords < MinMaxRecords {
99+
return fmt.Errorf("maxRecords must be at least %d, got %d", MinMaxRecords, maxRecords)
100+
}
101+
if maxRecords > MaxMaxRecords {
102+
return fmt.Errorf("maxRecords cannot exceed %d, got %d", MaxMaxRecords, maxRecords)
103+
}
104+
105+
// Validate clusterResourceID
106+
if clusterResourceID == "" {
107+
return fmt.Errorf("clusterResourceID cannot be empty")
108+
}
109+
if !azureResourceIDPattern.MatchString(clusterResourceID) {
110+
return fmt.Errorf("invalid clusterResourceID format. Expected format: /subscriptions/{subscription-id}/resourceGroups/{resource-group}/providers/Microsoft.ContainerService/managedClusters/{cluster-name}")
111+
}
112+
113+
// Validate tableMode
114+
if tableMode != AzureDiagnosticsMode && tableMode != ResourceSpecificMode {
115+
return fmt.Errorf("invalid tableMode. Must be AzureDiagnosticsMode (%d) or ResourceSpecificMode (%d)", AzureDiagnosticsMode, ResourceSpecificMode)
116+
}
117+
118+
return nil
119+
}
120+
121+
// NewKQLQueryBuilder creates a new KQL query builder instance
122+
func NewKQLQueryBuilder(category, logLevel string, maxRecords int, clusterResourceID string, tableMode TableMode) (*KQLQueryBuilder, error) {
123+
// Validate all input parameters
124+
if err := ValidateKQLQueryParams(category, logLevel, maxRecords, clusterResourceID, tableMode); err != nil {
125+
return nil, fmt.Errorf("invalid KQL query parameters: %w", err)
126+
}
127+
128+
return &KQLQueryBuilder{
129+
category: category,
130+
logLevel: logLevel,
131+
maxRecords: maxRecords,
132+
clusterResourceID: clusterResourceID,
133+
tableMode: tableMode,
134+
}, nil
135+
}
136+
137+
// determineTableStrategy decides which table to use and processes the resource ID accordingly
138+
func (q *KQLQueryBuilder) determineTableStrategy() error {
139+
if q.tableMode == ResourceSpecificMode {
140+
if tableName, exists := resourceSpecificTableMapping[q.category]; exists {
141+
q.selectedTable = tableName
142+
// Resource-specific tables store _ResourceId in lowercase
143+
q.processedResourceID = strings.ToLower(q.clusterResourceID)
144+
} else {
145+
// Return error for unmapped categories in resource-specific mode
146+
return fmt.Errorf("category '%s' is not supported in resource-specific mode. Supported categories: %v",
147+
q.category, getSupportedResourceSpecificCategories())
148+
}
149+
} else {
150+
q.selectedTable = "AzureDiagnostics"
151+
q.processedResourceID = strings.ToUpper(q.clusterResourceID)
152+
}
153+
return nil
154+
}
155+
156+
// getSupportedResourceSpecificCategories returns a list of supported categories for resource-specific mode
157+
func getSupportedResourceSpecificCategories() []string {
158+
categories := make([]string, 0, len(resourceSpecificTableMapping))
159+
for category := range resourceSpecificTableMapping {
160+
categories = append(categories, category)
161+
}
162+
return categories
163+
}
164+
165+
// buildBaseQuery creates the initial table selection and filtering clause
166+
func (q *KQLQueryBuilder) buildBaseQuery() (string, error) {
167+
switch q.tableMode {
168+
case ResourceSpecificMode:
169+
return fmt.Sprintf("%s | where _ResourceId == '%s'", q.selectedTable, q.processedResourceID), nil
170+
case AzureDiagnosticsMode:
171+
return fmt.Sprintf("%s | where Category == '%s' and ResourceId == '%s'", q.selectedTable, q.category, q.processedResourceID), nil
172+
default:
173+
// This should never happen if validation is working correctly
174+
return "", fmt.Errorf("unexpected table mode: %d. This indicates an internal error in query builder", q.tableMode)
175+
}
176+
}
177+
178+
// isAuditCategory checks if the current category is an audit log category
179+
func (q *KQLQueryBuilder) isAuditCategory() bool {
180+
return auditCategories[q.category]
181+
}
182+
183+
// addLogLevelFilter adds log level filtering if applicable
184+
func (q *KQLQueryBuilder) addLogLevelFilter(baseQuery string) string {
185+
// Skip log level filtering for audit categories or empty log level
186+
if q.logLevel == "" || q.isAuditCategory() {
187+
return baseQuery
40188
}
41189

42-
baseQuery += " | order by TimeGenerated desc"
43-
baseQuery += fmt.Sprintf(" | limit %d", maxRecords)
190+
mapping, exists := logLevelMappings[strings.ToLower(q.logLevel)]
191+
if !exists {
192+
return baseQuery // Unknown log level, skip filtering
193+
}
194+
195+
switch q.tableMode {
196+
case ResourceSpecificMode:
197+
return baseQuery + fmt.Sprintf(" | where Level == '%s'", mapping.ResourceSpecificLevel)
198+
case AzureDiagnosticsMode:
199+
return baseQuery + fmt.Sprintf(" | where log_s startswith '%s'", mapping.AzureDiagnosticsPrefix)
200+
default:
201+
return baseQuery
202+
}
203+
}
204+
205+
// addOrderingAndLimit adds the ordering and limit clauses
206+
func (q *KQLQueryBuilder) addOrderingAndLimit(query string) string {
207+
query += " | order by TimeGenerated desc"
208+
query += fmt.Sprintf(" | limit %d", q.maxRecords)
209+
return query
210+
}
211+
212+
// addProjection adds the appropriate field projection based on table type
213+
func (q *KQLQueryBuilder) addProjection(query string) string {
214+
switch q.tableMode {
215+
case ResourceSpecificMode:
216+
return q.addResourceSpecificProjection(query)
217+
case AzureDiagnosticsMode:
218+
return query + " | project TimeGenerated, Level, log_s"
219+
default:
220+
return query + " | project TimeGenerated, Level, log_s"
221+
}
222+
}
223+
224+
// addResourceSpecificProjection adds projection for resource-specific tables
225+
func (q *KQLQueryBuilder) addResourceSpecificProjection(query string) string {
226+
switch q.selectedTable {
227+
case "AKSAudit", "AKSAuditAdmin":
228+
// Audit tables have structured fields
229+
return query + " | project TimeGenerated, Level, AuditId, Stage, RequestUri, Verb, User"
230+
case "AKSControlPlane":
231+
// Control plane table has message field
232+
return query + " | project TimeGenerated, Category, Level, Message, PodName"
233+
default:
234+
// Fallback projection for unknown resource-specific tables
235+
return query + " | project TimeGenerated, Level, Message"
236+
}
237+
}
44238

45-
// Project only essential fields: log content, timestamp, and level
46-
baseQuery += " | project TimeGenerated, Level, log_s"
239+
// Build constructs the complete KQL query
240+
func (q *KQLQueryBuilder) Build() (string, error) {
241+
// Step 1: Determine table strategy
242+
if err := q.determineTableStrategy(); err != nil {
243+
return "", err
244+
}
245+
246+
// Step 2: Build base query with table and resource filtering
247+
query, err := q.buildBaseQuery()
248+
if err != nil {
249+
return "", err
250+
}
251+
252+
// Step 3: Add log level filtering
253+
query = q.addLogLevelFilter(query)
254+
255+
// Step 4: Add ordering and limit
256+
query = q.addOrderingAndLimit(query)
257+
258+
// Step 5: Add field projection
259+
query = q.addProjection(query)
260+
261+
return query, nil
262+
}
263+
264+
// BuildSafeKQLQuery builds pre-validated KQL queries to prevent injection, scoped to specific AKS cluster
265+
// Supports both Azure Diagnostics and Resource-specific destination tables
266+
// Returns an error if query building fails
267+
func BuildSafeKQLQuery(category, logLevel string, maxRecords int, clusterResourceID string, isResourceSpecific bool) (string, error) {
268+
tableMode := AzureDiagnosticsMode
269+
if isResourceSpecific {
270+
tableMode = ResourceSpecificMode
271+
}
272+
273+
builder, err := NewKQLQueryBuilder(category, logLevel, maxRecords, clusterResourceID, tableMode)
274+
if err != nil {
275+
return "", fmt.Errorf("failed to create KQL query builder: %w", err)
276+
}
277+
278+
query, err := builder.Build()
279+
if err != nil {
280+
return "", fmt.Errorf("failed to build KQL query: %w", err)
281+
}
47282

48-
return baseQuery
283+
return query, nil
49284
}
50285

51286
// CalculateTimespan converts start/end times to Azure CLI timespan format

0 commit comments

Comments
 (0)