Skip to content
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
5 changes: 5 additions & 0 deletions src/dict.h
Original file line number Diff line number Diff line change
Expand Up @@ -50,6 +50,7 @@ typedef struct dictEntry {
void *val;
uint64_t u64;
int64_t s64;
double d;
} v;
struct dictEntry *next;
} dictEntry;
Expand Down Expand Up @@ -114,6 +115,9 @@ typedef void (dictScanFunction)(void *privdata, const dictEntry *de);
#define dictSetUnsignedIntegerVal(entry, _val_) \
do { entry->v.u64 = _val_; } while(0)

#define dictSetDoubleVal(entry, _val_) \
do { entry->v.d = _val_; } while(0)

#define dictFreeKey(d, entry) \
if ((d)->type->keyDestructor) \
(d)->type->keyDestructor((d)->privdata, (entry)->key)
Expand All @@ -135,6 +139,7 @@ typedef void (dictScanFunction)(void *privdata, const dictEntry *de);
#define dictGetVal(he) ((he)->v.val)
#define dictGetSignedIntegerVal(he) ((he)->v.s64)
#define dictGetUnsignedIntegerVal(he) ((he)->v.u64)
#define dictGetDoubleVal(he) ((he)->v.d)
#define dictSlots(d) ((d)->ht[0].size+(d)->ht[1].size)
#define dictSize(d) ((d)->ht[0].used+(d)->ht[1].used)
#define dictIsRehashing(ht) ((ht)->rehashidx != -1)
Expand Down
92 changes: 59 additions & 33 deletions src/t_zset.c
Original file line number Diff line number Diff line change
Expand Up @@ -1981,51 +1981,77 @@ void zunionInterGenericCommand(redisClient *c, robj *dstkey, int op) {
zuiClearIterator(&src[0]);
}
} else if (op == REDIS_OP_UNION) {
dict *accumulator = dictCreate(&setDictType,NULL);
dictIterator *di;
dictEntry *de;
double score;

if (setnum) {
/* Our union is at least as large as the largest set.
* Resize the dictionary ASAP to avoid useless rehashing. */
dictExpand(accumulator,zuiLength(&src[setnum-1]));
}

/* Step 1: Create a dictionary of elements -> aggregated-scores
* by iterating one sorted set after the other. */
for (i = 0; i < setnum; i++) {
if (zuiLength(&src[i]) == 0)
continue;
if (zuiLength(&src[i]) == 0) continue;

zuiInitIterator(&src[i]);
while (zuiNext(&src[i],&zval)) {
double score, value;

/* Skip an element that when already processed */
if (dictFind(dstzset->dict,zuiObjectFromValue(&zval)) != NULL)
continue;

/* Initialize score */
/* Initialize value */
score = src[i].weight * zval.score;
if (isnan(score)) score = 0;

/* We need to check only next sets to see if this element
* exists, since we process every element just one time so
* it can't exist in a previous set (otherwise it would be
* already processed). */
for (j = (i+1); j < setnum; j++) {
/* It is not safe to access the zset we are
* iterating, so explicitly check for equal object. */
if(src[j].subject == src[i].subject) {
value = zval.score*src[j].weight;
zunionInterAggregate(&score,value,aggregate);
} else if (zuiFind(&src[j],&zval,&value)) {
value *= src[j].weight;
zunionInterAggregate(&score,value,aggregate);
/* Search for this element in the accumulating dictionary. */
de = dictFind(accumulator,zuiObjectFromValue(&zval));
/* If we don't have it, we need to create a new entry. */
if (de == NULL) {
tmp = zuiObjectFromValue(&zval);
/* Remember the longest single element encountered,
* to understand if it's possible to convert to ziplist
* at the end. */
if (sdsEncodedObject(tmp)) {
if (sdslen(tmp->ptr) > maxelelen)
maxelelen = sdslen(tmp->ptr);
}
}

tmp = zuiObjectFromValue(&zval);
znode = zslInsert(dstzset->zsl,score,tmp);
incrRefCount(zval.ele); /* added to skiplist */
dictAdd(dstzset->dict,tmp,&znode->score);
incrRefCount(zval.ele); /* added to dictionary */

if (sdsEncodedObject(tmp)) {
if (sdslen(tmp->ptr) > maxelelen)
maxelelen = sdslen(tmp->ptr);
/* Add the element with its initial score. */
de = dictAddRaw(accumulator,tmp);
incrRefCount(tmp);
dictSetDoubleVal(de,score);
} else {
/* Update the score with the score of the new instance
* of the element found in the current sorted set.
*
* Here we access directly the dictEntry double
* value inside the union as it is a big speedup
* compared to using the getDouble/setDouble API. */
zunionInterAggregate(&de->v.d,score,aggregate);
}
}
zuiClearIterator(&src[i]);
}

/* Step 2: convert the dictionary into the final sorted set. */
di = dictGetIterator(accumulator);

/* We now are aware of the final size of the resulting sorted set,
* let's resize the dictionary embedded inside the sorted set to the
* right size, in order to save rehashing time. */
dictExpand(dstzset->dict,dictSize(accumulator));

while((de = dictNext(di)) != NULL) {
robj *ele = dictGetKey(de);
score = dictGetDoubleVal(de);
znode = zslInsert(dstzset->zsl,score,ele);
incrRefCount(ele); /* added to skiplist */
dictAdd(dstzset->dict,ele,&znode->score);
incrRefCount(ele); /* added to dictionary */
}
dictReleaseIterator(di);

/* We can free the accumulator dictionary now. */
dictRelease(accumulator);
} else {
redisPanic("Unknown operator");
}
Expand Down