diff --git a/cluster.go b/cluster.go index 452d138cf..e8d7c4026 100644 --- a/cluster.go +++ b/cluster.go @@ -77,6 +77,24 @@ type ClusterConfig struct { // Default: 128 for older CQL versions MaxRequestsPerConn int + // Threshold for the number of inflight requests per connection + // after which the connection is considered as heavy loaded + // Default: 512 + HeavyLoadedConnectionThreshold int + + // When a connection is considered as heavy loaded, the driver + // could switch to the least loaded connection for the same node. + // The switch will happen if the other connection is at least + // HeavyLoadedSwitchConnectionPercentage percentage less busy + // (in terms of inflight requests). + // + // For the default value of 20%, if the heavy loaded connection + // has 100 inflight requests, the switch will happen only if the + // least busy connection has less than 80 inflight requests. + // + // Default: 20% + HeavyLoadedSwitchConnectionPercentage int + // Default consistency level. // Default: Quorum Consistency Consistency @@ -267,6 +285,8 @@ func NewCluster(hosts ...string) *ClusterConfig { ConnectTimeout: 600 * time.Millisecond, Port: 9042, NumConns: 2, + HeavyLoadedConnectionThreshold: 512, + HeavyLoadedSwitchConnectionPercentage: 20, Consistency: Quorum, MaxPreparedStmts: defaultMaxPreparedStmts, MaxRoutingKeyInfo: 1000, diff --git a/conn.go b/conn.go index a14caa584..ac8e183db 100644 --- a/conn.go +++ b/conn.go @@ -1537,6 +1537,10 @@ func (c *Conn) AvailableStreams() int { return c.streams.Available() } +func (c *Conn) InUseStreams() int { + return c.streams.InUse() +} + func (c *Conn) UseKeyspace(keyspace string) error { q := &writeQueryFrame{statement: `USE "` + keyspace + `"`} q.params.consistency = c.session.cons diff --git a/internal/streams/streams.go b/internal/streams/streams.go index 05bcd7d6a..7e502c2cc 100644 --- a/internal/streams/streams.go +++ b/internal/streams/streams.go @@ -145,3 +145,7 @@ func (s *IDGenerator) Clear(stream int) (inuse bool) { func (s *IDGenerator) Available() int { return s.NumStreams - int(atomic.LoadInt32(&s.inuseStreams)) - 1 } + +func (s *IDGenerator) InUse() int { + return int(atomic.LoadInt32(&s.inuseStreams)) +} diff --git a/scylla.go b/scylla.go index 7790a26ee..8e90e948d 100644 --- a/scylla.go +++ b/scylla.go @@ -345,15 +345,15 @@ func (p *scyllaConnPicker) maybeReplaceWithLessBusyConnection(c *Conn) *Conn { return c } alternative := p.leastBusyConn() - if alternative == nil || alternative.AvailableStreams() * 120 > c.AvailableStreams() * 100 { - return c - } else { + if alternative != nil && alternative.InUseStreams() * 100 < c.InUseStreams() * (100 - c.session.cfg.HeavyLoadedSwitchConnectionPercentage) { return alternative + } else { + return c } } func isHeavyLoaded(c *Conn) bool { - return c.streams.NumStreams / 2 > c.AvailableStreams(); + return c.InUseStreams() > c.session.cfg.HeavyLoadedConnectionThreshold } func (p *scyllaConnPicker) leastBusyConn() *Conn {