Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
54 changes: 54 additions & 0 deletions cases/valkey_hash2_with_hfe.aof
Original file line number Diff line number Diff line change
@@ -0,0 +1,54 @@
*8
$5
HMSET
$9
hash2-hfe
$2
F1
$2
V1
$2
F2
$2
V2
$2
F3
$2
V3
*6
$10
HPEXPIREAT
$9
hash2-hfe
$13
2715785640000
$6
FIELDS
$1
1
$2
F1
*6
$10
HPEXPIREAT
$9
hash2-hfe
$13
2400425640000
$6
FIELDS
$1
1
$2
F2
*5
$8
HPERSIST
$9
hash2-hfe
$6
FIELDS
$1
1
$2
F3
3 changes: 3 additions & 0 deletions cases/valkey_hash2_with_hfe.json
Original file line number Diff line number Diff line change
@@ -0,0 +1,3 @@
[
{"db":0,"key":"hash2-hfe","size":316,"type":"hash","encoding":"hashex","hash":{"F1":"V1","F2":"V2","F3":"V3"},"expire":{"F1":2715785640000,"F2":2400425640000,"F3":0}}
]
Binary file added cases/valkey_hash2_with_hfe.rdb
Binary file not shown.
88 changes: 78 additions & 10 deletions core/decoder.go
Original file line number Diff line number Diff line change
Expand Up @@ -23,6 +23,9 @@ type Decoder struct {

withSpecialOpCode bool
withSpecialTypes map[string]ModuleTypeHandleFunc

valkey bool
rdbVersion int
}

// NewDecoder creates a new RDB decoder
Expand All @@ -46,15 +49,20 @@ func (dec *Decoder) WithSpecialType(moduleType string, f ModuleTypeHandleFunc) *
return dec
}

var magicNumber = []byte("REDIS")
var magicNumberRedis = []byte("REDIS")
var magicNumberValkey = []byte("VALKEY")

const (
minVersion = 1
maxVersion = 12
minVersion = 1
maxVersion = 12
minVersionValkey = 80
maxVersionValkey = 80
)

const (
opCodeFunction = 245
opCodeSlotImport = 243 /* Slot import state. */
opCodeSlotInfo = 244 /* Foreign slot info, safe to ignore. */
opCodeFunction = 245 /* function library data */
opCodeModuleAux = 247 /* Module auxiliary data. */
opCodeIdle = 248 /* LRU idle time. */
opCodeFreq = 249 /* LFU frequency. */
Expand Down Expand Up @@ -93,6 +101,8 @@ const (
typeHashListPackWithHfeRc // rdb 12 (only redis 7.4 rc)
typeHashWithHfe // since rdb 12 (redis 7.4)
typeHashListPackWithHfe // since rdb 12 (redis 7.4)

typeHash2 = typeHashWithHfeRc // Hash with field-level expiration (Valkey 9+)
)

const (
Expand Down Expand Up @@ -124,6 +134,7 @@ var encodingMap = map[int]string{
typeHashListPackWithHfeRc: model.ListPackExEncoding,
typeHashWithHfe: model.HashExEncoding,
typeHashListPackWithHfe: model.ListPackExEncoding,
// typeHash2: model.HashExEncoding, // same 22 as typeHashWithHfeRc
}

// checkHeader checks whether input has valid RDB file header
Expand All @@ -136,16 +147,27 @@ func (dec *Decoder) checkHeader() error {
if err != nil {
return fmt.Errorf("io error: %v", err)
}
if !bytes.Equal(header[0:5], magicNumber) {
var versionString string
if bytes.HasPrefix(header, magicNumberRedis) {
dec.valkey = false
versionString = string(bytes.TrimPrefix(header, magicNumberRedis))
} else if bytes.HasPrefix(header, magicNumberValkey) {
dec.valkey = true
versionString = string(bytes.TrimPrefix(header, magicNumberValkey))
} else {
return errors.New("file is not a RDB file")
}
version, err := strconv.Atoi(string(header[5:]))
version, err := strconv.Atoi(versionString)
if err != nil {
return fmt.Errorf("%s is not valid version number", string(header[5:]))
return fmt.Errorf("%s is not valid version number", versionString)
}
if version < minVersion || version > maxVersion {
if !dec.valkey && (version < minVersion || version > maxVersion) {
return fmt.Errorf("cannot parse version: %d", version)
}
if dec.valkey && (version < minVersionValkey || version > maxVersionValkey) {
return fmt.Errorf("cannot parse version: %d", version)
}
dec.rdbVersion = version
return nil
}

Expand Down Expand Up @@ -355,6 +377,8 @@ func (dec *Decoder) readObject(flag byte, base *model.BaseObject) (model.RedisOb
func (dec *Decoder) parse(cb func(object model.RedisObject) bool) error {
var dbIndex int
var expireMs int64
var lru int64 = -1
var lfu int64 = -1
for {
b, err := dec.readByte()
if err != nil {
Expand Down Expand Up @@ -430,16 +454,18 @@ func (dec *Decoder) parse(cb func(object model.RedisObject) bool) error {
}
continue
} else if b == opCodeFreq {
_, err = dec.readByte()
freq, err := dec.readByte()
if err != nil {
return err
}
lfu = int64(freq)
continue
} else if b == opCodeIdle {
_, _, err = dec.readLength()
idle, _, err := dec.readLength()
if err != nil {
return err
}
lru = int64(idle)
continue
} else if b == opCodeModuleAux {
_, _, err = dec.readModuleType()
Expand All @@ -466,6 +492,44 @@ func (dec *Decoder) parse(cb func(object model.RedisObject) bool) error {
}
}
continue
} else if b == opCodeSlotInfo {
var err error
var slot_id, slot_size, expires_slot_size uint64
slot_id, _, err = dec.readLength()
if err == nil {
slot_size, _, err = dec.readLength()
}
if err == nil {
expires_slot_size, _, err = dec.readLength()
}
if err != nil {
return err
}
_, _, _ = slot_id, slot_size, expires_slot_size // safe to skip
continue
} else if b == opCodeSlotImport {
job, err := dec.readString()
if err != nil {
return err
}
num_slot_ranges, _, err := dec.readLength()
if err != nil {
return err
}
var slot_from, slot_to uint64
ranges := make([]string, num_slot_ranges)
for i := uint64(0); i < num_slot_ranges; i++ {
slot_from, _, err = dec.readLength()
if err == nil {
slot_to, _, err = dec.readLength()
}
if err != nil {
return err
}
ranges[i] = fmt.Sprintf("%d-%d", slot_from, slot_to)
}
_, _ = job, ranges // no way other than skipping
continue
}
key, err := dec.readString()
if err != nil {
Expand All @@ -480,6 +544,10 @@ func (dec *Decoder) parse(cb func(object model.RedisObject) bool) error {
base.Expiration = &expiration
expireMs = 0 // reset expire ms
}
base.IdleTime = lru
lru = -1 // reset lru
base.Freq = lfu
lfu = -1 // reset lfu
obj, err := dec.readObject(b, base)
if err != nil {
return err
Expand Down
16 changes: 15 additions & 1 deletion core/encoder.go
Original file line number Diff line number Diff line change
Expand Up @@ -22,6 +22,8 @@ type Encoder struct {
hashZipListOpt *zipListOpt
zsetZipListOpt *zipListOpt
listZipListSize int

valkey bool
}

type zipListOpt struct {
Expand Down Expand Up @@ -102,6 +104,13 @@ func NewEncoder(writer io.Writer) *Encoder {
}
}

// NewEncoderValkey creates an encoder instance for Valkey 9+ (rdb 80)
func NewEncoderValkey(writer io.Writer) *Encoder {
enc := NewEncoder(writer)
enc.valkey = true
return enc
}

// SetListZipListOpt sets list-max-ziplist-value and list-max-ziplist-entries
func (enc *Encoder) SetListZipListOpt(maxValue, maxEntries int) *Encoder {
enc.listZipListOpt = &zipListOpt{
Expand Down Expand Up @@ -147,7 +156,8 @@ func (enc *Encoder) write(p []byte) error {
return nil
}

var rdbHeader = []byte("REDIS0011")
var rdbHeaderRedis = []byte("REDIS0011")
var rdbHeaderValkey = []byte("VALKEY080")

func (enc *Encoder) validateStateChange(toState string) bool {
_, ok := stateChanges[enc.state][toState]
Expand All @@ -158,6 +168,10 @@ func (enc *Encoder) WriteHeader() error {
if !enc.validateStateChange(writtenHeaderState) {
return fmt.Errorf("cannot writing header at state: %s", enc.state)
}
var rdbHeader []byte = rdbHeaderRedis
if enc.valkey {
rdbHeader = rdbHeaderValkey
}
err := enc.write(rdbHeader)
if err != nil {
return err
Expand Down
Loading
Loading