From 3a29481ed1fac134a2641e975bc1e137208f55cc Mon Sep 17 00:00:00 2001 From: Ace Breakpoint Date: Fri, 30 Jan 2026 01:48:19 +0800 Subject: [PATCH 1/2] rdb 80 (valkey 9+) compatibility --- cases/valkey_hash2_with_hfe.aof | 54 +++++++++++++++++++ cases/valkey_hash2_with_hfe.json | 3 ++ cases/valkey_hash2_with_hfe.rdb | Bin 0 -> 148 bytes core/decoder.go | 76 +++++++++++++++++++++++--- core/encoder.go | 16 +++++- core/hash.go | 88 ++++++++++++++++++++++++------- 6 files changed, 210 insertions(+), 27 deletions(-) create mode 100644 cases/valkey_hash2_with_hfe.aof create mode 100644 cases/valkey_hash2_with_hfe.json create mode 100644 cases/valkey_hash2_with_hfe.rdb diff --git a/cases/valkey_hash2_with_hfe.aof b/cases/valkey_hash2_with_hfe.aof new file mode 100644 index 0000000..c7fc09a --- /dev/null +++ b/cases/valkey_hash2_with_hfe.aof @@ -0,0 +1,54 @@ +*8 +$5 +HMSET +$9 +hash2-hfe +$2 +F1 +$2 +V1 +$2 +F2 +$2 +V2 +$2 +F3 +$2 +V3 +*6 +$10 +HPEXPIREAT +$9 +hash2-hfe +$13 +2715785640000 +$6 +FIELDS +$1 +1 +$2 +F1 +*6 +$10 +HPEXPIREAT +$9 +hash2-hfe +$13 +2400425640000 +$6 +FIELDS +$1 +1 +$2 +F2 +*5 +$8 +HPERSIST +$9 +hash2-hfe +$6 +FIELDS +$1 +1 +$2 +F3 diff --git a/cases/valkey_hash2_with_hfe.json b/cases/valkey_hash2_with_hfe.json new file mode 100644 index 0000000..bc4e4bf --- /dev/null +++ b/cases/valkey_hash2_with_hfe.json @@ -0,0 +1,3 @@ +[ +{"db":0,"key":"hash2-hfe","size":316,"type":"hash","encoding":"hashex","hash":{"F1":"V1","F2":"V2","F3":"V3"},"expire":{"F1":2715785640000,"F2":2400425640000,"F3":0}} +] \ No newline at end of file diff --git a/cases/valkey_hash2_with_hfe.rdb b/cases/valkey_hash2_with_hfe.rdb new file mode 100644 index 0000000000000000000000000000000000000000..d4ceb78ab20ff79d4d8e4945be9eff11d319107b GIT binary patch literal 148 zcmWG@^zn9$G_WxE#Z{J=lbu?rTb5eHYN=5xUrsf`!@DpVC#gUkwrkj*loO*!aAH#1(1~JZz#NrGi-HfzUW+pd7 rrZ7VXi-N!kCI$v3HzTGnBZr2TCwgF>F;kfFe;Ak(5n7bY(YzM`Qa?7z literal 0 HcmV?d00001 diff --git a/core/decoder.go b/core/decoder.go index f0c83c5..d98975e 100644 --- a/core/decoder.go +++ b/core/decoder.go @@ -23,6 +23,9 @@ type Decoder struct { withSpecialOpCode bool withSpecialTypes map[string]ModuleTypeHandleFunc + + valkey bool + rdbVersion int } // NewDecoder creates a new RDB decoder @@ -46,15 +49,20 @@ func (dec *Decoder) WithSpecialType(moduleType string, f ModuleTypeHandleFunc) * return dec } -var magicNumber = []byte("REDIS") +var magicNumberRedis = []byte("REDIS") +var magicNumberValkey = []byte("VALKEY") const ( - minVersion = 1 - maxVersion = 12 + minVersion = 1 + maxVersion = 12 + minVersionValkey = 80 + maxVersionValkey = 80 ) const ( - opCodeFunction = 245 + opCodeSlotImport = 243 /* Slot import state. */ + opCodeSlotInfo = 244 /* Foreign slot info, safe to ignore. */ + opCodeFunction = 245 /* function library data */ opCodeModuleAux = 247 /* Module auxiliary data. */ opCodeIdle = 248 /* LRU idle time. */ opCodeFreq = 249 /* LFU frequency. */ @@ -93,6 +101,8 @@ const ( typeHashListPackWithHfeRc // rdb 12 (only redis 7.4 rc) typeHashWithHfe // since rdb 12 (redis 7.4) typeHashListPackWithHfe // since rdb 12 (redis 7.4) + + typeHash2 = typeHashWithHfeRc // Hash with field-level expiration (Valkey 9+) ) const ( @@ -124,6 +134,7 @@ var encodingMap = map[int]string{ typeHashListPackWithHfeRc: model.ListPackExEncoding, typeHashWithHfe: model.HashExEncoding, typeHashListPackWithHfe: model.ListPackExEncoding, + // typeHash2: model.HashExEncoding, // same 22 as typeHashWithHfeRc } // checkHeader checks whether input has valid RDB file header @@ -136,16 +147,27 @@ func (dec *Decoder) checkHeader() error { if err != nil { return fmt.Errorf("io error: %v", err) } - if !bytes.Equal(header[0:5], magicNumber) { + var versionString string + if bytes.HasPrefix(header, magicNumberRedis) { + dec.valkey = false + versionString = string(bytes.TrimPrefix(header, magicNumberRedis)) + } else if bytes.HasPrefix(header, magicNumberValkey) { + dec.valkey = true + versionString = string(bytes.TrimPrefix(header, magicNumberValkey)) + } else { return errors.New("file is not a RDB file") } - version, err := strconv.Atoi(string(header[5:])) + version, err := strconv.Atoi(versionString) if err != nil { - return fmt.Errorf("%s is not valid version number", string(header[5:])) + return fmt.Errorf("%s is not valid version number", versionString) } - if version < minVersion || version > maxVersion { + if !dec.valkey && (version < minVersion || version > maxVersion) { return fmt.Errorf("cannot parse version: %d", version) } + if dec.valkey && (version < minVersionValkey || version > maxVersionValkey) { + return fmt.Errorf("cannot parse version: %d", version) + } + dec.rdbVersion = version return nil } @@ -466,6 +488,44 @@ func (dec *Decoder) parse(cb func(object model.RedisObject) bool) error { } } continue + } else if b == opCodeSlotInfo { + var err error + var slot_id, slot_size, expires_slot_size uint64 + slot_id, _, err = dec.readLength() + if err == nil { + slot_size, _, err = dec.readLength() + } + if err == nil { + expires_slot_size, _, err = dec.readLength() + } + if err != nil { + return err + } + _, _, _ = slot_id, slot_size, expires_slot_size // safe to skip + continue + } else if b == opCodeSlotImport { + job, err := dec.readString() + if err != nil { + return err + } + num_slot_ranges, _, err := dec.readLength() + if err != nil { + return err + } + var slot_from, slot_to uint64 + ranges := make([]string, num_slot_ranges) + for i := uint64(0); i < num_slot_ranges; i++ { + slot_from, _, err = dec.readLength() + if err == nil { + slot_to, _, err = dec.readLength() + } + if err != nil { + return err + } + ranges[i] = fmt.Sprintf("%d-%d", slot_from, slot_to) + } + _, _ = job, ranges // no way other than skipping + continue } key, err := dec.readString() if err != nil { diff --git a/core/encoder.go b/core/encoder.go index 06cdc5b..a5380c7 100644 --- a/core/encoder.go +++ b/core/encoder.go @@ -22,6 +22,8 @@ type Encoder struct { hashZipListOpt *zipListOpt zsetZipListOpt *zipListOpt listZipListSize int + + valkey bool } type zipListOpt struct { @@ -102,6 +104,13 @@ func NewEncoder(writer io.Writer) *Encoder { } } +// NewEncoderValkey creates an encoder instance for Valkey 9+ (rdb 80) +func NewEncoderValkey(writer io.Writer) *Encoder { + enc := NewEncoder(writer) + enc.valkey = true + return enc +} + // SetListZipListOpt sets list-max-ziplist-value and list-max-ziplist-entries func (enc *Encoder) SetListZipListOpt(maxValue, maxEntries int) *Encoder { enc.listZipListOpt = &zipListOpt{ @@ -147,7 +156,8 @@ func (enc *Encoder) write(p []byte) error { return nil } -var rdbHeader = []byte("REDIS0011") +var rdbHeaderRedis = []byte("REDIS0011") +var rdbHeaderValkey = []byte("VALKEY080") func (enc *Encoder) validateStateChange(toState string) bool { _, ok := stateChanges[enc.state][toState] @@ -158,6 +168,10 @@ func (enc *Encoder) WriteHeader() error { if !enc.validateStateChange(writtenHeaderState) { return fmt.Errorf("cannot writing header at state: %s", enc.state) } + var rdbHeader []byte = rdbHeaderRedis + if enc.valkey { + rdbHeader = rdbHeaderValkey + } err := enc.write(rdbHeader) if err != nil { return err diff --git a/core/hash.go b/core/hash.go index 3966bcf..009952f 100644 --- a/core/hash.go +++ b/core/hash.go @@ -4,6 +4,7 @@ import ( "encoding/binary" "errors" "fmt" + "math" "github.com/hdt3213/rdb/model" ) @@ -35,7 +36,7 @@ func (dec *Decoder) readHashMap() (map[string][]byte, error) { func (dec *Decoder) readHashMapEx(rc bool) (map[string][]byte, map[string]int64, error) { var minExpire int64 = EB_EXPIRE_TIME_INVALID var expire int64 - if !rc { + if !rc && !dec.valkey { // Hash with HFEs. min TTL at start (7.4+), 7.4RC not included min, err := dec.readInt64() if err != nil { @@ -55,22 +56,24 @@ func (dec *Decoder) readHashMapEx(rc bool) (map[string][]byte, map[string]int64, m := make(map[string][]byte) e := make(map[string]int64) for i := 0; i < int(size); i++ { - ttl, _, err := dec.readLength() - if err != nil { - return nil, nil, err - } - if rc { - // Value is absolute for 7.4RC - expire = int64(ttl) - } else if ttl == 0 { - // 0 Indicates no TTL. This is common case so we keep it small. - expire = 0 - } else { - // TTL is relative to minExpire (with +1 to avoid 0 that already taken) - expire = int64(ttl) + minExpire - 1 - } - if expire > EB_EXPIRE_TIME_MAX { - return nil, nil, fmt.Errorf("invalid expireAt time: %d", expire) + if !dec.valkey { + ttl, _, err := dec.readLength() + if err != nil { + return nil, nil, err + } + if rc { + // Value is absolute for 7.4RC + expire = int64(ttl) + } else if ttl == 0 { + // 0 Indicates no TTL. This is common case so we keep it small. + expire = 0 + } else { + // TTL is relative to minExpire (with +1 to avoid 0 that already taken) + expire = int64(ttl) + minExpire - 1 + } + if expire > EB_EXPIRE_TIME_MAX { + return nil, nil, fmt.Errorf("invalid expireAt time: %d", expire) + } } field, err := dec.readString() if err != nil { @@ -80,6 +83,15 @@ func (dec *Decoder) readHashMapEx(rc bool) (map[string][]byte, map[string]int64, if err != nil { return nil, nil, err } + if dec.valkey { + expire, err = dec.readInt64() + if err != nil { + return nil, nil, err + } + if expire < 0 { + expire = 0 // valkey use -1 to indicate no TTL + } + } m[unsafeBytes2Str(field)] = value e[unsafeBytes2Str(field)] = expire } @@ -303,7 +315,11 @@ func (enc *Encoder) WriteHashMapObjectEx(key string, hash map[string][]byte, exp return err } - err = enc.writeHashEncodingEx(key, hash, expire, options...) + if enc.valkey { + err = enc.writeHash2Encoding(key, hash, expire, options...) + } else { + err = enc.writeHashEncodingEx(key, hash, expire, options...) + } if err != nil { return err } @@ -380,6 +396,42 @@ func (enc *Encoder) writeHashEncodingEx(key string, hash map[string][]byte, expi return nil } +func (enc *Encoder) writeHash2Encoding(key string, hash map[string][]byte, expire map[string]int64, options ...interface{}) error { + ttl := make([]byte, 8) + err := enc.write([]byte{typeHash2}) + if err != nil { + return err + } + err = enc.writeString(key) + if err != nil { + return err + } + err = enc.writeLength(uint64(len(hash))) + if err != nil { + return err + } + for field, value := range hash { + err = enc.writeString(field) + if err != nil { + return err + } + err = enc.writeString(unsafeBytes2Str(value)) + if err != nil { + return err + } + if expire[field] == 0 { + binary.LittleEndian.PutUint64(ttl, math.MaxUint64) // -1 means no TTL + } else { + binary.LittleEndian.PutUint64(ttl, uint64(expire[field])) + } + err = enc.write(ttl) + if err != nil { + return err + } + } + return nil +} + func (enc *Encoder) tryWriteZipListHashMap(key string, hash map[string][]byte, options ...interface{}) (bool, error) { if len(hash) > enc.hashZipListOpt.getMaxEntries() { return false, nil From 4469096ef03e71a5517c2ef39397dc9cc12df3d2 Mon Sep 17 00:00:00 2001 From: Ace Breakpoint Date: Mon, 2 Feb 2026 17:54:14 +0800 Subject: [PATCH 2/2] add LFU/LRU support --- core/decoder.go | 12 ++++++++++-- model/model.go | 16 ++++++++++++++++ 2 files changed, 26 insertions(+), 2 deletions(-) diff --git a/core/decoder.go b/core/decoder.go index d98975e..a320db6 100644 --- a/core/decoder.go +++ b/core/decoder.go @@ -377,6 +377,8 @@ func (dec *Decoder) readObject(flag byte, base *model.BaseObject) (model.RedisOb func (dec *Decoder) parse(cb func(object model.RedisObject) bool) error { var dbIndex int var expireMs int64 + var lru int64 = -1 + var lfu int64 = -1 for { b, err := dec.readByte() if err != nil { @@ -452,16 +454,18 @@ func (dec *Decoder) parse(cb func(object model.RedisObject) bool) error { } continue } else if b == opCodeFreq { - _, err = dec.readByte() + freq, err := dec.readByte() if err != nil { return err } + lfu = int64(freq) continue } else if b == opCodeIdle { - _, _, err = dec.readLength() + idle, _, err := dec.readLength() if err != nil { return err } + lru = int64(idle) continue } else if b == opCodeModuleAux { _, _, err = dec.readModuleType() @@ -540,6 +544,10 @@ func (dec *Decoder) parse(cb func(object model.RedisObject) bool) error { base.Expiration = &expiration expireMs = 0 // reset expire ms } + base.IdleTime = lru + lru = -1 // reset lru + base.Freq = lfu + lfu = -1 // reset lfu obj, err := dec.readObject(b, base) if err != nil { return err diff --git a/model/model.go b/model/model.go index d0d1639..24aed17 100644 --- a/model/model.go +++ b/model/model.go @@ -76,6 +76,10 @@ type RedisObject interface { GetElemCount() int // GetEncoding returns encoding of object GetEncoding() string + // GetIdleTime returns LRU of object + GetIdleTime() int64 + // GetFreq() returns LFU of object + GetFreq() int64 } // BaseObject is basement of redis object @@ -87,6 +91,8 @@ type BaseObject struct { Type string `json:"type"` // Type is one of string/list/set/hash/zset Encoding string `json:"encoding"` // Encoding is the exact encoding method Extra interface{} `json:"-"` // Extra stores more detail of encoding for memory profiler and other usages + IdleTime int64 `json:"lru"` + Freq int64 `json:"lfu"` } // GetKey returns key of object @@ -119,6 +125,16 @@ func (o *BaseObject) GetElemCount() int { return 0 } +// GetIdleTime returns LRU of object +func (o *BaseObject) GetIdleTime() int64 { + return o.IdleTime +} + +// GetFreq() returns LFU of object +func (o *BaseObject) GetFreq() int64 { + return o.Freq +} + // StringObject stores a string object type StringObject struct { *BaseObject