Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
3 changes: 3 additions & 0 deletions .gitignore
Original file line number Diff line number Diff line change
Expand Up @@ -19,3 +19,6 @@ dist/

# Dependency directories (remove the comment below to include it)
# vendor/
.vscode
.codecc
vendor
11 changes: 7 additions & 4 deletions go.mod
Original file line number Diff line number Diff line change
Expand Up @@ -8,15 +8,18 @@ require (
github.com/agiledragon/gomonkey v2.0.2+incompatible
github.com/apache/dubbo-getty v1.4.8
github.com/dubbogo/gost v1.12.5
github.com/go-sql-driver/mysql v1.6.0
github.com/google/uuid v1.3.0
github.com/natefinch/lumberjack v2.0.0+incompatible
github.com/pingcap/tidb v1.1.0-beta.0.20211124132551-4a1b2e9fe5b5
github.com/pingcap/tidb/parser v0.0.0-20211124132551-4a1b2e9fe5b5
github.com/pingcap/tipb v0.0.0-20220628092852-069ef6c8fc90 // indirect
github.com/pkg/errors v0.9.1
github.com/stretchr/testify v1.7.1
go.uber.org/atomic v1.9.0
go.uber.org/zap v1.21.0
golang.org/x/net v0.0.0-20220722155237-a158d28d115b // indirect
golang.org/x/sync v0.0.0-20220722155255-886fb9371eb4 // indirect
golang.org/x/sys v0.0.0-20220722155257-8c9f86f7a55f // indirect
google.golang.org/grpc v1.45.0
google.golang.org/genproto v0.0.0-20220630174209-ad1d48641aa7 // indirect
google.golang.org/grpc v1.47.0
google.golang.org/protobuf v1.28.0
vimagination.zapto.org/byteio v0.0.0-20200222190125-d27cba0f0b10
vimagination.zapto.org/memio v0.0.0-20200222190306-588ebc67b97d // indirect
Expand Down
612 changes: 591 additions & 21 deletions go.sum

Large diffs are not rendered by default.

270 changes: 270 additions & 0 deletions pkg/datasource/sql/at.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,270 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

package sql

import (
"context"
"database/sql"
"fmt"
"os"
"strconv"
"sync"
"time"

"github.com/seata/seata-go/pkg/datasource/sql/datasource"
"github.com/seata/seata-go/pkg/datasource/sql/types"
"github.com/seata/seata-go/pkg/datasource/sql/undo"
"github.com/seata/seata-go/pkg/protocol/branch"
"github.com/seata/seata-go/pkg/protocol/message"
"github.com/seata/seata-go/pkg/rm"
)

const (
_defaultResourceSize = 16
_undoLogDeleteLimitSize = 1000
)

func init() {
datasource.RegisterResourceManager(branch.BranchTypeAT,
&ATSourceManager{
resourceCache: sync.Map{},
basic: datasource.NewBasicSourceManager(),
})
}

type ATSourceManager struct {
resourceCache sync.Map
worker *asyncATWorker
basic *datasource.BasicSourceManager
}

// Register a Resource to be managed by Resource Manager
func (mgr *ATSourceManager) RegisterResource(res rm.Resource) error {
mgr.resourceCache.Store(res.GetResourceId(), res)

return mgr.basic.RegisterResource(res)
}

// Unregister a Resource from the Resource Manager
func (mgr *ATSourceManager) UnregisterResource(res rm.Resource) error {
return mgr.basic.UnregisterResource(res)
}

// Get all resources managed by this manager
func (mgr *ATSourceManager) GetManagedResources() map[string]rm.Resource {
ret := make(map[string]rm.Resource)

mgr.resourceCache.Range(func(key, value interface{}) bool {
ret[key.(string)] = value.(rm.Resource)
return true
})

return ret
}

// BranchRollback Rollback the corresponding transactions according to the request
func (mgr *ATSourceManager) BranchRollback(ctx context.Context, req message.BranchRollbackRequest) (branch.BranchStatus, error) {
val, ok := mgr.resourceCache.Load(req.ResourceId)

if !ok {
return branch.BranchStatusPhaseoneFailed, fmt.Errorf("resource %s not found", req.ResourceId)
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

这里的err, 尽量都改成 error.Warp 吧

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

这里没有将另外一个错误包装返回,这里没必要这个吧

}

res := val.(*DBResource)

undoMgr, err := undo.GetUndoLogManager(res.dbType)
if err != nil {
return branch.BranchStatusUnknown, err
}

conn, err := res.target.Conn(ctx)
if err != nil {
return branch.BranchStatusUnknown, err
}

if err := undoMgr.RunUndo(req.Xid, req.BranchId, conn); err != nil {
transErr, ok := err.(*types.TransactionError)
if !ok {
return branch.BranchStatusPhaseoneFailed, err
}

if transErr.Code() == types.ErrorCodeBranchRollbackFailedUnretriable {
return branch.BranchStatusPhasetwoRollbackFailedUnretryable, nil
}

return branch.BranchStatusPhasetwoRollbackFailedRetryable, nil
}

return branch.BranchStatusPhasetwoRollbacked, nil
}

// BranchCommit
func (mgr *ATSourceManager) BranchCommit(ctx context.Context, req message.BranchCommitRequest) (branch.BranchStatus, error) {
mgr.worker.branchCommit(ctx, req)
return branch.BranchStatusPhaseoneDone, nil
}

// LockQuery
func (mgr *ATSourceManager) LockQuery(ctx context.Context, req message.GlobalLockQueryRequest) (bool, error) {
return false, nil
}

// BranchRegister
func (mgr *ATSourceManager) BranchRegister(ctx context.Context, clientId string, req message.BranchRegisterRequest) (int64, error) {
return 0, nil
}

// BranchReport
func (mgr *ATSourceManager) BranchReport(ctx context.Context, req message.BranchReportRequest) error {
return nil
}

// CreateTableMetaCache
func (mgr *ATSourceManager) CreateTableMetaCache(ctx context.Context, resID string, dbType types.DBType,
db *sql.DB) (datasource.TableMetaCache, error) {
return mgr.basic.CreateTableMetaCache(ctx, resID, dbType, db)
}

type asyncATWorker struct {
asyncCommitBufferLimit int64
commitQueue chan phaseTwoContext
resourceMgr datasource.DataSourceManager
}

func newAsyncATWorker() *asyncATWorker {
asyncCommitBufferLimit := int64(10000)

val := os.Getenv("CLIENT_RM_ASYNC_COMMIT_BUFFER_LIMIT")
if val != "" {
limit, _ := strconv.ParseInt(val, 10, 64)
if limit != 0 {
asyncCommitBufferLimit = limit
}
}

worker := &asyncATWorker{
commitQueue: make(chan phaseTwoContext, asyncCommitBufferLimit),
}

return worker
}

func (w *asyncATWorker) doBranchCommitSafely() {
batchSize := 64

ticker := time.NewTicker(1 * time.Second)
phaseCtxs := make([]phaseTwoContext, 0, batchSize)

for {
select {
case phaseCtx := <-w.commitQueue:
phaseCtxs = append(phaseCtxs, phaseCtx)
if len(phaseCtxs) == batchSize {
tmp := phaseCtxs
w.doBranchCommit(tmp)
phaseCtxs = make([]phaseTwoContext, 0, batchSize)
}
case <-ticker.C:
tmp := phaseCtxs
w.doBranchCommit(tmp)

phaseCtxs = make([]phaseTwoContext, 0, batchSize)
}
}
}

func (w *asyncATWorker) doBranchCommit(phaseCtxs []phaseTwoContext) {
groupCtxs := make(map[string][]phaseTwoContext, _defaultResourceSize)

for i := range phaseCtxs {
if phaseCtxs[i].ResourceID == "" {
continue
}

if _, ok := groupCtxs[phaseCtxs[i].ResourceID]; !ok {
groupCtxs[phaseCtxs[i].ResourceID] = make([]phaseTwoContext, 0, 4)
}

ctxs := groupCtxs[phaseCtxs[i].ResourceID]
ctxs = append(ctxs, phaseCtxs[i])

groupCtxs[phaseCtxs[i].ResourceID] = ctxs
}

for k := range groupCtxs {
w.dealWithGroupedContexts(k, groupCtxs[k])
}
}

func (w *asyncATWorker) dealWithGroupedContexts(resID string, phaseCtxs []phaseTwoContext) {
val, ok := w.resourceMgr.GetManagedResources()[resID]
if !ok {
for i := range phaseCtxs {
w.commitQueue <- phaseCtxs[i]
}
return
}

res := val.(*DBResource)

conn, err := res.target.Conn(context.Background())
if err != nil {
for i := range phaseCtxs {
w.commitQueue <- phaseCtxs[i]
}
}

defer conn.Close()

undoMgr, err := undo.GetUndoLogManager(res.dbType)
if err != nil {
for i := range phaseCtxs {
w.commitQueue <- phaseCtxs[i]
}

return
}

for i := range phaseCtxs {
phaseCtx := phaseCtxs[i]
if err := undoMgr.DeleteUndoLogs([]string{phaseCtx.Xid}, []int64{phaseCtx.BranchID}, conn); err != nil {
w.commitQueue <- phaseCtx
}
}
}

func (w *asyncATWorker) branchCommit(ctx context.Context, req message.BranchCommitRequest) {
phaseCtx := phaseTwoContext{
Xid: req.Xid,
BranchID: req.BranchId,
ResourceID: req.ResourceId,
}

select {
case w.commitQueue <- phaseCtx:
case <-ctx.Done():
}

return
}

type phaseTwoContext struct {
Xid string
BranchID int64
ResourceID string
}
Loading