Skip to content

Commit 5d2860d

Browse files
committed
Introduce shared.ArchiveReader/ArchiveWriter
Signed-off-by: Andrew Putilov <[email protected]>
1 parent 14e96be commit 5d2860d

File tree

7 files changed

+220
-38
lines changed

7 files changed

+220
-38
lines changed

examples/memstore-plugin/main.go

Lines changed: 26 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -16,13 +16,14 @@ package main
1616

1717
import (
1818
"flag"
19-
"github.com/jaegertracing/jaeger/plugin/storage/grpc/config"
2019
"path"
2120
"strings"
2221

2322
"github.com/spf13/viper"
2423

2524
"github.com/jaegertracing/jaeger/plugin/storage/grpc"
25+
"github.com/jaegertracing/jaeger/plugin/storage/grpc/config"
26+
"github.com/jaegertracing/jaeger/plugin/storage/grpc/shared/extra"
2627
"github.com/jaegertracing/jaeger/plugin/storage/memory"
2728
"github.com/jaegertracing/jaeger/storage/dependencystore"
2829
"github.com/jaegertracing/jaeger/storage/spanstore"
@@ -46,13 +47,27 @@ func main() {
4647
opts := memory.Options{}
4748
opts.InitFromViper(v)
4849

50+
plugin := &memoryStore{
51+
store: memory.NewStore(),
52+
archiveStore: memory.NewStore(),
53+
}
4954
grpc.Serve(&config.PluginServices{
50-
Store: &memoryStore{store: memory.NewStore()},
55+
Store: plugin,
56+
Capabilities: plugin,
57+
ArchiveStore: plugin,
5158
})
5259
}
5360

5461
type memoryStore struct {
55-
store *memory.Store
62+
store *memory.Store
63+
archiveStore *memory.Store
64+
}
65+
66+
func (ns *memoryStore) Capabilities() (*extra.Capabilities, error) {
67+
return &extra.Capabilities{
68+
ArchiveSpanReader: true,
69+
ArchiveSpanWriter: true,
70+
}, nil
5671
}
5772

5873
func (ns *memoryStore) DependencyReader() dependencystore.Reader {
@@ -66,3 +81,11 @@ func (ns *memoryStore) SpanReader() spanstore.Reader {
6681
func (ns *memoryStore) SpanWriter() spanstore.Writer {
6782
return ns.store
6883
}
84+
85+
func (ns *memoryStore) ArchiveSpanReader() spanstore.Reader {
86+
return ns.archiveStore
87+
}
88+
89+
func (ns *memoryStore) ArchiveSpanWriter() spanstore.Writer {
90+
return ns.archiveStore
91+
}

plugin/storage/grpc/factory_test.go

Lines changed: 6 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -136,7 +136,9 @@ func TestGRPCStorageFactory_Capabilities(t *testing.T) {
136136

137137
f.builder = &mockPluginBuilder{
138138
plugin: &mockPlugin{
139-
capabilities: capabilities,
139+
capabilities: capabilities,
140+
archiveWriter: new(spanStoreMocks.Writer),
141+
archiveReader: new(spanStoreMocks.Reader),
140142
},
141143
}
142144
assert.NoError(t, f.Initialize(metrics.NullFactory, zap.NewNop()))
@@ -164,7 +166,9 @@ func TestGRPCStorageFactory_CapabilitiesDisabled(t *testing.T) {
164166

165167
f.builder = &mockPluginBuilder{
166168
plugin: &mockPlugin{
167-
capabilities: capabilities,
169+
capabilities: capabilities,
170+
archiveWriter: new(spanStoreMocks.Writer),
171+
archiveReader: new(spanStoreMocks.Reader),
168172
},
169173
}
170174
assert.NoError(t, f.Initialize(metrics.NullFactory, zap.NewNop()))

plugin/storage/grpc/grpc.go

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -16,9 +16,9 @@ package grpc
1616

1717
import (
1818
"github.com/hashicorp/go-plugin"
19-
"github.com/jaegertracing/jaeger/plugin/storage/grpc/config"
2019
"google.golang.org/grpc"
2120

21+
"github.com/jaegertracing/jaeger/plugin/storage/grpc/config"
2222
"github.com/jaegertracing/jaeger/plugin/storage/grpc/shared"
2323
)
2424

Lines changed: 78 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,78 @@
1+
// Copyright (c) 2020 The Jaeger Authors.
2+
//
3+
// Licensed under the Apache License, Version 2.0 (the "License");
4+
// you may not use this file except in compliance with the License.
5+
// You may obtain a copy of the License at
6+
//
7+
// http://www.apache.org/licenses/LICENSE-2.0
8+
//
9+
// Unless required by applicable law or agreed to in writing, software
10+
// distributed under the License is distributed on an "AS IS" BASIS,
11+
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12+
// See the License for the specific language governing permissions and
13+
// limitations under the License.
14+
15+
package shared
16+
17+
import (
18+
"context"
19+
"fmt"
20+
21+
"github.com/jaegertracing/jaeger/model"
22+
"github.com/jaegertracing/jaeger/proto-gen/storage_v1"
23+
"github.com/jaegertracing/jaeger/storage/spanstore"
24+
)
25+
26+
// ArchiveReader wraps storage_v1.ArchiveSpanReaderPluginClient into spanstore.Reader
27+
type ArchiveReader struct {
28+
storage_v1.ArchiveSpanReaderPluginClient
29+
}
30+
31+
// GetTrace takes a traceID and returns a Trace associated with that traceID from Archive Storage
32+
func (r *ArchiveReader) GetTrace(ctx context.Context, traceID model.TraceID) (*model.Trace, error) {
33+
stream, err := r.GetArchiveTrace(upgradeContextWithBearerToken(ctx), &storage_v1.GetTraceRequest{
34+
TraceID: traceID,
35+
})
36+
if err != nil {
37+
return nil, fmt.Errorf("plugin error: %w", err)
38+
}
39+
40+
return readTrace(stream)
41+
}
42+
43+
// GetServices not used in ArchiveReader
44+
func (r *ArchiveReader) GetServices(ctx context.Context) ([]string, error) {
45+
panic("implement me")
46+
}
47+
48+
// GetOperations not used in ArchiveReader
49+
func (r *ArchiveReader) GetOperations(ctx context.Context, query spanstore.OperationQueryParameters) ([]spanstore.Operation, error) {
50+
panic("implement me")
51+
}
52+
53+
// FindTraces not used in ArchiveReader
54+
func (r *ArchiveReader) FindTraces(ctx context.Context, query *spanstore.TraceQueryParameters) ([]*model.Trace, error) {
55+
panic("implement me")
56+
}
57+
58+
// FindTraceIDs not used in ArchiveReader
59+
func (r *ArchiveReader) FindTraceIDs(ctx context.Context, query *spanstore.TraceQueryParameters) ([]model.TraceID, error) {
60+
panic("implement me")
61+
}
62+
63+
// ArchiveWriter wraps storage_v1.ArchiveSpanWriterPluginClient into spanstore.Writer
64+
type ArchiveWriter struct {
65+
storage_v1.ArchiveSpanWriterPluginClient
66+
}
67+
68+
// WriteSpan saves the span into Archive Storage
69+
func (w *ArchiveWriter) WriteSpan(span *model.Span) error {
70+
_, err := w.WriteArchiveSpan(context.Background(), &storage_v1.WriteSpanRequest{
71+
Span: span,
72+
})
73+
if err != nil {
74+
return fmt.Errorf("plugin error: %w", err)
75+
}
76+
77+
return nil
78+
}
Lines changed: 101 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,101 @@
1+
// Copyright (c) 2020 The Jaeger Authors.
2+
//
3+
// Licensed under the Apache License, Version 2.0 (the "License");
4+
// you may not use this file except in compliance with the License.
5+
// You may obtain a copy of the License at
6+
//
7+
// http://www.apache.org/licenses/LICENSE-2.0
8+
//
9+
// Unless required by applicable law or agreed to in writing, software
10+
// distributed under the License is distributed on an "AS IS" BASIS,
11+
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12+
// See the License for the specific language governing permissions and
13+
// limitations under the License.
14+
15+
package shared
16+
17+
import (
18+
"context"
19+
"io"
20+
"testing"
21+
22+
"github.com/stretchr/testify/assert"
23+
"github.com/stretchr/testify/mock"
24+
25+
"github.com/jaegertracing/jaeger/model"
26+
"github.com/jaegertracing/jaeger/proto-gen/storage_v1"
27+
"github.com/jaegertracing/jaeger/proto-gen/storage_v1/mocks"
28+
"github.com/jaegertracing/jaeger/storage/spanstore"
29+
)
30+
31+
func TestArchiveWriter_WriteSpan(t *testing.T) {
32+
mockSpan := &model.Span{
33+
TraceID: mockTraceID,
34+
SpanID: model.NewSpanID(1),
35+
Process: &model.Process{},
36+
}
37+
38+
archiveWriter := new(mocks.ArchiveSpanWriterPluginClient)
39+
archiveWriter.On("WriteArchiveSpan", mock.Anything, &storage_v1.WriteSpanRequest{Span: mockSpan}).
40+
Return(&storage_v1.WriteSpanResponse{}, nil)
41+
writer := &ArchiveWriter{archiveWriter}
42+
43+
err := writer.WriteSpan(mockSpan)
44+
assert.NoError(t, err)
45+
}
46+
47+
func TestArchiveReader_GetTrace(t *testing.T) {
48+
mockTraceID := model.NewTraceID(0, 123456)
49+
mockSpan := model.Span{
50+
TraceID: mockTraceID,
51+
SpanID: model.NewSpanID(1),
52+
Process: &model.Process{},
53+
}
54+
expected := &model.Trace{
55+
Spans: []*model.Span{&mockSpan},
56+
}
57+
58+
traceClient := new(mocks.ArchiveSpanReaderPlugin_GetArchiveTraceClient)
59+
traceClient.On("Recv").Return(&storage_v1.SpansResponseChunk{
60+
Spans: []model.Span{mockSpan},
61+
}, nil).Once()
62+
traceClient.On("Recv").Return(nil, io.EOF)
63+
64+
archiveReader := new(mocks.ArchiveSpanReaderPluginClient)
65+
archiveReader.On("GetArchiveTrace", mock.Anything, &storage_v1.GetTraceRequest{
66+
TraceID: mockTraceID,
67+
}).Return(traceClient, nil)
68+
reader := &ArchiveReader{archiveReader}
69+
70+
trace, err := reader.GetTrace(context.Background(), mockTraceID)
71+
assert.NoError(t, err)
72+
assert.Equal(t, expected, trace)
73+
}
74+
75+
func TestArchiveReader_FindTraceIDs(t *testing.T) {
76+
assert.Panics(t, func() {
77+
reader := ArchiveReader{&mocks.ArchiveSpanReaderPluginClient{}}
78+
_, _ = reader.FindTraceIDs(context.Background(), nil)
79+
})
80+
}
81+
82+
func TestArchiveReader_FindTraces(t *testing.T) {
83+
assert.Panics(t, func() {
84+
reader := ArchiveReader{&mocks.ArchiveSpanReaderPluginClient{}}
85+
_, _ = reader.FindTraces(context.Background(), nil)
86+
})
87+
}
88+
89+
func TestArchiveReader_GetOperations(t *testing.T) {
90+
assert.Panics(t, func() {
91+
reader := ArchiveReader{&mocks.ArchiveSpanReaderPluginClient{}}
92+
_, _ = reader.GetOperations(context.Background(), spanstore.OperationQueryParameters{})
93+
})
94+
}
95+
96+
func TestArchiveReader_GetServices(t *testing.T) {
97+
assert.Panics(t, func() {
98+
reader := ArchiveReader{&mocks.ArchiveSpanReaderPluginClient{}}
99+
_, _ = reader.GetServices(context.Background())
100+
})
101+
}

plugin/storage/grpc/shared/grpc_client.go

Lines changed: 2 additions & 26 deletions
Original file line numberDiff line numberDiff line change
@@ -71,11 +71,11 @@ func (c *grpcClient) SpanWriter() spanstore.Writer {
7171
}
7272

7373
func (c *grpcClient) ArchiveSpanReader() spanstore.Reader {
74-
return c
74+
return &ArchiveReader{c.archiveReaderClient}
7575
}
7676

7777
func (c *grpcClient) ArchiveSpanWriter() spanstore.Writer {
78-
return c
78+
return &ArchiveWriter{c.archiveWriterClient}
7979
}
8080

8181
// GetTrace takes a traceID and returns a Trace associated with that traceID
@@ -215,30 +215,6 @@ func (c *grpcClient) GetDependencies(endTs time.Time, lookback time.Duration) ([
215215
return resp.Dependencies, nil
216216
}
217217

218-
// WriteArchiveSpan saves the span in archive storage
219-
func (c *grpcClient) WriteArchiveSpan(span *model.Span) error {
220-
_, err := c.archiveWriterClient.WriteArchiveSpan(context.Background(), &storage_v1.WriteSpanRequest{
221-
Span: span,
222-
})
223-
if err != nil {
224-
return fmt.Errorf("plugin error: %w", err)
225-
}
226-
227-
return nil
228-
}
229-
230-
// GetArchiveTrace takes a traceID and returns a Trace associated with that traceID from archive storage
231-
func (c *grpcClient) GetArchiveTrace(ctx context.Context, traceID model.TraceID) (*model.Trace, error) {
232-
stream, err := c.archiveReaderClient.GetArchiveTrace(upgradeContextWithBearerToken(ctx), &storage_v1.GetTraceRequest{
233-
TraceID: traceID,
234-
})
235-
if err != nil {
236-
return nil, fmt.Errorf("plugin error: %w", err)
237-
}
238-
239-
return readTrace(stream)
240-
}
241-
242218
func (c *grpcClient) Capabilities() (*extra.Capabilities, error) {
243219
capabilities, err := c.capabilitiesClient.Capabilities(context.Background(), &storage_v1.CapabilitiesRequest{})
244220
if status.Code(err) == codes.Unimplemented {

plugin/storage/grpc/shared/grpc_client_test.go

Lines changed: 6 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -324,7 +324,7 @@ func TestGrpcClientWriteArchiveSpan(t *testing.T) {
324324
Span: &mockTraceSpans[0],
325325
}).Return(&storage_v1.WriteSpanResponse{}, nil)
326326

327-
err := r.client.WriteArchiveSpan(&mockTraceSpans[0])
327+
err := r.client.ArchiveSpanWriter().WriteSpan(&mockTraceSpans[0])
328328
assert.NoError(t, err)
329329
})
330330
}
@@ -335,7 +335,7 @@ func TestGrpcClientWriteArchiveSpan_Error(t *testing.T) {
335335
Span: &mockTraceSpans[0],
336336
}).Return(nil, status.Error(codes.Internal, "internal error"))
337337

338-
err := r.client.WriteArchiveSpan(&mockTraceSpans[0])
338+
err := r.client.ArchiveSpanWriter().WriteSpan(&mockTraceSpans[0])
339339
assert.Error(t, err)
340340
})
341341
}
@@ -356,7 +356,7 @@ func TestGrpcClientGetArchiveTrace(t *testing.T) {
356356
expectedSpans = append(expectedSpans, &mockTraceSpans[i])
357357
}
358358

359-
s, err := r.client.GetArchiveTrace(context.Background(), mockTraceID)
359+
s, err := r.client.ArchiveSpanReader().GetTrace(context.Background(), mockTraceID)
360360
assert.NoError(t, err)
361361
assert.Equal(t, &model.Trace{
362362
Spans: expectedSpans,
@@ -372,7 +372,7 @@ func TestGrpcClientGetArchiveTrace_StreamError(t *testing.T) {
372372
TraceID: mockTraceID,
373373
}).Return(traceClient, nil)
374374

375-
s, err := r.client.GetArchiveTrace(context.Background(), mockTraceID)
375+
s, err := r.client.ArchiveSpanReader().GetTrace(context.Background(), mockTraceID)
376376
assert.Error(t, err)
377377
assert.Nil(t, s)
378378
})
@@ -384,7 +384,7 @@ func TestGrpcClientGetArchiveTrace_NoTrace(t *testing.T) {
384384
TraceID: mockTraceID,
385385
}).Return(nil, spanstore.ErrTraceNotFound)
386386

387-
s, err := r.client.GetArchiveTrace(context.Background(), mockTraceID)
387+
s, err := r.client.ArchiveSpanReader().GetTrace(context.Background(), mockTraceID)
388388
assert.Error(t, err)
389389
assert.Nil(t, s)
390390
})
@@ -398,7 +398,7 @@ func TestGrpcClientGetArchiveTrace_StreamErrorTraceNotFound(t *testing.T) {
398398
TraceID: mockTraceID,
399399
}).Return(traceClient, nil)
400400

401-
s, err := r.client.GetArchiveTrace(context.Background(), mockTraceID)
401+
s, err := r.client.ArchiveSpanReader().GetTrace(context.Background(), mockTraceID)
402402
assert.Equal(t, spanstore.ErrTraceNotFound, err)
403403
assert.Nil(t, s)
404404
})

0 commit comments

Comments
 (0)