Skip to content

Commit a2cb754

Browse files
committed
[#591] Extra changes following merge. Moving CRC functionality to internal package. Integration tests config changes. Reverting unwanted merge changes.
Detailed summary: - Moving CRC functionality to internal package. - Update test cluster hosts to match CCM-created cluster - Update flagCluster default from 127.0.0.1 to 127.0.2.1,127.0.2.2,127.0.2.3 to align with the 3-node cluster configuration created by CCM. Fixes connection refused errors in integration tests. - Adding validation to `ReadTimeout` value in `ClusterConfig` - Removing `headSize` from `framer`. - Adding `.vscode` to `.gitignore`.
1 parent f997313 commit a2cb754

14 files changed

Lines changed: 153 additions & 185 deletions

.gitignore

Lines changed: 3 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -2,6 +2,9 @@ gocql-fuzz
22
fuzz-corpus
33
fuzz-work
44
gocql.test
5+
6+
# IDE specific files
7+
.vscode/
58
.idea
69

710
bin/

batch_test.go

Lines changed: 2 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -28,9 +28,10 @@
2828
package gocql
2929

3030
import (
31-
"github.com/stretchr/testify/require"
3231
"testing"
3332
"time"
33+
34+
"github.com/stretchr/testify/require"
3435
)
3536

3637
func TestBatch_Errors(t *testing.T) {

cassandra_test.go

Lines changed: 5 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -1761,7 +1761,7 @@ func TestQueryInfo(t *testing.T) {
17611761
defer session.Close()
17621762

17631763
conn := getRandomConn(t, session)
1764-
info, err := conn.prepareStatement(context.Background(), "SELECT release_version, host_id FROM system.local WHERE key = ?", nil, conn.currentKeyspace)
1764+
info, err := conn.prepareStatement(context.Background(), "SELECT release_version, host_id FROM system.local WHERE key = ?", nil, conn.currentKeyspace, time.Second)
17651765

17661766
if err != nil {
17671767
t.Fatalf("Failed to execute query for preparing statement: %v", err)
@@ -2315,7 +2315,7 @@ func TestRoutingKey(t *testing.T) {
23152315

23162316
initCacheSize := session.routingKeyInfoCache.lru.Len()
23172317

2318-
routingKeyInfo, err := session.routingKeyInfo(context.Background(), "SELECT * FROM test_single_routing_key WHERE second_id=? AND first_id=?", "")
2318+
routingKeyInfo, err := session.routingKeyInfo(context.Background(), "SELECT * FROM test_single_routing_key WHERE second_id=? AND first_id=?", "", time.Second)
23192319
if err != nil {
23202320
t.Fatalf("failed to get routing key info due to error: %v", err)
23212321
}
@@ -2343,7 +2343,7 @@ func TestRoutingKey(t *testing.T) {
23432343
context.Background(),
23442344
"SELECT * FROM test_single_routing_key WHERE second_id=? AND first_id=?",
23452345
// Routing info will be pulled from cached prepared statement, it should work with minimal timeout
2346-
"")
2346+
"", time.Nanosecond)
23472347
if err != nil {
23482348
t.Fatalf("failed to get routing key info due to error: %v", err)
23492349
}
@@ -2380,7 +2380,7 @@ func TestRoutingKey(t *testing.T) {
23802380
routingKeyInfo, err = session.routingKeyInfo(
23812381
context.Background(),
23822382
"SELECT * FROM test_composite_routing_key WHERE second_id=? AND first_id=?",
2383-
"")
2383+
"", time.Second)
23842384
if err != nil {
23852385
t.Fatalf("failed to get routing key info due to error: %v", err)
23862386
}
@@ -2488,7 +2488,7 @@ func TestNegativeStream(t *testing.T) {
24882488
return f.finish()
24892489
})
24902490

2491-
frame, err := conn.exec(context.Background(), writer, nil)
2491+
frame, err := conn.exec(context.Background(), writer, nil, time.Second)
24922492
if err == nil {
24932493
t.Fatalf("expected to get an error on stream %d", stream)
24942494
} else if frame != nil {

cluster.go

Lines changed: 4 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -513,6 +513,10 @@ func (cfg *ClusterConfig) Validate() error {
513513
return errors.New("ConnectTimeout should be positive time.Duration or zero")
514514
}
515515

516+
if cfg.ReadTimeout < 0 {
517+
return errors.New("ReadTimeout should be positive time.Duration or zero")
518+
}
519+
516520
if cfg.MetadataSchemaRequestTimeout < 0 {
517521
return errors.New("MetadataSchemaRequestTimeout should be positive time.Duration or zero")
518522
}

common_test.go

Lines changed: 6 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -34,10 +34,12 @@ import (
3434
"sync"
3535
"testing"
3636
"time"
37+
38+
"github.com/gocql/gocql/lz4"
3739
)
3840

3941
var (
40-
flagCluster = flag.String("cluster", "127.0.0.1", "a comma-separated list of host:port tuples")
42+
flagCluster = flag.String("cluster", "127.0.2.1,127.0.2.2,127.0.2.3", "a comma-separated list of host:port tuples")
4143
flagProto = flag.Int("proto", 0, "protcol version")
4244
flagCQL = flag.String("cql", "3.0.0", "CQL version")
4345
flagRF = flag.Int("rf", 1, "replication factor for test keyspace")
@@ -223,7 +225,9 @@ func createCluster(opts ...func(*ClusterConfig)) *ClusterConfig {
223225
switch *flagCompressTest {
224226
case "snappy":
225227
cluster.Compressor = &SnappyCompressor{}
226-
case "":
228+
case "lz4":
229+
cluster.Compressor = &lz4.LZ4Compressor{}
230+
case "no-compression":
227231
default:
228232
panic("invalid compressor: " + *flagCompressTest)
229233
}

conn.go

Lines changed: 30 additions & 61 deletions
Original file line numberDiff line numberDiff line change
@@ -176,7 +176,7 @@ func (fn connErrorHandlerFn) HandleError(conn *Conn, err error, closed bool) {
176176

177177
type ConnInterface interface {
178178
Close()
179-
exec(ctx context.Context, req frameBuilder, tracer Tracer) (*framer, error)
179+
exec(ctx context.Context, req frameBuilder, tracer Tracer, requestTimeout time.Duration) (*framer, error)
180180
awaitSchemaAgreement(ctx context.Context) error
181181
executeQuery(ctx context.Context, qry *Query) *Iter
182182
querySystem(ctx context.Context, query string, values ...interface{}) *Iter
@@ -198,7 +198,6 @@ type Conn struct {
198198
ctx context.Context
199199
errorHandler ConnErrorHandler
200200
compressor Compressor
201-
conn net.Conn
202201
supported map[string][]string
203202
streams *streams.IDGenerator
204203
host *HostInfo
@@ -216,7 +215,6 @@ type Conn struct {
216215
scyllaSupported scyllaSupported
217216
systemRequestTimeout time.Duration
218217
timeouts int64
219-
readTimeout atomic.Int64
220218
writeTimeout atomic.Int64
221219
mu sync.Mutex
222220
tabletsRoutingV1 int32
@@ -252,9 +250,9 @@ func (c *Conn) finalizeConnection() {
252250
// It is done to make sure that connection is easy to establish when users set very low `WriteTimeout` and/or `Timeout`
253251
// This method sets timeouts to `operational` values after connection successfully created
254252
c.writeTimeout.Store(int64(c.cfg.WriteTimeout))
255-
c.readTimeout.Store(int64(c.cfg.ReadTimeout))
256253
c.setSystemRequestTimeout(c.session.cfg.MetadataSchemaRequestTimeout)
257254
c.w.setWriteTimeout(c.cfg.WriteTimeout)
255+
c.r.SetTimeout(c.cfg.ReadTimeout)
258256
}
259257

260258
func (c *Conn) getScyllaSupported() scyllaSupported {
@@ -372,7 +370,7 @@ func (s *Session) streamIDGenerator() *streams.IDGenerator {
372370
}
373371

374372
func (c *Conn) init(ctx context.Context, dialedHost *DialedHost) error {
375-
c.readTimeout.Store(int64(c.cfg.ConnectTimeout))
373+
c.r.SetTimeout(c.cfg.ConnectTimeout)
376374
c.writeTimeout.Store(int64(c.cfg.ConnectTimeout))
377375
c.w.setWriteTimeout(c.cfg.ConnectTimeout)
378376

@@ -391,16 +389,13 @@ func (c *Conn) init(ctx context.Context, dialedHost *DialedHost) error {
391389
conn: c,
392390
}
393391

394-
c.r.SetTimeout(c.cfg.ConnectTimeout)
395392
if err := startup.setupConn(ctx); err != nil {
396393
return err
397394
}
398395

399-
c.r.SetTimeout(c.cfg.ReadTimeout)
400-
401396
// dont coalesce startup frames
402397
if c.session.cfg.WriteCoalesceWaitTime > 0 && !c.cfg.disableCoalesce && !dialedHost.DisableCoalesce {
403-
c.w = newWriteCoalescer(dialedHost.Conn, time.Duration(c.writeTimeout.Load()), c.session.cfg.WriteCoalesceWaitTime, ctx.Done())
398+
c.w = newWriteCoalescer(dialedHost.Conn, c.cfg.ConnectTimeout, c.session.cfg.WriteCoalesceWaitTime, ctx.Done())
404399
}
405400

406401
if c.isScyllaConn() { // ScyllaDB does not support system.peers_v2
@@ -417,42 +412,15 @@ func (c *Conn) Write(p []byte) (n int, err error) {
417412
return c.w.writeContext(context.Background(), p)
418413
}
419414

420-
func (c *Conn) Read(p []byte) (n int, err error) {
421-
const maxAttempts = 5
422-
timeout := c.readTimeout.Load()
423-
424-
for i := 0; i < maxAttempts; i++ {
425-
var nn int
426-
if timeout > 0 {
427-
err = c.conn.SetReadDeadline(time.Now().Add(time.Duration(timeout)))
428-
if err != nil {
429-
return 0, err
430-
}
431-
}
432-
433-
nn, err = io.ReadFull(c.r, p[n:])
434-
n += nn
435-
if err == nil {
436-
break
437-
}
438-
439-
if verr, ok := err.(net.Error); !ok || !verr.Temporary() {
440-
break
441-
}
442-
}
443-
444-
return
445-
}
446-
447415
type startupCoordinator struct {
448416
conn *Conn
449417
frameTicker chan struct{}
450418
}
451419

452420
func (s *startupCoordinator) setupConn(ctx context.Context) error {
453421
var cancel context.CancelFunc
454-
if s.conn.r.GetTimeout() > 0 {
455-
ctx, cancel = context.WithTimeout(ctx, s.conn.r.GetTimeout())
422+
if s.conn.cfg.ConnectTimeout > 0 {
423+
ctx, cancel = context.WithTimeout(ctx, s.conn.cfg.ConnectTimeout)
456424
} else {
457425
ctx, cancel = context.WithCancel(ctx)
458426
}
@@ -510,7 +478,7 @@ func (s *startupCoordinator) write(ctx context.Context, frame frameBuilder, star
510478
return nil, ctx.Err()
511479
}
512480

513-
framer, err := s.conn.execInternal(ctx, frame, nil, startupCompleted.Load())
481+
framer, err := s.conn.execInternal(ctx, frame, nil, s.conn.cfg.ConnectTimeout, startupCompleted.Load())
514482
if err != nil {
515483
return nil, err
516484
}
@@ -696,7 +664,7 @@ func (c *Conn) setTabletSupported(val bool) {
696664
}
697665

698666
func (c *Conn) close() error {
699-
return c.conn.Close()
667+
return c.r.Close()
700668
}
701669

702670
func (c *Conn) Close() {
@@ -755,7 +723,7 @@ func (c *Conn) heartBeat(ctx context.Context) {
755723
case <-timer.C:
756724
}
757725

758-
framer, err := c.exec(context.Background(), &writeOptionsFrame{}, nil)
726+
framer, err := c.exec(context.Background(), &writeOptionsFrame{}, nil, c.cfg.ConnectTimeout)
759727
if err != nil {
760728
failures++
761729
continue
@@ -827,7 +795,7 @@ func (c *Conn) processFrame(ctx context.Context, r io.Reader) error {
827795
// TODO: handle cassandra event frames, we shouldnt get any currently
828796
framer := newFramerWithExts(c.compressor, c.version, c.cqlProtoExts, c.logger)
829797
c.setTabletSupported(framer.tabletsRoutingV1)
830-
if err := framer.readFrame(c, &head); err != nil {
798+
if err := framer.readFrame(r, &head); err != nil {
831799
return err
832800
}
833801
go c.session.handleEvent(framer)
@@ -837,7 +805,7 @@ func (c *Conn) processFrame(ctx context.Context, r io.Reader) error {
837805
// or a bug in Cassandra, this should be an error, parse it and return.
838806
framer := newFramerWithExts(c.compressor, c.version, c.cqlProtoExts, c.logger)
839807
c.setTabletSupported(framer.tabletsRoutingV1)
840-
if err := framer.readFrame(c, &head); err != nil {
808+
if err := framer.readFrame(r, &head); err != nil {
841809
return err
842810
}
843811

@@ -1015,16 +983,17 @@ type ConnReader interface {
1015983
type connReader struct {
1016984
conn net.Conn
1017985
r *bufio.Reader
1018-
timeout time.Duration
986+
timeout atomic.Int64
1019987
}
1020988

1021989
func (c *connReader) Read(p []byte) (n int, err error) {
1022990
const maxAttempts = 5
991+
timeout := c.GetTimeout()
1023992

1024993
for i := 0; i < maxAttempts; i++ {
1025994
var nn int
1026-
if c.timeout > 0 {
1027-
c.conn.SetReadDeadline(time.Now().Add(c.timeout))
995+
if timeout > 0 {
996+
c.conn.SetReadDeadline(time.Now().Add(timeout))
1028997
}
1029998

1030999
nn, err = io.ReadFull(c.r, p[n:])
@@ -1070,11 +1039,11 @@ func (c *connReader) SetWriteDeadline(t time.Time) error {
10701039
}
10711040

10721041
func (c *connReader) SetTimeout(timeout time.Duration) {
1073-
c.timeout = timeout
1042+
c.timeout.Store(int64(timeout))
10741043
}
10751044

10761045
func (c *connReader) GetTimeout() time.Duration {
1077-
return c.timeout
1046+
return time.Duration(c.timeout.Load())
10781047
}
10791048

10801049
type callReq struct {
@@ -1330,11 +1299,11 @@ func (c *Conn) addCall(call *callReq) error {
13301299
return nil
13311300
}
13321301

1333-
func (c *Conn) exec(ctx context.Context, req frameBuilder, tracer Tracer) (*framer, error) {
1334-
return c.execInternal(ctx, req, tracer, true)
1302+
func (c *Conn) exec(ctx context.Context, req frameBuilder, tracer Tracer, requestTimeout time.Duration) (*framer, error) {
1303+
return c.execInternal(ctx, req, tracer, requestTimeout, true)
13351304
}
13361305

1337-
func (c *Conn) execInternal(ctx context.Context, req frameBuilder, tracer Tracer, startupCompleted bool) (*framer, error) {
1306+
func (c *Conn) execInternal(ctx context.Context, req frameBuilder, tracer Tracer, requestTimeout time.Duration, startupCompleted bool) (*framer, error) {
13381307
if ctxErr := ctx.Err(); ctxErr != nil {
13391308
return nil, &QueryError{err: ctxErr, potentiallyExecuted: false}
13401309
}
@@ -1431,7 +1400,7 @@ func (c *Conn) execInternal(ctx context.Context, req frameBuilder, tracer Tracer
14311400
}
14321401

14331402
var timeoutCh <-chan time.Time
1434-
if timeout := c.r.GetTimeout(); timeout > 0 {
1403+
if requestTimeout > 0 {
14351404
if call.timer == nil {
14361405
call.timer = time.NewTimer(0)
14371406
<-call.timer.C
@@ -1444,7 +1413,7 @@ func (c *Conn) execInternal(ctx context.Context, req frameBuilder, tracer Tracer
14441413
}
14451414
}
14461415

1447-
call.timer.Reset(timeout)
1416+
call.timer.Reset(requestTimeout)
14481417
timeoutCh = call.timer.C
14491418
}
14501419

@@ -1538,10 +1507,10 @@ type StreamObserverContext interface {
15381507
}
15391508

15401509
type preparedStatment struct {
1510+
response resultMetadata
15411511
id []byte
15421512
resultMetadataID []byte
15431513
request preparedMetadata
1544-
response resultMetadata
15451514
}
15461515

15471516
type inflightPrepare struct {
@@ -1551,7 +1520,7 @@ type inflightPrepare struct {
15511520
preparedStatment *preparedStatment
15521521
}
15531522

1554-
func (c *Conn) prepareStatement(ctx context.Context, stmt string, tracer Tracer, keyspace string) (*preparedStatment, error) {
1523+
func (c *Conn) prepareStatement(ctx context.Context, stmt string, tracer Tracer, keyspace string, requestTimeout time.Duration) (*preparedStatment, error) {
15551524
stmtCacheKey := c.session.stmtsLRU.keyFor(c.host.HostID(), keyspace, stmt)
15561525
flight, ok := c.session.stmtsLRU.execIfMissing(stmtCacheKey, func(lru *lru.Cache) *inflightPrepare {
15571526
flight := &inflightPrepare{
@@ -1575,7 +1544,7 @@ func (c *Conn) prepareStatement(ctx context.Context, stmt string, tracer Tracer,
15751544
// we won the race to do the load, if our context is canceled we shouldnt
15761545
// stop the load as other callers are waiting for it but this caller should get
15771546
// their context cancelled error.
1578-
framer, err := c.exec(c.ctx, prep, tracer)
1547+
framer, err := c.exec(c.ctx, prep, tracer, requestTimeout)
15791548
if err != nil {
15801549
flight.err = err
15811550
c.session.stmtsLRU.remove(stmtCacheKey)
@@ -1692,7 +1661,7 @@ func (c *Conn) executeQuery(ctx context.Context, qry *Query) (iter *Iter) {
16921661
if !qry.skipPrepare && qry.shouldPrepare() {
16931662
// Prepare all DML queries. Other queries can not be prepared.
16941663
var err error
1695-
info, err = c.prepareStatement(ctx, qry.stmt, qry.trace, usedKeyspace)
1664+
info, err = c.prepareStatement(ctx, qry.stmt, qry.trace, usedKeyspace, qry.GetRequestTimeout())
16961665
if err != nil {
16971666
return &Iter{err: err}
16981667
}
@@ -1752,7 +1721,7 @@ func (c *Conn) executeQuery(ctx context.Context, qry *Query) (iter *Iter) {
17521721
}
17531722
}
17541723

1755-
framer, err := c.exec(ctx, frame, qry.trace)
1724+
framer, err := c.exec(ctx, frame, qry.trace, qry.GetRequestTimeout())
17561725
if err != nil {
17571726
return &Iter{err: err}
17581727
}
@@ -1911,7 +1880,7 @@ func (c *Conn) UseKeyspace(keyspace string) error {
19111880
q := &writeQueryFrame{statement: `USE "` + keyspace + `"`}
19121881
q.params.consistency = c.session.cons
19131882

1914-
framer, err := c.exec(c.ctx, q, nil)
1883+
framer, err := c.exec(c.ctx, q, nil, c.cfg.ConnectTimeout)
19151884
if err != nil {
19161885
return err
19171886
}
@@ -1975,7 +1944,7 @@ func (c *Conn) executeBatch(ctx context.Context, batch *Batch) (iter *Iter) {
19751944
b := &req.statements[i]
19761945

19771946
if len(entry.Args) > 0 || entry.binding != nil {
1978-
info, err := c.prepareStatement(batch.Context(), entry.Stmt, batch.trace, usedKeyspace)
1947+
info, err := c.prepareStatement(batch.Context(), entry.Stmt, batch.trace, usedKeyspace, batch.GetRequestTimeout())
19791948
if err != nil {
19801949
return &Iter{err: err}
19811950
}
@@ -2028,7 +1997,7 @@ func (c *Conn) executeBatch(ctx context.Context, batch *Batch) (iter *Iter) {
20281997
batch.routingInfo.mu.Unlock()
20291998

20301999
// TODO: should batch support tracing?
2031-
framer, err := c.exec(batch.Context(), req, batch.trace)
2000+
framer, err := c.exec(batch.Context(), req, batch.trace, batch.GetRequestTimeout())
20322001
if err != nil {
20332002
return &Iter{err: err}
20342003
}

0 commit comments

Comments
 (0)