diff --git a/src/aof.c b/src/aof.c index c7c0731b94..19c11c087a 100644 --- a/src/aof.c +++ b/src/aof.c @@ -1955,8 +1955,9 @@ static int rioWriteHashIteratorCursor(rio *r, hashTypeIterator *hi, int what) { else return rioWriteBulkLongLong(r, vll); } else if (hi->encoding == OBJ_ENCODING_HASHTABLE) { - sds value = hashTypeCurrentFromHashTable(hi, what); - return rioWriteBulkString(r, value, sdslen(value)); + size_t len; + char *value = hashTypeCurrentFromHashTable(hi, what, &len); + return rioWriteBulkString(r, value, len); } serverPanic("Unknown hash encoding"); @@ -1974,7 +1975,8 @@ int rewriteHashObject(rio *r, robj *key, robj *o) { while (hashTypeNext(&hi) != C_ERR) { long long expiry = entryGetExpiry(hi.next); sds field = entryGetField(hi.next); - sds value = entryGetValue(hi.next); + size_t value_len; + char *value = entryGetValue(hi.next, &value_len); if (rioWriteBulkCount(r, '*', 8) == 0) return 0; if (rioWriteBulkString(r, "HSETEX", 6) == 0) return 0; if (rioWriteBulkObject(r, key) == 0) return 0; @@ -1983,7 +1985,7 @@ int rewriteHashObject(rio *r, robj *key, robj *o) { if (rioWriteBulkString(r, "FIELDS", 6) == 0) return 0; if (rioWriteBulkLongLong(r, 1) == 0) return 0; if (rioWriteBulkString(r, field, sdslen(field)) == 0) return 0; - if (rioWriteBulkString(r, value, sdslen(value)) == 0) return 0; + if (rioWriteBulkString(r, value, value_len) == 0) return 0; volatile_items++; } hashTypeResetIterator(&hi); diff --git a/src/db.c b/src/db.c index 91aa67a195..50fdb95200 100644 --- a/src/db.c +++ b/src/db.c @@ -1001,6 +1001,12 @@ int objectTypeCompare(robj *o, long long target) { return 1; } +static void addScanDataItem(vector *result, const char *buf, size_t len) { + stringRef *item = vectorPush(result); + item->buf = buf; + item->len = len; +} + /* Hashtable scan callback used by scanCallback when scanning the keyspace. */ void keysScanCallback(void *privdata, void *entry, int didx) { scanData *data = (scanData *)privdata; @@ -1031,15 +1037,14 @@ void keysScanCallback(void *privdata, void *entry, int didx) { } /* Keep this key. */ - sds *item = vectorPush(data->result); - *item = key; + addScanDataItem(data->result, (const char *)key, sdslen(key)); } /* This callback is used by scanGenericCommand in order to collect elements * returned by the dictionary iterator into a list. */ void hashtableScanCallback(void *privdata, void *entry) { scanData *data = (scanData *)privdata; - sds val = NULL; + stringRef val = {NULL, 0}; sds key = NULL; robj *o = data->o; @@ -1059,7 +1064,7 @@ void hashtableScanCallback(void *privdata, void *entry) { } else if (o->type == OBJ_HASH) { key = entryGetField(entry); if (!data->only_keys) { - val = entryGetValue(entry); + val.buf = entryGetValue(entry, &val.len); } } else { serverPanic("Type not handled in hashtable SCAN callback."); @@ -1081,15 +1086,15 @@ void hashtableScanCallback(void *privdata, void *entry) { if (!data->only_keys) { char buf[MAX_LONG_DOUBLE_CHARS]; int len = ld2string(buf, sizeof(buf), node->score, LD_STR_AUTO); - val = sdsnewlen(buf, len); + sds tmp = sdsnewlen(buf, len); + val.buf = (const char *)tmp; + val.len = sdslen(tmp); } } - sds *item = vectorPush(data->result); - *item = key; - if (val) { - item = vectorPush(data->result); - *item = val; + addScanDataItem(data->result, (const char *)key, sdslen(key)); + if (val.buf) { + addScanDataItem(data->result, val.buf, val.len); } } @@ -1244,7 +1249,7 @@ void scanGenericCommand(client *c, robj *o, unsigned long long cursor) { /* scanning ZSET allocates temporary strings even though it's a dict */ free_callback = sdsfree; } - vectorInit(&result, SCAN_VECTOR_INITIAL_ALLOC, sizeof(sds)); + vectorInit(&result, SCAN_VECTOR_INITIAL_ALLOC, sizeof(stringRef)); /* For main hash table scan or scannable data structure. */ if (!o || ht) { @@ -1305,8 +1310,8 @@ void scanGenericCommand(client *c, robj *o, unsigned long long cursor) { if (use_pattern && !stringmatchlen(pat, sdslen(pat), key, len, 0)) { continue; } - sds *item = vectorPush(&result); - *item = sdsnewlen(key, len); + sds item = sdsnewlen(key, len); + addScanDataItem(&result, (const char *)item, sdslen(item)); } setTypeReleaseIterator(si); cursor = 0; @@ -1326,13 +1331,13 @@ void scanGenericCommand(client *c, robj *o, unsigned long long cursor) { continue; } /* add key object */ - sds *item = vectorPush(&result); - *item = sdsnewlen(str, len); + sds item = sdsnewlen(str, len); + addScanDataItem(&result, (const char *)item, sdslen(item)); /* add value object */ if (!only_keys) { str = lpGet(p, &len, intbuf); - item = vectorPush(&result); - *item = sdsnewlen(str, len); + item = sdsnewlen(str, len); + addScanDataItem(&result, (const char *)item, sdslen(item)); } p = lpNext(o->ptr, p); } @@ -1347,10 +1352,10 @@ void scanGenericCommand(client *c, robj *o, unsigned long long cursor) { addReplyArrayLen(c, vectorLen(&result)); for (uint32_t i = 0; i < vectorLen(&result); i++) { - sds *key = vectorGet(&result, i); - addReplyBulkCBuffer(c, *key, sdslen(*key)); + stringRef *key = vectorGet(&result, i); + addReplyBulkCBuffer(c, key->buf, key->len); if (free_callback) { - free_callback(*key); + free_callback((sds)(key->buf)); } } diff --git a/src/entry.c b/src/entry.c index cc9819e41c..94cbdd7b97 100644 --- a/src/entry.c +++ b/src/entry.c @@ -54,7 +54,24 @@ * | * +-> sds value * - * Identified by: Aux bit FIELD_SDS_AUX_BIT_ENTRY_HAS_VALUE_PTR + * + * Type 4: Value is an stringRef, referenced by pointer + * With this type, the key is embedded, and the value is a stringRef, referenced by pointer. Extra + * bits in the sdshdr8(+) are used to encode aux flags which indicate the presence of a value by + * pointer. An aux bit may indicate the presence of an optional expiration. Note that the + * "field" is not padded, so there's no direct way to identify the length of the allocation. + * This type is used when the entry's Value holds a buffer but doesn't own it, meaning + * freeing the entry doesn't involve freeing of the buffer. + * + * entry + * | + * +--------------+---------------------+----------V----------+--------+ + * | Expire (opt) | Value | Field | / / / /| + * | long long | stringRef (pointer) | sdshdr8+ | "foo" \0 |/ / / / | + * +--------------+----------+----------+----------+----------+--------+ + * | + * | + * stringRef value */ enum { @@ -64,6 +81,11 @@ enum { * pointer located in memory before the embedded field. If unset, the entry * instead has an embedded value located after the embedded field. */ FIELD_SDS_AUX_BIT_ENTRY_HAS_VALUE_PTR = 1, + /* SDS aux flag. If set, it indicates that the hash entry has a reference to the value. + * The hash entry does not own the string reference and will not free it upon + * entry destruction. The primary usecase is to avoid memory duplication + * between the core and a module. */ + FIELD_SDS_AUX_BIT_ENTRY_HAS_STRING_REF = 2, FIELD_SDS_AUX_BIT_MAX }; static_assert(FIELD_SDS_AUX_BIT_MAX < sizeof(char) - SDS_TYPE_BITS, "too many sds bits are used for entry metadata"); @@ -81,10 +103,15 @@ static bool entryHasValuePtr(const entry *entry) { /* Returns true in case the entry's value is embedded in the entry. * Returns false otherwise. */ -bool entryHasEmbeddedValue(entry *entry) { +bool entryHasEmbeddedValue(const entry *entry) { return (!entryHasValuePtr(entry)); } +/* Returns true in case the entry holds a reference of the value. + * Returns false otherwise. */ +bool entryHasStringRef(const entry *entry) { + return entryHasValuePtr(entry) && sdsGetAuxBit(entryGetField(entry), FIELD_SDS_AUX_BIT_ENTRY_HAS_STRING_REF); +} /* Returns true in case the entry has expiration timestamp. * Returns false otherwise. */ bool entryHasExpiry(const entry *entry) { @@ -93,74 +120,98 @@ bool entryHasExpiry(const entry *entry) { /* Returns the location of a pointer to a separately allocated value. Only for * an entry without an embedded value. */ -static sds *entryGetValueRef(const entry *entry) { +static void **entryGetValueRef(const entry *entry) { serverAssert(entryHasValuePtr(entry)); char *field_data = sdsAllocPtr(entryGetField(entry)); - field_data -= sizeof(sds); - return (sds *)field_data; + field_data -= sizeof(void *); + return (void **)field_data; } -/* Returns the sds of the entry's value. */ -sds entryGetValue(const entry *entry) { - if (entryHasValuePtr(entry)) { - return *entryGetValueRef(entry); - } else { +static sds *entryGetSdsValueRef(const entry *entry) { + return (sds *)entryGetValueRef(entry); +} + +static stringRef *entryGetStringRefRef(const entry *entry) { + serverAssert(entryHasStringRef(entry)); + return (stringRef *)*entryGetValueRef(entry); +} + +/* Returns the entry's value. */ +char *entryGetValue(const entry *entry, size_t *len) { + if (entryHasEmbeddedValue(entry)) { /* Skip field content, field null terminator and value sds8 hdr. */ size_t offset = sdslen(entryGetField(entry)) + 1 + sdsHdrSize(SDS_TYPE_8); - return (sds)((char *)entry + offset); + sds value = (char *)entry + offset; + if (len) *len = sdslen(value); + return value; } + if (entryHasStringRef(entry)) { + stringRef *string_ref = entryGetStringRefRef(entry); + if (!string_ref) return NULL; + if (len) *len = string_ref->len; + return (char *)string_ref->buf; + } + sds *value_ref = entryGetSdsValueRef(entry); + if (len) *len = sdslen(*value_ref); + return *value_ref; } -/* Modify the value of this entry and return a pointer to the (potentially new) entry. - * The value is taken by the function and cannot be reused after this function returns. */ -entry *entrySetValue(entry *e, sds value) { - if (entryHasValuePtr(e)) { - sds *value_ref = entryGetValueRef(e); - sdsfree(*value_ref); - *value_ref = value; - return e; +/* Frees the entry's non-embedded value. + * If the value is a string reference (stringRef), only the entry's pointer + * is freed, as the underlying string is not owned by this entry. + * Otherwise, the value is a standard SDS and is fully freed. */ +static void entryFreeValuePtr(entry *entry) { + serverAssert(entryHasValuePtr(entry)); + void **value_ref = entryGetValueRef(entry); + if (entryHasStringRef(entry)) { + zfree(*value_ref); } else { - entry *new_entry = entryUpdate(e, value, entryGetExpiry(e)); - return new_entry; + sdsfree(*value_ref); } + *value_ref = NULL; } +static void entrySetValueSds(entry *e, sds value) { + serverAssert(entryHasValuePtr(e)); + entryFreeValuePtr(e); + if (entryHasStringRef(e)) sdsSetAuxBit(entryGetField(e), FIELD_SDS_AUX_BIT_ENTRY_HAS_STRING_REF, 0); + sds *value_ref = entryGetSdsValueRef(e); + *value_ref = value; +} /* Returns the address of the entry allocation. */ -void *entryGetAllocPtr(const entry *entry) { +static void *entryGetAllocPtr(const entry *entry) { char *buf = sdsAllocPtr(entryGetField(entry)); - if (entryHasValuePtr(entry)) buf -= sizeof(sds); + if (entryHasValuePtr(entry)) buf -= sizeof(void *); if (entryHasExpiry(entry)) buf -= sizeof(long long); return buf; } /**************************************** Entry Expiry API *****************************************/ +/* Returns the location of a pointer to the expiry */ +static long long *entryGetExpiryRef(const entry *entry) { + debugServerAssert(entryHasExpiry(entry)); + char *buf = entryGetAllocPtr(entry); + return (long long *)buf; +} /* Returns the entry expiration timestamp. * In case this entry has no expiration time, will return EXPIRE_NONE. */ long long entryGetExpiry(const entry *entry) { - long long expiry = EXPIRY_NONE; - if (entryHasExpiry(entry)) { - char *buf = entryGetAllocPtr(entry); - debugServerAssert((((uintptr_t)buf & 0x7) == 0)); /* Test that the allocation is indeed 8 bytes aligned - * This is needed since we access the expiry as with pointer casting - * which require the access to be 8 bytes aligned. */ - expiry = *(long long *)buf; - } - return expiry; + if (entryHasExpiry(entry)) return *entryGetExpiryRef(entry); + return EXPIRY_NONE; } /* Modify the expiration time of this entry and return a pointer to the (potentially new) entry. */ entry *entrySetExpiry(entry *e, long long expiry) { - if (entryHasExpiry(e)) { - char *buf = entryGetAllocPtr(e); - debugServerAssert((((uintptr_t)buf & 0x7) == 0)); /* Test that the allocation is indeed 8 bytes aligned - * This is needed since we access the expiry as with pointer casting - * which require the access to be 8 bytes aligned. */ - *(long long *)buf = expiry; + if (expiry != EXPIRY_NONE && entryHasExpiry(e)) { + *entryGetExpiryRef(e) = expiry; return e; } - entry *new_entry = entryUpdate(e, NULL, expiry); - return new_entry; + if (entryHasStringRef(e)) { + stringRef *value = entryGetStringRefRef(e); + return entryUpdateAsStringRef(e, value->buf, value->len, expiry); + } + return entryUpdate(e, NULL, expiry); } /* Return true in case the entry has assigned expiration or false otherwise. */ @@ -168,16 +219,13 @@ bool entryIsExpired(entry *entry) { return timestampIsExpired(entryGetExpiry(entry)); } /**************************************** Entry Expiry API - End *****************************************/ - void entryFree(entry *entry) { - if (entryHasValuePtr(entry)) { - sdsfree(entryGetValue(entry)); - } + if (entryHasValuePtr(entry)) entryFreeValuePtr(entry); zfree(entryGetAllocPtr(entry)); } -static inline size_t entryReqSize(const_sds field, - sds value, +static inline size_t entryReqSize(size_t field_len, + size_t value_len, long long expiry, bool *is_value_embedded, int *field_sds_type, @@ -185,17 +233,15 @@ static inline size_t entryReqSize(const_sds field, size_t *expiry_size, size_t *embedded_value_size) { size_t expiry_alloc_size = (expiry == EXPIRY_NONE) ? 0 : sizeof(long long); - size_t field_len = sdslen(field); int embedded_field_sds_type = sdsReqType(field_len); if (embedded_field_sds_type == SDS_TYPE_5 && (expiry_alloc_size > 0)) { embedded_field_sds_type = SDS_TYPE_8; } size_t field_alloc_size = sdsReqSize(field_len, embedded_field_sds_type); - size_t value_len = value ? sdslen(value) : 0; - size_t embedded_value_alloc_size = value ? sdsReqSize(value_len, SDS_TYPE_8) : 0; + size_t embedded_value_alloc_size = value_len != SIZE_MAX ? sdsReqSize(value_len, SDS_TYPE_8) : 0; size_t alloc_size = field_alloc_size + expiry_alloc_size; bool embed_value = false; - if (value) { + if (value_len != SIZE_MAX) { if (alloc_size + embedded_value_alloc_size <= EMBED_VALUE_MAX_ALLOC_SIZE) { /* Embed field and value. Value is fixed to SDS_TYPE_8. Unused * allocation space is recorded in the embedded value's SDS header. @@ -235,34 +281,37 @@ static inline size_t entryReqSize(const_sds field, return alloc_size; } -/* Serialize the content of the entry into the provided buffer buf. Make use of the provided arguments provided by a call to entryReqSize. +/* Serialize the content of the entry into an allocated buffer buf. * Note that this function will take ownership of the value so user should not assume it is valid after this call. */ -static entry *entryWrite(char *buf, - size_t buf_size, - const_sds field, - sds value, - long long expiry, - bool embed_value, - int embedded_field_sds_type, - size_t embedded_field_sds_size, - size_t embedded_value_sds_size, - size_t expiry_size) { +static entry *entryConstruct(size_t alloc_size, + const_sds field, + sds sds_value, + stringRef *stringref_value, + long long expiry, + bool embed_value, + int embedded_field_sds_type, + size_t expiry_size, + size_t embedded_value_sds_size, + size_t embedded_field_sds_size) { + serverAssert((sds_value == NULL && stringref_value == NULL && embed_value) || (sds_value != NULL && stringref_value == NULL) || (sds_value == NULL && stringref_value != NULL && !embed_value)); + size_t buf_size; + /* allocate the buffer */ + char *buf = zmalloc_usable(alloc_size, &buf_size); + /* Set The expiry if exists */ if (expiry_size) { *(long long *)buf = expiry; buf += expiry_size; buf_size -= expiry_size; } - if (value) { - if (!embed_value) { - *(sds *)buf = value; - buf += sizeof(sds); - buf_size -= sizeof(sds); - } else { - sdswrite(buf + embedded_field_sds_size, buf_size - embedded_field_sds_size, SDS_TYPE_8, value, sdslen(value)); - sdsfree(value); - buf_size -= embedded_value_sds_size; - } + if (!embed_value) { + *(void **)buf = sds_value ? (void *)sds_value : (void *)stringref_value; + buf += sizeof(void *); + buf_size -= sizeof(void *); + } else if (sds_value) { + sdswrite(buf + embedded_field_sds_size, buf_size - embedded_field_sds_size, SDS_TYPE_8, sds_value, sdslen(sds_value)); + sdsfree(sds_value); + buf_size -= embedded_value_sds_size; } /* Set the field data. When we write the field into the buffer, the entry pointer is the returned * sds (after the sds header). */ @@ -283,15 +332,52 @@ entry *entryCreate(const_sds field, sds value, long long expiry) { bool embed_value = false; int embedded_field_sds_type; size_t expiry_size, embedded_value_sds_size, embedded_field_sds_size; - size_t alloc_size = entryReqSize(field, value, expiry, &embed_value, &embedded_field_sds_type, &embedded_field_sds_size, &expiry_size, &embedded_value_sds_size); - size_t buf_size; + size_t value_len = value ? sdslen(value) : SIZE_MAX; + size_t alloc_size = entryReqSize(sdslen(field), value_len, expiry, &embed_value, &embedded_field_sds_type, &embedded_field_sds_size, &expiry_size, &embedded_value_sds_size); + return entryConstruct(alloc_size, field, value, NULL, expiry, embed_value, embedded_field_sds_type, expiry_size, embedded_value_sds_size, embedded_field_sds_size); +} - /* allocate the buffer */ - char *buf = zmalloc_usable(alloc_size, &buf_size); +/* Sets the entry's value to a string reference object. + * The reference points to the provided `buf` but does not assume ownership. + * It is assumed that an external mechanism will handle releasing any memory which + * may have been associated with value->buf */ +entry *entryUpdateAsStringRef(entry *e, const char *buf, size_t len, long long expiry) { + long long entry_expiry = entryGetExpiry(e); + // Check for toggling expiration + bool expiry_add_remove = (expiry != entry_expiry) && (entry_expiry == EXPIRY_NONE || expiry == EXPIRY_NONE); + if (entryHasValuePtr(e) && !expiry_add_remove) { + if (entryHasStringRef(e)) { + stringRef *value = entryGetStringRefRef(e); + value->buf = buf; + value->len = len; + } else { + stringRef *value = zmalloc(sizeof(stringRef)); + value->buf = buf; + value->len = len; + sds *value_ref = entryGetSdsValueRef(e); + sdsfree(*value_ref); + *value_ref = (sds)value; + sdsSetAuxBit(entryGetField(e), FIELD_SDS_AUX_BIT_ENTRY_HAS_STRING_REF, 1); + } + if (expiry != EXPIRY_NONE) *entryGetExpiryRef(e) = expiry; + return e; + } + stringRef *value = zmalloc(sizeof(stringRef)); + value->buf = buf; + value->len = len; + sds field = entryGetField(e); + size_t field_size = sdsReqSize(sdslen(field), SDS_TYPE_8); + size_t alloc_size = field_size + sizeof(void *); + alloc_size += (expiry == EXPIRY_NONE) ? 0 : sizeof(expiry); - return entryWrite(buf, buf_size, field, value, expiry, embed_value, embedded_field_sds_type, embedded_field_sds_size, embedded_value_sds_size, expiry_size); -} + size_t expiry_size = 0; + if (expiry != EXPIRY_NONE) expiry_size = sizeof(expiry); + entry *new_entry = entryConstruct(alloc_size, field, NULL, value, expiry, false, SDS_TYPE_8, expiry_size, sizeof(value), field_size); + entryFree(e); + sdsSetAuxBit(entryGetField(new_entry), FIELD_SDS_AUX_BIT_ENTRY_HAS_STRING_REF, 1); + return new_entry; +} /* Modify the entry's value and/or expiration time. * In case the provided value is NULL, will use the existing value. * Note that the value ownership is moved to this function and the caller should assume the @@ -300,19 +386,27 @@ entry *entryUpdate(entry *e, sds value, long long expiry) { sds field = entryGetField(e); entry *new_entry = NULL; + /* Update just the expiry field, no value change, of a string ref entry */ + if (entryHasStringRef(e) && !value) { + stringRef *value = entryGetStringRefRef(e); + return entryUpdateAsStringRef(e, value->buf, value->len, expiry); + } bool update_value = value ? true : false; long long curr_expiration_time = entryGetExpiry(e); bool update_expiry = (expiry != curr_expiration_time) ? true : false; /* Just a sanity check. If nothing changes, lets just return */ - if (!update_value && !update_expiry) - return e; - - if (!value) value = entryGetValue(e); + if (!update_value && !update_expiry) return e; + size_t value_len = SIZE_MAX; + if (value) { + value_len = sdslen(value); + } else { + value = entryGetValue(e, &value_len); + } bool embed_value = false; int embedded_field_sds_type; size_t expiry_size, embedded_value_size, embedded_field_size; - size_t required_entry_size = entryReqSize(field, value, expiry, &embed_value, &embedded_field_sds_type, &embedded_field_size, &expiry_size, &embedded_value_size); - size_t current_embedded_allocation_size = entryHasValuePtr(e) ? 0 : entryMemUsage(e); + size_t required_entry_size = entryReqSize(sdslen(field), value_len, expiry, &embed_value, &embedded_field_sds_type, &embedded_field_size, &expiry_size, &embedded_value_size); + size_t current_embedded_allocation_size = entryHasEmbeddedValue(e) ? entryMemUsage(e) : 0; bool expiry_add_remove = update_expiry && (curr_expiration_time == EXPIRY_NONE || expiry == EXPIRY_NONE); // In case we are toggling expiration bool value_change_encoding = update_value && (embed_value != entryHasEmbeddedValue(e)); // In case we change the way value is embedded or not @@ -337,13 +431,9 @@ entry *entryUpdate(entry *e, sds value, long long expiry) { } /* In this case we are sure we do not have to allocate new entry, so value must already be set or we have enough room to embed it. */ if (update_value) { - if (entryHasValuePtr(e)) { - sds *value_ref = entryGetValueRef(e); - sdsfree(*value_ref); - *value_ref = value; - } else { + if (entryHasEmbeddedValue(e)) { /* Skip field content, field null terminator and value sds8 hdr. */ - sds old_value = entryGetValue(e); + char *old_value = entryGetValue(e, NULL); /* We are using the same entry memory in order to store a potentially new value. * In such cases the old value alloc was adjusted to the real buffer size part it was embedded to. * Since we can potentially write here a smaller value, which requires less allocation space, we would like to @@ -351,28 +441,26 @@ entry *entryUpdate(entry *e, sds value, long long expiry) { size_t value_size = sdsHdrSize(SDS_TYPE_8) + sdsalloc(old_value) + 1; sdswrite(sdsAllocPtr(old_value), value_size, SDS_TYPE_8, value, sdslen(value)); sdsfree(value); + } else { + entrySetValueSds(e, value); } } new_entry = e; } else { if (!update_value) { - /* Check if the value can be reused. */ - int value_was_embedded = !entryHasValuePtr(e); - /* In case the original entry value is embedded WE WILL HAVE TO DUPLICATE IT - * if not we have to duplicate it, remove it from the original entry since we are going to delete it.*/ - if (value_was_embedded) { + /* Check if the value can be reused. In case the original entry value is + * embedded WE WILL HAVE TO DUPLICATE IT if not we have to duplicate it, + * remove it from the original entry since we are going to delete it. */ + if (entryHasEmbeddedValue(e)) { value = sdsdup(value); } else { - sds *value_ref = entryGetValueRef(e); + void **value_ref = entryGetValueRef(e); *value_ref = NULL; } } /* allocate the buffer for a new entry */ - size_t buf_size; - char *buf = zmalloc_usable(required_entry_size, &buf_size); - new_entry = entryWrite(buf, buf_size, entryGetField(e), value, expiry, embed_value, embedded_field_sds_type, embedded_field_size, embedded_value_size, expiry_size); - debugServerAssert(new_entry != e); + new_entry = entryConstruct(required_entry_size, field, value, NULL, expiry, embed_value, embedded_field_sds_type, expiry_size, embedded_value_size, embedded_field_size); entryFree(e); } /* Check that the new entry was built correctly */ @@ -387,15 +475,15 @@ entry *entryUpdate(entry *e, sds value, long long expiry) { size_t entryMemUsage(entry *entry) { size_t mem = 0; - if (entryHasValuePtr(entry)) { + if (entryHasEmbeddedValue(entry)) { + mem += sdsReqSize(sdslen(entryGetField(entry)), sdsType(entryGetField(entry))); + if (entryHasExpiry(entry)) mem += sizeof(long long); + } else { /* In case the value is not embedded we might not be able to sum all the allocation sizes since the field * header could be too small for holding the real allocation size. */ mem += zmalloc_usable_size(entryGetAllocPtr(entry)); - } else { - mem += sdsReqSize(sdslen(entryGetField(entry)), sdsType(entryGetField(entry))); - if (entryHasExpiry(entry)) mem += sizeof(long long); } - mem += sdsAllocSize(entryGetValue(entry)); + if (!entryHasStringRef(entry)) mem += sdsAllocSize((sds)entryGetValue(entry, NULL)); return mem; } @@ -407,8 +495,12 @@ size_t entryMemUsage(entry *entry) { * If the location of the entry changed we return the new location, * otherwise we return NULL. */ entry *entryDefrag(entry *e, void *(*defragfn)(void *), sds (*sdsdefragfn)(sds)) { - if (entryHasValuePtr(e)) { - sds *value_ref = entryGetValueRef(e); + if (entryHasStringRef(e)) { + stringRef **value_ref = (stringRef **)entryGetValueRef(e); + stringRef *new_value = defragfn(*value_ref); + if (new_value) *value_ref = new_value; + } else if (entryHasValuePtr(e)) { + sds *value_ref = (sds *)entryGetValueRef(e); sds new_value = sdsdefragfn(*value_ref); if (new_value) *value_ref = new_value; } @@ -427,7 +519,5 @@ entry *entryDefrag(entry *e, void *(*defragfn)(void *), sds (*sdsdefragfn)(sds)) * forked and memory won't be used again. See zmadvise_dontneed() */ void entryDismissMemory(entry *entry) { /* Only dismiss values memory since the field size usually is small. */ - if (entryHasValuePtr(entry)) { - dismissSds(*entryGetValueRef(entry)); - } + if (entryHasValuePtr(entry)) entryFreeValuePtr(entry); } diff --git a/src/entry.h b/src/entry.h index d4a24daacb..7d02b75d77 100644 --- a/src/entry.h +++ b/src/entry.h @@ -23,10 +23,7 @@ typedef struct _entry entry; sds entryGetField(const entry *entry); /* Returns the value string (sds) from the entry. */ -sds entryGetValue(const entry *entry); - -/* Sets or replaces the value string in the entry. May reallocate and return a new pointer. */ -entry *entrySetValue(entry *entry, sds value); +char *entryGetValue(const entry *entry, size_t *len); /* Gets the expiration timestamp (UNIX time in milliseconds). */ long long entryGetExpiry(const entry *entry); @@ -34,6 +31,9 @@ long long entryGetExpiry(const entry *entry); /* Returns true if the entry has an expiration timestamp set. */ bool entryHasExpiry(const entry *entry); +/* Returns true if the entry value is externalized. */ +bool entryHasStringRef(const entry *entry); + /* Sets the expiration timestamp. */ entry *entrySetExpiry(entry *entry, long long expiry); @@ -45,6 +45,10 @@ void entryFree(entry *entry); /* Creates a new entry with the given field, value, and optional expiry. */ entry *entryCreate(const_sds field, sds value, long long expiry); +/* Sets the entry's value to a string reference object. + * The reference points to the provided `buf` but does not assume ownership. + * An external mechanism must handle the eventual memory deallocation of `buf`. */ +entry *entryUpdateAsStringRef(entry *entry, const char *buf, size_t len, long long expiry); /* Updates the value and/or expiry of an existing entry. * In case value is NULL, will use the existing entry value. @@ -61,6 +65,6 @@ entry *entryDefrag(entry *entry, void *(*defragfn)(void *), sds (*sdsdefragfn)(s void entryDismissMemory(entry *entry); /* Internal used for debug. No need to use this function except in tests */ -bool entryHasEmbeddedValue(entry *entry); +bool entryHasEmbeddedValue(const entry *entry); #endif diff --git a/src/module.c b/src/module.c index ae5f622780..e605b78fe8 100644 --- a/src/module.c +++ b/src/module.c @@ -5304,6 +5304,26 @@ int VM_ZsetRangePrev(ValkeyModuleKey *key) { * See also VM_ValueLength(), which returns the number of fields in a hash. * -------------------------------------------------------------------------- */ +/* Sets the value of a hash field to a non-owning string reference (stringRef) + * pointing to the buffer parameter, which remains owned by the module. + * + * NOTE: This API is designed for memory efficiency by avoiding memory duplication + * between the module and the core engine, which is critical when the buffer size is large. + * For example, valkey-search uses this interface to avoid maintaining two copies of the + * indexed vectors. + * + * The function receives the hash key, field name, buffer to share along with its size. */ +int VM_HashSetStringRef(ValkeyModuleKey *key, ValkeyModuleString *field, const char *buf, size_t len) { + if (!key || !key->value || key->value->type != OBJ_HASH || !field || !buf) return VALKEYMODULE_ERR; + return hashTypeUpdateAsStringRef(key->value, field->ptr, buf, len); +} + +/* Checks if the value of a hash entry is a shared string reference (stringRef). + * The function receives the hash key and field name to perform the check against. */ +int VM_HashHasStringRef(ValkeyModuleKey *key, ValkeyModuleString *field) { + if (!key || !key->value || key->value->type != OBJ_HASH) return VALKEYMODULE_ERR; + return hashTypeHasStringRef(key->value, field->ptr); +} /* Set the field of the specified hash field to the specified value. * If the key is an empty key open for writing, it is created with an empty * hash value, in order to set the specified field. @@ -11423,8 +11443,9 @@ static void moduleScanKeyHashtableCallback(void *privdata, void *entry) { value = createStringObjectFromLongDouble(node->score, 0); } else if (o->type == OBJ_HASH) { key = entryGetField(entry); - sds val = entryGetValue(entry); - value = createStringObject(val, sdslen(val)); + size_t val_len; + char *val = entryGetValue(entry, &val_len); + value = createStringObject(val, val_len); } else { serverPanic("unexpected object type"); } @@ -14326,6 +14347,8 @@ void moduleRegisterCoreAPI(void) { REGISTER_API(ZsetRangeEndReached); REGISTER_API(HashSet); REGISTER_API(HashGet); + REGISTER_API(HashSetStringRef); + REGISTER_API(HashHasStringRef); REGISTER_API(StreamAdd); REGISTER_API(StreamDelete); REGISTER_API(StreamIteratorStart); diff --git a/src/rdb.c b/src/rdb.c index 98b8a3d430..69d20f4025 100644 --- a/src/rdb.c +++ b/src/rdb.c @@ -993,14 +993,15 @@ ssize_t rdbSaveObject(rio *rdb, robj *o, robj *key, int dbid) { void *next; while (hashtableNext(&iter, &next)) { sds field = entryGetField(next); - sds value = entryGetValue(next); + size_t value_len; + unsigned char *value = (unsigned char *)entryGetValue(next, &value_len); if ((n = rdbSaveRawString(rdb, (unsigned char *)field, sdslen(field))) == -1) { hashtableResetIterator(&iter); return -1; } nwritten += n; - if ((n = rdbSaveRawString(rdb, (unsigned char *)value, sdslen(value))) == -1) { + if ((n = rdbSaveRawString(rdb, value, value_len)) == -1) { hashtableResetIterator(&iter); return -1; } diff --git a/src/server.h b/src/server.h index 1413712b0a..1f9aded9b3 100644 --- a/src/server.h +++ b/src/server.h @@ -625,6 +625,14 @@ typedef enum { LOG_TIMESTAMP_LEGACY = 0, typedef enum { RDB_VERSION_CHECK_STRICT = 0, RDB_VERSION_CHECK_RELAXED } rdb_version_check_type; +/* Structure representing a non-owning view of a buffer. + * A stringRef struct does not manage the underlying memory, so its destruction + * will not free the buffer. */ +typedef struct stringRef { + const char *buf; /* Pointer to the externalized buffer */ + size_t len; /* Length of the buffer */ +} stringRef; + /* common sets of actions to pause/unpause */ #define PAUSE_ACTIONS_CLIENT_WRITE_SET \ (PAUSE_ACTION_CLIENT_WRITE | PAUSE_ACTION_EXPIRE | PAUSE_ACTION_EVICT | PAUSE_ACTION_REPLICA) @@ -3455,13 +3463,15 @@ void hashTypeCurrentFromListpack(hashTypeIterator *hi, unsigned char **vstr, unsigned int *vlen, long long *vll); -sds hashTypeCurrentFromHashTable(hashTypeIterator *hi, int what); +char *hashTypeCurrentFromHashTable(hashTypeIterator *hi, int what, size_t *len); sds hashTypeCurrentObjectNewSds(hashTypeIterator *hi, int what); robj *hashTypeLookupWriteOrCreate(client *c, robj *key); robj *hashTypeGetValueObject(robj *o, sds field); int hashTypeSet(robj *o, sds field, sds value, long long expiry, int flags); robj *hashTypeDup(robj *o); bool hashTypeHasVolatileFields(robj *o); +int hashTypeUpdateAsStringRef(robj *o, sds field, const char *buf, size_t len); +bool hashTypeHasStringRef(robj *o, sds field); /* Pub / Sub */ int pubsubUnsubscribeAllChannels(client *c, int notify); diff --git a/src/t_hash.c b/src/t_hash.c index b02ce59eeb..7b59214427 100644 --- a/src/t_hash.c +++ b/src/t_hash.c @@ -242,10 +242,11 @@ int hashTypeGetValue(robj *o, sds field, unsigned char **vstr, unsigned int *vle void *entry = NULL; hashtableFind(o->ptr, field, &entry); if (entry) { - sds value = entryGetValue(entry); + size_t len = 0; + char *value = entryGetValue(entry, &len); serverAssert(value != NULL); *vstr = (unsigned char *)value; - *vlen = sdslen(value); + *vlen = len; if (expiry) *expiry = entryGetExpiry(entry); return C_OK; } @@ -317,6 +318,35 @@ int hashTypeExists(robj *o, sds field) { return hashTypeGetValue(o, field, &vstr, &vlen, &vll, NULL) == C_OK; } +bool hashTypeHasStringRef(robj *o, sds field) { + if (o->encoding == OBJ_ENCODING_LISTPACK) return 0; + hashtable *ht = o->ptr; + void **entry_ref = hashtableFindRef(ht, field); + return (entryHasStringRef(*entry_ref)); +} + +/* Update a hash field value with a string reference value. + * Returns C_ERR if the hash field value not found. Otherwise, returns C_OK. */ +int hashTypeUpdateAsStringRef(robj *o, sds field, const char *buf, size_t len) { + unsigned char *vstr = NULL; + unsigned int vlen = UINT_MAX; + long long vll = LLONG_MAX; + + if (hashTypeGetValue(o, field, &vstr, &vlen, &vll, NULL) != C_OK) return C_ERR; + // require HASHTABLE encoding due to aux bits and pointer storage. + if (o->encoding == OBJ_ENCODING_LISTPACK) hashTypeConvert(o, OBJ_ENCODING_HASHTABLE); + + hashtable *ht = o->ptr; + void **entry_ref = hashtableFindRef(ht, field); + entry *entry = *entry_ref; + long long expiry = entryGetExpiry(entry); + void *new_entry = entryUpdateAsStringRef(entry, buf, len, expiry); + bool replaced = hashtableReplaceReallocatedEntry(ht, entry, new_entry); + serverAssert(replaced); + hashTypeTrackUpdateEntry(o, entry, new_entry, expiry, expiry); + return C_OK; +} + /* Add a new field, overwrite the old with the new value if it already exists. * Return 0 on insert and 1 on update. * @@ -527,7 +557,7 @@ static expiryModificationResult hashTypePersist(robj *o, sds field) { long long current_expire = entryGetExpiry(current_entry); if (current_expire != EXPIRY_NONE) { hashTypeUntrackEntry(o, current_entry); - *entry_ref = entryUpdate(current_entry, NULL, EXPIRY_NONE); + *entry_ref = entrySetExpiry(current_entry, EXPIRY_NONE); return EXPIRATION_MODIFICATION_SUCCESSFUL; } return EXPIRATION_MODIFICATION_FAILED; // If the found element has no expiration set, return -1 @@ -682,49 +712,34 @@ void hashTypeCurrentFromListpack(hashTypeIterator *hi, /* Get the field or value at iterator cursor, for an iterator on a hash value * encoded as a hash table. Prototype is similar to * `hashTypeGetFromHashTable`. */ -sds hashTypeCurrentFromHashTable(hashTypeIterator *hi, int what) { +char *hashTypeCurrentFromHashTable(hashTypeIterator *hi, int what, size_t *len) { serverAssert(hi->encoding == OBJ_ENCODING_HASHTABLE); if (what & OBJ_HASH_FIELD) { - return entryGetField(hi->next); - } else { - return entryGetValue(hi->next); - } -} - -/* Higher level function of hashTypeCurrent*() that returns the hash value - * at current iterator position. - * - * The returned element is returned by reference in either *vstr and *vlen if - * it's returned in string form, or stored in *vll if it's returned as - * a number. - * - * If *vll is populated *vstr is set to NULL, so the caller - * can always check the function return by checking the return value - * type checking if vstr == NULL. */ -static void hashTypeCurrentObject(hashTypeIterator *hi, int what, unsigned char **vstr, unsigned int *vlen, long long *vll) { - if (hi->encoding == OBJ_ENCODING_LISTPACK) { - *vstr = NULL; - hashTypeCurrentFromListpack(hi, what, vstr, vlen, vll); - } else if (hi->encoding == OBJ_ENCODING_HASHTABLE) { - sds ele = hashTypeCurrentFromHashTable(hi, what); - *vstr = (unsigned char *)ele; - *vlen = sdslen(ele); - } else { - serverPanic("Unknown hash encoding"); + sds key = entryGetField(hi->next); + if (key) *len = sdslen(key); + return key; } + return entryGetValue(hi->next, len); } /* Return the field or value at the current iterator position as a new * SDS string. */ sds hashTypeCurrentObjectNewSds(hashTypeIterator *hi, int what) { - unsigned char *vstr; - unsigned int vlen; - long long vll; - - hashTypeCurrentObject(hi, what, &vstr, &vlen, &vll); - if (vstr) return sdsnewlen(vstr, vlen); - return sdsfromlonglong(vll); + unsigned char *vstr = NULL; + if (hi->encoding == OBJ_ENCODING_LISTPACK) { + long long vll; + unsigned int vlen; + hashTypeCurrentFromListpack(hi, what, &vstr, &vlen, &vll); + if (vstr) return sdsnewlen(vstr, vlen); + return sdsfromlonglong(vll); + } + if (hi->encoding == OBJ_ENCODING_HASHTABLE) { + size_t vlen = 0; + vstr = (unsigned char *)hashTypeCurrentFromHashTable(hi, what, &vlen); + return sdsnewlen(vstr, vlen); + } + serverPanic("Unknown hash encoding"); } robj *hashTypeLookupWriteOrCreate(client *c, robj *key) { @@ -812,11 +827,13 @@ robj *hashTypeDup(robj *o) { hashTypeInitIterator(o, &hi); while (hashTypeNext(&hi) != C_ERR) { /* Extract a field-value pair from an original hash object.*/ - sds field = hashTypeCurrentFromHashTable(&hi, OBJ_HASH_FIELD); - sds value = hashTypeCurrentFromHashTable(&hi, OBJ_HASH_VALUE); + size_t len; + sds field = entryGetField(hi.next); + char *value_str = entryGetValue(hi.next, &len); long long expiry = entryGetExpiry(hi.next); /* Add a field-value pair to a new hash object. */ - entry *entry = entryCreate(field, sdsdup(value), expiry); + sds value = sdsnewlen(value_str, len); + entry *entry = entryCreate(field, value, expiry); hashtableAdd(ht, entry); if (expiry != EXPIRY_NONE) hashTypeTrackEntry(hobj, entry); @@ -866,11 +883,7 @@ static void hashTypeRandomElement(robj *hashobj, unsigned long hashsize, listpac field->sval = (unsigned char *)sds_field; field->slen = sdslen(sds_field); if (val) { - entry *hash_entry = e; - sds sds_val = entryGetValue(hash_entry); - val->sval = (unsigned char *)sds_val; - val->slen = - sdslen(sds_val); + val->sval = (unsigned char *)entryGetValue(e, (size_t *)&val->slen); } } hashTypeIgnoreTTL(hashobj, false); @@ -1080,8 +1093,9 @@ static void addHashIteratorCursorToReply(writePreparedClient *wpc, hashTypeItera else addWritePreparedReplyBulkLongLong(wpc, vll); } else if (hi->encoding == OBJ_ENCODING_HASHTABLE) { - sds value = hashTypeCurrentFromHashTable(hi, what); - addWritePreparedReplyBulkCBuffer(wpc, value, sdslen(value)); + size_t len = 0; + char *value = hashTypeCurrentFromHashTable(hi, what, &len); + addWritePreparedReplyBulkCBuffer(wpc, value, len); } else { serverPanic("Unknown hash encoding"); } @@ -2066,10 +2080,11 @@ void hrandfieldWithCountCommand(client *c, long l, int withvalues) { void *next; while (hashtableNext(&iter, &next)) { sds field = entryGetField(next); - sds value = entryGetValue(next); + size_t value_len; + char *value = entryGetValue(next, &value_len); if (withvalues && c->resp > 2) addWritePreparedReplyArrayLen(wpc, 2); addWritePreparedReplyBulkCBuffer(wpc, field, sdslen(field)); - if (withvalues) addWritePreparedReplyBulkCBuffer(wpc, value, sdslen(value)); + if (withvalues) addWritePreparedReplyBulkCBuffer(wpc, value, value_len); } hashtableResetIterator(&iter); diff --git a/src/unit/test_entry.c b/src/unit/test_entry.c index 39afd790f6..bf673a786e 100644 --- a/src/unit/test_entry.c +++ b/src/unit/test_entry.c @@ -17,7 +17,9 @@ /* Verify entry properties */ static int verify_entry_properties(entry *e, sds field, sds value_copy, long long expiry, bool has_expiry, bool has_valueptr) { TEST_ASSERT(sdscmp(entryGetField(e), field) == 0); - TEST_ASSERT(sdscmp(entryGetValue(e), value_copy) == 0); + size_t len; + TEST_ASSERT(sdscmp(entryGetValue(e, &len), value_copy) == 0); + TEST_ASSERT(len == sdslen(value_copy)); TEST_ASSERT(entryGetExpiry(e) == expiry); TEST_ASSERT(entryHasExpiry(e) == has_expiry); TEST_ASSERT(entryHasEmbeddedValue(e) != has_valueptr); @@ -356,7 +358,7 @@ int test_entryIsExpired(int argc, char **argv, int flags) { * * To smaller value (should decrease memory usage) * * To bigger value (should increase memory usage) */ -int test_entryMemUsage_entrySetExpiry_entrySetValue(int argc, char **argv, int flags) { +int test_entryMemUsage_entrySetExpiry_entryUpdate(int argc, char **argv, int flags) { UNUSED(argc); UNUSED(argv); UNUSED(flags); @@ -393,7 +395,7 @@ int test_entryMemUsage_entrySetExpiry_entrySetValue(int argc, char **argv, int f // Memory usage should decrease by the difference in value size (2 bytes) sds value4 = sdsnew("x"); sds value_copy4 = sdsdup(value4); - entry *e4 = entrySetValue(e3, value4); + entry *e4 = entryUpdate(e3, value4, entryGetExpiry(e3)); size_t e4_entryMemUsage = entryMemUsage(e4); verify_entry_properties(e4, field1, value_copy4, expiry3, true, false); TEST_ASSERT(zmalloc_usable_size((char *)e4 - sizeof(long long) - 3) == e4_entryMemUsage); @@ -402,7 +404,7 @@ int test_entryMemUsage_entrySetExpiry_entrySetValue(int argc, char **argv, int f // Memory usage should increase by the difference in value size (1 byte) sds value5 = sdsnew("xx"); sds value_copy5 = sdsdup(value5); - entry *e5 = entrySetValue(e4, value5); + entry *e5 = entryUpdate(e4, value5, entryGetExpiry(e4)); size_t e5_entryMemUsage = entryMemUsage(e5); verify_entry_properties(e5, field1, value_copy5, expiry3, true, false); TEST_ASSERT(zmalloc_usable_size((char *)e5 - sizeof(long long) - 3) == e5_entryMemUsage); @@ -440,7 +442,7 @@ int test_entryMemUsage_entrySetExpiry_entrySetValue(int argc, char **argv, int f // Memory usage should increase by at least the difference between LONG_VALUE and "x" (143) sds value9 = sdsnew("x"); sds value_copy9 = sdsdup(value9); - entry *e9 = entrySetValue(e8, value9); + entry *e9 = entryUpdate(e8, value9, entryGetExpiry(e8)); size_t e9_entryMemUsage = entryMemUsage(e9); verify_entry_properties(e9, field6, value_copy9, expiry8, true, true); size_t expected_e9_entry_mem = zmalloc_usable_size((char *)e9 - sizeof(long long) - sizeof(sds) - 3) + sdsAllocSize(value9); @@ -450,7 +452,7 @@ int test_entryMemUsage_entrySetExpiry_entrySetValue(int argc, char **argv, int f // Memory usage increases by the difference in value size (1 byte) sds value10 = sdsnew("xx"); sds value_copy10 = sdsdup(value10); - entry *e10 = entrySetValue(e9, value10); + entry *e10 = entryUpdate(e9, value10, entryGetExpiry(e9)); size_t e10_entryMemUsage = entryMemUsage(e10); size_t expected_10_entry_mem = zmalloc_usable_size((char *)e10 - sizeof(long long) - sizeof(sds) - 3) + sdsAllocSize(value10); TEST_ASSERT(expected_10_entry_mem == e10_entryMemUsage); @@ -468,3 +470,54 @@ int test_entryMemUsage_entrySetExpiry_entrySetValue(int argc, char **argv, int f return 0; } + +int test_entryStringRef(int argc, char **argv, int flags) { + UNUSED(argc); + UNUSED(argv); + UNUSED(flags); + + sds field1 = sdsnew(SHORT_FIELD); + sds value1 = sdsnew(SHORT_VALUE); + sds value_copy1 = sdsdup(value1); + long long expiry1 = EXPIRY_NONE; + entry *e1 = entryCreate(field1, value1, expiry1); + entry *e2 = entryUpdateAsStringRef(e1, value_copy1, sdslen(value_copy1), entryGetExpiry(e1)); + verify_entry_properties(e2, field1, value_copy1, expiry1, false, true); + TEST_ASSERT(entryHasStringRef(e2) == true); + + long long expiry2 = 100; + entry *e3 = entryUpdateAsStringRef(e2, value_copy1, sdslen(value_copy1), expiry2); + TEST_ASSERT(e2 != e3); + verify_entry_properties(e3, field1, value_copy1, expiry2, true, true); + TEST_ASSERT(entryHasStringRef(e3) == true); + + long long expiry3 = 200; + entry *e4 = entryUpdateAsStringRef(e3, value_copy1, sdslen(value_copy1), expiry3); + TEST_ASSERT(e3 == e4); + verify_entry_properties(e4, field1, value_copy1, expiry3, true, true); + TEST_ASSERT(entryHasStringRef(e4) == true); + + sds value2 = sdsnew(SHORT_VALUE); + sds value_copy2 = sdsdup(value2); + entry *e5 = entryUpdate(e4, value2, expiry3); + verify_entry_properties(e5, field1, value_copy2, expiry3, true, false); + TEST_ASSERT(entryHasStringRef(e5) == false); + + entry *e6 = entryUpdateAsStringRef(e5, value_copy1, sdslen(value_copy1), expiry2); + TEST_ASSERT(e5 != e6); + verify_entry_properties(e6, field1, value_copy1, expiry2, true, true); + TEST_ASSERT(entryHasStringRef(e6) == true); + + sds value3 = sdsnew(LONG_VALUE); + sds value_copy3 = sdsdup(value3); + entry *e7 = entryUpdate(e6, value3, expiry1); + verify_entry_properties(e7, field1, value_copy3, expiry1, false, true); + TEST_ASSERT(entryHasStringRef(e7) == false); + + entryFree(e7); + sdsfree(value_copy1); + sdsfree(value_copy2); + sdsfree(value_copy3); + sdsfree(field1); + return 0; +} diff --git a/src/unit/test_files.h b/src/unit/test_files.h index ca557984be..d99a551a80 100644 --- a/src/unit/test_files.h +++ b/src/unit/test_files.h @@ -24,7 +24,8 @@ int test_entryCreate(int argc, char **argv, int flags); int test_entryUpdate(int argc, char **argv, int flags); int test_entryHasexpiry_entrySetExpiry(int argc, char **argv, int flags); int test_entryIsExpired(int argc, char **argv, int flags); -int test_entryMemUsage_entrySetExpiry_entrySetValue(int argc, char **argv, int flags); +int test_entryMemUsage_entrySetExpiry_entryUpdate(int argc, char **argv, int flags); +int test_entryStringRef(int argc, char **argv, int flags); int test_cursor(int argc, char **argv, int flags); int test_set_hash_function_seed(int argc, char **argv, int flags); int test_add_find_delete(int argc, char **argv, int flags); @@ -260,7 +261,7 @@ unitTest __test_crc64_c[] = {{"test_crc64", test_crc64}, {NULL, NULL}}; unitTest __test_crc64combine_c[] = {{"test_crc64combine", test_crc64combine}, {NULL, NULL}}; unitTest __test_dict_c[] = {{"test_dictCreate", test_dictCreate}, {"test_dictAdd16Keys", test_dictAdd16Keys}, {"test_dictDisableResize", test_dictDisableResize}, {"test_dictAddOneKeyTriggerResize", test_dictAddOneKeyTriggerResize}, {"test_dictDeleteKeys", test_dictDeleteKeys}, {"test_dictDeleteOneKeyTriggerResize", test_dictDeleteOneKeyTriggerResize}, {"test_dictEmptyDirAdd128Keys", test_dictEmptyDirAdd128Keys}, {"test_dictDisableResizeReduceTo3", test_dictDisableResizeReduceTo3}, {"test_dictDeleteOneKeyTriggerResizeAgain", test_dictDeleteOneKeyTriggerResizeAgain}, {"test_dictBenchmark", test_dictBenchmark}, {NULL, NULL}}; unitTest __test_endianconv_c[] = {{"test_endianconv", test_endianconv}, {NULL, NULL}}; -unitTest __test_entry_c[] = {{"test_entryCreate", test_entryCreate}, {"test_entryUpdate", test_entryUpdate}, {"test_entryHasexpiry_entrySetExpiry", test_entryHasexpiry_entrySetExpiry}, {"test_entryIsExpired", test_entryIsExpired}, {"test_entryMemUsage_entrySetExpiry_entrySetValue", test_entryMemUsage_entrySetExpiry_entrySetValue}, {NULL, NULL}}; +unitTest __test_entry_c[] = {{"test_entryCreate", test_entryCreate}, {"test_entryUpdate", test_entryUpdate}, {"test_entryHasexpiry_entrySetExpiry", test_entryHasexpiry_entrySetExpiry}, {"test_entryIsExpired", test_entryIsExpired}, {"test_entryMemUsage_entrySetExpiry_entryUpdate", test_entryMemUsage_entrySetExpiry_entryUpdate}, {"test_entryStringRef", test_entryStringRef}, {NULL, NULL}}; unitTest __test_hashtable_c[] = {{"test_cursor", test_cursor}, {"test_set_hash_function_seed", test_set_hash_function_seed}, {"test_add_find_delete", test_add_find_delete}, {"test_add_find_delete_avoid_resize", test_add_find_delete_avoid_resize}, {"test_instant_rehashing", test_instant_rehashing}, {"test_bucket_chain_length", test_bucket_chain_length}, {"test_two_phase_insert_and_pop", test_two_phase_insert_and_pop}, {"test_replace_reallocated_entry", test_replace_reallocated_entry}, {"test_incremental_find", test_incremental_find}, {"test_scan", test_scan}, {"test_iterator", test_iterator}, {"test_safe_iterator", test_safe_iterator}, {"test_compact_bucket_chain", test_compact_bucket_chain}, {"test_random_entry", test_random_entry}, {"test_random_entry_with_long_chain", test_random_entry_with_long_chain}, {"test_random_entry_sparse_table", test_random_entry_sparse_table}, {NULL, NULL}}; unitTest __test_intset_c[] = {{"test_intsetValueEncodings", test_intsetValueEncodings}, {"test_intsetBasicAdding", test_intsetBasicAdding}, {"test_intsetLargeNumberRandomAdd", test_intsetLargeNumberRandomAdd}, {"test_intsetUpgradeFromint16Toint32", test_intsetUpgradeFromint16Toint32}, {"test_intsetUpgradeFromint16Toint64", test_intsetUpgradeFromint16Toint64}, {"test_intsetUpgradeFromint32Toint64", test_intsetUpgradeFromint32Toint64}, {"test_intsetStressLookups", test_intsetStressLookups}, {"test_intsetStressAddDelete", test_intsetStressAddDelete}, {NULL, NULL}}; unitTest __test_kvstore_c[] = {{"test_kvstoreAdd16Keys", test_kvstoreAdd16Keys}, {"test_kvstoreIteratorRemoveAllKeysNoDeleteEmptyHashtable", test_kvstoreIteratorRemoveAllKeysNoDeleteEmptyHashtable}, {"test_kvstoreIteratorRemoveAllKeysDeleteEmptyHashtable", test_kvstoreIteratorRemoveAllKeysDeleteEmptyHashtable}, {"test_kvstoreHashtableIteratorRemoveAllKeysNoDeleteEmptyHashtable", test_kvstoreHashtableIteratorRemoveAllKeysNoDeleteEmptyHashtable}, {"test_kvstoreHashtableIteratorRemoveAllKeysDeleteEmptyHashtable", test_kvstoreHashtableIteratorRemoveAllKeysDeleteEmptyHashtable}, {"test_kvstoreHashtableExpand", test_kvstoreHashtableExpand}, {NULL, NULL}}; diff --git a/src/unit/test_vset.c b/src/unit/test_vset.c index 1fb495d2db..d4f5872961 100644 --- a/src/unit/test_vset.c +++ b/src/unit/test_vset.c @@ -27,7 +27,9 @@ static void mockFreeEntry(void *entry) { } static mock_entry *mockEntryUpdate(mock_entry *entry, long long expiry) { - mock_entry *new_entry = entryCreate(entryGetField(entry), sdsdup(entryGetValue(entry)), expiry); + sds field = entryGetField(entry); + size_t len; + mock_entry *new_entry = entryCreate(field, sdsdup(entryGetValue(entry, &len)), expiry); entryFree(entry); return new_entry; } diff --git a/src/valkeymodule.h b/src/valkeymodule.h index b24908b3f7..bcd817e3eb 100644 --- a/src/valkeymodule.h +++ b/src/valkeymodule.h @@ -1644,6 +1644,8 @@ VALKEYMODULE_API int (*ValkeyModule_ZsetRangePrev)(ValkeyModuleKey *key) VALKEYM VALKEYMODULE_API int (*ValkeyModule_ZsetRangeEndReached)(ValkeyModuleKey *key) VALKEYMODULE_ATTR; VALKEYMODULE_API int (*ValkeyModule_HashSet)(ValkeyModuleKey *key, int flags, ...) VALKEYMODULE_ATTR; VALKEYMODULE_API int (*ValkeyModule_HashGet)(ValkeyModuleKey *key, int flags, ...) VALKEYMODULE_ATTR; +VALKEYMODULE_API int (*ValkeyModule_HashSetStringRef)(ValkeyModuleKey *key, ValkeyModuleString *field, const char *buf, size_t len) VALKEYMODULE_ATTR; +VALKEYMODULE_API int (*ValkeyModule_HashHasStringRef)(ValkeyModuleKey *key, ValkeyModuleString *field) VALKEYMODULE_ATTR; VALKEYMODULE_API int (*ValkeyModule_StreamAdd)(ValkeyModuleKey *key, int flags, ValkeyModuleStreamID *id, @@ -2305,6 +2307,8 @@ static int ValkeyModule_Init(ValkeyModuleCtx *ctx, const char *name, int ver, in VALKEYMODULE_GET_API(ZsetRangeEndReached); VALKEYMODULE_GET_API(HashSet); VALKEYMODULE_GET_API(HashGet); + VALKEYMODULE_GET_API(HashSetStringRef); + VALKEYMODULE_GET_API(HashHasStringRef); VALKEYMODULE_GET_API(StreamAdd); VALKEYMODULE_GET_API(StreamDelete); VALKEYMODULE_GET_API(StreamIteratorStart); diff --git a/src/ziplist.c b/src/ziplist.c index 608487fa2b..1198c86f0e 100644 --- a/src/ziplist.c +++ b/src/ziplist.c @@ -740,7 +740,7 @@ unsigned char *ziplistResize(unsigned char *zl, size_t len) { * updated, i.e. consecutive fields MAY need an update. */ unsigned char *__ziplistCascadeUpdate(unsigned char *zl, unsigned char *p) { zlentry cur; - size_t prevlen, prevlensize, prevoffset; /* Informat of the last changed entry. */ + size_t prevlen, prevlensize, prevoffset; /* Information of the last changed entry. */ size_t firstentrylen; /* Used to handle insert at head. */ size_t rawlen, curlen = intrev32ifbe(ZIPLIST_BYTES(zl)); size_t extra = 0, cnt = 0, offset; diff --git a/tests/modules/Makefile b/tests/modules/Makefile index 72031c5ad3..d731b9ad2a 100644 --- a/tests/modules/Makefile +++ b/tests/modules/Makefile @@ -48,6 +48,7 @@ TEST_MODULES = \ defragtest.so \ keyspecs.so \ hash.so \ + hash_stringref.so \ zset.so \ stream.so \ mallocsize.so \ diff --git a/tests/modules/hash_stringref.c b/tests/modules/hash_stringref.c new file mode 100644 index 0000000000..6daefd60f4 --- /dev/null +++ b/tests/modules/hash_stringref.c @@ -0,0 +1,99 @@ +/* Module Test: Verifies the module's capability to share an owned buffer with the core, + * which is then stored in a hash key field using a non-owning string reference (stringRef). */ +#include "valkeymodule.h" +#include + +typedef struct bufferNode { + char *buf; + size_t len; + struct bufferNode *next; +} bufferNode; + +bufferNode *head = NULL; + +bufferNode *addBuffer(const char *buf, size_t len) { + if (!buf || len == 0) return NULL; + + bufferNode *node = malloc(sizeof(bufferNode)); + node->buf = malloc(len); + memcpy(node->buf, buf, len); + node->len = len; + node->next = head; + head = node; + return node; +} + +void freeBufferList(void) { + bufferNode *current = head; + while (current) { + bufferNode *next = current->next; + free(current->buf); + free(current); + current = next; + } +} + +/* HASH.HAS_STRINGREF key field + * + * Returns 1 if all of the following conditions are met for the hash field: + * 1. The key exists. + * 2. The key's value is a HASH type. + * 3. The field's value is a string reference (stringRef) type. + * Otherwise, returns 0. + * + * Parameters: + * 1. The hash entry key. + * 2. The hahs entry field. + */ +int hashHasStringRef(ValkeyModuleCtx *ctx, ValkeyModuleString **argv, int argc) { + if (argc != 3) return ValkeyModule_WrongArity(ctx); + + ValkeyModule_AutoMemory(ctx); + ValkeyModuleKey *key = ValkeyModule_OpenKey(ctx, argv[1], VALKEYMODULE_WRITE); + + int result = ValkeyModule_HashHasStringRef(key, argv[2]); + return ValkeyModule_ReplyWithLongLong(ctx, result); +} + +/* HASH.SET_STRINGREF key field buffer + * + * Sets hash entry value of a given key and field to an external owned buffer. + * Parameters: + * 1. The hash entry key. + * 2. The hahs entry field. + * 3. The buffer to share with the core. + */ +int hashSetStringRef(ValkeyModuleCtx *ctx, ValkeyModuleString **argv, int argc) { + if (argc != 4) return ValkeyModule_WrongArity(ctx); + + ValkeyModule_AutoMemory(ctx); + ValkeyModuleKey *key = ValkeyModule_OpenKey(ctx, argv[1], VALKEYMODULE_WRITE); + + size_t buf_len; + const char *buf = ValkeyModule_StringPtrLen(argv[3], &buf_len); + bufferNode *node = addBuffer(buf, buf_len); + + int result = ValkeyModule_HashSetStringRef(key, argv[2], node->buf, node->len); + if (result == 0) return ValkeyModule_ReplyWithLongLong(ctx, result); + return ValkeyModule_ReplyWithError(ctx, "Err"); +} + +int ValkeyModule_OnLoad(ValkeyModuleCtx *ctx, ValkeyModuleString **argv, int argc) { + VALKEYMODULE_NOT_USED(argv); + VALKEYMODULE_NOT_USED(argc); + if (ValkeyModule_Init(ctx, "hash.stringref", 1, VALKEYMODULE_APIVER_1) == + VALKEYMODULE_OK && + ValkeyModule_CreateCommand(ctx, "hash.set_stringref", hashSetStringRef, "write", + 1, 1, 1) == VALKEYMODULE_OK && + ValkeyModule_CreateCommand(ctx, "hash.has_stringref", hashHasStringRef, "readonly", + 1, 1, 1) == VALKEYMODULE_OK) { + return VALKEYMODULE_OK; + } + return VALKEYMODULE_ERR; +} + +int ValkeyModule_OnUnload(ValkeyModuleCtx *ctx) { + VALKEYMODULE_NOT_USED(ctx); + freeBufferList(); + return VALKEYMODULE_OK; +} diff --git a/tests/unit/moduleapi/hash_stringref.tcl b/tests/unit/moduleapi/hash_stringref.tcl new file mode 100644 index 0000000000..a2efb23cc8 --- /dev/null +++ b/tests/unit/moduleapi/hash_stringref.tcl @@ -0,0 +1,20 @@ +set testmodule [file normalize tests/modules/hash_stringref.so] + +start_server {tags {"modules"}} { + r module load $testmodule + + test {Module hash set} { + r del k + set status [catch {r hash.set_stringref k f hello} errmsg] + assert {$status == 1} + r hset k f hello1 + assert_equal "0" [r hash.has_stringref k f] + r hash.set_stringref k f hello1 + assert_equal "hello1" [r hget k f] + assert_equal "1" [r hash.has_stringref k f] + } + + test "Unload the module - hash" { + assert_equal {OK} [r module unload hash.stringref] + } +} diff --git a/tests/unit/scripting.tcl b/tests/unit/scripting.tcl index fde55e8b7e..7e49766a4d 100644 --- a/tests/unit/scripting.tcl +++ b/tests/unit/scripting.tcl @@ -1320,7 +1320,7 @@ start_server {tags {"scripting"}} { set rd [valkey_deferring_client] r config set lua-time-limit 10 - # senging (in a pipeline): + # sending (in a pipeline): # 1. eval "while 1 do redis.call('ping') end" 0 # 2. ping if {$is_eval == 1} {