diff --git a/build.sh b/build.sh old mode 100644 new mode 100755 diff --git a/cases/hash_as_listpack_with_hfe.aof b/cases/hash_as_listpack_with_hfe.aof new file mode 100644 index 0000000..ee4cfad --- /dev/null +++ b/cases/hash_as_listpack_with_hfe.aof @@ -0,0 +1,54 @@ +*8 +$5 +HMSET +$12 +listpack-hfe +$2 +F1 +$2 +V1 +$2 +F3 +$2 +V3 +$2 +F2 +$2 +V2 +*6 +$10 +HPEXPIREAT +$12 +listpack-hfe +$13 +2755484483878 +$6 +FIELDS +$1 +1 +$2 +F3 +*5 +$8 +HPERSIST +$12 +listpack-hfe +$6 +FIELDS +$1 +1 +$2 +F2 +*6 +$10 +HPEXPIREAT +$12 +listpack-hfe +$13 +2755482478325 +$6 +FIELDS +$1 +1 +$2 +F1 diff --git a/cases/hash_as_listpack_with_hfe.json b/cases/hash_as_listpack_with_hfe.json new file mode 100644 index 0000000..63911bb --- /dev/null +++ b/cases/hash_as_listpack_with_hfe.json @@ -0,0 +1,3 @@ +[ +{"db":0,"key":"listpack-hfe","size":316,"type":"hash","encoding":"listpackex","hash":{"F1":"V1","F2":"V2","F3":"V3"},"expire":{"F1":2755482478325,"F2":0,"F3":2755484483878}} +] \ No newline at end of file diff --git a/cases/hash_as_listpack_with_hfe.rdb b/cases/hash_as_listpack_with_hfe.rdb new file mode 100644 index 0000000..1aa142c Binary files /dev/null and b/cases/hash_as_listpack_with_hfe.rdb differ diff --git a/cases/hash_with_hfe.aof b/cases/hash_with_hfe.aof new file mode 100644 index 0000000..8983988 --- /dev/null +++ b/cases/hash_with_hfe.aof @@ -0,0 +1,131 @@ +*18 +$5 +HMSET +$8 +hash-hfe +$2 +F4 +$2 +V4 +$2 +F7 +$2 +V7 +$2 +F8 +$2 +V8 +$2 +F2 +$2 +V2 +$2 +F5 +$2 +V5 +$2 +F3 +$2 +V3 +$2 +F1 +$2 +V1 +$2 +F6 +$2 +V6 +*5 +$8 +HPERSIST +$8 +hash-hfe +$6 +FIELDS +$1 +1 +$2 +F8 +*6 +$10 +HPEXPIREAT +$8 +hash-hfe +$13 +2755483429282 +$6 +FIELDS +$1 +1 +$2 +F2 +*5 +$8 +HPERSIST +$8 +hash-hfe +$6 +FIELDS +$1 +1 +$2 +F5 +*6 +$10 +HPEXPIREAT +$8 +hash-hfe +$13 +2755484433842 +$6 +FIELDS +$1 +1 +$2 +F3 +*6 +$10 +HPEXPIREAT +$8 +hash-hfe +$13 +2755482424661 +$6 +FIELDS +$1 +1 +$2 +F1 +*5 +$8 +HPERSIST +$8 +hash-hfe +$6 +FIELDS +$1 +1 +$2 +F6 +*5 +$8 +HPERSIST +$8 +hash-hfe +$6 +FIELDS +$1 +1 +$2 +F4 +*5 +$8 +HPERSIST +$8 +hash-hfe +$6 +FIELDS +$1 +1 +$2 +F7 diff --git a/cases/hash_with_hfe.json b/cases/hash_with_hfe.json new file mode 100644 index 0000000..6851712 --- /dev/null +++ b/cases/hash_with_hfe.json @@ -0,0 +1,3 @@ +[ +{"db":0,"key":"hash-hfe","size":660,"type":"hash","encoding":"hashex","hash":{"F1":"V1","F2":"V2","F3":"V3","F4":"V4","F5":"V5","F6":"V6","F7":"V7","F8":"V8"},"expire":{"F1":2755482424661,"F2":2755483429282,"F3":2755484433842,"F4":0,"F5":0,"F6":0,"F7":0,"F8":0}} +] \ No newline at end of file diff --git a/cases/hash_with_hfe.rdb b/cases/hash_with_hfe.rdb new file mode 100644 index 0000000..3a0c116 Binary files /dev/null and b/cases/hash_with_hfe.rdb differ diff --git a/core/decoder.go b/core/decoder.go index 6fc8032..8114ea7 100644 --- a/core/decoder.go +++ b/core/decoder.go @@ -7,11 +7,12 @@ import ( "encoding/binary" "errors" "fmt" - "github.com/hdt3213/rdb/memprofiler" - "github.com/hdt3213/rdb/model" "io" "strconv" "time" + + "github.com/hdt3213/rdb/memprofiler" + "github.com/hdt3213/rdb/model" ) // Decoder is an instance of rdb parsing process @@ -87,27 +88,41 @@ const ( typeStreamListPacks2 typeSetListPack typeStreamListPacks3 + typeHashWithHfeRc // rdb 12 (only redis 7.4 rc) + typeHashListPackWithHfeRc // rdb 12 (only redis 7.4 rc) + typeHashWithHfe // since rdb 12 (redis 7.4) + typeHashListPackWithHfe // since rdb 12 (redis 7.4) +) + +const ( + EB_EXPIRE_TIME_MAX int64 = 0x0000FFFFFFFFFFFF + EB_EXPIRE_TIME_INVALID int64 = EB_EXPIRE_TIME_MAX + 1 + HFE_MAX_ABS_TIME_MSEC int64 = EB_EXPIRE_TIME_MAX >> 2 ) var encodingMap = map[int]string{ - typeString: model.StringEncoding, - typeList: model.ListEncoding, - typeSet: model.SetEncoding, - typeZset: model.ZSetEncoding, - typeHash: model.HashEncoding, - typeZset2: model.ZSet2Encoding, - typeHashZipMap: model.ZipMapEncoding, - typeListZipList: model.ZipListEncoding, - typeSetIntSet: model.IntSetEncoding, - typeZsetZipList: model.ZipListEncoding, - typeHashZipList: model.ZipListEncoding, - typeListQuickList: model.QuickListEncoding, - typeStreamListPacks: model.ListPackEncoding, - typeStreamListPacks2: model.ListPackEncoding, - typeHashListPack: model.ListPackEncoding, - typeZsetListPack: model.ListPackEncoding, - typeListQuickList2: model.QuickList2Encoding, - typeSetListPack: model.ListPackEncoding, + typeString: model.StringEncoding, + typeList: model.ListEncoding, + typeSet: model.SetEncoding, + typeZset: model.ZSetEncoding, + typeHash: model.HashEncoding, + typeZset2: model.ZSet2Encoding, + typeHashZipMap: model.ZipMapEncoding, + typeListZipList: model.ZipListEncoding, + typeSetIntSet: model.IntSetEncoding, + typeZsetZipList: model.ZipListEncoding, + typeHashZipList: model.ZipListEncoding, + typeListQuickList: model.QuickListEncoding, + typeStreamListPacks: model.ListPackEncoding, + typeStreamListPacks2: model.ListPackEncoding, + typeHashListPack: model.ListPackEncoding, + typeZsetListPack: model.ListPackEncoding, + typeListQuickList2: model.QuickList2Encoding, + typeSetListPack: model.ListPackEncoding, + typeHashWithHfeRc: model.HashExEncoding, + typeHashListPackWithHfeRc: model.ListPackExEncoding, + typeHashWithHfe: model.HashExEncoding, + typeHashListPackWithHfe: model.ListPackExEncoding, } // checkHeader checks whether input has valid RDB file header @@ -311,6 +326,27 @@ func (dec *Decoder) readObject(flag byte, base *model.BaseObject) (model.RedisOb BaseObject: base, Members: set, }, nil + case typeHashWithHfe, typeHashWithHfeRc: + hash, expire, err := dec.readHashMapEx(func() bool { return flag == typeHashWithHfeRc }()) + if err != nil { + return nil, err + } + return &model.HashObject{ + BaseObject: base, + Hash: hash, + FieldExpirations: expire, + }, nil + case typeHashListPackWithHfe, typeHashListPackWithHfeRc: + m, e, extra, err := dec.readListPackHashEx(func() bool { return flag == typeHashListPackWithHfeRc }()) + if err != nil { + return nil, err + } + base.Extra = extra + return &model.HashObject{ + BaseObject: base, + Hash: m, + FieldExpirations: e, + }, nil } return nil, fmt.Errorf("unknown type flag: %b", flag) } diff --git a/core/hash.go b/core/hash.go index f624a73..3966bcf 100644 --- a/core/hash.go +++ b/core/hash.go @@ -3,6 +3,8 @@ package core import ( "encoding/binary" "errors" + "fmt" + "github.com/hdt3213/rdb/model" ) @@ -30,6 +32,60 @@ func (dec *Decoder) readHashMap() (map[string][]byte, error) { return m, nil } +func (dec *Decoder) readHashMapEx(rc bool) (map[string][]byte, map[string]int64, error) { + var minExpire int64 = EB_EXPIRE_TIME_INVALID + var expire int64 + if !rc { + // Hash with HFEs. min TTL at start (7.4+), 7.4RC not included + min, err := dec.readInt64() + if err != nil { + return nil, nil, err + } + if min > EB_EXPIRE_TIME_INVALID { + return nil, nil, fmt.Errorf("hash read invalid minExpire value: %d", min) + } + minExpire = min + } + size, _, err := dec.readLength() + if err != nil { + return nil, nil, err + } else if size == 0 { + return nil, nil, fmt.Errorf("hash read empty key") + } + m := make(map[string][]byte) + e := make(map[string]int64) + for i := 0; i < int(size); i++ { + ttl, _, err := dec.readLength() + if err != nil { + return nil, nil, err + } + if rc { + // Value is absolute for 7.4RC + expire = int64(ttl) + } else if ttl == 0 { + // 0 Indicates no TTL. This is common case so we keep it small. + expire = 0 + } else { + // TTL is relative to minExpire (with +1 to avoid 0 that already taken) + expire = int64(ttl) + minExpire - 1 + } + if expire > EB_EXPIRE_TIME_MAX { + return nil, nil, fmt.Errorf("invalid expireAt time: %d", expire) + } + field, err := dec.readString() + if err != nil { + return nil, nil, err + } + value, err := dec.readString() + if err != nil { + return nil, nil, err + } + m[unsafeBytes2Str(field)] = value + e[unsafeBytes2Str(field)] = expire + } + return m, e, nil +} + func (dec *Decoder) readZipMapHash() (map[string][]byte, error) { buf, err := dec.readString() if err != nil { @@ -179,6 +235,49 @@ func (dec *Decoder) readListPackHash() (map[string][]byte, *model.ListpackDetail return m, detail, nil } +func (dec *Decoder) readListPackHashEx(rc bool) (map[string][]byte, map[string]int64, *model.ListpackDetail, error) { + if !rc { + // This value was serialized for future use-case of streaming the object directly to FLASH (while keeping in mem its next expiration time) + _, err := dec.readInt64() + if err != nil { + return nil, nil, nil, err + } + } + buf, err := dec.readString() + if err != nil { + return nil, nil, nil, err + } + cursor := 0 + size := readListPackLength(buf, &cursor) + if size == 0 { + return nil, nil, nil, fmt.Errorf("hash listpack read empty key") + } else if size%3 != 0 { + return nil, nil, nil, fmt.Errorf("hash listpack read invalid size %d", size) + } + m := make(map[string][]byte) + e := make(map[string]int64) + for i := 0; i < size; i += 3 { + key, err := dec.readListPackEntryAsString(buf, &cursor) + if err != nil { + return nil, nil, nil, err + } + val, err := dec.readListPackEntryAsString(buf, &cursor) + if err != nil { + return nil, nil, nil, err + } + expire, err := dec.readListPackEntryAsInt(buf, &cursor) + if err != nil { + return nil, nil, nil, err + } + m[unsafeBytes2Str(key)] = val + e[unsafeBytes2Str(key)] = expire + } + detail := &model.ListpackDetail{ + RawStringSize: len(buf), + } + return m, e, detail, nil +} + func (enc *Encoder) WriteHashMapObject(key string, hash map[string][]byte, options ...interface{}) error { err := enc.beforeWriteObject(options...) if err != nil { @@ -198,6 +297,20 @@ func (enc *Encoder) WriteHashMapObject(key string, hash map[string][]byte, optio return nil } +func (enc *Encoder) WriteHashMapObjectEx(key string, hash map[string][]byte, expire map[string]int64, options ...interface{}) error { + err := enc.beforeWriteObject(options...) + if err != nil { + return err + } + + err = enc.writeHashEncodingEx(key, hash, expire, options...) + if err != nil { + return err + } + enc.state = writtenObjectState + return nil +} + func (enc *Encoder) writeHashEncoding(key string, hash map[string][]byte, options ...interface{}) error { err := enc.write([]byte{typeHash}) if err != nil { @@ -224,6 +337,49 @@ func (enc *Encoder) writeHashEncoding(key string, hash map[string][]byte, option return nil } +func (enc *Encoder) writeHashEncodingEx(key string, hash map[string][]byte, expire map[string]int64, options ...interface{}) error { + err := enc.write([]byte{typeHashWithHfe}) + if err != nil { + return err + } + err = enc.writeString(key) + if err != nil { + return err + } + // Hash with HFEs. min TTL at start (7.4+), 7.4RC not included + var minExpire int64 = 0 + for _, e := range expire { + if e > minExpire { + minExpire = e + } + } + minExpireBytes := make([]byte, 8) + binary.LittleEndian.PutUint64(minExpireBytes[:], uint64(minExpire)) + err = enc.write(minExpireBytes) + if err != nil { + return err + } + err = enc.writeLength(uint64(len(hash))) + if err != nil { + return err + } + for field, value := range hash { + err = enc.writeLength(uint64(expire[field] + 1 - minExpire)) + if err != nil { + return err + } + err = enc.writeString(field) + if err != nil { + return err + } + err = enc.writeString(unsafeBytes2Str(value)) + if err != nil { + return err + } + } + return nil +} + func (enc *Encoder) tryWriteZipListHashMap(key string, hash map[string][]byte, options ...interface{}) (bool, error) { if len(hash) > enc.hashZipListOpt.getMaxEntries() { return false, nil diff --git a/core/string.go b/core/string.go index 3ff25aa..68c78d3 100644 --- a/core/string.go +++ b/core/string.go @@ -4,10 +4,11 @@ import ( "encoding/binary" "errors" "fmt" - "github.com/hdt3213/rdb/lzf" "math" "strconv" "unicode" + + "github.com/hdt3213/rdb/lzf" ) const ( @@ -119,13 +120,23 @@ func (dec *Decoder) readInt16() (int16, error) { func (dec *Decoder) readInt32() (int32, error) { err := dec.readFull(dec.buffer[:4]) if err != nil { - return 0, fmt.Errorf("read uint16 error: %v", err) + return 0, fmt.Errorf("read uint32 error: %v", err) } i := binary.LittleEndian.Uint32(dec.buffer[:4]) return int32(i), nil } +func (dec *Decoder) readInt64() (int64, error) { + err := dec.readFull(dec.buffer[:8]) + if err != nil { + return 0, fmt.Errorf("read uint64 error: %v", err) + } + + i := binary.LittleEndian.Uint64(dec.buffer[:8]) + return int64(i), nil +} + func (dec *Decoder) readLiteralFloat() (float64, error) { first, err := dec.readByte() if err != nil { diff --git a/helper/resp.go b/helper/resp.go index 758a333..3829017 100644 --- a/helper/resp.go +++ b/helper/resp.go @@ -2,6 +2,7 @@ package helper import ( "bytes" + "fmt" "io" "sort" "strconv" @@ -66,8 +67,10 @@ func setToCmd(obj *model.SetObject) CmdLine { } var hMSetCmd = []byte("HMSET") +var hPExpireAtCmd = []byte("HPEXPIREAT") // redis 7.4.0+ +var hPersistCmd = []byte("HPERSIST") // redis 7.4.0+ -func hashToCmd(obj *model.HashObject, useLexOrder bool) CmdLine { +func hashToCmd(obj *model.HashObject, useLexOrder bool) []CmdLine { cmdLine := make([][]byte, 2+obj.GetElemCount()*2) cmdLine[0] = hMSetCmd cmdLine[1] = []byte(obj.GetKey()) @@ -94,7 +97,34 @@ func hashToCmd(obj *model.HashObject, useLexOrder bool) CmdLine { i++ } } - return cmdLine + + cmds := []CmdLine{cmdLine} + if len(obj.FieldExpirations) == len(obj.Hash) { + for field, expire := range obj.FieldExpirations { + if expire == 0 { + hpexp := make([][]byte, 5) + // HPEXPIRE key seconds FIELDS num FIELD... + hpexp[0] = hPersistCmd + hpexp[1] = []byte(obj.Key) + hpexp[2] = []byte("FIELDS") + hpexp[3] = []byte("1") + hpexp[4] = []byte(field) + cmds = append(cmds, hpexp) + } else { + hpexp := make([][]byte, 6) + // HPEXPIRE key seconds FIELDS num FIELD... + hpexp[0] = hPExpireAtCmd + hpexp[1] = []byte(obj.Key) + hpexp[2] = []byte(fmt.Sprintf("%d", expire)) + hpexp[3] = []byte("FIELDS") + hpexp[4] = []byte("1") + hpexp[5] = []byte(field) + cmds = append(cmds, hpexp) + } + } + } + + return cmds } var zAddCmd = []byte("ZADD") @@ -184,7 +214,7 @@ func ObjectToCmd(obj model.RedisObject, opts ...interface{}) []CmdLine { cmdLines = append(cmdLines, listToCmd(listObj)) case model.HashType: hashObj := obj.(*model.HashObject) - cmdLines = append(cmdLines, hashToCmd(hashObj, useLexOrder)) + cmdLines = append(cmdLines, hashToCmd(hashObj, useLexOrder)...) case model.SetType: setObj := obj.(*model.SetObject) cmdLines = append(cmdLines, setToCmd(setObj)) diff --git a/model/model.go b/model/model.go index 60d379c..6237ad6 100644 --- a/model/model.go +++ b/model/model.go @@ -35,6 +35,8 @@ const ( ZSetEncoding = "zset" // HashEncoding is formed by a length encoding and some string HashEncoding = "hash" + // HashExEncoding is hash with field expiration + HashExEncoding = "hashex" // ZSet2Encoding is zset version2 which stores doubles in binary format ZSet2Encoding = "zset2" // ZipMapEncoding has been deprecated @@ -47,6 +49,8 @@ const ( QuickListEncoding = "quicklist" // ListPackEncoding is a new replacement for ziplist ListPackEncoding = "listpack" + // ListPackExEncoding is listpack with field expiration + ListPackExEncoding = "listpackex" // QuickList2Encoding is a list of listpack QuickList2Encoding = "quicklist2" ) @@ -171,7 +175,8 @@ func (o *ListObject) MarshalJSON() ([]byte, error) { // HashObject stores a hash object type HashObject struct { *BaseObject - Hash map[string][]byte + Hash map[string][]byte + FieldExpirations map[string]int64 } // GetType returns redis object type @@ -190,14 +195,28 @@ func (o *HashObject) MarshalJSON() ([]byte, error) { for k, v := range o.Hash { m[k] = string(v) } - o2 := struct { - *BaseObject - Hash map[string]string `json:"hash"` - }{ - BaseObject: o.BaseObject, - Hash: m, + if len(o.FieldExpirations) == len(o.Hash) { + // hash/listpack with HFE + o2 := struct { + *BaseObject + Hash map[string]string `json:"hash"` + FieldExpirations map[string]int64 `json:"expire"` + }{ + BaseObject: o.BaseObject, + Hash: m, + FieldExpirations: o.FieldExpirations, + } + return json.Marshal(o2) + } else { + o2 := struct { + *BaseObject + Hash map[string]string `json:"hash"` + }{ + BaseObject: o.BaseObject, + Hash: m, + } + return json.Marshal(o2) } - return json.Marshal(o2) } // SetObject stores a set object