Skip to content
Merged
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
64 changes: 64 additions & 0 deletions pkg/user/UserService.go
Original file line number Diff line number Diff line change
Expand Up @@ -36,9 +36,15 @@ import (
"go.uber.org/zap"
"net/http"
"strings"
"sync"
"time"
)

const (
ConcurrentRequestLockError = "there is an ongoing request for this user, please try after some time"
ConcurrentRequestUnlockError = "cannot block request that is not in process"
)

type UserService interface {
CreateUser(userInfo *bean.UserInfo, token string, managerAuth func(resource, token string, object string) bool) ([]*bean.UserInfo, error)
SelfRegisterUserIfNotExists(userInfo *bean.UserInfo) ([]*bean.UserInfo, error)
Expand All @@ -62,6 +68,10 @@ type UserService interface {
}

type UserServiceImpl struct {
userReqLock sync.RWMutex
//map of userId and current lock-state of their serving ability;
//if TRUE then it means that some request is ongoing & unable to serve and FALSE then it is open to serve
userReqState map[int32]bool
userAuthRepository repository2.UserAuthRepository
logger *zap.SugaredLogger
userRepository repository2.UserRepository
Expand All @@ -77,6 +87,7 @@ func NewUserServiceImpl(userAuthRepository repository2.UserAuthRepository,
userGroupRepository repository2.RoleGroupRepository,
sessionManager2 *middleware.SessionManager, userCommonService UserCommonService, userAuditService UserAuditService) *UserServiceImpl {
serviceImpl := &UserServiceImpl{
userReqState: make(map[int32]bool),
userAuthRepository: userAuthRepository,
logger: logger,
userRepository: userRepository,
Expand All @@ -89,6 +100,36 @@ func NewUserServiceImpl(userAuthRepository repository2.UserAuthRepository,
return serviceImpl
}

func (impl *UserServiceImpl) getUserReqLockStateById(userId int32) bool {
defer impl.userReqLock.RUnlock()
impl.userReqLock.RLock()
return impl.userReqState[userId]
}

// FreeUnfreeUserReqState - free sets the userId free for serving, meaning removing the lock(removing entry). Unfree locks the user for other requests
func (impl *UserServiceImpl) lockUnlockUserReqState(userId int32, lock bool) error {
var err error
defer impl.userReqLock.Unlock()
impl.userReqLock.Lock()
if lock {
//checking again if someone changed or not
if !impl.userReqState[userId] {
//available to serve, locking
impl.userReqState[userId] = true
} else {
err = &util.ApiError{Code: "409", HttpStatusCode: http.StatusConflict, UserMessage: ConcurrentRequestLockError}
}
} else {
if impl.userReqState[userId] {
//in serving state, unlocking
delete(impl.userReqState, userId)
} else {
err = &util.ApiError{Code: "409", HttpStatusCode: http.StatusConflict, UserMessage: ConcurrentRequestUnlockError}
}
}
return err
}

func (impl *UserServiceImpl) validateUserRequest(userInfo *bean.UserInfo) (bool, error) {
if len(userInfo.RoleFilters) == 1 &&
userInfo.RoleFilters[0].Team == "" && userInfo.RoleFilters[0].Environment == "" && userInfo.RoleFilters[0].Action == "" {
Expand Down Expand Up @@ -617,6 +658,23 @@ func (impl *UserServiceImpl) mergeGroups(oldGroups []string, newGroups []string)
}

func (impl *UserServiceImpl) UpdateUser(userInfo *bean.UserInfo, token string, managerAuth func(resource, token string, object string) bool) (*bean.UserInfo, bool, bool, []string, error) {
//checking if request for same user is being processed
isLocked := impl.getUserReqLockStateById(userInfo.Id)
if isLocked {
impl.logger.Errorw("received concurrent request for user update, UpdateUser", "userId", userInfo.Id)
return nil, false, false, nil, &util.ApiError{
Code: "409",
HttpStatusCode: http.StatusConflict,
UserMessage: ConcurrentRequestLockError,
}
} else {
//locking state for this user since it's ready to serve
err := impl.lockUnlockUserReqState(userInfo.Id, true)
if err != nil {
impl.logger.Errorw("error in locking, lockUnlockUserReqState", "userId", userInfo.Id)
return nil, false, false, nil, err
}
}
//validating if action user is not admin and trying to update user who has super admin polices, return 403
isUserSuperAdmin, err := impl.IsSuperAdmin(int(userInfo.Id))
if err != nil {
Expand Down Expand Up @@ -802,6 +860,12 @@ func (impl *UserServiceImpl) UpdateUser(userInfo *bean.UserInfo, token string, m
//loading policy for syncing orchestrator to casbin with newly added policies
casbin2.LoadPolicy()

err = impl.lockUnlockUserReqState(userInfo.Id, false)
if err != nil {
impl.logger.Errorw("error in unlocking, lockUnlockUserReqState", "userId", userInfo.Id)
return nil, false, false, nil, err
}

return userInfo, rolesChanged, groupsModified, restrictedGroups, nil
}

Expand Down