Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions go.mod
Original file line number Diff line number Diff line change
Expand Up @@ -35,6 +35,7 @@ require (
github.com/soheilhy/cmux v0.1.4
github.com/spf13/cobra v0.0.3
github.com/spf13/pflag v1.0.1
github.com/stretchr/testify v1.3.0 // indirect
github.com/tmc/grpc-websocket-proxy v0.0.0-20200427203606-3cfed13b9966
github.com/urfave/cli v1.20.0
github.com/xiang90/probing v0.0.0-20190116061207-43a291ad63a2
Expand Down
7 changes: 6 additions & 1 deletion integration/cluster.go
Original file line number Diff line number Diff line change
Expand Up @@ -154,6 +154,7 @@ type ClusterConfig struct {
LeaseCheckpointInterval time.Duration

WatchProgressNotifyInterval time.Duration
CorruptCheckTime time.Duration
}

type cluster struct {
Expand Down Expand Up @@ -299,6 +300,7 @@ func (c *cluster) mustNewMember(t testing.TB) *member {
enableLeaseCheckpoint: c.cfg.EnableLeaseCheckpoint,
leaseCheckpointInterval: c.cfg.LeaseCheckpointInterval,
WatchProgressNotifyInterval: c.cfg.WatchProgressNotifyInterval,
CorruptCheckTime: c.cfg.CorruptCheckTime,
})
m.DiscoveryURL = c.cfg.DiscoveryURL
if c.cfg.UseGRPC {
Expand Down Expand Up @@ -589,6 +591,7 @@ type memberConfig struct {
enableLeaseCheckpoint bool
leaseCheckpointInterval time.Duration
WatchProgressNotifyInterval time.Duration
CorruptCheckTime time.Duration
}

// mustNewMember return an inited member with the given name. If peerTLS is
Expand Down Expand Up @@ -685,7 +688,9 @@ func mustNewMember(t testing.TB, mcfg memberConfig) *member {
m.WatchProgressNotifyInterval = mcfg.WatchProgressNotifyInterval

m.InitialCorruptCheck = true

if mcfg.CorruptCheckTime > time.Duration(0) {
m.CorruptCheckTime = mcfg.CorruptCheckTime
}
lcfg := logutil.DefaultZapLoggerConfig
m.LoggerConfig = &lcfg
m.LoggerConfig.OutputPaths = []string{"/dev/null"}
Expand Down
122 changes: 122 additions & 0 deletions integration/v3_alarm_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -16,6 +16,7 @@ package integration

import (
"context"
"encoding/binary"
"os"
"path/filepath"
"sync"
Expand All @@ -24,10 +25,12 @@ import (

"go.etcd.io/etcd/etcdserver/api/v3rpc/rpctypes"
pb "go.etcd.io/etcd/etcdserver/etcdserverpb"
"go.etcd.io/etcd/lease/leasepb"
"go.etcd.io/etcd/mvcc"
"go.etcd.io/etcd/mvcc/backend"
"go.etcd.io/etcd/pkg/testutil"
"go.etcd.io/etcd/pkg/traceutil"
"go.uber.org/zap/zaptest"

"go.uber.org/zap"
)
Expand Down Expand Up @@ -232,3 +235,122 @@ func TestV3CorruptAlarm(t *testing.T) {
}
t.Fatalf("expected error %v after %s", rpctypes.ErrCorrupt, 5*time.Second)
}

var leaseBucketName = []byte("lease")

func TestV3CorruptAlarmWithLeaseCorrupted(t *testing.T) {
defer testutil.AfterTest(t)
clus := NewClusterV3(t, &ClusterConfig{
CorruptCheckTime: time.Second,
Size: 3,
SnapshotCount: 10,
SnapshotCatchUpEntries: 5,
})
defer clus.Terminate(t)

ctx, cancel := context.WithCancel(context.Background())
defer cancel()

lresp, err := toGRPC(clus.RandClient()).Lease.LeaseGrant(ctx, &pb.LeaseGrantRequest{ID: 1, TTL: 60})
if err != nil {
t.Errorf("could not create lease 1 (%v)", err)
}
if lresp.ID != 1 {
t.Errorf("got id %v, wanted id %v", lresp.ID, 1)
}

putr := &pb.PutRequest{Key: []byte("foo"), Value: []byte("bar"), Lease: lresp.ID}
// Trigger snapshot from the leader to new member
for i := 0; i < 15; i++ {
_, err := toGRPC(clus.RandClient()).KV.Put(ctx, putr)
if err != nil {
t.Errorf("#%d: couldn't put key (%v)", i, err)
}
}

clus.RemoveMember(t, uint64(clus.Members[2].ID()))
oldMemberClient := clus.Client(2)
if err := oldMemberClient.Close(); err != nil {
t.Fatal(err)
}

clus.AddMember(t)
// Wait for new member to catch up
clus.Members[2].WaitStarted(t)
newMemberClient, err := NewClientV3(clus.Members[2])
if err != nil {
t.Fatal(err)
}
WaitClientV3(t, newMemberClient)
clus.clients[2] = newMemberClient

// Corrupt member 2 by modifying backend lease bucket offline.
clus.Members[2].Stop(t)
fp := filepath.Join(clus.Members[2].DataDir, "member", "snap", "db")
bcfg := backend.DefaultBackendConfig()
bcfg.Path = fp
bcfg.Logger = zaptest.NewLogger(t)
be := backend.New(bcfg)

tx := be.BatchTx()
tx.UnsafeDelete(leaseBucketName, leaseIdToBytes(1))
lpb := leasepb.Lease{ID: int64(2), TTL: 60}
mustUnsafePutLease(tx, &lpb)
tx.Commit()

if err := be.Close(); err != nil {
t.Fatal(err)
}

if err := clus.Members[2].Restart(t); err != nil {
t.Fatal(err)
}

clus.Members[1].WaitOK(t)
clus.Members[2].WaitOK(t)

// Revoke lease should remove key except the member with corruption
_, err = toGRPC(clus.Client(0)).Lease.LeaseRevoke(ctx, &pb.LeaseRevokeRequest{ID: lresp.ID})
if err != nil {
t.Fatal(err)
}
resp0, err0 := clus.Client(1).KV.Get(context.TODO(), "foo")
if err0 != nil {
t.Fatal(err0)
}
resp1, err1 := clus.Client(2).KV.Get(context.TODO(), "foo")
if err1 != nil {
t.Fatal(err1)
}

if resp0.Header.Revision == resp1.Header.Revision {
t.Fatalf("matching Revision values")
}

// Wait for CorruptCheckTime
time.Sleep(time.Second)
presp, perr := clus.Client(0).Put(context.TODO(), "abc", "aaa")
if perr != nil {
if !eqErrGRPC(perr, rpctypes.ErrCorrupt) {
t.Fatalf("expected %v, got %+v (%v)", rpctypes.ErrCorrupt, presp, perr)
} else {
return
}
}
}

func leaseIdToBytes(n int64) []byte {
bytes := make([]byte, 8)
binary.BigEndian.PutUint64(bytes, uint64(n))
return bytes
}

func mustUnsafePutLease(tx backend.BatchTx, lpb *leasepb.Lease) {
key := leaseIdToBytes(lpb.ID)

val, err := lpb.Marshal()
if err != nil {
panic("failed to marshal lease proto item")
}
tx.UnsafePut(leaseBucketName, key, val)
}
86 changes: 86 additions & 0 deletions integration/v3_lease_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,7 @@ package integration
import (
"context"
"fmt"
"math"
"testing"
"time"

Expand Down Expand Up @@ -139,6 +140,91 @@ func TestV3LeaseGrantByID(t *testing.T) {
}
}

// TestV3LeaseNegativeID ensures restarted member lessor can recover negative leaseID from backend.
//
// When the negative leaseID is used for lease revoke, all etcd nodes will remove the lease
// and delete associated keys to ensure kv store data consistency
//
// It ensures issue 12535 is fixed by PR 13676
func TestV3LeaseNegativeID(t *testing.T) {
tcs := []struct {
leaseID int64
k []byte
v []byte
}{
{
leaseID: -1, // int64 -1 is 2^64 -1 in uint64
k: []byte("foo"),
v: []byte("bar"),
},
{
leaseID: math.MaxInt64,
k: []byte("bar"),
v: []byte("foo"),
},
{
leaseID: math.MinInt64,
k: []byte("hello"),
v: []byte("world"),
},
}
for _, tc := range tcs {
t.Run(fmt.Sprintf("test with lease ID %16x", tc.leaseID), func(t *testing.T) {
defer testutil.AfterTest(t)
clus := NewClusterV3(t, &ClusterConfig{Size: 3})
defer clus.Terminate(t)

ctx, cancel := context.WithCancel(context.Background())
defer cancel()
cc := clus.RandClient()
lresp, err := toGRPC(cc).Lease.LeaseGrant(ctx, &pb.LeaseGrantRequest{ID: tc.leaseID, TTL: 300})
if err != nil {
t.Errorf("could not create lease %d (%v)", tc.leaseID, err)
}
if lresp.ID != tc.leaseID {
t.Errorf("got id %v, wanted id %v", lresp.ID, tc.leaseID)
}
putr := &pb.PutRequest{Key: tc.k, Value: tc.v, Lease: tc.leaseID}
_, err = toGRPC(cc).KV.Put(ctx, putr)
if err != nil {
t.Errorf("couldn't put key (%v)", err)
}

// wait for backend Commit
time.Sleep(100 * time.Millisecond)
// restore lessor from db file
clus.Members[2].Stop(t)
if err := clus.Members[2].Restart(t); err != nil {
t.Fatal(err)
}

// revoke lease should remove key
WaitClientV3(t, clus.Client(2))
_, err = toGRPC(clus.RandClient()).Lease.LeaseRevoke(ctx, &pb.LeaseRevokeRequest{ID: tc.leaseID})
if err != nil {
t.Errorf("could not revoke lease %d (%v)", tc.leaseID, err)
}
var revision int64
for i := range clus.Members {
getr := &pb.RangeRequest{Key: tc.k}
getresp, err := toGRPC(clus.Client(i)).KV.Range(ctx, getr)
if err != nil {
t.Fatal(err)
}
if revision == 0 {
revision = getresp.Header.Revision
}
if revision != getresp.Header.Revision {
t.Errorf("expect revision %d, but got %d", revision, getresp.Header.Revision)
}
if len(getresp.Kvs) != 0 {
t.Errorf("lease removed but key remains")
}
}
})
}
}

// TestV3LeaseExpire ensures a key is deleted once a key expires.
func TestV3LeaseExpire(t *testing.T) {
defer testutil.AfterTest(t)
Expand Down
39 changes: 29 additions & 10 deletions lease/lessor.go
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,7 @@ import (
"context"
"encoding/binary"
"errors"
"fmt"
"math"
"sort"
"sync"
Expand Down Expand Up @@ -771,15 +772,10 @@ func (le *lessor) initAndRecover() {
tx.Lock()

tx.UnsafeCreateBucket(leaseBucketName)
_, vs := tx.UnsafeRange(leaseBucketName, int64ToBytes(0), int64ToBytes(math.MaxInt64), 0)
// TODO: copy vs and do decoding outside tx lock if lock contention becomes an issue.
for i := range vs {
var lpb leasepb.Lease
err := lpb.Unmarshal(vs[i])
if err != nil {
tx.Unlock()
panic("failed to unmarshal lease proto item")
}
lpbs := unsafeGetAllLeases(tx)
tx.Unlock()

for _, lpb := range lpbs {
ID := LeaseID(lpb.ID)
if lpb.TTL < le.minLeaseTTL {
lpb.TTL = le.minLeaseTTL
Expand All @@ -796,7 +792,6 @@ func (le *lessor) initAndRecover() {
}
le.leaseExpiredNotifier.Init()
heap.Init(&le.leaseCheckpointHeap)
tx.Unlock()

le.b.ForceCommit()
}
Expand Down Expand Up @@ -894,6 +889,30 @@ func int64ToBytes(n int64) []byte {
return bytes
}

func bytesToLeaseID(bytes []byte) int64 {
if len(bytes) != 8 {
panic(fmt.Errorf("lease ID must be 8-byte"))
}
return int64(binary.BigEndian.Uint64(bytes))
}

func unsafeGetAllLeases(tx backend.ReadTx) []*leasepb.Lease {
ls := make([]*leasepb.Lease, 0)
err := tx.UnsafeForEach(leaseBucketName, func(k, v []byte) error {
var lpb leasepb.Lease
err := lpb.Unmarshal(v)
if err != nil {
return fmt.Errorf("failed to Unmarshal lease proto item; lease ID=%016x", bytesToLeaseID(k))
}
ls = append(ls, &lpb)
return nil
})
if err != nil {
panic(err)
}
return ls
}

// FakeLessor is a fake implementation of Lessor interface.
// Used for testing only.
type FakeLessor struct{}
Expand Down
Loading