Skip to content

Commit 7e7462f

Browse files
acrmprroberts2222
authored andcommitted
chore: Make pool tests pass
Fixes the failing pool tests in `plumbing` and `rlp/internal/ingress` following the change to use `grpc.NewClient` instead of `grpc.Dial`. This is because they were testing that connections were initiated when a new doppler was added. However, with the change to NewClient, connections won't be made until the resulting grpc client connections are used for RPC. This also means that dopplers may be added to the pool map when in the past they would not be added due to the connection on Dial failing. One approach to getting the existing tests to pass would have been to call `Connect` on the grpc client connections to force them to leave idle mode. However, `Connect` is experimental, and really gRPC is encouraging us not to care about the connection state when creating client connections. To fix the tests we ended up just asserting on the pool size. One downside of this was that we couldn't see a nice way to assert that `Close` was called on the gRPC client connection when `Close` was called for a doppler address. We could have replaced the connection creation with an interface and mocked that but it didn't seem worth it. Signed-off-by: Carson Long <[email protected]> Signed-off-by: Rebecca Roberts <[email protected]>
1 parent efe712a commit 7e7462f

File tree

4 files changed

+41
-99
lines changed

4 files changed

+41
-99
lines changed

src/plumbing/pool.go

Lines changed: 6 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -79,3 +79,9 @@ func (p *Pool) connectToDoppler(addr string) {
7979
return
8080
}
8181
}
82+
83+
func (p *Pool) Size() int {
84+
p.mu.Lock()
85+
defer p.mu.Unlock()
86+
return len(p.dopplers)
87+
}

src/plumbing/pool_test.go

Lines changed: 15 additions & 63 deletions
Original file line numberDiff line numberDiff line change
@@ -1,8 +1,6 @@
11
package plumbing_test
22

33
import (
4-
"net"
5-
64
"code.cloudfoundry.org/loggregator-release/src/plumbing"
75

86
"golang.org/x/net/context"
@@ -16,62 +14,33 @@ import (
1614
var _ = Describe("Pool", func() {
1715
var (
1816
pool *plumbing.Pool
19-
20-
listeners []net.Listener
21-
servers []*grpc.Server
2217
)
2318

2419
BeforeEach(func() {
2520
pool = plumbing.NewPool(grpc.WithTransportCredentials(insecure.NewCredentials()))
2621
})
2722

28-
AfterEach(func() {
29-
for _, lis := range listeners {
30-
lis.Close()
31-
}
32-
listeners = nil
33-
34-
for _, server := range servers {
35-
server.Stop()
36-
}
37-
servers = nil
38-
})
39-
40-
Describe("Register() & Close()", func() {
41-
var (
42-
lis1, lis2 net.Listener
43-
accepter1, accepter2 chan bool
44-
)
45-
23+
Describe("Register()", func() {
4624
BeforeEach(func() {
47-
lis1, accepter1 = accepter(startListener("127.0.0.1:0"))
48-
lis2, accepter2 = accepter(startListener("127.0.0.1:0"))
49-
listeners = append(listeners, lis1, lis2)
25+
pool.RegisterDoppler("192.0.2.10:8080")
26+
pool.RegisterDoppler("192.0.2.11:8080")
5027
})
5128

52-
Describe("Register()", func() {
53-
It("fills pool with connections to each doppler", func() {
54-
pool.RegisterDoppler(lis1.Addr().String())
55-
pool.RegisterDoppler(lis2.Addr().String())
56-
57-
Eventually(accepter1).Should(HaveLen(1))
58-
Eventually(accepter2).Should(HaveLen(1))
59-
})
29+
It("adds entries to the pool", func() {
30+
Eventually(pool.Size).Should(Equal(2))
6031
})
32+
})
6133

62-
Describe("Close()", func() {
63-
BeforeEach(func() {
64-
pool.RegisterDoppler(lis1.Addr().String())
65-
})
66-
67-
It("stops the gRPC connections", func() {
68-
pool.Close(lis1.Addr().String())
69-
lis1.Close()
34+
Describe("Close()", func() {
35+
BeforeEach(func() {
36+
pool.RegisterDoppler("192.0.2.10:8080")
37+
pool.RegisterDoppler("192.0.2.11:8080")
38+
})
7039

71-
// Drain the channel
72-
Eventually(accepter1, 5).ShouldNot(Receive())
73-
Consistently(accepter1).Should(HaveLen(0))
74-
})
40+
It("removes entries from the pool", func() {
41+
Eventually(pool.Size).Should(Equal(2))
42+
pool.Close("192.0.2.11:8080")
43+
Eventually(pool.Size).Should(Equal(1))
7544
})
7645
})
7746

@@ -173,20 +142,3 @@ func fetchRx(
173142
Eventually(f).ShouldNot(HaveOccurred())
174143
return rx
175144
}
176-
177-
func accepter(lis net.Listener) (net.Listener, chan bool) {
178-
c := make(chan bool, 100)
179-
go func() {
180-
var dontGC []net.Conn
181-
for {
182-
conn, err := lis.Accept()
183-
if err != nil {
184-
return
185-
}
186-
187-
dontGC = append(dontGC, conn) //nolint: staticcheck
188-
c <- true
189-
}
190-
}()
191-
return lis, c
192-
}

src/rlp/internal/ingress/pool.go

Lines changed: 6 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -123,3 +123,9 @@ func (p *Pool) connectToDoppler(addr string, clients []unsafe.Pointer, idx int)
123123
return
124124
}
125125
}
126+
127+
func (p *Pool) Size() int {
128+
p.mu.Lock()
129+
defer p.mu.Unlock()
130+
return len(p.dopplers)
131+
}

src/rlp/internal/ingress/pool_test.go

Lines changed: 14 additions & 36 deletions
Original file line numberDiff line numberDiff line change
@@ -23,49 +23,27 @@ var _ = Describe("Pool", func() {
2323
pool = ingress.NewPool(2, grpc.WithTransportCredentials(insecure.NewCredentials()))
2424
})
2525

26-
Describe("Register() & Close()", func() {
27-
var (
28-
listeners []net.Listener
29-
lis1, lis2 net.Listener
30-
accepter1, accepter2 chan bool
31-
)
32-
26+
Describe("Register()", func() {
3327
BeforeEach(func() {
34-
lis1, accepter1 = accepter(startListener("127.0.0.1:0"))
35-
lis2, accepter2 = accepter(startListener("127.0.0.1:0"))
36-
listeners = append(listeners, lis1, lis2)
28+
pool.RegisterDoppler("192.0.2.10:8080")
29+
pool.RegisterDoppler("192.0.2.11:8080")
3730
})
3831

39-
AfterEach(func() {
40-
for _, lis := range listeners {
41-
lis.Close()
42-
}
43-
listeners = nil
32+
It("adds entries to the pool", func() {
33+
Eventually(pool.Size).Should(Equal(2))
4434
})
35+
})
4536

46-
Describe("Register()", func() {
47-
It("fills pool with connections to each doppler", func() {
48-
pool.RegisterDoppler(lis1.Addr().String())
49-
pool.RegisterDoppler(lis2.Addr().String())
50-
51-
Eventually(accepter1).Should(HaveLen(2))
52-
Eventually(accepter2).Should(HaveLen(2))
53-
})
37+
Describe("Close()", func() {
38+
BeforeEach(func() {
39+
pool.RegisterDoppler("192.0.2.10:8080")
40+
pool.RegisterDoppler("192.0.2.11:8080")
5441
})
5542

56-
Describe("Close()", func() {
57-
BeforeEach(func() {
58-
pool.RegisterDoppler(lis1.Addr().String())
59-
})
60-
61-
It("stops the gRPC connections", func() {
62-
pool.Close(lis1.Addr().String())
63-
lis1.Close()
64-
65-
// Drain the channel
66-
Eventually(accepter1, 5).ShouldNot(Receive())
67-
Consistently(accepter1).Should(HaveLen(0))
68-
})
43+
It("removes entries from the pool", func() {
44+
Eventually(pool.Size).Should(Equal(2))
45+
pool.Close("192.0.2.11:8080")
46+
Eventually(pool.Size).Should(Equal(1))
6947
})
7048
})
7149

0 commit comments

Comments
 (0)