Skip to content

Commit 887ca37

Browse files
committed
[webhookeventreceiver] add option to include headers as attributes
1 parent 55ca3c5 commit 887ca37

File tree

6 files changed

+162
-13
lines changed

6 files changed

+162
-13
lines changed
Lines changed: 30 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,30 @@
1+
# Use this changelog template to create an entry for release notes.
2+
3+
# One of 'breaking', 'deprecation', 'new_component', 'enhancement', 'bug_fix'
4+
change_type: enhancement
5+
6+
# The name of the component, or a single word describing the area of concern, (e.g. filelogreceiver)
7+
component: receiver/webhookeventreceiver
8+
9+
# A brief description of the change. Surround your text with quotes ("") if it needs to start with a backtick (`).
10+
note: Add option to include headers as log attributes
11+
12+
# Mandatory: One or more tracking issues related to the change. You can use the PR number here if no issue exists.
13+
issues: []
14+
15+
# (Optional) One or more lines of additional information to render under the primary note.
16+
# These lines will be padded with 2 spaces and then inserted directly into the document.
17+
# Use pipe (|) for multiline entries.
18+
subtext: |
19+
Adds new `convert_headers_to_attributes` option. If set, all headers, with the exception of the
20+
required header (if also enabled) will be added as log attributes. Header names are normalized
21+
to snake_case and then prefixed with the namespace `header`.
22+
23+
# If your change doesn't affect end users or the exported elements of any package,
24+
# you should instead start your pull request title with [chore] or use the "Skip Changelog" label.
25+
# Optional: The change log or logs in which this entry should be included.
26+
# e.g. '[user]' or '[user, api]'
27+
# Include 'user' if the change is relevant to end users.
28+
# Include 'api' if there is a change to a library API.
29+
# Default: '[user]'
30+
change_logs: [user]

receiver/webhookeventreceiver/README.md

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -31,6 +31,7 @@ The following settings are optional:
3131
* `required_header` (optional):
3232
* `key` (required if `required_header` config option is set): Represents the key portion of the required header.
3333
* `value` (required if `required_header` config option is set): Represents the value portion of the required header.
34+
* `convert_headers_to_attributes` (optional): add all request headers (excluding `required_header` if also set) log attributes
3435

3536
Example:
3637
```yaml

receiver/webhookeventreceiver/config.go

Lines changed: 7 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -20,12 +20,13 @@ var (
2020

2121
// Config defines configuration for the Generic Webhook receiver.
2222
type Config struct {
23-
confighttp.ServerConfig `mapstructure:",squash"` // squash ensures fields are correctly decoded in embedded struct
24-
ReadTimeout string `mapstructure:"read_timeout"` // wait time for reading request headers in ms. Default is 500ms.
25-
WriteTimeout string `mapstructure:"write_timeout"` // wait time for writing request response in ms. Default is 500ms.
26-
Path string `mapstructure:"path"` // path for data collection. Default is /events
27-
HealthPath string `mapstructure:"health_path"` // path for health check api. Default is /health_check
28-
RequiredHeader RequiredHeader `mapstructure:"required_header"` // optional setting to set a required header for all requests to have
23+
confighttp.ServerConfig `mapstructure:",squash"` // squash ensures fields are correctly decoded in embedded struct
24+
ReadTimeout string `mapstructure:"read_timeout"` // wait time for reading request headers in ms. Default is 500ms.
25+
WriteTimeout string `mapstructure:"write_timeout"` // wait time for writing request response in ms. Default is 500ms.
26+
Path string `mapstructure:"path"` // path for data collection. Default is /events
27+
HealthPath string `mapstructure:"health_path"` // path for health check api. Default is /health_check
28+
RequiredHeader RequiredHeader `mapstructure:"required_header"` // optional setting to set a required header for all requests to have
29+
ConvertHeadersToAttributes bool `mapstructure:"convert_headers_to_attributes"` // optional to convert all headers to attributes
2930
}
3031

3132
type RequiredHeader struct {

receiver/webhookeventreceiver/receiver.go

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -191,7 +191,7 @@ func (er *eventReceiver) handleReq(w http.ResponseWriter, r *http.Request, _ htt
191191

192192
// send body into a scanner and then convert the request body into a log
193193
sc := bufio.NewScanner(bodyReader)
194-
ld, numLogs := reqToLog(sc, r.URL.Query(), er.cfg, er.settings)
194+
ld, numLogs := reqToLog(sc, r.Header, r.URL.Query(), er.cfg, er.settings)
195195
consumerErr := er.logConsumer.ConsumeLogs(ctx, ld)
196196

197197
_ = bodyReader.Close()

receiver/webhookeventreceiver/req_to_log.go

Lines changed: 22 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -5,7 +5,10 @@ package webhookeventreceiver // import "github.com/open-telemetry/opentelemetry-
55

66
import (
77
"bufio"
8+
"net/http"
9+
"net/textproto"
810
"net/url"
11+
"strings"
912
"time"
1013

1114
"go.opentelemetry.io/collector/pdata/pcommon"
@@ -16,8 +19,9 @@ import (
1619
)
1720

1821
func reqToLog(sc *bufio.Scanner,
22+
headers http.Header,
1923
query url.Values,
20-
_ *Config,
24+
config *Config,
2125
settings receiver.Settings,
2226
) (plog.Logs, int) {
2327
// we simply dont split the data passed into scan (i.e. scan the whole thing)
@@ -41,6 +45,9 @@ func reqToLog(sc *bufio.Scanner,
4145
scopeLog.Scope().SetVersion(settings.BuildInfo.Version)
4246
scopeLog.Scope().Attributes().PutStr("source", settings.ID.String())
4347
scopeLog.Scope().Attributes().PutStr("receiver", metadata.Type.String())
48+
if config.ConvertHeadersToAttributes {
49+
appendHeaders(config, scopeLog, headers)
50+
}
4451

4552
for sc.Scan() {
4653
logRecord := scopeLog.LogRecords().AppendEmpty()
@@ -60,3 +67,17 @@ func appendMetadata(resourceLog plog.ResourceLogs, query url.Values) {
6067
}
6168
}
6269
}
70+
71+
// append headers as attributes
72+
func appendHeaders(config *Config, scopeLog plog.ScopeLogs, headers http.Header) {
73+
for k := range headers {
74+
// Skip the required header used for authentication
75+
if k == textproto.CanonicalMIMEHeaderKey(config.RequiredHeader.Key) {
76+
continue
77+
}
78+
// store headers with "header" namespace and normalize key to snake_case
79+
normalizedHeader := strings.ReplaceAll(k, "-", "_")
80+
normalizedHeader = strings.ToLower(normalizedHeader)
81+
scopeLog.Scope().Attributes().PutStr("header."+normalizedHeader, strings.Join(headers.Values(k), ";"))
82+
}
83+
}

receiver/webhookeventreceiver/req_to_log_test.go

Lines changed: 101 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -8,6 +8,8 @@ import (
88
"bytes"
99
"io"
1010
"log"
11+
"net/http"
12+
"net/textproto"
1113
"net/url"
1214
"testing"
1315

@@ -21,10 +23,12 @@ func TestReqToLog(t *testing.T) {
2123
defaultConfig := createDefaultConfig().(*Config)
2224

2325
tests := []struct {
24-
desc string
25-
sc *bufio.Scanner
26-
query url.Values
27-
tt func(t *testing.T, reqLog plog.Logs, reqLen int, settings receiver.Settings)
26+
desc string
27+
sc *bufio.Scanner
28+
headers http.Header
29+
query url.Values
30+
config *Config
31+
tt func(t *testing.T, reqLog plog.Logs, reqLen int, settings receiver.Settings)
2832
}{
2933
{
3034
desc: "Valid query valid event",
@@ -110,11 +114,103 @@ func TestReqToLog(t *testing.T) {
110114
require.Equal(t, 2, scopeLogsScope.Attributes().Len())
111115
},
112116
},
117+
{
118+
desc: "Headers not added by default",
119+
headers: http.Header{
120+
textproto.CanonicalMIMEHeaderKey("X-Foo"): []string{"1"},
121+
textproto.CanonicalMIMEHeaderKey("X-Bar"): []string{"2"},
122+
},
123+
sc: func() *bufio.Scanner {
124+
reader := io.NopCloser(bytes.NewReader([]byte("this is a: log")))
125+
return bufio.NewScanner(reader)
126+
}(),
127+
tt: func(t *testing.T, reqLog plog.Logs, reqLen int, _ receiver.Settings) {
128+
require.Equal(t, 1, reqLen)
129+
130+
attributes := reqLog.ResourceLogs().At(0).Resource().Attributes()
131+
require.Equal(t, 0, attributes.Len())
132+
133+
scopeLogsScope := reqLog.ResourceLogs().At(0).ScopeLogs().At(0).Scope()
134+
require.Equal(t, 2, scopeLogsScope.Attributes().Len()) // expect no additional attributes even though headers are set
135+
},
136+
},
137+
{
138+
desc: "Headers added if ConvertHeadersToAttributes enabled",
139+
headers: http.Header{
140+
textproto.CanonicalMIMEHeaderKey("X-Foo"): []string{"1"},
141+
textproto.CanonicalMIMEHeaderKey("X-Bar"): []string{"2"},
142+
},
143+
config: &Config{
144+
Path: defaultPath,
145+
HealthPath: defaultHealthPath,
146+
ReadTimeout: defaultReadTimeout,
147+
WriteTimeout: defaultWriteTimeout,
148+
ConvertHeadersToAttributes: true,
149+
},
150+
sc: func() *bufio.Scanner {
151+
reader := io.NopCloser(bytes.NewReader([]byte("this is a: log")))
152+
return bufio.NewScanner(reader)
153+
}(),
154+
tt: func(t *testing.T, reqLog plog.Logs, reqLen int, _ receiver.Settings) {
155+
require.Equal(t, 1, reqLen)
156+
157+
attributes := reqLog.ResourceLogs().At(0).Resource().Attributes()
158+
require.Equal(t, 0, attributes.Len())
159+
160+
scopeLogsScope := reqLog.ResourceLogs().At(0).ScopeLogs().At(0).Scope()
161+
require.Equal(t, 4, scopeLogsScope.Attributes().Len()) // expect no additional attributes even though headers are set
162+
v, exists := scopeLogsScope.Attributes().Get("header.x_foo")
163+
require.True(t, exists)
164+
require.Equal(t, "1", v.AsString())
165+
v, exists = scopeLogsScope.Attributes().Get("header.x_bar")
166+
require.True(t, exists)
167+
require.Equal(t, "2", v.AsString())
168+
},
169+
},
170+
{
171+
desc: "Required header skipped",
172+
headers: http.Header{
173+
textproto.CanonicalMIMEHeaderKey("X-Foo"): []string{"1"},
174+
textproto.CanonicalMIMEHeaderKey("X-Bar"): []string{"2"},
175+
textproto.CanonicalMIMEHeaderKey("X-Required-Header"): []string{"password"},
176+
},
177+
config: &Config{
178+
Path: defaultPath,
179+
HealthPath: defaultHealthPath,
180+
ReadTimeout: defaultReadTimeout,
181+
WriteTimeout: defaultWriteTimeout,
182+
RequiredHeader: RequiredHeader{Key: "X-Required-Header", Value: "password"},
183+
ConvertHeadersToAttributes: true,
184+
},
185+
sc: func() *bufio.Scanner {
186+
reader := io.NopCloser(bytes.NewReader([]byte("this is a: log")))
187+
return bufio.NewScanner(reader)
188+
}(),
189+
tt: func(t *testing.T, reqLog plog.Logs, reqLen int, _ receiver.Settings) {
190+
require.Equal(t, 1, reqLen)
191+
192+
attributes := reqLog.ResourceLogs().At(0).Resource().Attributes()
193+
require.Equal(t, 0, attributes.Len())
194+
195+
scopeLogsScope := reqLog.ResourceLogs().At(0).ScopeLogs().At(0).Scope()
196+
require.Equal(t, 4, scopeLogsScope.Attributes().Len()) // expect no additional attributes even though headers are set
197+
_, exists := scopeLogsScope.Attributes().Get("header.x_foo")
198+
require.True(t, exists)
199+
_, exists = scopeLogsScope.Attributes().Get("header.x_bar")
200+
require.True(t, exists)
201+
_, exists = scopeLogsScope.Attributes().Get("header.x_required_header")
202+
require.False(t, exists)
203+
},
204+
},
113205
}
114206

115207
for _, test := range tests {
116208
t.Run(test.desc, func(t *testing.T) {
117-
reqLog, reqLen := reqToLog(test.sc, test.query, defaultConfig, receivertest.NewNopSettings())
209+
testConfig := defaultConfig
210+
if test.config != nil {
211+
testConfig = test.config
212+
}
213+
reqLog, reqLen := reqToLog(test.sc, test.headers, test.query, testConfig, receivertest.NewNopSettings())
118214
test.tt(t, reqLog, reqLen, receivertest.NewNopSettings())
119215
})
120216
}

0 commit comments

Comments
 (0)