-
Notifications
You must be signed in to change notification settings - Fork 41
Expand file tree
/
Copy pathcommand_bind_test.go
More file actions
102 lines (81 loc) · 2.88 KB
/
command_bind_test.go
File metadata and controls
102 lines (81 loc) · 2.88 KB
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
package wire
import (
"bytes"
"context"
"testing"
"github.com/jackc/pgx/v5/pgtype"
"github.com/jeroenrinzema/psql-wire/pkg/buffer"
"github.com/jeroenrinzema/psql-wire/pkg/mock"
"github.com/jeroenrinzema/psql-wire/pkg/types"
"github.com/neilotoole/slogt"
"github.com/stretchr/testify/assert"
"github.com/stretchr/testify/require"
)
// TestHandleBind_ParallelPipeline_Success verifies that successful bind operations enqueue the right events
func TestHandleBind_ParallelPipeline_Success(t *testing.T) {
t.Parallel()
ctx := context.Background()
logger := slogt.New(t)
statements := &DefaultStatementCache{}
stmt := NewStatement(
func(ctx context.Context, writer DataWriter, parameters []Parameter) error { return nil },
WithParameters([]uint32{pgtype.Int4OID}),
WithColumns(Columns{{Name: "col1", Oid: pgtype.Int4OID}}),
)
require.NoError(t, statements.Set(ctx, "test_stmt", stmt))
session := &Session{
Server: &Server{
logger: logger,
},
Statements: statements,
Portals: &DefaultPortalCache{},
ParallelPipeline: ParallelPipelineConfig{Enabled: true},
ResponseQueue: NewResponseQueue(),
}
reader := mock.NewBindReader(t, logger, "test_portal", "test_stmt", 0, 0, 0)
outBuf := &bytes.Buffer{}
err := session.handleBind(ctx, reader, buffer.NewWriter(logger, outBuf))
require.NoError(t, err)
// In parallel pipeline mode, nothing should be written to the wire immediately
assert.Equal(t, 0, outBuf.Len(), "parallel pipeline should not write to wire on success")
assert.Equal(t, 1, session.ResponseQueue.Len())
events := session.ResponseQueue.DrainAll()
require.Len(t, events, 1)
event := events[0]
assert.Equal(t, ResponseBindComplete, event.Kind)
}
// TestHandleBind_ParallelPipeline_Error verifies error handling drains the queue
func TestHandleBind_ParallelPipeline_Error(t *testing.T) {
t.Parallel()
ctx := context.Background()
logger := slogt.New(t)
session := &Session{
Server: &Server{
logger: logger,
},
Statements: &DefaultStatementCache{
statements: map[string]*Statement{
"unknown_stmt": nil,
},
},
Portals: &DefaultPortalCache{},
ParallelPipeline: ParallelPipelineConfig{Enabled: true},
ResponseQueue: NewResponseQueue(),
inExtendedQuery: true,
}
// Enqueue a previous event
session.ResponseQueue.Enqueue(NewParseCompleteEvent())
reader := mock.NewBindReader(t, logger, "test_portal", "unknown_stmt", 0, 0, 0)
outBuf := &bytes.Buffer{}
writer := buffer.NewWriter(logger, outBuf)
err := session.handleBind(ctx, reader, writer)
require.NoError(t, err)
assert.Equal(t, 0, session.ResponseQueue.Len())
responseReader := mock.NewReader(t, outBuf)
msgType, _, err := responseReader.ReadTypedMsg()
require.NoError(t, err)
assert.Equal(t, types.ServerParseComplete, msgType)
msgType, _, err = responseReader.ReadTypedMsg()
require.NoError(t, err)
assert.Equal(t, types.ServerErrorResponse, msgType)
}