Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 1 addition & 1 deletion Makefile
Original file line number Diff line number Diff line change
Expand Up @@ -40,7 +40,7 @@ install-go-test-coverage:
go install github.com/vladopajic/go-test-coverage/v2@latest

install-tools:
go install go.uber.org/mock/mockgen@latest
go install go.uber.org/mock/mockgen@v0.1.0
go install github.com/bufbuild/buf/cmd/[email protected]
go install github.com/cosmos/gogoproto/protoc-gen-gocosmos@latest

Expand Down
2 changes: 1 addition & 1 deletion README.md
Original file line number Diff line number Diff line change
Expand Up @@ -84,7 +84,7 @@ Failure: plugin gocosmos: could not find protoc plugin for name gocosmos - pleas
GO111MODULE=on GOBIN=/usr/local/go/bin go install github.com/cosmos/gogoproto/protoc-gen-gocosmos@latest

# if you want to execute unit test of sp, you should execute the following command, assumed that you installed golang in /usr/local/go/bin. Other OS are similar.
GO111MODULE=on GOBIN=/usr/local/go/bin go install go.uber.org/mock/mockgen@latest
GO111MODULE=on GOBIN=/usr/local/go/bin go install go.uber.org/mock/mockgen@v0.1.0
```

Above error messages are due to users don't set go env correctly. More info users can search `GOROOT`, `GOPATH` and `GOBIN`.
Expand Down
2 changes: 1 addition & 1 deletion cmd/command/bs_data_migration/v1.0.1/fix_payment.go
Original file line number Diff line number Diff line change
Expand Up @@ -42,7 +42,7 @@ func FixPayment(endpoint string, db *gorm.DB) error {
}
defer resp.Body.Close()
if resp.StatusCode != 200 {
httpErr = fmt.Errorf(resp.Status)
httpErr = fmt.Errorf("%s", resp.Status)
return
}
err := json.NewDecoder(resp.Body).Decode(&paymentResult)
Expand Down
2 changes: 1 addition & 1 deletion cmd/command/query.go
Original file line number Diff line number Diff line change
Expand Up @@ -218,7 +218,7 @@ func (w *CMDWrapper) queryTasksAction(ctx *cli.Context) error {
fmt.Printf("failed to query task due to no task, endpoint:%v, key:%v\n", endpoint, key)
}
for _, info := range infos {
fmt.Printf(info + "\n")
fmt.Printf("%s"+"\n", info)
}
return nil
}
Expand Down
3 changes: 3 additions & 0 deletions core/piecestore/piecestore.go
Original file line number Diff line number Diff line change
Expand Up @@ -48,4 +48,7 @@ type PieceStore interface {
// DeletePiece deletes the piece data from piece store, it can delete
// segment or ec piece data.
DeletePiece(ctx context.Context, key string) error
// DeletePiecesByPrefix deletes pieces data from piece store, it can delete
// segment or ec piece data.
DeletePiecesByPrefix(ctx context.Context, key string) error
}
14 changes: 14 additions & 0 deletions core/piecestore/piecestore_mock.go

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

54 changes: 25 additions & 29 deletions modular/executor/execute_gc.go
Original file line number Diff line number Diff line change
Expand Up @@ -3,6 +3,8 @@ package executor
import (
"context"
"errors"
"fmt"
"math"
"strconv"
"strings"
"time"
Expand Down Expand Up @@ -199,7 +201,6 @@ func (e *ExecuteModular) HandleGCObjectTask(ctx context.Context, task coretask.G
currentGCBlockID uint64
currentGCObjectID uint64
responseEndBlockID uint64
storageParams *storagetypes.Params
gcObjectNumber int
tryAgainLater bool
taskIsCanceled bool
Expand Down Expand Up @@ -252,30 +253,18 @@ func (e *ExecuteModular) HandleGCObjectTask(ctx context.Context, task coretask.G
}

for _, object := range waitingGCObjects {
if storageParams, err = e.baseApp.Consensus().QueryStorageParamsByTimestamp(
context.Background(), object.GetObjectInfo().GetCreateAt()); err != nil {
log.Errorw("failed to query storage params", "task_info", task.Info(), "error", err)
return
}

currentGCBlockID = uint64(object.GetDeleteAt())
objectInfo := object.GetObjectInfo()
objectVersion := objectInfo.Version
currentGCObjectID = objectInfo.Id.Uint64()
if currentGCBlockID < task.GetCurrentBlockNumber() {
log.Errorw("skip gc object", "object_info", objectInfo,
"task_current_gc_block_id", task.GetCurrentBlockNumber())
continue
}
segmentCount := e.baseApp.PieceOp().SegmentPieceCount(objectInfo.GetPayloadSize(),
storageParams.VersionedParams.GetMaxSegmentSize())
for segIdx := uint32(0); segIdx < segmentCount; segIdx++ {
pieceKey := e.baseApp.PieceOp().SegmentPieceKey(currentGCObjectID, segIdx, objectVersion)
// ignore this delete api error, TODO: refine gc workflow by enrich metadata index.
deleteErr := e.baseApp.PieceStore().DeletePiece(ctx, pieceKey)
log.CtxDebugw(ctx, "delete the primary sp pieces", "object_info", objectInfo,
"piece_key", pieceKey, "error", deleteErr)
}
segmentPieceKeyPrefix := fmt.Sprintf("s%d_", currentGCObjectID)
deleteErr := e.baseApp.PieceStore().DeletePiecesByPrefix(ctx, segmentPieceKeyPrefix)
log.CtxDebugw(ctx, "delete the primary sp pieces", "object_info", objectInfo,
"piece_key_prefix", segmentPieceKeyPrefix, "error", deleteErr)
bucketInfo, err := e.baseApp.GfSpClient().GetBucketInfoByBucketName(ctx, objectInfo.BucketName)
if err != nil || bucketInfo == nil {
log.Errorw("failed to get bucket by bucket name", "bucket_name", objectInfo.BucketName, "error", err)
Expand All @@ -288,23 +277,30 @@ func (e *ExecuteModular) HandleGCObjectTask(ctx context.Context, task coretask.G
}

var redundancyIndex int32 = -1
for rIdx, sspId := range gvg.GetSecondarySpIds() {
if spId == sspId {
redundancyIndex = int32(rIdx)
for segIdx := uint32(0); segIdx < segmentCount; segIdx++ {
pieceKey := e.baseApp.PieceOp().ECPieceKey(currentGCObjectID, segIdx, uint32(rIdx), objectVersion)
if objectInfo.GetRedundancyType() == storagetypes.REDUNDANCY_REPLICA_TYPE {
pieceKey = e.baseApp.PieceOp().SegmentPieceKey(objectInfo.Id.Uint64(), segIdx, objectVersion)
}
// since in GC the object will be completely deleted, simply find all pieces with the piece key prefix and remove them
ECPieceKeyPrefix := fmt.Sprintf("e%d_", currentGCObjectID)
if len(gvg.GetSecondarySpIds()) != 0 {
for rIdx, sspId := range gvg.GetSecondarySpIds() {
if spId == sspId {
redundancyIndex = int32(rIdx)
// ignore this delete api error, TODO: refine gc workflow by enrich metadata index.
deleteErr := e.baseApp.PieceStore().DeletePiece(ctx, pieceKey)
log.CtxDebugw(ctx, "delete the secondary sp pieces",
"object_info", objectInfo, "piece_key", pieceKey, "error", deleteErr)
deleteErr := e.baseApp.PieceStore().DeletePiecesByPrefix(ctx, ECPieceKeyPrefix)
log.CtxDebugw(ctx, "delete the secondary sp pieces by prefix",
"object_info", objectInfo, "piece_key_prefix", ECPieceKeyPrefix, "error", deleteErr)
}
}
} else {
// if failed to get secondary sps, check the current sp
deleteErr := e.baseApp.PieceStore().DeletePiecesByPrefix(ctx, ECPieceKeyPrefix)
log.CtxDebugw(ctx, "delete the sp pieces by prefix in current sp when secondary sp not found",
"object_info", objectInfo, "piece_key_prefix", ECPieceKeyPrefix, "error", deleteErr)

// signal as delete any integrity meta related with the object
redundancyIndex = math.MaxInt32
}

// ignore this delete api error, TODO: refine gc workflow by enrich metadata index
deleteErr := e.baseApp.GfSpDB().DeleteObjectIntegrity(objectInfo.Id.Uint64(), redundancyIndex)
deleteErr = e.baseApp.GfSpDB().DeleteObjectIntegrity(objectInfo.Id.Uint64(), redundancyIndex)
log.CtxDebugw(ctx, "delete the object integrity meta", "object_info", objectInfo, "error", deleteErr)
task.SetCurrentBlockNumber(currentGCBlockID)
task.SetLastDeletedObjectId(currentGCObjectID)
Expand Down
45 changes: 3 additions & 42 deletions modular/executor/executor_task_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -520,32 +520,6 @@ func TestExecuteModular_HandleGCObjectTask(t *testing.T) {
return e
},
},
{
name: "failed to query storage params",
task: &gfsptask.GfSpGCObjectTask{
Task: &gfsptask.GfSpTask{},
},
fn: func() *ExecuteModular {
e := setup(t)
ctrl := gomock.NewController(t)
m := gfspclient.NewMockGfSpClientAPI(ctrl)
waitingGCObjects := []*metadatatypes.Object{
{
ObjectInfo: &storagetypes.ObjectInfo{Id: sdkmath.NewUint(1)},
},
}
m.EXPECT().ListDeletedObjectsByBlockNumberRange(gomock.Any(), gomock.Any(), gomock.Any(), gomock.Any(),
gomock.Any()).Return(waitingGCObjects, uint64(0), nil).Times(1)
m.EXPECT().ReportTask(gomock.Any(), gomock.Any()).Return(nil).Times(1)
e.baseApp.SetGfSpClient(m)

m1 := consensus.NewMockConsensus(ctrl)
m1.EXPECT().QueryStorageParamsByTimestamp(gomock.Any(), gomock.Any()).Return(nil, mockErr).Times(1)
m1.EXPECT().QuerySP(gomock.Any(), gomock.Any()).Return(&sptypes.StorageProvider{Id: 1}, nil).Times(1)
e.baseApp.SetConsensus(m1)
return e
},
},
{
name: "failed to get bucket by bucket name",
task: &gfsptask.GfSpGCObjectTask{
Expand All @@ -568,18 +542,14 @@ func TestExecuteModular_HandleGCObjectTask(t *testing.T) {
e.baseApp.SetGfSpClient(m)

m1 := consensus.NewMockConsensus(ctrl)
m1.EXPECT().QueryStorageParamsByTimestamp(gomock.Any(), gomock.Any()).Return(&storagetypes.Params{
VersionedParams: storagetypes.VersionedParams{MaxSegmentSize: 10}}, nil).Times(1)
m1.EXPECT().QuerySP(gomock.Any(), gomock.Any()).Return(&sptypes.StorageProvider{Id: 1}, nil).Times(1)
e.baseApp.SetConsensus(m1)

m2 := piecestore.NewMockPieceOp(ctrl)
m2.EXPECT().SegmentPieceCount(gomock.Any(), gomock.Any()).Return(uint32(1)).Times(1)
m2.EXPECT().SegmentPieceKey(gomock.Any(), gomock.Any(), gomock.Any()).Return("test").Times(1)
e.baseApp.SetPieceOp(m2)

m3 := piecestore.NewMockPieceStore(ctrl)
m3.EXPECT().DeletePiece(gomock.Any(), gomock.Any()).Return(nil).Times(1)
m3.EXPECT().DeletePiecesByPrefix(gomock.Any(), gomock.Any()).Return(nil).Times(1)
e.baseApp.SetPieceStore(m3)
return e
},
Expand Down Expand Up @@ -608,18 +578,14 @@ func TestExecuteModular_HandleGCObjectTask(t *testing.T) {
e.baseApp.SetGfSpClient(m)

m1 := consensus.NewMockConsensus(ctrl)
m1.EXPECT().QueryStorageParamsByTimestamp(gomock.Any(), gomock.Any()).Return(&storagetypes.Params{
VersionedParams: storagetypes.VersionedParams{MaxSegmentSize: 10}}, nil).Times(1)
m1.EXPECT().QuerySP(gomock.Any(), gomock.Any()).Return(&sptypes.StorageProvider{Id: 1}, nil).Times(1)
e.baseApp.SetConsensus(m1)

m2 := piecestore.NewMockPieceOp(ctrl)
m2.EXPECT().SegmentPieceCount(gomock.Any(), gomock.Any()).Return(uint32(1)).Times(1)
m2.EXPECT().SegmentPieceKey(gomock.Any(), gomock.Any(), gomock.Any()).Return("test").Times(1)
e.baseApp.SetPieceOp(m2)

m3 := piecestore.NewMockPieceStore(ctrl)
m3.EXPECT().DeletePiece(gomock.Any(), gomock.Any()).Return(nil).Times(1)
m3.EXPECT().DeletePiecesByPrefix(gomock.Any(), gomock.Any()).Return(nil).Times(1)
e.baseApp.SetPieceStore(m3)
return e
},
Expand Down Expand Up @@ -676,19 +642,14 @@ func TestExecuteModular_HandleGCObjectTask(t *testing.T) {
e.baseApp.SetGfSpClient(m)

m1 := consensus.NewMockConsensus(ctrl)
m1.EXPECT().QueryStorageParamsByTimestamp(gomock.Any(), gomock.Any()).Return(&storagetypes.Params{
VersionedParams: storagetypes.VersionedParams{MaxSegmentSize: 10}}, nil).Times(1)
m1.EXPECT().QuerySP(gomock.Any(), gomock.Any()).Return(&sptypes.StorageProvider{Id: 1}, nil).Times(1)
e.baseApp.SetConsensus(m1)

m2 := piecestore.NewMockPieceOp(ctrl)
m2.EXPECT().SegmentPieceCount(gomock.Any(), gomock.Any()).Return(uint32(1)).Times(1)
m2.EXPECT().SegmentPieceKey(gomock.Any(), gomock.Any(), gomock.Any()).Return("test").Times(1)
m2.EXPECT().ECPieceKey(gomock.Any(), gomock.Any(), gomock.Any(), gomock.Any()).Return("test").Times(1)
e.baseApp.SetPieceOp(m2)

m3 := piecestore.NewMockPieceStore(ctrl)
m3.EXPECT().DeletePiece(gomock.Any(), gomock.Any()).Return(nil).Times(2)
m3.EXPECT().DeletePiecesByPrefix(gomock.Any(), gomock.Any()).Return(nil).Times(2)
e.baseApp.SetPieceStore(m3)

m4 := corespdb.NewMockSPDB(ctrl)
Expand Down
24 changes: 24 additions & 0 deletions store/piecestore/client/piece_store_client.go
Original file line number Diff line number Diff line change
Expand Up @@ -132,3 +132,27 @@ func (client *StoreClient) DeletePiece(ctx context.Context, key string) error {
err = client.ps.Delete(ctx, key)
return err
}

// DeletePiecesByPrefix deletes pieces by prefix from piece store.
func (client *StoreClient) DeletePiecesByPrefix(ctx context.Context, key string) error {
var (
startTime = time.Now()
err error
valSize uint64
)
defer func() {
if err != nil {
metrics.PieceStoreCounter.WithLabelValues(PieceStoreFailureDel).Inc()
metrics.PieceStoreTime.WithLabelValues(PieceStoreFailureDel).Observe(
time.Since(startTime).Seconds())
return
}
metrics.PieceStoreCounter.WithLabelValues(PieceStoreSuccessDel).Inc()
metrics.PieceStoreTime.WithLabelValues(PieceStoreSuccessDel).Observe(
time.Since(startTime).Seconds())
metrics.PieceStoreUsageAmountGauge.WithLabelValues(PieceStoreSuccessDel).Add(0 - float64(valSize))
}()

valSize, err = client.ps.DeleteByPrefix(ctx, key)
return err
}
6 changes: 6 additions & 0 deletions store/piecestore/piece/api.go
Original file line number Diff line number Diff line change
Expand Up @@ -14,6 +14,7 @@ type PieceAPI interface {
Get(ctx context.Context, key string, offset, limit int64) (io.ReadCloser, error)
Put(ctx context.Context, key string, reader io.Reader) error
Delete(ctx context.Context, key string) error
DeleteByPrefix(ctx context.Context, key string) (uint64, error)
}

type PieceStore struct {
Expand All @@ -35,6 +36,11 @@ func (p *PieceStore) Delete(ctx context.Context, key string) error {
return p.storeAPI.DeleteObject(ctx, key)
}

// DeleteByPrefix deletes several pieces in PieceStore and returns deleted size
func (p *PieceStore) DeleteByPrefix(ctx context.Context, key string) (uint64, error) {
return p.storeAPI.DeleteObjectsByPrefix(ctx, key)
}

// Head returns piece info in PieceStore
func (p *PieceStore) Head(ctx context.Context, key string) (storage.Object, error) {
return p.storeAPI.HeadObject(ctx, key)
Expand Down
15 changes: 15 additions & 0 deletions store/piecestore/piece/api_mock.go

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

39 changes: 39 additions & 0 deletions store/piecestore/storage/disk_file.go
Original file line number Diff line number Diff line change
Expand Up @@ -5,6 +5,7 @@ import (
"context"
"fmt"
"io"
"io/fs"
"math/rand"
"os"
"path/filepath"
Expand Down Expand Up @@ -142,6 +143,44 @@ func (d *diskFileStore) DeleteObject(ctx context.Context, key string) error {
return err
}

func (d *diskFileStore) DeleteObjectsByPrefix(ctx context.Context, key string) (uint64, error) {
dirEntries, err := os.ReadDir(d.root)
if err != nil {
log.Errorw("DeleteObjectsByPrefix read directory error", "error", err)
return 0, err
}

var (
size uint64
entryInfo fs.FileInfo
)

for _, dirEntry := range dirEntries {
entryName := dirEntry.Name()
if strings.HasPrefix(entryName, key) {
var curInfoSize int64
// need to extract entry info and size first, otherwise when the object is deleted, the info can not be found
entryInfo, err = dirEntry.Info()
if entryInfo != nil {
curInfoSize = entryInfo.Size()
}
if err != nil {
log.Errorw("get dirEntry info error", "error", err)
}
err = d.DeleteObject(ctx, entryName)
if err != nil {
log.Errorw("remove single file by prefix error", "error", err)
} else {
if entryInfo != nil {
size += uint64(curInfoSize)
}
}
}
}

return size, nil
}

func (d *diskFileStore) HeadBucket(ctx context.Context) error {
if _, err := os.Stat(d.root); err != nil {
if os.IsNotExist(err) {
Expand Down
2 changes: 2 additions & 0 deletions store/piecestore/storage/interface.go
Original file line number Diff line number Diff line change
Expand Up @@ -21,6 +21,8 @@ type ObjectStorage interface {
PutObject(ctx context.Context, key string, reader io.Reader) error
// DeleteObject deletes an object
DeleteObject(ctx context.Context, key string) error
// DeleteObjectsByPrefix deletes objects by prefix
DeleteObjectsByPrefix(ctx context.Context, key string) (uint64, error)

// HeadBucket determines if a bucket exists and have permission to access it
HeadBucket(ctx context.Context) error
Expand Down
Loading
Loading