Skip to content

Commit dfd7798

Browse files
constanca-mFiery-Fenix
authored andcommitted
[extension/awslogs_encoding] Add support for cloudwatch logs coming from subscription filter (open-telemetry#38821)
<!--Ex. Fixing a bug - Describe the bug and how this fixes the issue. Ex. Adding a feature - Explain what this achieves.--> #### Description Add support for [cloudwatch logs coming from subscription filter](https://docs.aws.amazon.com/AmazonCloudWatch/latest/logs/SubscriptionFilters.html). <!-- Issue number (e.g. open-telemetry#1234) or full URL to issue, if applicable. --> #### Link to tracking issue open-telemetry#38820 <!--Describe what testing was performed and which tests were added.--> #### Testing There are unit tests for new code. <!--Describe the documentation added.--> #### Documentation Comments and unit tests should be enough. <!--Please delete paragraphs that you did not use before submitting.-->
1 parent 07fb080 commit dfd7798

File tree

11 files changed

+423
-23
lines changed

11 files changed

+423
-23
lines changed
Lines changed: 27 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,27 @@
1+
# Use this changelog template to create an entry for release notes.
2+
3+
# One of 'breaking', 'deprecation', 'new_component', 'enhancement', 'bug_fix'
4+
change_type: enhancement
5+
6+
# The name of the component, or a single word describing the area of concern, (e.g. filelogreceiver)
7+
component: awslogsencodingextension
8+
9+
# A brief description of the change. Surround your text with quotes ("") if it needs to start with a backtick (`).
10+
note: Add support for cloudwatch logs coming from subscription filters.
11+
12+
# Mandatory: One or more tracking issues related to the change. You can use the PR number here if no issue exists.
13+
issues: [38820]
14+
15+
# (Optional) One or more lines of additional information to render under the primary note.
16+
# These lines will be padded with 2 spaces and then inserted directly into the document.
17+
# Use pipe (|) for multiline entries.
18+
subtext:
19+
20+
# If your change doesn't affect end users or the exported elements of any package,
21+
# you should instead start your pull request title with [chore] or use the "Skip Changelog" label.
22+
# Optional: The change log or logs in which this entry should be included.
23+
# e.g. '[user]' or '[user, api]'
24+
# Include 'user' if the change is relevant to end users.
25+
# Include 'api' if there is a change to a library API.
26+
# Default: '[user]'
27+
change_logs: [user]

extension/encoding/awslogsencodingextension/cloudwatchlogssubscriptionfilter.go

Lines changed: 0 additions & 17 deletions
This file was deleted.

extension/encoding/awslogsencodingextension/extension.go

Lines changed: 7 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -12,6 +12,7 @@ import (
1212
"go.opentelemetry.io/collector/pdata/plog"
1313

1414
"github.com/open-telemetry/opentelemetry-collector-contrib/extension/encoding"
15+
subscriptionfilter "github.com/open-telemetry/opentelemetry-collector-contrib/extension/encoding/awslogsencodingextension/internal/unmarshaler/subscription-filter"
1516
)
1617

1718
var _ encoding.LogsUnmarshalerExtension = (*encodingExtension)(nil)
@@ -21,12 +22,12 @@ type encodingExtension struct {
2122
format string
2223
}
2324

24-
func newExtension(cfg *Config, _ extension.Settings) (*encodingExtension, error) {
25+
func newExtension(cfg *Config, settings extension.Settings) (*encodingExtension, error) {
2526
switch cfg.Format {
2627
case formatCloudWatchLogsSubscriptionFilter:
2728
return &encodingExtension{
28-
unmarshaler: cloudWatchLogsSubscriptionFilterUnmarshaler{},
29-
format: cfg.Format,
29+
unmarshaler: subscriptionfilter.NewSubscriptionFilterUnmarshaler(settings.BuildInfo),
30+
format: formatCloudWatchLogsSubscriptionFilter,
3031
}, nil
3132
default:
3233
// Format will have been validated by Config.Validate,
@@ -44,10 +45,10 @@ func (*encodingExtension) Shutdown(_ context.Context) error {
4445
return nil
4546
}
4647

47-
func (e *encodingExtension) UnmarshalLogs(record []byte) (plog.Logs, error) {
48-
logs, err := e.unmarshaler.UnmarshalLogs(record)
48+
func (e *encodingExtension) UnmarshalLogs(buf []byte) (plog.Logs, error) {
49+
logs, err := e.unmarshaler.UnmarshalLogs(buf)
4950
if err != nil {
50-
return plog.Logs{}, fmt.Errorf("failed to unmarshal logs as '%s' format: %w", e.format, err)
51+
return plog.Logs{}, fmt.Errorf("failed to unmarshal logs as %q format: %w", e.format, err)
5152
}
5253
return logs, nil
5354
}

extension/encoding/awslogsencodingextension/extension_test.go

Lines changed: 3 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -15,6 +15,9 @@ func TestNew_CloudWatchLogsSubscriptionFilter(t *testing.T) {
1515
e, err := newExtension(&Config{Format: formatCloudWatchLogsSubscriptionFilter}, extensiontest.NewNopSettings(extensiontest.NopType))
1616
require.NoError(t, err)
1717
require.NotNil(t, e)
18+
19+
_, err = e.UnmarshalLogs([]byte("invalid"))
20+
require.ErrorContains(t, err, `failed to unmarshal logs as "cloudwatch_logs_subscription_filter" format`)
1821
}
1922

2023
func TestNew_Unimplemented(t *testing.T) {

extension/encoding/awslogsencodingextension/go.mod

Lines changed: 14 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -3,7 +3,12 @@ module github.com/open-telemetry/opentelemetry-collector-contrib/extension/encod
33
go 1.23.0
44

55
require (
6+
github.com/aws/aws-lambda-go v1.47.0
7+
github.com/goccy/go-json v0.10.5
8+
github.com/klauspost/compress v1.18.0
69
github.com/open-telemetry/opentelemetry-collector-contrib/extension/encoding v0.122.0
10+
github.com/open-telemetry/opentelemetry-collector-contrib/pkg/golden v0.122.0
11+
github.com/open-telemetry/opentelemetry-collector-contrib/pkg/pdatatest v0.122.0
712
github.com/stretchr/testify v1.10.0
813
go.opentelemetry.io/collector/component v1.28.2-0.20250319144947-41a9ea7f7402
914
go.opentelemetry.io/collector/component/componenttest v0.122.2-0.20250319144947-41a9ea7f7402
@@ -12,10 +17,12 @@ require (
1217
go.opentelemetry.io/collector/extension v1.28.2-0.20250319144947-41a9ea7f7402
1318
go.opentelemetry.io/collector/extension/extensiontest v0.122.2-0.20250319144947-41a9ea7f7402
1419
go.opentelemetry.io/collector/pdata v1.28.2-0.20250319144947-41a9ea7f7402
20+
go.opentelemetry.io/collector/semconv v0.122.2-0.20250319144947-41a9ea7f7402
1521
go.uber.org/goleak v1.3.0
1622
)
1723

1824
require (
25+
github.com/cespare/xxhash/v2 v2.3.0 // indirect
1926
github.com/davecgh/go-spew v1.1.2-0.20180830191138-d8f796af33cc // indirect
2027
github.com/go-logr/logr v1.4.2 // indirect
2128
github.com/go-logr/stdr v1.2.2 // indirect
@@ -31,6 +38,7 @@ require (
3138
github.com/mitchellh/reflectwalk v1.0.2 // indirect
3239
github.com/modern-go/concurrent v0.0.0-20180306012644-bacd9c7ef1dd // indirect
3340
github.com/modern-go/reflect2 v1.0.2 // indirect
41+
github.com/open-telemetry/opentelemetry-collector-contrib/pkg/pdatautil v0.122.0 // indirect
3442
github.com/pmezard/go-difflib v1.0.1-0.20181226105442-5d4384ee4fb2 // indirect
3543
go.opentelemetry.io/auto/sdk v1.1.0 // indirect
3644
go.opentelemetry.io/collector/featuregate v1.28.2-0.20250319144947-41a9ea7f7402 // indirect
@@ -52,3 +60,9 @@ require (
5260
)
5361

5462
replace github.com/open-telemetry/opentelemetry-collector-contrib/extension/encoding => ../
63+
64+
replace github.com/open-telemetry/opentelemetry-collector-contrib/pkg/pdatautil => ../../../pkg/pdatautil
65+
66+
replace github.com/open-telemetry/opentelemetry-collector-contrib/pkg/pdatatest => ../../../pkg/pdatatest
67+
68+
replace github.com/open-telemetry/opentelemetry-collector-contrib/pkg/golden => ../../../pkg/golden

extension/encoding/awslogsencodingextension/go.sum

Lines changed: 10 additions & 0 deletions
Some generated files are not rendered by default. Learn more about customizing how changed files appear on GitHub.
Lines changed: 130 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,130 @@
1+
// Copyright The OpenTelemetry Authors
2+
// SPDX-License-Identifier: Apache-2.0
3+
4+
package subscriptionfilter // import "github.com/open-telemetry/opentelemetry-collector-contrib/extension/encoding/awslogsencodingextension/internal/unmarshaler/subscription-filter"
5+
6+
import (
7+
"bytes"
8+
"errors"
9+
"fmt"
10+
"sync"
11+
"time"
12+
13+
"github.com/aws/aws-lambda-go/events"
14+
gojson "github.com/goccy/go-json"
15+
"github.com/klauspost/compress/gzip"
16+
"go.opentelemetry.io/collector/component"
17+
"go.opentelemetry.io/collector/pdata/pcommon"
18+
"go.opentelemetry.io/collector/pdata/plog"
19+
conventions "go.opentelemetry.io/collector/semconv/v1.27.0"
20+
21+
"github.com/open-telemetry/opentelemetry-collector-contrib/extension/encoding/awslogsencodingextension/internal/metadata"
22+
)
23+
24+
var (
25+
errEmptyOwner = errors.New("cloudwatch log with message type 'DATA_MESSAGE' has empty owner field")
26+
errEmptyLogGroup = errors.New("cloudwatch log with message type 'DATA_MESSAGE' has empty log group field")
27+
errEmptyLogStream = errors.New("cloudwatch log with message type 'DATA_MESSAGE' has empty log stream field")
28+
)
29+
30+
func validateLog(log events.CloudwatchLogsData) error {
31+
switch log.MessageType {
32+
case "DATA_MESSAGE":
33+
if log.Owner == "" {
34+
return errEmptyOwner
35+
}
36+
if log.LogGroup == "" {
37+
return errEmptyLogGroup
38+
}
39+
if log.LogStream == "" {
40+
return errEmptyLogStream
41+
}
42+
case "CONTROL_MESSAGE":
43+
default:
44+
return fmt.Errorf("cloudwatch log has invalid message type %q", log.MessageType)
45+
}
46+
return nil
47+
}
48+
49+
type subscriptionFilterUnmarshaler struct {
50+
buildInfo component.BuildInfo
51+
52+
// Pool the gzip readers, which are expensive to create.
53+
gzipPool sync.Pool
54+
}
55+
56+
func NewSubscriptionFilterUnmarshaler(buildInfo component.BuildInfo) plog.Unmarshaler {
57+
return &subscriptionFilterUnmarshaler{
58+
buildInfo: buildInfo,
59+
gzipPool: sync.Pool{},
60+
}
61+
}
62+
63+
var _ plog.Unmarshaler = (*subscriptionFilterUnmarshaler)(nil)
64+
65+
// UnmarshalLogs deserializes the given record as CloudWatch Logs events
66+
// into a plog.Logs, grouping logs by owner (account ID), log group, and
67+
// log stream. Logs are assumed to be gzip-compressed as specified at
68+
// https://docs.aws.amazon.com/firehose/latest/dev/writing-with-cloudwatch-logs.html.
69+
func (f *subscriptionFilterUnmarshaler) UnmarshalLogs(compressedRecord []byte) (plog.Logs, error) {
70+
var errDecompress error
71+
gzipReader, ok := f.gzipPool.Get().(*gzip.Reader)
72+
if !ok {
73+
gzipReader, errDecompress = gzip.NewReader(bytes.NewReader(compressedRecord))
74+
} else {
75+
errDecompress = gzipReader.Reset(bytes.NewReader(compressedRecord))
76+
}
77+
if errDecompress != nil {
78+
if gzipReader != nil {
79+
f.gzipPool.Put(gzipReader)
80+
}
81+
return plog.Logs{}, fmt.Errorf("failed to decompress record: %w", errDecompress)
82+
}
83+
defer func() {
84+
_ = gzipReader.Close()
85+
f.gzipPool.Put(gzipReader)
86+
}()
87+
88+
var cwLog events.CloudwatchLogsData
89+
decoder := gojson.NewDecoder(gzipReader)
90+
if err := decoder.Decode(&cwLog); err != nil {
91+
return plog.Logs{}, fmt.Errorf("failed to decode decompressed record: %w", err)
92+
}
93+
94+
if cwLog.MessageType == "CONTROL_MESSAGE" {
95+
return plog.Logs{}, nil
96+
}
97+
98+
if err := validateLog(cwLog); err != nil {
99+
return plog.Logs{}, fmt.Errorf("invalid cloudwatch log: %w", err)
100+
}
101+
102+
return f.createLogs(cwLog), nil
103+
}
104+
105+
// createLogs create plog.Logs from the cloudwatchLog
106+
func (f *subscriptionFilterUnmarshaler) createLogs(
107+
cwLog events.CloudwatchLogsData,
108+
) plog.Logs {
109+
logs := plog.NewLogs()
110+
rl := logs.ResourceLogs().AppendEmpty()
111+
resourceAttrs := rl.Resource().Attributes()
112+
resourceAttrs.PutStr(conventions.AttributeCloudProvider, conventions.AttributeCloudProviderAWS)
113+
resourceAttrs.PutStr(conventions.AttributeCloudAccountID, cwLog.Owner)
114+
resourceAttrs.PutEmptySlice(conventions.AttributeAWSLogGroupNames).AppendEmpty().SetStr(cwLog.LogGroup)
115+
resourceAttrs.PutEmptySlice(conventions.AttributeAWSLogStreamNames).AppendEmpty().SetStr(cwLog.LogStream)
116+
resourceAttrs.PutStr(conventions.AttributeAWSLogGroupNames, cwLog.LogGroup)
117+
resourceAttrs.PutStr(conventions.AttributeAWSLogStreamNames, cwLog.LogStream)
118+
119+
sl := rl.ScopeLogs().AppendEmpty()
120+
sl.Scope().SetName(metadata.ScopeName)
121+
sl.Scope().SetVersion(f.buildInfo.Version)
122+
for _, event := range cwLog.LogEvents {
123+
logRecord := sl.LogRecords().AppendEmpty()
124+
// pcommon.Timestamp is a time specified as UNIX Epoch time in nanoseconds
125+
// but timestamp in cloudwatch logs are in milliseconds.
126+
logRecord.SetTimestamp(pcommon.Timestamp(event.Timestamp * int64(time.Millisecond)))
127+
logRecord.Body().SetStr(event.Message)
128+
}
129+
return logs
130+
}

0 commit comments

Comments
 (0)