Skip to content

Commit cf7f4bd

Browse files
committed
Use otlp request in wrapper, hide members in the wrapper
Tested in contrib and was able to access InternalWrapper.Orig, with this change this is no longer possible. Also removes one extra allocation for the request object when using otlp receiver/exporter since we keep the initial pointer around. Signed-off-by: Bogdan Drutu <[email protected]>
1 parent 03904de commit cf7f4bd

File tree

12 files changed

+228
-218
lines changed

12 files changed

+228
-218
lines changed

consumer/pdata/log.go

Lines changed: 24 additions & 33 deletions
Original file line numberDiff line numberDiff line change
@@ -29,56 +29,51 @@ import (
2929
// Must use NewLogs functions to create new instances.
3030
// Important: zero-initialized instance is not valid for use.
3131
type Logs struct {
32-
orig *[]*otlplogs.ResourceLogs
32+
orig *otlpcollectorlog.ExportLogsServiceRequest
3333
}
3434

3535
// NewLogs creates a new Logs.
3636
func NewLogs() Logs {
37-
orig := []*otlplogs.ResourceLogs(nil)
38-
return Logs{&orig}
37+
return Logs{orig: &otlpcollectorlog.ExportLogsServiceRequest{}}
3938
}
4039

4140
// LogsFromInternalRep creates the internal Logs representation from the ProtoBuf. Should
4241
// not be used outside this module. This is intended to be used only by OTLP exporter and
4342
// File exporter, which legitimately need to work with OTLP Protobuf structs.
44-
func LogsFromInternalRep(logs internal.OtlpLogsWrapper) Logs {
45-
return Logs{logs.Orig}
43+
func LogsFromInternalRep(logs internal.LogsWrapper) Logs {
44+
return Logs{orig: internal.LogsToOtlp(logs)}
45+
}
46+
47+
// LogsFromOtlpProtoBytes converts OTLP Collector ExportLogsServiceRequest
48+
// ProtoBuf bytes to the internal Logs.
49+
//
50+
// Returns an invalid Logs instance if error is not nil.
51+
func LogsFromOtlpProtoBytes(data []byte) (Logs, error) {
52+
req := otlpcollectorlog.ExportLogsServiceRequest{}
53+
if err := req.Unmarshal(data); err != nil {
54+
return Logs{}, err
55+
}
56+
return Logs{orig: &req}, nil
4657
}
4758

4859
// InternalRep returns internal representation of the logs. Should not be used outside
4960
// this module. This is intended to be used only by OTLP exporter and File exporter,
5061
// which legitimately need to work with OTLP Protobuf structs.
51-
func (ld Logs) InternalRep() internal.OtlpLogsWrapper {
52-
return internal.OtlpLogsWrapper{Orig: ld.orig}
62+
func (ld Logs) InternalRep() internal.LogsWrapper {
63+
return internal.LogsFromOtlp(ld.orig)
5364
}
5465

5566
// ToOtlpProtoBytes returns the internal Logs to OTLP Collector ExportTraceServiceRequest
5667
// ProtoBuf bytes. This is intended to export OTLP Protobuf bytes for OTLP/HTTP transports.
5768
func (ld Logs) ToOtlpProtoBytes() ([]byte, error) {
58-
logs := otlpcollectorlog.ExportLogsServiceRequest{
59-
ResourceLogs: *ld.orig,
60-
}
61-
return logs.Marshal()
62-
}
63-
64-
// FromOtlpProtoBytes converts OTLP Collector ExportLogsServiceRequest
65-
// ProtoBuf bytes to the internal Logs. Overrides current data.
66-
// Calling this function on zero-initialized structure causes panic.
67-
// Use it with NewLogs or on existing initialized Logs.
68-
func (ld Logs) FromOtlpProtoBytes(data []byte) error {
69-
logs := otlpcollectorlog.ExportLogsServiceRequest{}
70-
if err := logs.Unmarshal(data); err != nil {
71-
return err
72-
}
73-
*ld.orig = logs.ResourceLogs
74-
return nil
69+
return ld.orig.Marshal()
7570
}
7671

7772
// Clone returns a copy of Logs.
7873
func (ld Logs) Clone() Logs {
79-
rls := NewResourceLogsSlice()
80-
ld.ResourceLogs().CopyTo(rls)
81-
return Logs(rls)
74+
cloneLd := NewLogs()
75+
ld.ResourceLogs().CopyTo(cloneLd.ResourceLogs())
76+
return cloneLd
8277
}
8378

8479
// LogRecordCount calculates the total number of log records.
@@ -99,15 +94,11 @@ func (ld Logs) LogRecordCount() int {
9994
// SizeBytes returns the number of bytes in the internal representation of the
10095
// logs.
10196
func (ld Logs) SizeBytes() int {
102-
size := 0
103-
for i := range *ld.orig {
104-
size += (*ld.orig)[i].Size()
105-
}
106-
return size
97+
return ld.orig.Size()
10798
}
10899

109100
func (ld Logs) ResourceLogs() ResourceLogsSlice {
110-
return ResourceLogsSlice(ld)
101+
return newResourceLogsSlice(&ld.orig.ResourceLogs)
111102
}
112103

113104
// SeverityNumber is the public alias of otlplogs.SeverityNumber from internal package.

consumer/pdata/log_test.go

Lines changed: 26 additions & 19 deletions
Original file line numberDiff line numberDiff line change
@@ -21,6 +21,7 @@ import (
2121
"github.com/stretchr/testify/require"
2222

2323
"go.opentelemetry.io/collector/internal"
24+
otlpcollectorlog "go.opentelemetry.io/collector/internal/data/protogen/collector/logs/v1"
2425
otlplogs "go.opentelemetry.io/collector/internal/data/protogen/logs/v1"
2526
)
2627

@@ -48,28 +49,35 @@ func TestLogRecordCount(t *testing.T) {
4849
}
4950

5051
func TestLogRecordCountWithEmpty(t *testing.T) {
51-
assert.EqualValues(t, 0, LogsFromInternalRep(internal.LogsFromOtlp([]*otlplogs.ResourceLogs{{}})).LogRecordCount())
52-
assert.EqualValues(t, 0, LogsFromInternalRep(internal.LogsFromOtlp([]*otlplogs.ResourceLogs{
53-
{
54-
InstrumentationLibraryLogs: []*otlplogs.InstrumentationLibraryLogs{{}},
52+
assert.Zero(t, NewLogs().LogRecordCount())
53+
assert.Zero(t, LogsFromInternalRep(internal.LogsFromOtlp(&otlpcollectorlog.ExportLogsServiceRequest{
54+
ResourceLogs: []*otlplogs.ResourceLogs{{}},
55+
})).LogRecordCount())
56+
assert.Zero(t, LogsFromInternalRep(internal.LogsFromOtlp(&otlpcollectorlog.ExportLogsServiceRequest{
57+
ResourceLogs: []*otlplogs.ResourceLogs{
58+
{
59+
InstrumentationLibraryLogs: []*otlplogs.InstrumentationLibraryLogs{{}},
60+
},
5561
},
5662
})).LogRecordCount())
57-
assert.EqualValues(t, 1, LogsFromInternalRep(internal.LogsFromOtlp([]*otlplogs.ResourceLogs{
58-
{
59-
InstrumentationLibraryLogs: []*otlplogs.InstrumentationLibraryLogs{
60-
{
61-
Logs: []*otlplogs.LogRecord{{}},
63+
assert.Equal(t, 1, LogsFromInternalRep(internal.LogsFromOtlp(&otlpcollectorlog.ExportLogsServiceRequest{
64+
ResourceLogs: []*otlplogs.ResourceLogs{
65+
{
66+
InstrumentationLibraryLogs: []*otlplogs.InstrumentationLibraryLogs{
67+
{
68+
Logs: []*otlplogs.LogRecord{{}},
69+
},
6270
},
6371
},
6472
},
6573
})).LogRecordCount())
6674
}
6775

6876
func TestToFromLogProto(t *testing.T) {
69-
otlp := []*otlplogs.ResourceLogs(nil)
70-
td := LogsFromInternalRep(internal.LogsFromOtlp(otlp))
71-
assert.EqualValues(t, NewLogs(), td)
72-
assert.EqualValues(t, otlp, *td.orig)
77+
wrapper := internal.LogsFromOtlp(&otlpcollectorlog.ExportLogsServiceRequest{})
78+
ld := LogsFromInternalRep(wrapper)
79+
assert.EqualValues(t, NewLogs(), ld)
80+
assert.EqualValues(t, &otlpcollectorlog.ExportLogsServiceRequest{}, ld.orig)
7381
}
7482

7583
func TestLogsToFromOtlpProtoBytes(t *testing.T) {
@@ -78,14 +86,13 @@ func TestLogsToFromOtlpProtoBytes(t *testing.T) {
7886
bytes, err := send.ToOtlpProtoBytes()
7987
assert.NoError(t, err)
8088

81-
recv := NewLogs()
82-
err = recv.FromOtlpProtoBytes(bytes)
89+
recv, err := LogsFromOtlpProtoBytes(bytes)
8390
assert.NoError(t, err)
8491
assert.EqualValues(t, send, recv)
8592
}
8693

8794
func TestLogsFromInvalidOtlpProtoBytes(t *testing.T) {
88-
err := NewLogs().FromOtlpProtoBytes([]byte{0xFF})
95+
_, err := LogsFromOtlpProtoBytes([]byte{0xFF})
8996
assert.EqualError(t, err, "unexpected EOF")
9097
}
9198

@@ -127,8 +134,8 @@ func BenchmarkLogsFromOtlp(b *testing.B) {
127134
b.ResetTimer()
128135
b.ReportAllocs()
129136
for n := 0; n < b.N; n++ {
130-
traces := NewLogs()
131-
require.NoError(b, traces.FromOtlpProtoBytes(buf))
132-
assert.Equal(b, baseLogs.ResourceLogs().Len(), traces.ResourceLogs().Len())
137+
logs, err := LogsFromOtlpProtoBytes(buf)
138+
require.NoError(b, err)
139+
assert.Equal(b, baseLogs.ResourceLogs().Len(), logs.ResourceLogs().Len())
133140
}
134141
}

exporter/fileexporter/file_exporter.go

Lines changed: 2 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -25,7 +25,6 @@ import (
2525
"go.opentelemetry.io/collector/component"
2626
"go.opentelemetry.io/collector/consumer/pdata"
2727
"go.opentelemetry.io/collector/internal"
28-
otlplogs "go.opentelemetry.io/collector/internal/data/protogen/collector/logs/v1"
2928
otlpmetrics "go.opentelemetry.io/collector/internal/data/protogen/collector/metrics/v1"
3029
otlptrace "go.opentelemetry.io/collector/internal/data/protogen/collector/trace/v1"
3130
)
@@ -55,10 +54,8 @@ func (e *fileExporter) ConsumeMetrics(_ context.Context, md pdata.Metrics) error
5554
}
5655

5756
func (e *fileExporter) ConsumeLogs(_ context.Context, ld pdata.Logs) error {
58-
request := otlplogs.ExportLogsServiceRequest{
59-
ResourceLogs: internal.LogsToOtlp(ld.InternalRep()),
60-
}
61-
return exportMessageAsLine(e, &request)
57+
request := internal.LogsToOtlp(ld.InternalRep())
58+
return exportMessageAsLine(e, request)
6259
}
6360

6461
func exportMessageAsLine(e *fileExporter, message proto.Message) error {

exporter/fileexporter/file_exporter_test.go

Lines changed: 69 additions & 65 deletions
Original file line numberDiff line numberDiff line change
@@ -73,105 +73,109 @@ func TestFileLogsExporterNoErrors(t *testing.T) {
7373
require.NotNil(t, exporter)
7474

7575
now := time.Now()
76-
ld := []*logspb.ResourceLogs{
77-
{
78-
Resource: otresourcepb.Resource{
79-
Attributes: []otlpcommon.KeyValue{
80-
{
81-
Key: "attr1",
82-
Value: otlpcommon.AnyValue{Value: &otlpcommon.AnyValue_StringValue{StringValue: "value1"}},
83-
},
84-
},
85-
},
86-
InstrumentationLibraryLogs: []*logspb.InstrumentationLibraryLogs{
87-
{
88-
Logs: []*logspb.LogRecord{
89-
{
90-
TimeUnixNano: uint64(now.UnixNano()),
91-
Name: "logA",
92-
},
76+
otlp := &collectorlogs.ExportLogsServiceRequest{
77+
ResourceLogs: []*logspb.ResourceLogs{
78+
{
79+
Resource: otresourcepb.Resource{
80+
Attributes: []otlpcommon.KeyValue{
9381
{
94-
TimeUnixNano: uint64(now.UnixNano()),
95-
Name: "logB",
82+
Key: "attr1",
83+
Value: otlpcommon.AnyValue{Value: &otlpcommon.AnyValue_StringValue{StringValue: "value1"}},
9684
},
9785
},
9886
},
99-
},
100-
},
101-
{
102-
Resource: otresourcepb.Resource{
103-
Attributes: []otlpcommon.KeyValue{
87+
InstrumentationLibraryLogs: []*logspb.InstrumentationLibraryLogs{
10488
{
105-
Key: "attr2",
106-
Value: otlpcommon.AnyValue{Value: &otlpcommon.AnyValue_StringValue{StringValue: "value2"}},
89+
Logs: []*logspb.LogRecord{
90+
{
91+
TimeUnixNano: uint64(now.UnixNano()),
92+
Name: "logA",
93+
},
94+
{
95+
TimeUnixNano: uint64(now.UnixNano()),
96+
Name: "logB",
97+
},
98+
},
10799
},
108100
},
109101
},
110-
InstrumentationLibraryLogs: []*logspb.InstrumentationLibraryLogs{
111-
{
112-
Logs: []*logspb.LogRecord{
102+
{
103+
Resource: otresourcepb.Resource{
104+
Attributes: []otlpcommon.KeyValue{
113105
{
114-
TimeUnixNano: uint64(now.UnixNano()),
115-
Name: "logC",
106+
Key: "attr2",
107+
Value: otlpcommon.AnyValue{Value: &otlpcommon.AnyValue_StringValue{StringValue: "value2"}},
108+
},
109+
},
110+
},
111+
InstrumentationLibraryLogs: []*logspb.InstrumentationLibraryLogs{
112+
{
113+
Logs: []*logspb.LogRecord{
114+
{
115+
TimeUnixNano: uint64(now.UnixNano()),
116+
Name: "logC",
117+
},
116118
},
117119
},
118120
},
119121
},
120122
},
121123
}
122-
assert.NoError(t, exporter.ConsumeLogs(context.Background(), pdata.LogsFromInternalRep(internal.LogsFromOtlp(ld))))
124+
assert.NoError(t, exporter.ConsumeLogs(context.Background(), pdata.LogsFromInternalRep(internal.LogsFromOtlp(otlp))))
123125
assert.NoError(t, exporter.Shutdown(context.Background()))
124126

125127
var unmarshaler = &jsonpb.Unmarshaler{}
126128
var j collectorlogs.ExportLogsServiceRequest
127129

128130
assert.NoError(t, unmarshaler.Unmarshal(mf, &j))
129-
assert.EqualValues(t, ld, j.ResourceLogs)
131+
assert.EqualValues(t, otlp.ResourceLogs, j.ResourceLogs)
130132
}
131133

132134
func TestFileLogsExporterErrors(t *testing.T) {
133135

134136
now := time.Now()
135-
ld := []*logspb.ResourceLogs{
136-
{
137-
Resource: otresourcepb.Resource{
138-
Attributes: []otlpcommon.KeyValue{
139-
{
140-
Key: "attr1",
141-
Value: otlpcommon.AnyValue{Value: &otlpcommon.AnyValue_StringValue{StringValue: "value1"}},
142-
},
143-
},
144-
},
145-
InstrumentationLibraryLogs: []*logspb.InstrumentationLibraryLogs{
146-
{
147-
Logs: []*logspb.LogRecord{
148-
{
149-
TimeUnixNano: uint64(now.UnixNano()),
150-
Name: "logA",
151-
},
137+
otlp := &collectorlogs.ExportLogsServiceRequest{
138+
ResourceLogs: []*logspb.ResourceLogs{
139+
{
140+
Resource: otresourcepb.Resource{
141+
Attributes: []otlpcommon.KeyValue{
152142
{
153-
TimeUnixNano: uint64(now.UnixNano()),
154-
Name: "logB",
143+
Key: "attr1",
144+
Value: otlpcommon.AnyValue{Value: &otlpcommon.AnyValue_StringValue{StringValue: "value1"}},
155145
},
156146
},
157147
},
158-
},
159-
},
160-
{
161-
Resource: otresourcepb.Resource{
162-
Attributes: []otlpcommon.KeyValue{
148+
InstrumentationLibraryLogs: []*logspb.InstrumentationLibraryLogs{
163149
{
164-
Key: "attr2",
165-
Value: otlpcommon.AnyValue{Value: &otlpcommon.AnyValue_StringValue{StringValue: "value2"}},
150+
Logs: []*logspb.LogRecord{
151+
{
152+
TimeUnixNano: uint64(now.UnixNano()),
153+
Name: "logA",
154+
},
155+
{
156+
TimeUnixNano: uint64(now.UnixNano()),
157+
Name: "logB",
158+
},
159+
},
166160
},
167161
},
168162
},
169-
InstrumentationLibraryLogs: []*logspb.InstrumentationLibraryLogs{
170-
{
171-
Logs: []*logspb.LogRecord{
163+
{
164+
Resource: otresourcepb.Resource{
165+
Attributes: []otlpcommon.KeyValue{
172166
{
173-
TimeUnixNano: uint64(now.UnixNano()),
174-
Name: "logC",
167+
Key: "attr2",
168+
Value: otlpcommon.AnyValue{Value: &otlpcommon.AnyValue_StringValue{StringValue: "value2"}},
169+
},
170+
},
171+
},
172+
InstrumentationLibraryLogs: []*logspb.InstrumentationLibraryLogs{
173+
{
174+
Logs: []*logspb.LogRecord{
175+
{
176+
TimeUnixNano: uint64(now.UnixNano()),
177+
Name: "logC",
178+
},
175179
},
176180
},
177181
},
@@ -210,7 +214,7 @@ func TestFileLogsExporterErrors(t *testing.T) {
210214
exporter := &fileExporter{file: mf}
211215
require.NotNil(t, exporter)
212216

213-
assert.Error(t, exporter.ConsumeLogs(context.Background(), pdata.LogsFromInternalRep(internal.LogsFromOtlp(ld))))
217+
assert.Error(t, exporter.ConsumeLogs(context.Background(), pdata.LogsFromInternalRep(internal.LogsFromOtlp(otlp))))
214218
assert.NoError(t, exporter.Shutdown(context.Background()))
215219
})
216220
}

exporter/otlpexporter/otlp.go

Lines changed: 2 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -85,10 +85,8 @@ func (e *exporterImp) pushMetricsData(ctx context.Context, md pdata.Metrics) err
8585
return nil
8686
}
8787

88-
func (e *exporterImp) pushLogData(ctx context.Context, logs pdata.Logs) error {
89-
request := &otlplogs.ExportLogsServiceRequest{
90-
ResourceLogs: internal.LogsToOtlp(logs.InternalRep()),
91-
}
88+
func (e *exporterImp) pushLogData(ctx context.Context, ld pdata.Logs) error {
89+
request := internal.LogsToOtlp(ld.InternalRep())
9290
if err := e.w.exportLogs(ctx, request); err != nil {
9391
return fmt.Errorf("failed to push log data via OTLP exporter: %w", err)
9492
}

0 commit comments

Comments
 (0)