Skip to content

Commit 80e1137

Browse files
committed
[#591] Adding timeout param back. Removing headSize from framer.
1 parent b67e00c commit 80e1137

File tree

6 files changed

+31
-31
lines changed

6 files changed

+31
-31
lines changed

cassandra_test.go

Lines changed: 5 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -1751,7 +1751,7 @@ func TestQueryInfo(t *testing.T) {
17511751
defer session.Close()
17521752

17531753
conn := getRandomConn(t, session)
1754-
info, err := conn.prepareStatement(context.Background(), "SELECT release_version, host_id FROM system.local WHERE key = ?", nil, conn.currentKeyspace)
1754+
info, err := conn.prepareStatement(context.Background(), "SELECT release_version, host_id FROM system.local WHERE key = ?", nil, conn.currentKeyspace, time.Second)
17551755

17561756
if err != nil {
17571757
t.Fatalf("Failed to execute query for preparing statement: %v", err)
@@ -2305,7 +2305,7 @@ func TestRoutingKey(t *testing.T) {
23052305

23062306
initCacheSize := session.routingKeyInfoCache.lru.Len()
23072307

2308-
routingKeyInfo, err := session.routingKeyInfo(context.Background(), "SELECT * FROM test_single_routing_key WHERE second_id=? AND first_id=?", "")
2308+
routingKeyInfo, err := session.routingKeyInfo(context.Background(), "SELECT * FROM test_single_routing_key WHERE second_id=? AND first_id=?", "", time.Second)
23092309
if err != nil {
23102310
t.Fatalf("failed to get routing key info due to error: %v", err)
23112311
}
@@ -2333,7 +2333,7 @@ func TestRoutingKey(t *testing.T) {
23332333
context.Background(),
23342334
"SELECT * FROM test_single_routing_key WHERE second_id=? AND first_id=?",
23352335
// Routing info will be pulled from cached prepared statement, it should work with minimal timeout
2336-
"")
2336+
"", time.Nanosecond)
23372337
if err != nil {
23382338
t.Fatalf("failed to get routing key info due to error: %v", err)
23392339
}
@@ -2370,7 +2370,7 @@ func TestRoutingKey(t *testing.T) {
23702370
routingKeyInfo, err = session.routingKeyInfo(
23712371
context.Background(),
23722372
"SELECT * FROM test_composite_routing_key WHERE second_id=? AND first_id=?",
2373-
"")
2373+
"", time.Second)
23742374
if err != nil {
23752375
t.Fatalf("failed to get routing key info due to error: %v", err)
23762376
}
@@ -2478,7 +2478,7 @@ func TestNegativeStream(t *testing.T) {
24782478
return f.finish()
24792479
})
24802480

2481-
frame, err := conn.exec(context.Background(), writer, nil)
2481+
frame, err := conn.exec(context.Background(), writer, nil, time.Second)
24822482
if err == nil {
24832483
t.Fatalf("expected to get an error on stream %d", stream)
24842484
} else if frame != nil {

conn.go

Lines changed: 17 additions & 17 deletions
Original file line numberDiff line numberDiff line change
@@ -176,7 +176,7 @@ func (fn connErrorHandlerFn) HandleError(conn *Conn, err error, closed bool) {
176176

177177
type ConnInterface interface {
178178
Close()
179-
exec(ctx context.Context, req frameBuilder, tracer Tracer) (*framer, error)
179+
exec(ctx context.Context, req frameBuilder, tracer Tracer, requestTimeout time.Duration) (*framer, error)
180180
awaitSchemaAgreement(ctx context.Context) error
181181
executeQuery(ctx context.Context, qry *Query) *Iter
182182
querySystem(ctx context.Context, query string, values ...interface{}) *Iter
@@ -451,8 +451,8 @@ type startupCoordinator struct {
451451

452452
func (s *startupCoordinator) setupConn(ctx context.Context) error {
453453
var cancel context.CancelFunc
454-
if s.conn.r.GetTimeout() > 0 {
455-
ctx, cancel = context.WithTimeout(ctx, s.conn.r.GetTimeout())
454+
if s.conn.cfg.ConnectTimeout > 0 {
455+
ctx, cancel = context.WithTimeout(ctx, s.conn.cfg.ConnectTimeout)
456456
} else {
457457
ctx, cancel = context.WithCancel(ctx)
458458
}
@@ -510,7 +510,7 @@ func (s *startupCoordinator) write(ctx context.Context, frame frameBuilder, star
510510
return nil, ctx.Err()
511511
}
512512

513-
framer, err := s.conn.execInternal(ctx, frame, nil, startupCompleted.Load())
513+
framer, err := s.conn.execInternal(ctx, frame, nil, s.conn.cfg.ConnectTimeout, startupCompleted.Load())
514514
if err != nil {
515515
return nil, err
516516
}
@@ -755,7 +755,7 @@ func (c *Conn) heartBeat(ctx context.Context) {
755755
case <-timer.C:
756756
}
757757

758-
framer, err := c.exec(context.Background(), &writeOptionsFrame{}, nil)
758+
framer, err := c.exec(context.Background(), &writeOptionsFrame{}, nil, c.cfg.ConnectTimeout)
759759
if err != nil {
760760
failures++
761761
continue
@@ -796,7 +796,7 @@ func (c *Conn) processFrame(ctx context.Context, r io.Reader) error {
796796

797797
// read a full header, ignore timeouts, as this is being ran in a loop
798798
// TODO: TCP level deadlines? or just query level deadlines?
799-
if c.r.GetTimeout() > 0 {
799+
if c.readTimeout.Load() > 0 {
800800
c.r.SetReadDeadline(time.Time{})
801801
}
802802

@@ -1330,11 +1330,11 @@ func (c *Conn) addCall(call *callReq) error {
13301330
return nil
13311331
}
13321332

1333-
func (c *Conn) exec(ctx context.Context, req frameBuilder, tracer Tracer) (*framer, error) {
1334-
return c.execInternal(ctx, req, tracer, true)
1333+
func (c *Conn) exec(ctx context.Context, req frameBuilder, tracer Tracer, requestTimeout time.Duration) (*framer, error) {
1334+
return c.execInternal(ctx, req, tracer, requestTimeout, true)
13351335
}
13361336

1337-
func (c *Conn) execInternal(ctx context.Context, req frameBuilder, tracer Tracer, startupCompleted bool) (*framer, error) {
1337+
func (c *Conn) execInternal(ctx context.Context, req frameBuilder, tracer Tracer, requestTimeout time.Duration, startupCompleted bool) (*framer, error) {
13381338
if ctxErr := ctx.Err(); ctxErr != nil {
13391339
return nil, &QueryError{err: ctxErr, potentiallyExecuted: false}
13401340
}
@@ -1431,7 +1431,7 @@ func (c *Conn) execInternal(ctx context.Context, req frameBuilder, tracer Tracer
14311431
}
14321432

14331433
var timeoutCh <-chan time.Time
1434-
if timeout := c.r.GetTimeout(); timeout > 0 {
1434+
if timeout := requestTimeout; timeout > 0 {
14351435
if call.timer == nil {
14361436
call.timer = time.NewTimer(0)
14371437
<-call.timer.C
@@ -1551,7 +1551,7 @@ type inflightPrepare struct {
15511551
preparedStatment *preparedStatment
15521552
}
15531553

1554-
func (c *Conn) prepareStatement(ctx context.Context, stmt string, tracer Tracer, keyspace string) (*preparedStatment, error) {
1554+
func (c *Conn) prepareStatement(ctx context.Context, stmt string, tracer Tracer, keyspace string, requestTimeout time.Duration) (*preparedStatment, error) {
15551555
stmtCacheKey := c.session.stmtsLRU.keyFor(c.host.HostID(), keyspace, stmt)
15561556
flight, ok := c.session.stmtsLRU.execIfMissing(stmtCacheKey, func(lru *lru.Cache) *inflightPrepare {
15571557
flight := &inflightPrepare{
@@ -1575,7 +1575,7 @@ func (c *Conn) prepareStatement(ctx context.Context, stmt string, tracer Tracer,
15751575
// we won the race to do the load, if our context is canceled we shouldnt
15761576
// stop the load as other callers are waiting for it but this caller should get
15771577
// their context cancelled error.
1578-
framer, err := c.exec(c.ctx, prep, tracer)
1578+
framer, err := c.exec(c.ctx, prep, tracer, requestTimeout)
15791579
if err != nil {
15801580
flight.err = err
15811581
c.session.stmtsLRU.remove(stmtCacheKey)
@@ -1692,7 +1692,7 @@ func (c *Conn) executeQuery(ctx context.Context, qry *Query) (iter *Iter) {
16921692
if !qry.skipPrepare && qry.shouldPrepare() {
16931693
// Prepare all DML queries. Other queries can not be prepared.
16941694
var err error
1695-
info, err = c.prepareStatement(ctx, qry.stmt, qry.trace, usedKeyspace)
1695+
info, err = c.prepareStatement(ctx, qry.stmt, qry.trace, usedKeyspace, qry.GetRequestTimeout())
16961696
if err != nil {
16971697
return &Iter{err: err}
16981698
}
@@ -1752,7 +1752,7 @@ func (c *Conn) executeQuery(ctx context.Context, qry *Query) (iter *Iter) {
17521752
}
17531753
}
17541754

1755-
framer, err := c.exec(ctx, frame, qry.trace)
1755+
framer, err := c.exec(ctx, frame, qry.trace, qry.GetRequestTimeout())
17561756
if err != nil {
17571757
return &Iter{err: err}
17581758
}
@@ -1911,7 +1911,7 @@ func (c *Conn) UseKeyspace(keyspace string) error {
19111911
q := &writeQueryFrame{statement: `USE "` + keyspace + `"`}
19121912
q.params.consistency = c.session.cons
19131913

1914-
framer, err := c.exec(c.ctx, q, nil)
1914+
framer, err := c.exec(c.ctx, q, nil, c.cfg.ConnectTimeout)
19151915
if err != nil {
19161916
return err
19171917
}
@@ -1975,7 +1975,7 @@ func (c *Conn) executeBatch(ctx context.Context, batch *Batch) (iter *Iter) {
19751975
b := &req.statements[i]
19761976

19771977
if len(entry.Args) > 0 || entry.binding != nil {
1978-
info, err := c.prepareStatement(batch.Context(), entry.Stmt, batch.trace, usedKeyspace)
1978+
info, err := c.prepareStatement(batch.Context(), entry.Stmt, batch.trace, usedKeyspace, batch.GetRequestTimeout())
19791979
if err != nil {
19801980
return &Iter{err: err}
19811981
}
@@ -2028,7 +2028,7 @@ func (c *Conn) executeBatch(ctx context.Context, batch *Batch) (iter *Iter) {
20282028
batch.routingInfo.mu.Unlock()
20292029

20302030
// TODO: should batch support tracing?
2031-
framer, err := c.exec(batch.Context(), req, batch.trace)
2031+
framer, err := c.exec(batch.Context(), req, batch.trace, batch.GetRequestTimeout())
20322032
if err != nil {
20332033
return &Iter{err: err}
20342034
}

control.go

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -368,7 +368,7 @@ func (c *controlConn) registerEvents(conn *Conn) error {
368368
framer, err := conn.exec(context.Background(),
369369
&writeRegisterFrame{
370370
events: events,
371-
}, nil)
371+
}, nil, conn.cfg.ConnectTimeout)
372372
if err != nil {
373373
return err
374374
}
@@ -493,7 +493,7 @@ func (c *controlConn) writeFrame(w frameBuilder) (frame, error) {
493493
return nil, errNoControl
494494
}
495495

496-
framer, err := ch.conn.exec(context.Background(), w, nil)
496+
framer, err := ch.conn.exec(context.Background(), w, nil, c.session.cfg.MetadataSchemaRequestTimeout)
497497
if err != nil {
498498
return nil, err
499499
}

frame.go

Lines changed: 1 addition & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -376,7 +376,6 @@ type framer struct {
376376
// 0 after a read.
377377
readBuffer []byte
378378
buf []byte
379-
headSize int
380379
flagLWT int
381380
rateLimitingErrorCode int
382381
proto byte
@@ -761,7 +760,7 @@ func (f *framer) finish() error {
761760
}
762761

763762
// TODO: only compress frames which are big enough
764-
compressed, err := f.compres.AppendCompressedWithLength(nil, f.buf[f.headSize:])
763+
compressed, err := f.compres.AppendCompressedWithLength(nil, f.buf[headSize:])
765764
if err != nil {
766765
return err
767766
}

ring_describer_test.go

Lines changed: 2 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -8,6 +8,7 @@ import (
88
"fmt"
99
"net"
1010
"testing"
11+
"time"
1112

1213
"github.com/gocql/gocql/internal/tests"
1314

@@ -93,7 +94,7 @@ func TestGetClusterPeerInfoZeroToken(t *testing.T) {
9394
type mockConnection struct{}
9495

9596
func (*mockConnection) Close() {}
96-
func (*mockConnection) exec(ctx context.Context, req frameBuilder, tracer Tracer) (*framer, error) {
97+
func (*mockConnection) exec(ctx context.Context, req frameBuilder, tracer Tracer, requestTimeout time.Duration) (*framer, error) {
9798
return nil, nil
9899
}
99100
func (*mockConnection) awaitSchemaAgreement(ctx context.Context) error { return nil }

session.go

Lines changed: 4 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -712,7 +712,7 @@ func (s *Session) findTabletReplicasForToken(keyspace, table string, token int64
712712

713713
// Returns routing key indexes and type info.
714714
// If keyspace == "" it uses the keyspace which is specified in Cluster.Keyspace
715-
func (s *Session) routingKeyInfo(ctx context.Context, stmt string, keyspace string) (*routingKeyInfo, error) {
715+
func (s *Session) routingKeyInfo(ctx context.Context, stmt string, keyspace string, requestTimeout time.Duration) (*routingKeyInfo, error) {
716716
if keyspace == "" {
717717
keyspace = s.cfg.Keyspace
718718
}
@@ -763,7 +763,7 @@ func (s *Session) routingKeyInfo(ctx context.Context, stmt string, keyspace stri
763763
}
764764

765765
// get the query info for the statement
766-
info, inflight.err = conn.prepareStatement(ctx, stmt, nil, keyspace)
766+
info, inflight.err = conn.prepareStatement(ctx, stmt, nil, keyspace, requestTimeout)
767767
if inflight.err != nil {
768768
// don't cache this error
769769
s.routingKeyInfoCache.Remove(stmt)
@@ -1372,7 +1372,7 @@ func (q *Query) GetRoutingKey() ([]byte, error) {
13721372
}
13731373

13741374
// try to determine the routing key
1375-
routingKeyInfo, err := q.session.routingKeyInfo(q.Context(), q.stmt, q.keyspace)
1375+
routingKeyInfo, err := q.session.routingKeyInfo(q.Context(), q.stmt, q.keyspace, q.requestTimeout)
13761376
if err != nil {
13771377
return nil, err
13781378
}
@@ -2298,7 +2298,7 @@ func (b *Batch) GetRoutingKey() ([]byte, error) {
22982298
return nil, nil
22992299
}
23002300
// try to determine the routing key
2301-
routingKeyInfo, err := b.session.routingKeyInfo(b.Context(), entry.Stmt, b.keyspace)
2301+
routingKeyInfo, err := b.session.routingKeyInfo(b.Context(), entry.Stmt, b.keyspace, b.GetRequestTimeout())
23022302
if err != nil {
23032303
return nil, err
23042304
}

0 commit comments

Comments
 (0)