Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Accelerate hash table iterator with prefetching #1501

Merged
merged 1 commit into from
Jan 8, 2025
Merged
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
92 changes: 70 additions & 22 deletions src/hashtable.c
Original file line number Diff line number Diff line change
Expand Up @@ -498,7 +498,7 @@ size_t nextCursor(size_t v, size_t mask) {
}

/* Returns the next bucket in a bucket chain, or NULL if there's no next. */
static bucket *bucketNext(bucket *b) {
static bucket *getChildBucket(bucket *b) {
return b->chained ? b->entries[ENTRIES_PER_BUCKET - 1] : NULL;
}

Expand Down Expand Up @@ -548,12 +548,12 @@ static void rehashStep(hashtable *ht) {
rehashBucket(ht, b);
if (b->chained) {
/* Rehash and free child buckets. */
bucket *next = bucketNext(b);
bucket *next = getChildBucket(b);
b->chained = 0;
b = next;
while (b != NULL) {
rehashBucket(ht, b);
next = bucketNext(b);
next = getChildBucket(b);
zfree(b);
if (ht->type->trackMemUsage) ht->type->trackMemUsage(ht, -sizeof(bucket));
ht->child_buckets[0]--;
Expand Down Expand Up @@ -708,7 +708,7 @@ static bucket *findBucket(hashtable *ht, uint64_t hash, const void *key, int *po
}
}
}
b = bucketNext(b);
b = getChildBucket(b);
} while (b != NULL);
}
return NULL;
Expand Down Expand Up @@ -753,7 +753,7 @@ static void bucketConvertToUnchained(bucket *b) {
* This function needs the penultimate 'before_last' bucket in the chain, to be
* able to update it when the last bucket is freed. */
static void pruneLastBucket(hashtable *ht, bucket *before_last, bucket *last, int table_index) {
assert(before_last->chained && bucketNext(before_last) == last);
assert(before_last->chained && getChildBucket(before_last) == last);
assert(!last->chained);
assert(last->presence == 0 || __builtin_popcount(last->presence) == 1);
bucketConvertToUnchained(before_last);
Expand All @@ -775,10 +775,10 @@ static void fillBucketHole(hashtable *ht, bucket *b, int pos_in_bucket, int tabl
assert(b->chained && !isPositionFilled(b, pos_in_bucket));
/* Find the last bucket */
bucket *before_last = b;
bucket *last = bucketNext(b);
bucket *last = getChildBucket(b);
while (last->chained) {
before_last = last;
last = bucketNext(last);
last = getChildBucket(last);
}
/* Unless the last bucket is empty, find an entry in the last bucket and
* move it to the hole in b. */
Expand All @@ -800,10 +800,10 @@ static void fillBucketHole(hashtable *ht, bucket *b, int pos_in_bucket, int tabl
static void compactBucketChain(hashtable *ht, size_t bucket_index, int table_index) {
bucket *b = &ht->tables[table_index][bucket_index];
while (b->chained) {
bucket *next = bucketNext(b);
bucket *next = getChildBucket(b);
if (next->chained && next->presence == 0) {
/* Empty bucket in the middle of the chain. Remove it from the chain. */
bucket *next_next = bucketNext(next);
bucket *next_next = getChildBucket(next);
b->entries[ENTRIES_PER_BUCKET - 1] = next_next;
zfree(next);
if (ht->type->trackMemUsage) ht->type->trackMemUsage(ht, -sizeof(bucket));
Expand Down Expand Up @@ -846,7 +846,7 @@ static bucket *findBucketForInsert(hashtable *ht, uint64_t hash, int *pos_in_buc
bucketConvertToChained(ht, b);
ht->child_buckets[table]++;
}
b = bucketNext(b);
b = getChildBucket(b);
}
/* Find a free slot in the bucket. There must be at least one. */
int pos;
Expand Down Expand Up @@ -934,6 +934,51 @@ static inline incrementalFind *incrementalFindFromOpaque(hashtableIncrementalFin
return (incrementalFind *)(void *)state;
}

/* Prefetches all filled entries in the given bucket to optimize future memory access. */
static void prefetchBucketEntries(bucket *b) {
for (int pos = 0; pos < numBucketPositions(b); pos++) {
if (isPositionFilled(b, pos)) {
valkey_prefetch(b->entries[pos]);
}
}
}

/* Returns the child bucket if chained, otherwise the next bucket in the table. returns NULL if neither exists. */
static bucket *getNextBucket(bucket *current_bucket, size_t bucket_index, hashtable *ht, int table_index) {
bucket *next_bucket = NULL;
if (current_bucket->chained) {
next_bucket = getChildBucket(current_bucket);
} else {
size_t table_size = numBuckets(ht->bucket_exp[table_index]);
size_t next_index = bucket_index + 1;
if (next_index < table_size) {
next_bucket = &ht->tables[table_index][next_index];
}
}
return next_bucket;
}

/* This function prefetches data that will be needed in subsequent iterations:
* - The entries of the next bucket
* - The next of the next bucket
* It attempts to bring this data closer to the L1 cache to reduce future memory access latency.
*
* Cache state before this function is called(due to last call for this function):
* 1. The current bucket and its entries are likely already in cache.
* 2. The next bucket is in cache.
*/
static void prefetchNextBucketEntries(iter *iter, bucket *current_bucket) {
size_t next_index = iter->index + 1;
bucket *next_bucket = getNextBucket(current_bucket, next_index, iter->hashtable, iter->table);
if (next_bucket) {
prefetchBucketEntries(next_bucket);
bucket *next_next_bucket = getNextBucket(next_bucket, next_index + 1, iter->hashtable, iter->table);
if (next_next_bucket) {
valkey_prefetch(next_next_bucket);
}
}
}

/* --- API functions --- */

/* Allocates and initializes a new hashtable specified by the given type. */
Expand Down Expand Up @@ -979,7 +1024,7 @@ void hashtableEmpty(hashtable *ht, void(callback)(hashtable *)) {
}
}
}
bucket *next = bucketNext(b);
bucket *next = getChildBucket(b);

/* Free allocated bucket. */
if (b != &ht->tables[table_index][idx]) {
Expand Down Expand Up @@ -1375,7 +1420,7 @@ int hashtableReplaceReallocatedEntry(hashtable *ht, const void *old_entry, void
return 1;
}
}
b = bucketNext(b);
b = getChildBucket(b);
} while (b != NULL);
}
return 0;
Expand Down Expand Up @@ -1538,8 +1583,8 @@ int hashtableIncrementalFindStep(hashtableIncrementalFindState *state) {
bucket_idx = data->hash & mask;
}
data->bucket = &ht->tables[data->table][bucket_idx];
} else if (bucketNext(data->bucket) != NULL) {
data->bucket = bucketNext(data->bucket);
} else if (getChildBucket(data->bucket) != NULL) {
data->bucket = getChildBucket(data->bucket);
} else if (data->table == 0 && ht->rehash_idx >= 0) {
data->table = 1;
size_t mask = expToMask(ht->bucket_exp[1]);
Expand Down Expand Up @@ -1656,7 +1701,7 @@ size_t hashtableScanDefrag(hashtable *ht, size_t cursor, hashtableScanFunction f
}
}
}
bucket *next = bucketNext(b);
bucket *next = getChildBucket(b);
if (next != NULL && defragfn != NULL) {
next = bucketDefrag(b, next, defragfn);
}
Expand Down Expand Up @@ -1693,7 +1738,7 @@ size_t hashtableScanDefrag(hashtable *ht, size_t cursor, hashtableScanFunction f
}
}
}
bucket *next = bucketNext(b);
bucket *next = getChildBucket(b);
if (next != NULL && defragfn != NULL) {
next = bucketDefrag(b, next, defragfn);
}
Expand Down Expand Up @@ -1723,7 +1768,7 @@ size_t hashtableScanDefrag(hashtable *ht, size_t cursor, hashtableScanFunction f
}
}
}
bucket *next = bucketNext(b);
bucket *next = getChildBucket(b);
if (next != NULL && defragfn != NULL) {
next = bucketDefrag(b, next, defragfn);
}
Expand Down Expand Up @@ -1859,7 +1904,7 @@ int hashtableNext(hashtableIterator *iterator, void **elemptr) {
iter->pos_in_bucket++;
if (iter->bucket->chained && iter->pos_in_bucket >= ENTRIES_PER_BUCKET - 1) {
iter->pos_in_bucket = 0;
iter->bucket = bucketNext(iter->bucket);
iter->bucket = getChildBucket(iter->bucket);
} else if (iter->pos_in_bucket >= ENTRIES_PER_BUCKET) {
/* Bucket index done. */
if (iter->safe) {
Expand Down Expand Up @@ -1890,6 +1935,9 @@ int hashtableNext(hashtableIterator *iterator, void **elemptr) {
}
}
bucket *b = iter->bucket;
if (iter->pos_in_bucket == 0) {
prefetchNextBucketEntries(iter, b);
}
if (!isPositionFilled(b, iter->pos_in_bucket)) {
/* No entry here. */
continue;
Expand Down Expand Up @@ -1988,7 +2036,7 @@ hashtableStats *hashtableGetStatsHt(hashtable *ht, int table_index, int full) {
unsigned long chainlen = 0;
while (b->chained) {
chainlen++;
b = bucketNext(b);
b = getChildBucket(b);
}
if (chainlen > stats->max_chain_len) {
stats->max_chain_len = chainlen;
Expand Down Expand Up @@ -2083,7 +2131,7 @@ void hashtableDump(hashtable *ht) {
printf("(empty)\n");
}
}
b = bucketNext(b);
b = getChildBucket(b);
level++;
} while (b != NULL);
}
Expand Down Expand Up @@ -2117,7 +2165,7 @@ void hashtableHistogram(hashtable *ht) {
continue;
}
printf("%X", __builtin_popcount(b->presence));
buckets[idx] = bucketNext(b);
buckets[idx] = getChildBucket(b);
if (buckets[idx] == NULL) chains_left--;
}
printf("\n");
Expand All @@ -2138,7 +2186,7 @@ int hashtableLongestBucketChain(hashtable *ht) {
if (++chainlen > maxlen) {
maxlen = chainlen;
}
b = bucketNext(b);
b = getChildBucket(b);
}
}
}
Expand Down
Loading