Skip to content
Merged
Show file tree
Hide file tree
Changes from 1 commit
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
51 changes: 50 additions & 1 deletion pkg/meta/redis.go
Original file line number Diff line number Diff line change
Expand Up @@ -409,8 +409,16 @@ func (r *redisMeta) getSession(sid string, detail bool) (*Session, error) {
}

func (r *redisMeta) GetSession(sid uint64) (*Session, error) {
var legacy bool
key := strconv.FormatUint(sid, 10)
score, err := r.rdb.ZScore(Background, r.allSessions(), key).Result()
if err == redis.Nil {
legacy = true
score, err = r.rdb.ZScore(Background, legacySessions, key).Result()
}
if err == redis.Nil {
err = fmt.Errorf("session not found: %d", sid)
}
if err != nil {
return nil, err
}
Expand All @@ -419,6 +427,9 @@ func (r *redisMeta) GetSession(sid uint64) (*Session, error) {
return nil, err
}
s.Expire = time.Unix(int64(score), 0)
if legacy {
s.Expire = s.Expire.Add(time.Minute * 5)
}
return s, nil
}

Expand All @@ -437,6 +448,22 @@ func (r *redisMeta) ListSessions() ([]*Session, error) {
s.Expire = time.Unix(int64(k.Score), 0)
sessions = append(sessions, s)
}

// add clients with version before 1.0-beta3 as well
keys, err = r.rdb.ZRangeWithScores(Background, legacySessions, 0, -1).Result()
if err != nil {
logger.Errorf("Scan legacy sessions: %s", err)
return sessions, nil
}
for _, k := range keys {
s, err := r.getSession(k.Member.(string), false)
if err != nil {
logger.Errorf("Get legacy session: %s", err)
continue
}
s.Expire = time.Unix(int64(k.Score), 0).Add(time.Minute * 5)
sessions = append(sessions, s)
}
return sessions, nil
}

Expand Down Expand Up @@ -1745,7 +1772,12 @@ func (r *redisMeta) doCleanStaleSession(sid uint64) error {
if fail {
return fmt.Errorf("failed to clean up sid %d", sid)
} else {
return r.rdb.ZRem(ctx, r.allSessions(), ssid).Err()
if n, err := r.rdb.ZRem(ctx, r.allSessions(), ssid).Result(); err != nil {
return err
} else if n == 1 {
return nil
}
return r.rdb.ZRem(ctx, legacySessions, ssid).Err()
}
}

Expand All @@ -1760,6 +1792,23 @@ func (r *redisMeta) doFindStaleSessions(limit int) ([]uint64, error) {
for i, v := range vals {
sids[i], _ = strconv.ParseUint(v, 10, 64)
}
limit -= len(sids)
if limit <= 0 {
return sids, nil
}

// check clients with version before 1.0-beta3 as well
vals, err = r.rdb.ZRangeByScore(Background, legacySessions, &redis.ZRangeBy{
Max: strconv.FormatInt(time.Now().Add(time.Minute*-5).Unix(), 10),
Count: int64(limit)}).Result()
if err != nil {
logger.Errorf("Scan stale legacy sessions: %s", err)
return sids, nil
}
for _, v := range vals {
sid, _ := strconv.ParseUint(v, 10, 64)
sids = append(sids, sid)
}
return sids, nil
}

Expand Down
92 changes: 78 additions & 14 deletions pkg/meta/sql.go
Original file line number Diff line number Diff line change
Expand Up @@ -347,16 +347,27 @@ func (m *dbMeta) doNewSession(sinfo []byte) error {
return nil
}

func (m *dbMeta) getSession(row *session2, detail bool) (*Session, error) {
func (m *dbMeta) getSession(row interface{}, detail bool) (*Session, error) {
var s Session
if row.Info == nil { // legacy client has no info
row.Info = []byte("{}")
var info []byte
switch row := row.(type) {
case *session2:
s.Sid = row.Sid
s.Expire = time.Unix(row.Expire, 0)
info = row.Info
case *session:
s.Sid = row.Sid
s.Expire = time.Unix(row.Heartbeat, 0).Add(time.Minute * 5)
info = row.Info
if info == nil { // legacy client has no info
info = []byte("{}")
}
default:
return nil, fmt.Errorf("invalid type: %T", row)
}
if err := json.Unmarshal(row.Info, &s); err != nil {
if err := json.Unmarshal(info, &s); err != nil {
return nil, fmt.Errorf("corrupted session info; json error: %s", err)
}
s.Sid = row.Sid
s.Expire = time.Unix(row.Expire, 0)
if detail {
var (
srows []sustained
Expand Down Expand Up @@ -392,14 +403,22 @@ func (m *dbMeta) getSession(row *session2, detail bool) (*Session, error) {

func (m *dbMeta) GetSession(sid uint64) (*Session, error) {
row := session2{Sid: sid}
ok, err := m.db.Get(&row)
if err != nil {
if ok, err := m.db.Get(&row); err != nil {
return nil, err
} else if ok {
return m.getSession(&row, true)
}
if !ok {
return nil, fmt.Errorf("session not found: %d", sid)
if ok, err := m.db.IsTableExist(&session{}); err != nil {
return nil, err
} else if ok {
row := session{Sid: sid}
if ok, err := m.db.Get(&row); err != nil {
return nil, err
} else if ok {
return m.getSession(&row, true)
}
}
return m.getSession(&row, true)
return nil, fmt.Errorf("session not found: %d", sid)
}

func (m *dbMeta) ListSessions() ([]*Session, error) {
Expand All @@ -417,6 +436,24 @@ func (m *dbMeta) ListSessions() ([]*Session, error) {
}
sessions = append(sessions, s)
}

if ok, err := m.db.IsTableExist(&session{}); err != nil {
logger.Errorf("Check legacy session table: %s", err)
} else if ok {
var lrows []session
if err = m.db.Find(&lrows); err != nil {
logger.Errorf("Scan legacy sessions: %s", err)
return sessions, nil
}
for i := range lrows {
s, err := m.getSession(&lrows[i], false)
if err != nil {
logger.Errorf("Get legacy session: %s", err)
continue
}
sessions = append(sessions, s)
}
}
return sessions, nil
}

Expand Down Expand Up @@ -1617,10 +1654,16 @@ func (m *dbMeta) doCleanStaleSession(sid uint64) error {
if fail {
return fmt.Errorf("failed to clean up sid %d", sid)
} else {
return m.txn(func(ses *xorm.Session) error {
_, err := ses.Delete(&session2{Sid: sid})
if n, err := m.db.Delete(&session2{Sid: sid}); err != nil {
return err
})
} else if n == 1 {
return nil
}
ok, err := m.db.IsTableExist(&session{})
if err == nil && ok {
_, err = m.db.Delete(&session{Sid: sid})
}
return err
}
}

Expand All @@ -1637,6 +1680,27 @@ func (m *dbMeta) doFindStaleSessions(limit int) ([]uint64, error) {
}
}
_ = rows.Close()
limit -= len(sids)
if limit <= 0 {
return sids, nil
}

if ok, err := m.db.IsTableExist(&session{}); err != nil {
logger.Errorf("Check legacy session table: %s", err)
} else if ok {
var ls session
rows, err = m.db.Where("Heartbeat < ?", time.Now().Add(time.Minute*-5).Unix()).Limit(limit, 0).Rows(&ls)
if err != nil {
logger.Errorf("Scan stale legacy sessions: %s", err)
return sids, nil
}
for rows.Next() {
if rows.Scan(&ls) == nil {
sids = append(sids, ls.Sid)
}
}
_ = rows.Close()
}
return sids, nil
}

Expand Down
49 changes: 47 additions & 2 deletions pkg/meta/tkv.go
Original file line number Diff line number Diff line change
Expand Up @@ -168,6 +168,7 @@ All keys:
Piiiiiiii POSIX locks
Kccccccccnnnn slice refs
SEssssssss session expire time
SHssssssss session heartbeat // for legacy client
SIssssssss session info
SSssssssssiiiiiiii sustained inode
*/
Expand Down Expand Up @@ -208,8 +209,12 @@ func (m *kvMeta) sessionKey(sid uint64) []byte {
return m.fmtKey("SE", sid)
}

func (m *kvMeta) legacySessionKey(sid uint64) []byte {
return m.fmtKey("SH", sid)
}

func (m *kvMeta) parseSid(key string) uint64 {
buf := []byte(key[2:]) // "SE"
buf := []byte(key[2:]) // "SE" or "SH"
if len(buf) != 8 {
panic("invalid sid value")
}
Expand Down Expand Up @@ -462,7 +467,7 @@ func (m *kvMeta) doCleanStaleSession(sid uint64) error {
if fail {
return fmt.Errorf("failed to clean up sid %d", sid)
} else {
return m.deleteKeys(m.sessionKey(sid), m.sessionInfoKey(sid))
return m.deleteKeys(m.sessionKey(sid), m.legacySessionKey(sid), m.sessionInfoKey(sid))
}
}

Expand All @@ -477,6 +482,22 @@ func (m *kvMeta) doFindStaleSessions(limit int) ([]uint64, error) {
for k := range vals {
sids = append(sids, m.parseSid(k))
}
limit -= len(sids)
if limit <= 0 {
return sids, nil
}

// check clients with version before 1.0-beta3 as well
vals, err = m.scanValues(m.fmtKey("SH"), limit, func(k, v []byte) bool {
return m.parseInt64(v) < time.Now().Add(time.Minute*-5).Unix()
})
if err != nil {
logger.Errorf("Scan stale legacy sessions: %s", err)
return sids, nil
}
for k := range vals {
sids = append(sids, m.parseSid(k))
}
return sids, nil
}

Expand Down Expand Up @@ -534,7 +555,12 @@ func (m *kvMeta) getSession(sid uint64, detail bool) (*Session, error) {
}

func (m *kvMeta) GetSession(sid uint64) (*Session, error) {
var legacy bool
value, err := m.get(m.sessionKey(sid))
if err == nil && value == nil {
legacy = true
value, err = m.get(m.legacySessionKey(sid))
}
if err != nil {
return nil, err
}
Expand All @@ -546,6 +572,9 @@ func (m *kvMeta) GetSession(sid uint64) (*Session, error) {
return nil, err
}
s.Expire = time.Unix(m.parseInt64(value), 0)
if legacy {
s.Expire = s.Expire.Add(time.Minute * 5)
}
return s, nil
}

Expand All @@ -564,6 +593,22 @@ func (m *kvMeta) ListSessions() ([]*Session, error) {
s.Expire = time.Unix(m.parseInt64(v), 0)
sessions = append(sessions, s)
}

// add clients with version before 1.0-beta3 as well
vals, err = m.scanValues(m.fmtKey("SH"), -1, nil)
if err != nil {
logger.Errorf("Scan legacy sessions: %s", err)
return sessions, nil
}
for k, v := range vals {
s, err := m.getSession(m.parseSid(k), false)
if err != nil {
logger.Errorf("Get legacy session: %s", err)
continue
}
s.Expire = time.Unix(m.parseInt64(v), 0).Add(time.Minute * 5)
sessions = append(sessions, s)
}
return sessions, nil
}

Expand Down
5 changes: 3 additions & 2 deletions pkg/meta/utils.go
Original file line number Diff line number Diff line change
Expand Up @@ -32,8 +32,9 @@ import (
)

const (
usedSpace = "usedSpace"
totalInodes = "totalInodes"
usedSpace = "usedSpace"
totalInodes = "totalInodes"
legacySessions = "sessions"
)

const (
Expand Down