Skip to content

Commit cb641f2

Browse files
fix: reduce the number of memory allocations and the latency overhead. (#983)
* fix: reduce the number of memory allocations and the latency overhead. Struct instrumentedConn would trace every call to Read() and Write(). This incurs a significant overhead because the variadic arguments of trace.RecordBytesReceived() escape and need to be heap-allocated. Also, every trace.RecordBytesReceived() would be called in a new goroutine. That makes the call yet more expensive. It makes no sense to update the performance counter this often. The default scraping interval in Prometheus is 1 minute. Google Cloud Monitoring had the same interval before they added high-resolution counters that scrape services every 10 seconds. Only update integer counters in the hot path, and update OpenCensus' counters once in 5 seconds. I have a test that just loops through `SELECT * FROM t WHERE id = $1` and has response sizes that range from several dozen bytes to several kilobytes. At 64 connections to Postgres and 10k requests per connection, the test makes approx. 25.5M allocations before this patch, and 7.2M allocations after the patch. The CPU time goes down accordingly because OpenCensus and the garbage collector have less work to do. * fix: do not spawn goroutines to report dial_latency and open_connections. Creating a goroutine introduces a latency of its own. Moreover, a Dial() that makes a TLS connection will not benefit from any possiblemicrosend- level savings. * chore: use atomic.Int64 and atomic.Int32. Unlike int64 struct fields, atomic.Int64 fields are guaranteed to be aligned to 8 bytes on 32-bit platoform. This alignement is required for atomic ops to work. Also, replace atomic int32 vars with atomic.Int32 for consistency.
1 parent fbe77c1 commit cb641f2

File tree

5 files changed

+84
-58
lines changed

5 files changed

+84
-58
lines changed

dialer.go

Lines changed: 52 additions & 26 deletions
Original file line numberDiff line numberDiff line change
@@ -338,7 +338,7 @@ func (d *Dialer) Dial(ctx context.Context, icn string, opts ...DialOption) (conn
338338
trace.AddDialerID(d.dialerID),
339339
)
340340
defer func() {
341-
go trace.RecordDialError(context.Background(), icn, d.dialerID, err)
341+
trace.RecordDialError(context.Background(), icn, d.dialerID, err)
342342
endDial(err)
343343
}()
344344
cn, err := d.resolver.Resolve(ctx, icn)
@@ -429,14 +429,12 @@ func (d *Dialer) Dial(ctx context.Context, icn string, opts ...DialOption) (conn
429429
}
430430

431431
latency := time.Since(startTime).Milliseconds()
432-
go func() {
433-
n := atomic.AddUint64(c.openConnsCount, 1)
434-
trace.RecordOpenConnections(ctx, int64(n), d.dialerID, cn.String())
435-
trace.RecordDialLatency(ctx, icn, d.dialerID, latency)
436-
}()
432+
n := c.openConnsCount.Add(1)
433+
trace.RecordOpenConnections(ctx, int64(n), d.dialerID, cn.String())
434+
trace.RecordDialLatency(ctx, icn, d.dialerID, latency)
437435

438436
closeFunc := func() {
439-
n := atomic.AddUint64(c.openConnsCount, ^uint64(0)) // c.openConnsCount = c.openConnsCount - 1
437+
n := c.openConnsCount.Add(^uint64(0)) // c.openConnsCount = c.openConnsCount - 1
440438
trace.RecordOpenConnections(context.Background(), int64(n), d.dialerID, cn.String())
441439
}
442440
errFunc := func(err error) {
@@ -571,33 +569,44 @@ func (d *Dialer) Warmup(ctx context.Context, icn string, opts ...DialOption) err
571569
// newInstrumentedConn initializes an instrumentedConn that on closing will
572570
// decrement the number of open connects and record the result.
573571
func newInstrumentedConn(conn net.Conn, closeFunc func(), errFunc func(error), dialerID, connName string) *instrumentedConn {
574-
return &instrumentedConn{
575-
Conn: conn,
576-
closeFunc: closeFunc,
577-
errFunc: errFunc,
578-
dialerID: dialerID,
579-
connName: connName,
572+
ctx, cancel := context.WithCancel(context.Background())
573+
c := &instrumentedConn{
574+
Conn: conn,
575+
closeFunc: closeFunc,
576+
errFunc: errFunc,
577+
dialerID: dialerID,
578+
connName: connName,
579+
reportTicker: time.NewTicker(5 * time.Second),
580+
stopReporter: cancel,
580581
}
582+
583+
go c.report(ctx)
584+
585+
return c
581586
}
582587

583588
// instrumentedConn wraps a net.Conn and invokes closeFunc when the connection
584589
// is closed.
585590
type instrumentedConn struct {
586591
net.Conn
587-
closeFunc func()
588-
errFunc func(error)
589-
mu sync.RWMutex
590-
closed bool
591-
dialerID string
592-
connName string
592+
closeFunc func()
593+
errFunc func(error)
594+
mu sync.RWMutex
595+
closed bool
596+
dialerID string
597+
connName string
598+
bytesRead atomic.Int64
599+
bytesWritten atomic.Int64
600+
reportTicker *time.Ticker
601+
stopReporter func()
593602
}
594603

595604
// Read delegates to the underlying net.Conn interface and records number of
596605
// bytes read
597606
func (i *instrumentedConn) Read(b []byte) (int, error) {
598607
bytesRead, err := i.Conn.Read(b)
599608
if err == nil {
600-
go trace.RecordBytesReceived(context.Background(), int64(bytesRead), i.connName, i.dialerID)
609+
i.bytesRead.Add(int64(bytesRead))
601610
} else {
602611
i.errFunc(err)
603612
}
@@ -609,7 +618,7 @@ func (i *instrumentedConn) Read(b []byte) (int, error) {
609618
func (i *instrumentedConn) Write(b []byte) (int, error) {
610619
bytesWritten, err := i.Conn.Write(b)
611620
if err == nil {
612-
go trace.RecordBytesSent(context.Background(), int64(bytesWritten), i.connName, i.dialerID)
621+
i.bytesWritten.Add(int64(bytesWritten))
613622
} else {
614623
i.errFunc(err)
615624
}
@@ -629,12 +638,29 @@ func (i *instrumentedConn) Close() error {
629638
i.mu.Lock()
630639
defer i.mu.Unlock()
631640
i.closed = true
632-
err := i.Conn.Close()
633-
if err != nil {
634-
return err
641+
i.stopReporter()
642+
i.reportCounters()
643+
i.closeFunc()
644+
return i.Conn.Close()
645+
}
646+
647+
func (i *instrumentedConn) reportCounters() {
648+
bytesRead := i.bytesRead.Swap(0)
649+
bytesWritten := i.bytesWritten.Swap(0)
650+
trace.RecordBytesReceived(context.Background(), bytesRead, i.connName, i.dialerID)
651+
trace.RecordBytesSent(context.Background(), bytesWritten, i.connName, i.dialerID)
652+
}
653+
654+
func (i *instrumentedConn) report(ctx context.Context) {
655+
defer i.reportTicker.Stop()
656+
for {
657+
select {
658+
case <-i.reportTicker.C:
659+
i.reportCounters()
660+
case <-ctx.Done():
661+
return
662+
}
635663
}
636-
go i.closeFunc()
637-
return nil
638664
}
639665

640666
// Close closes the Dialer; it prevents the Dialer from refreshing the information

dialer_test.go

Lines changed: 4 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -1024,13 +1024,13 @@ func TestDialerFailsDnsTxtRecordMissing(t *testing.T) {
10241024
}
10251025

10261026
type changingResolver struct {
1027-
stage *int32
1027+
stage atomic.Int32
10281028
}
10291029

10301030
func (r *changingResolver) Resolve(_ context.Context, name string) (instance.ConnName, error) {
10311031
// For TestDialerFailoverOnInstanceChange
10321032
if name == "update.example.com" {
1033-
if atomic.LoadInt32(r.stage) == 0 {
1033+
if r.stage.Load() == 0 {
10341034
return instance.ParseConnNameWithDomainName("my-project:my-region:my-instance", "update.example.com")
10351035
}
10361036
return instance.ParseConnNameWithDomainName("my-project:my-region:my-instance2", "update.example.com")
@@ -1054,9 +1054,7 @@ func TestDialerUpdatesAutomaticallyAfterDnsChange(t *testing.T) {
10541054
"my-project", "my-region", "my-instance2",
10551055
mock.WithDNS("update.example.com"),
10561056
)
1057-
r := &changingResolver{
1058-
stage: new(int32),
1059-
}
1057+
r := &changingResolver{}
10601058

10611059
d := setupDialer(t, setupConfig{
10621060
skipServer: true,
@@ -1084,7 +1082,7 @@ func TestDialerUpdatesAutomaticallyAfterDnsChange(t *testing.T) {
10841082
"update.example.com",
10851083
)
10861084
stop1()
1087-
atomic.StoreInt32(r.stage, 1)
1085+
r.stage.Store(1)
10881086

10891087
time.Sleep(1 * time.Second)
10901088
instCn, _ := instance.ParseConnNameWithDomainName("my-project:my-region:my-instance", "update.example.com")

metrics_test.go

Lines changed: 6 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -172,7 +172,6 @@ func TestDialerWithMetrics(t *testing.T) {
172172
if err != nil {
173173
t.Fatalf("expected Dial to succeed, but got error: %v", err)
174174
}
175-
defer conn.Close()
176175
// dial the good instance again to check the counter
177176
conn2, err := d.Dial(context.Background(), "my-project:my-region:my-instance")
178177
if err != nil {
@@ -194,7 +193,6 @@ func TestDialerWithMetrics(t *testing.T) {
194193
if err != nil {
195194
t.Fatalf("conn.Write failed: %v", err)
196195
}
197-
defer conn2.Close()
198196
// dial a bogus instance
199197
_, err = d.Dial(context.Background(), "my-project:my-region:notaninstance")
200198
if err == nil {
@@ -205,6 +203,12 @@ func TestDialerWithMetrics(t *testing.T) {
205203

206204
// success metrics
207205
wantLastValueMetric(t, "cloudsqlconn/open_connections", spy.data(), 2)
206+
207+
conn.Close()
208+
conn2.Close()
209+
210+
time.Sleep(10 * time.Millisecond) // allow exporter a chance to run
211+
208212
wantDistributionMetric(t, "cloudsqlconn/dial_latency", spy.data())
209213
wantCountMetric(t, "cloudsqlconn/refresh_success_count", spy.data())
210214
wantSumMetric(t, "cloudsqlconn/bytes_sent", spy.data())

monitored_cache.go

Lines changed: 3 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -27,7 +27,7 @@ import (
2727
// monitoredCache is a wrapper around a connectionInfoCache that tracks the
2828
// number of connections to the associated instance.
2929
type monitoredCache struct {
30-
openConnsCount *uint64
30+
openConnsCount atomic.Uint64
3131
cn instance.ConnName
3232
resolver instance.ConnectionNameResolver
3333
logger debug.ContextLogger
@@ -53,7 +53,6 @@ func newMonitoredCache(
5353
logger debug.ContextLogger) *monitoredCache {
5454

5555
c := &monitoredCache{
56-
openConnsCount: new(uint64),
5756
closedCh: make(chan struct{}),
5857
cn: cn,
5958
resolver: resolver,
@@ -98,13 +97,13 @@ func (c *monitoredCache) Close() error {
9897
c.domainNameTicker.Stop()
9998
}
10099

101-
if atomic.LoadUint64(c.openConnsCount) > 0 {
100+
if c.openConnsCount.Load() > 0 {
102101
for _, socket := range c.openConns {
103102
if !socket.isClosed() {
104103
_ = socket.Close() // force socket closed, ok to ignore error.
105104
}
106105
}
107-
atomic.StoreUint64(c.openConnsCount, 0)
106+
c.openConnsCount.Store(0)
108107
}
109108

110109
return c.connectionInfoCache.Close()

monitored_cache_test.go

Lines changed: 19 additions & 20 deletions
Original file line numberDiff line numberDiff line change
@@ -63,9 +63,7 @@ func TestMonitoredCache_purgeClosedConns(t *testing.T) {
6363

6464
func TestMonitoredCache_checkDomainName_instanceChanged(t *testing.T) {
6565
cn, _ := instance.ParseConnNameWithDomainName("my-project:my-region:my-instance", "update.example.com")
66-
r := &changingResolver{
67-
stage: new(int32),
68-
}
66+
r := &changingResolver{}
6967
c := newMonitoredCache(context.TODO(),
7068
&spyConnectionInfoCache{},
7169
cn,
@@ -81,7 +79,7 @@ func TestMonitoredCache_checkDomainName_instanceChanged(t *testing.T) {
8179
t.Fatal("got cache closed, want cache open")
8280
}
8381
// update the domain name
84-
atomic.StoreInt32(r.stage, 1)
82+
r.stage.Store(1)
8583

8684
// wait for the resolver to run
8785
time.Sleep(100 * time.Millisecond)
@@ -93,11 +91,9 @@ func TestMonitoredCache_checkDomainName_instanceChanged(t *testing.T) {
9391

9492
func TestMonitoredCache_Close(t *testing.T) {
9593
cn, _ := instance.ParseConnNameWithDomainName("my-project:my-region:my-instance", "update.example.com")
96-
var closeFuncCalls int32
94+
var closeFuncCalls atomic.Int32
9795

98-
r := &changingResolver{
99-
stage: new(int32),
100-
}
96+
r := &changingResolver{}
10197

10298
c := newMonitoredCache(context.TODO(),
10399
&spyConnectionInfoCache{},
@@ -107,29 +103,32 @@ func TestMonitoredCache_Close(t *testing.T) {
107103
&testLog{t: t},
108104
)
109105
inc := func() {
110-
atomic.AddInt32(&closeFuncCalls, 1)
106+
closeFuncCalls.Add(1)
111107
}
112108

113109
c.mu.Lock()
114110
// set up the state as if there were 2 open connections.
115111
c.openConns = []*instrumentedConn{
116112
{
117-
closed: false,
118-
closeFunc: inc,
119-
Conn: &mockConn{},
113+
closed: false,
114+
closeFunc: inc,
115+
stopReporter: func() {},
116+
Conn: &mockConn{},
120117
},
121118
{
122-
closed: false,
123-
closeFunc: inc,
124-
Conn: &mockConn{},
119+
closed: false,
120+
closeFunc: inc,
121+
stopReporter: func() {},
122+
Conn: &mockConn{},
125123
},
126124
{
127-
closed: true,
128-
closeFunc: inc,
129-
Conn: &mockConn{},
125+
closed: true,
126+
closeFunc: inc,
127+
stopReporter: func() {},
128+
Conn: &mockConn{},
130129
},
131130
}
132-
*c.openConnsCount = 2
131+
c.openConnsCount.Store(2)
133132
c.mu.Unlock()
134133

135134
c.Close()
@@ -138,7 +137,7 @@ func TestMonitoredCache_Close(t *testing.T) {
138137
}
139138
// wait for closeFunc() to be called.
140139
time.Sleep(100 * time.Millisecond)
141-
if got := atomic.LoadInt32(&closeFuncCalls); got != 2 {
140+
if got := closeFuncCalls.Load(); got != 2 {
142141
t.Fatalf("got %d, want 2", got)
143142
}
144143

0 commit comments

Comments
 (0)