0%

【Redis数据结构】Dict的实现

字典是一种用于保存键值对(key-value pair)的抽象数据结构, 在字典中, 一个键(key)可以可一个值关联(或者说将键映射为值), 这些关联的键和值就成为键值对。

字典中的每个键都是独一无二的, 程序可以在字典中做到根据键查找与之关联的值,或者通过键更新/删除值等

Redis用到字典的地方主要有:

  • 实现数据库键空间(key space),维护 key 和 value 映射关系的数据结构
  • 用作 Hash 类型键的底层实现之一(另一种是ziplist)
  • 使用dict和skiplist来共同维护一个sorted_set

相关概念

实现原理: Redis的字典使用HASH表作为底层实现, 一个哈希表里面可以有多个哈希表节点, 而每个哈希表就保存了字典的一个键值对。

解决HASH冲突方法: 采用拉链法解决HASH冲突

ReHASH: 将 0号哈希表 的所有键值对迁移到 1号哈希表 的过程

负载因子: load_factor = ht[0].used / ht[0].size

Dict源码

字典定义

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
/*
* 哈希表节点
*/
typedef struct dictEntry {
// 键
void *key;
// 值
union {
void *val;
uint64_t u64;
int64_t s64;
double d;
} v;
// 下一个节点指针
struct dictEntry *next;
} dictEntry;

/* This is our hash table structure. Every dictionary has two of this as we
* implement incremental rehashing, for the old to the new table. */
/*
* 哈希表
*
* 每个字典都使用两个哈希表,从而实现渐进式 rehash 。
*/
typedef struct dictht {
// dictEntry数组, 数组长度为size
dictEntry **table;
// 哈希表大小, 初始为4, 大小总是2的n次方
unsigned long size;
// 哈希表大小掩码,用于计算索引值, 总是等于 size - 1
unsigned long sizemask;
// 该哈希表已有节点的数量
unsigned long used;
} dictht;

/*
* 字典
*/
typedef struct dict {
dictType *type;
void *privdata;
// 定义2个哈希表
dictht ht[2];
// 表示正在进行rehash的链表在数组中索引 rehash未进行是为 -1
// 字典在渐进式rehash时, 直接通过 rehashidx 去ht[0]->table去取需要迁移的链表
long rehashidx; /* rehashing not in progress if rehashidx == -1 */
// 目前正在运行的安全迭代器的数量
unsigned long iterators; /* number of iterators currently running */
} dict;

哈希表的实现(dictht):

哈希表一共有 size 条链表, 链表初始都为null, 哈希表的每个节点都是 dictEntry, dictEntry 有一个指向下一个节点的next指针, 在每次插入时都要计算哈希值与sizemask做与运算算出插入到哪条链表中, 将新节点插入到链表表头

字典定义了2个哈希表, 正常情况下只使用0号哈希表(ht[0]), 1号哈希表(ht[1])为null, 只有在字典 rehash 时 ht[1] 才派上用场。

字典的增删改查过程

插入一个新的键值对的过程(dictAddRaw):
  1. 判断是否正在 rehash, 如果在 rehash, 使用ht[1], 否则使用ht[0]
  2. 使用 murmurhash2 算法计算新节点键的hash
  3. 将计算得到的 hash 值与 sizemask 做 & 运算得到数组索引(idx), 也就是将要要插入的槽
  4. 将这个新节点插入到这个槽的链表表头(新节点next指针指向表头的next指针, 之后表头的指向新节点)
  5. 更新used(ht->used++;)
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
/* Low level add or find:
* This function adds the entry but instead of setting a value returns the
* dictEntry structure to the user, that will make sure to fill the value
* field as he wishes.
*
* This function is also directly exposed to the user API to be called
* mainly in order to store non-pointers inside the hash value, example:
*
* entry = dictAddRaw(dict,mykey,NULL);
* if (entry != NULL) dictSetSignedIntegerVal(entry,1000);
*
* Return values:
*
* If key already exists NULL is returned, and "*existing" is populated
* with the existing entry if existing is not NULL.
*
* If key was added, the hash entry is returned to be manipulated by the caller.
*/
dictEntry *dictAddRaw(dict *d, void *key, dictEntry **existing)
{
long index;
dictEntry *entry;
dictht *ht;

if (dictIsRehashing(d)) _dictRehashStep(d);

/* Get the index of the new element, or -1 if
* the element already exists. */
if ((index = _dictKeyIndex(d, key, dictHashKey(d,key), existing)) == -1)
return NULL;

/* Allocate the memory and store the new entry.
* Insert the element in top, with the assumption that in a database
* system it is more likely that recently added entries are accessed
* more frequently. */
ht = dictIsRehashing(d) ? &d->ht[1] : &d->ht[0];
entry = zmalloc(sizeof(*entry));
entry->next = ht->table[index];
ht->table[index] = entry;
ht->used++;

/* Set the hash entry fields. */
dictSetKey(d, entry, key);
return entry;
}
查找一个键值对的过程(dictFind):
  1. 计算得 key 的 hash 值,与掩码做位运算获取数组索引
  2. 遍历 ht[0] 查看这个槽上的链表的所有节点, 找到返回这个节点的指针
  3. 判断是否正在rehash, 正在 rehash 再去 ht[1] 中查找, 否则返回NULL
删除一个键值对的过程(dictGenericDelete):
  1. 查找需要删除的节点(he)和它的的前一个节点(prevHe)
  2. 判断 prevHe 是否为空判断 he 是否是头结点
  3. 如果 he 是头结点, 将表头(table[idx])设置为null
  4. he 不是头结点, prevHe 的 next 指针指向 he.next
  5. 释放 he 的内存
修改键值对(dictReplace):
  1. 尝试将调用 dictAddRaw 添加键值对
  2. 新增成功, 返回1
  3. 新增失败, 说明 key 在字典中已经存在
  4. 调用 dictFind 查找键值对
  5. 使用变量保存原有值的指针,节点值设置新的(dictSetVal)
  6. 释放掉旧的键值对(dictFreeVal)

字典的rehash

字典的rehash过程:

  1. 检查 负载因子 是否达到 rehash 的条件
  2. 将 ht[1] 的table数组大小设置为>=size的最小的2的n次方(可能是扩容,也可能是缩容)
  3. 遍历 ht[0] 的所有链表, 对每条链表上节点的键重新计算hash值, 与 ht[1] 的 sizemask 做&运算得到节点在ht[1]中要插入的链表
  4. 将节点设置为 ht[1] 头结点(只是改变节点的next指针指向, 不需要耗费额外的空间)
  5. rehash 并不是一次迁移所有的节点, 这个过程是渐进式的, 在迁移的过程中还可以对字典进行增删改查
  6. 在 rehash 的过程中, ht[0] 与 ht[1] 中都不为空, 删、改、查同时使用 ht[0] 和 ht[1], 新增节点只使用 ht[1]
  7. rehash 结束后ht[0]的所有元素都迁移到了ht[1],ht[0]为空, 将 ht[1] 设置为 ht[0], 释放ht[0] 并重置ht[1]

rehash的时机:

哈希表扩容:

dictAddRaw 函数中调用了_dictKeyIndex函数获取数组的索引: if ((index = _dictKeyIndex(d, key, dictHashKey(d,key), existing)) == -1);
_dictKeyIndex 函数中有 if (_dictExpandIfNeeded(d) == DICT_ERR);
_dictExpandIfNeeded 函数是判断是否需要扩容rehash的函数, 在此函数判断是否需要扩容的语句如下:

1
2
3
4
5
6
static unsigned int dict_force_resize_ratio = 5;

if (d->ht[0].used >= d->ht[0].size && (dict_can_resize || d->ht[0].used/d->ht[0].size > dict_force_resize_ratio))
{
return dictExpand(d, d->ht[0].used*2);
}

翻译过来就是2种情况进行扩容:

  • 负载因子(0号哈希表已有的节点数 / 哈希表容量) >= 1 且 dict_can_resize 为 true
    • dict_can_resize 在 redis 正在执行 bgsave(rdb持久化) 或 bgrewriteaof(aof持久化) 时为 false (updateDictResizePolicy)
    • 也就是说正在 redis 正在进行持久化时是不允许字典 rehash 的
  • 负载因子 >= 5 时, 不管是否正在进行持久化, 都给字典扩容
哈希表缩容:

hashTypeDelete 函数中删除完要删的hash节点之后有 if (htNeedsResize(o->ptr)) dictResize(o->ptr);

htNeedsResizedictResize 函数中有是否需要缩容的判断条件:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
#define DICT_HT_INITIAL_SIZE     4
#define HASHTABLE_MIN_FILL 10

int htNeedsResize(dict *dict) {
long long size, used;

size = dictSlots(dict);
used = dictSize(dict);
return (size > DICT_HT_INITIAL_SIZE &&
(used*100/size < HASHTABLE_MIN_FILL));
}

/* Resize the table to the minimal size that contains all the elements,
* but with the invariant of a USED/BUCKETS ratio near to <= 1 */
int dictResize(dict *d)
{
int minimal;

if (!dict_can_resize || dictIsRehashing(d)) return DICT_ERR;
minimal = d->ht[0].used;
if (minimal < DICT_HT_INITIAL_SIZE)
minimal = DICT_HT_INITIAL_SIZE;
return dictExpand(d, minimal);
}

翻译一下就是:

  • 哈希表中已经使用的元素格式大于4, 且负载因子小于 0.1, 需要缩容
  • 正在执行 bgsavebgrewriteaof 时, 或者正在rehash不允许缩容
渐进 rehash 的过程

在增删改查中都调用了 dictIsRehashing 函数判断是否正在进行 rehash, 如果正在进行 rehash, 则调用 _dictRehashStep 进行单步 rehash(只迁移一条链表中的所有元素):

1
2
3
static void _dictRehashStep(dict *d) {
if (d->iterators == 0) dictRehash(d,1);
}

rehash源码:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
/* Performs N steps of incremental rehashing. Returns 1 if there are still
* keys to move from the old to the new hash table, otherwise 0 is returned.
*
* Note that a rehashing step consists in moving a bucket (that may have more
* than one key as we use chaining) from the old to the new hash table, however
* since part of the hash table may be composed of empty spaces, it is not
* guaranteed that this function will rehash even a single bucket, since it
* will visit at max N*10 empty buckets in total, otherwise the amount of
* work it does would be unbound and the function may block for a long time.
*
* n 步 rehash, 最多从ht[0]中迁移n条非空链表到ht[1]
* 参数d: 字典结构体指针, n: 需要rehash的步数 \
* 返回值: 0: rehash 已经完成了, 1: rehash还没有完成
*/
int dictRehash(dict *d, int n) {

// 最多遍历10n个空桶, 避免非空链表在最后面一直需要遍历到字典最后才能找到完成更新n步rehash的情况
int empty_visits = n*10; /* Max number of empty buckets to visit. */

if (!dictIsRehashing(d)) return 0;

while(n-- && d->ht[0].used != 0) {
dictEntry *de, *nextde;

/* Note that rehashidx can't overflow as we are sure there are more
* elements because ht[0].used != 0 */
assert(d->ht[0].size > (unsigned long)d->rehashidx);

// 判断链表为空将 empty_visits-1, empty_visits 减到0退出rehash, 并返回1
while(d->ht[0].table[d->rehashidx] == NULL) {
d->rehashidx++;
if (--empty_visits == 0) return 1;
}

// rehashidx为当前遍历到的链表(桶)的索引, de(dictentry)是需要迁移到ht[1]中的链表
de = d->ht[0].table[d->rehashidx];

/* Move all the keys in this bucket from the old to the new hash HT */
while(de) {
uint64_t h;

// 1. 保存de的下一个节点
nextde = de->next;

// 2. 获取这个de在新的hash表(ht[1])中的索引
h = dictHashKey(d, de->key) & d->ht[1].sizemask;

// 3. 将de的next指针指向ht[1]中要插入的新链表的表头节点
de->next = d->ht[1].table[h];

// 4. ht[1] 中的链表表头节点设置为de
d->ht[1].table[h] = de;

// 更新ht[0]和ht[1]的used值
d->ht[0].used--;
d->ht[1].used++;

// 5. 继续更新de的下一届节点, 直到这条旧链表迁移完了
de = nextde;
}

// 将旧链表设置为null, rehashidx+1
d->ht[0].table[d->rehashidx] = NULL;
d->rehashidx++;
}

// 通过判断0号哈希表的节点数量判断是否已经完成了 rehash
// 完成rehash的的话 1. 释放ht[0]的table数组 2. 将ht[1]设置ht[0] 3. 重置ht[1] 4. rehashidx改为-1表示没有在rehash
/* Check if we already rehashed the whole table... */
if (d->ht[0].used == 0) {
zfree(d->ht[0].table);
d->ht[0] = d->ht[1];
_dictReset(&d->ht[1]);
d->rehashidx = -1;
return 0;
}

/* More to rehash... */
return 1;
}

除了增删改查过程中有调用 dictRehash(d,1) 进行单独rehash外,在Redis的服务器定时任务 serverCron 中判断正在 rehash 也有调用 dictRehashMilliseconds(server.db[dbid].dict,1) 在用 1 ms 的时间迁移最少 100 条链表

serverCron 定时任务的时间默认为 100ms 执行一次, 可在修改配置文件中修改 hz 参数的值来修改定时任务的执行频率, 执行的频率为 1000/hz ms 一次

dictRehashMilliseconds 函数如下, 作用是在指定的毫秒内更新

1
2
3
4
5
6
7
8
9
10
11
/* Rehash for an amount of time between ms milliseconds and ms+1 milliseconds */
int dictRehashMilliseconds(dict *d, int ms) {
long long start = timeInMilliseconds();
int rehashes = 0;

while(dictRehash(d,100)) {
rehashes += 100;
if (timeInMilliseconds()-start > ms) break;
}
return rehashes;
}