diff --git a/pkg/meta/tkv_tikv.go b/pkg/meta/tkv_tikv.go index cc1f75c87aa1..47badc29c8c7 100644 --- a/pkg/meta/tkv_tikv.go +++ b/pkg/meta/tkv_tikv.go @@ -31,6 +31,7 @@ import ( plog "github.com/pingcap/log" "github.com/pkg/errors" "github.com/sirupsen/logrus" + "github.com/spf13/cast" "github.com/tikv/client-go/v2/config" tikverr "github.com/tikv/client-go/v2/error" "github.com/tikv/client-go/v2/oracle" @@ -47,6 +48,11 @@ func init() { } +var ( + entryLimit, bufferLimit uint64 = math.MaxUint64, math.MaxUint64 + updateLimit bool +) + func newTikvClient(addr string) (tkvClient, error) { var plvl string // TiKV (PingCap) uses uber-zap logging, make it less verbose switch logger.Level { @@ -86,6 +92,14 @@ func newTikvClient(addr string) (tkvClient, error) { } logger.Infof("TiKV gc interval is set to %s", interval) + if val := cast.ToUint64(query.Get("entryLimit")); val > 0 { + entryLimit = val + updateLimit = true + } + if val := cast.ToUint64(query.Get("bufferLimit")); val > 0 { + bufferLimit = val + updateLimit = true + } client, err := txnkv.NewClient(strings.Split(tUrl.Host, ",")) if err != nil { return nil, err @@ -222,6 +236,9 @@ func (c *tikvClient) config(key string) interface{} { func (c *tikvClient) simpleTxn(ctx context.Context, f func(*kvTxn) error, retry int) (err error) { tx, err := c.client.Begin(tikv.WithStartTS(math.MaxUint64)) // math.MaxUint64 means to point get the latest committed data without PD access + if updateLimit { + tx.GetUnionStore().SetEntrySizeLimit(entryLimit, bufferLimit) + } if err != nil { return errors.Wrap(err, "failed to begin transaction") } @@ -253,6 +270,9 @@ func (c *tikvClient) txn(ctx context.Context, f func(*kvTxn) error, retry int) ( if err != nil { return err } + if updateLimit { + tx.GetUnionStore().SetEntrySizeLimit(entryLimit, bufferLimit) + } defer func() { if r := recover(); r != nil { fe, ok := r.(error)