Skip to content
Closed
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
20 changes: 20 additions & 0 deletions pkg/meta/tkv_tikv.go
Original file line number Diff line number Diff line change
Expand Up @@ -31,6 +31,7 @@ import (
plog "github.com/pingcap/log"
"github.com/pkg/errors"
"github.com/sirupsen/logrus"
"github.com/spf13/cast"
"github.com/tikv/client-go/v2/config"
tikverr "github.com/tikv/client-go/v2/error"
"github.com/tikv/client-go/v2/oracle"
Expand All @@ -47,6 +48,11 @@ func init() {

}

var (
entryLimit, bufferLimit uint64 = math.MaxUint64, math.MaxUint64
updateLimit bool
)

func newTikvClient(addr string) (tkvClient, error) {
var plvl string // TiKV (PingCap) uses uber-zap logging, make it less verbose
switch logger.Level {
Expand Down Expand Up @@ -86,6 +92,14 @@ func newTikvClient(addr string) (tkvClient, error) {
}
logger.Infof("TiKV gc interval is set to %s", interval)

if val := cast.ToUint64(query.Get("entryLimit")); val > 0 {
entryLimit = val
updateLimit = true
}
if val := cast.ToUint64(query.Get("bufferLimit")); val > 0 {
bufferLimit = val
updateLimit = true
}
client, err := txnkv.NewClient(strings.Split(tUrl.Host, ","))
if err != nil {
return nil, err
Expand Down Expand Up @@ -222,6 +236,9 @@ func (c *tikvClient) config(key string) interface{} {

func (c *tikvClient) simpleTxn(ctx context.Context, f func(*kvTxn) error, retry int) (err error) {
tx, err := c.client.Begin(tikv.WithStartTS(math.MaxUint64)) // math.MaxUint64 means to point get the latest committed data without PD access
if updateLimit {
tx.GetUnionStore().SetEntrySizeLimit(entryLimit, bufferLimit)
}
if err != nil {
return errors.Wrap(err, "failed to begin transaction")
}
Expand Down Expand Up @@ -253,6 +270,9 @@ func (c *tikvClient) txn(ctx context.Context, f func(*kvTxn) error, retry int) (
if err != nil {
return err
}
if updateLimit {
tx.GetUnionStore().SetEntrySizeLimit(entryLimit, bufferLimit)
}
defer func() {
if r := recover(); r != nil {
fe, ok := r.(error)
Expand Down
Loading