一、前言
前段时间辞职骑完川藏线后回来找工作,面试 贝尔科教后端开发工程师 岗位时,遇到这样一个面试题:
有一个几十亿的白名单,每天白天需要高并发查询,晚上需要更新一次,如何设计这个功能。
这道题本质上是解决 判断数据是否存在于一个大集合中。我当时的回答大致是:前面设置一个 布隆过滤器,可以判断哪些 key 一定不存在、哪些可能存在;通过布隆过滤器的检测之后,后面再设置 n 个 redis 数据库(桶),通过一个 hash 函数进行分桶操作,之后在某个桶中判断某个 key 是否存在就是 O(1) 的时间复杂度。
需要注意的是,在计算机中,判断一个元素是不是在一个集合中,通常是用 hash 来解决,这在数据量不大的时候是可以的,但是当数据量很大的时候存储空间就会爆炸。
当时面试官并没有表示满意或者不满意,毕竟这和其他众多更加底层的面试题比起来,只是冰山一角。最后的面试结果是通过,但因为薪资没有达到我的期望,因此也就没有后续了。
正好国庆假期有时间,系统总结一下 布隆过滤器 的原理,介绍现有的 Redis 实现,并希望通过 Golang 简单实现,算是学习与总结。
二、原理
布隆过滤器(Bloom Filter) 由 布隆 于 1970 年提出,在这里可以看到原论文:Space/time Trade-offs in Hash Coding with Allowable Errors。从论文标题可以看出,布隆过滤器在时空复杂度方面有着非常大的优势,同时使用到了哈希,但是存在误算率。实际上,布隆过滤器由 一个很长的二进制数组 和 一系列哈希函数 组成,它的作用是 可以检索一个元素是否存在于一个集合中,优点是 插入与查询的时空效率都远超一般的算法,缺点是 存在一定的误识别率 和 删除困难。
下面我们看一下布隆过滤器的工作流程:
布隆过滤器本质上是由长度为 m 的 位向量 或者 位列表 (仅包含 0 或者 1 的列表)组成,并且列表的所有元素被初始化为 0:

当然还会有一系列哈希函数:

当我们插入一个 key 时,先通过几个哈希函数得到各自的哈希值,然后将位列表对应位设置为 1:

再插入元素时,继续将对应位设置为 1 即可,而不用担心之前的值是否为 1:

当我们查询 another_key 是否在上面的位列表中时,还是经过同样的哈希函数,得到各个列表索引值,进而得到位列表处的值,之后进行判断:如果全为 1,则表示可能存在,如果出现一个为 0,则表明一定不存在。

为什么说当 结果全为 1 时可能存在,而不是 一定存在?假设我们要查一个单词 test 是否存在,其计算的哈希索引值分别为 [2, 5, 14],位列表中对应的值也全是 1,但是三个 1 是由 hu 和 Jemmy 两个单词插入的结果,原来并没有 test,这种情况下就会出现误判。
你可以在这个在线网站 Bloom Filters 上自己体会一下这个过程。
我们假设位列表的长度为 m,有 k 个哈希函数,那么其误判率大概为:

对于给定的 m 和 n,当

的时候,误差率取得最小值。具体的推导过程可参考这篇文章:布隆过滤器的误判率该如何计算? - Xdims 的回答 - 知乎。我们只需要记住结论即可:
- 不要让实际元素数量远大于初始化数量;
- 如果实际元素数量超过初始化数量,则应该选择更大的
m重建布隆过滤器,将之前的元素进行批量add。
三、在 Redis 中使用
在低版本需要安装插件并且重新启动:
- 下载插件并安装
cd ~/Documents && git clone https://github.com/RedisBloom/RedisBloom && cd RedisBloom && make
# 会得到一个 redisbloom.so 文件
- 重启
redis-server:
# 在redis-cli中关闭服务器,其他方法比如 命令行下 kill -9 (redis-server的pid)是没用的
shutdown
# 之后退出
# 重启
redis-server /usr/local/etc/redis.conf --loadmodule ~/Documents/RedisBloom/redisbloom.so &
- 重新进入
redis-cli即可。
常用命令如下:
bf.add name key:往名为name的布隆过滤器中添加一个keybf.madd name key1 key2 ... keyn: 往名为name的布隆过滤器中批量添加多个keybf.exists name key:检查key是否存在于名为name的布隆过滤器中bf.mexists name key1 key2 ... keyn:查询多个key是否存在于布隆过滤器中。

五、使用 Golang 实现
package main
import (
"encoding/binary"
"github.com/spaolacci/murmur3"
"hash"
"sync"
"unsafe"
)
/*
* @CreateTime: 2020/10/7 17:28
* @Author: Jemmy(hujm20151021@gmail.com)
* @Description: 布隆过滤器 实现
*/
// BloomFilter 布隆过滤器的struct
type BloomFilter struct {
m uint8 // 位数组长度为 2^m
n uint64 // 已有元素
k uint32 // 哈希函数的个数
hashFunc []hash.Hash64 // 哈希函数,使用 murmur3 算法,性能好实现简单,但是易于遭受DDoS攻击
data []byte
sync.RWMutex // 读写锁
}
// NewBloomFilter 初始化一个布隆过滤器
// m 表示位数组长度为 2的m次方
// k 表示哈希函数的个数
func NewBloomFilter(m uint8, k int) *BloomFilter {
if k <= 0 {
panic("invalid number k for hashFunc num ")
}
hashFunc := make([]hash.Hash64, k)
// 初始化哈希函数
for i := 0; i < k; i++ {
hashFunc[i] = murmur3.New64WithSeed(uint32(i))
}
filter := &BloomFilter{
m: m,
n: 0,
k: uint32(k),
hashFunc: hashFunc,
data: make([]byte, 1<<m),
}
// 防止创建数组越界
if len(filter.data) == 0 {
panic("m is too big to make slice")
}
return filter
}
// exists 检查元素是否存在于集合中
// true: 可能存在
// false: 一定不存在
func (b *BloomFilter) exists(data []byte) bool {
b.RLock()
defer b.RUnlock()
for _, f := range b.hashFunc {
_, _ = f.Write(data)
position := uint(f.Sum64() & ((1 << b.m) - 1))
f.Reset()
if b.data[position] == 0 {
return false
}
}
return true
}
func (b *BloomFilter) add(data []byte) {
b.Lock()
defer b.Unlock()
for _, f := range b.hashFunc {
_, _ = f.Write(data)
position := uint(f.Sum64() & ((1 << b.m) - 1))
b.data[position] = 1
f.Reset()
}
b.n++
}
// Reset 清空布隆过滤器中的所有元素
func (b *BloomFilter) Reset() {
b.Lock()
defer b.Unlock()
b.data = make([]byte, 1<<b.m)
b.n = 0
}
// Number 返回过滤器中的元素个数
func (b *BloomFilter) Number() uint64 {
b.RLock()
defer b.RUnlock()
return b.n
}
// AddString 向布隆过滤器中添加字符串对象
func (b *BloomFilter) AddString(s string) {
b.add(stringToBytes(s))
}
// ExistsString 检查s是否存在于集合中
func (b *BloomFilter) ExistsString(s string) bool {
return b.exists(stringToBytes(s))
}
// AddNumber 添加数字m
func (b *BloomFilter) AddNumber(m uint64) {
b.add(uint64ToBytes(m))
}
// ExistsNumber 检查数字m是否存在于集合中
func (b *BloomFilter) ExistsNumber(m uint64) bool {
return b.exists(uint64ToBytes(m))
}
// AddBytes 添加 序列化后的对象
func (b *BloomFilter) AddBytes(data []byte) {
b.add(data)
}
// ExistsBytes 检查对象data是否存在
func (b *BloomFilter) ExistsBytes(data []byte) bool {
return b.exists(data)
}
/*
********************************************* 辅助函数 *****************************************************
*/
// stringToBytes 将string转换成byte数组,零拷贝
func stringToBytes(s string) []byte {
return *(*[]byte)(unsafe.Pointer(&s))
}
// bytesToString byte数组转换成string,零拷贝
func bytesToString(b []byte) string {
return *(*string)(unsafe.Pointer(&b))
}
// uint64ToBytes 将uint64转为byte数组
func uint64ToBytes(num uint64) []byte {
data := make([]byte, 8)
binary.LittleEndian.PutUint64(data, num)
return data
}
写一个测试一下:
func main() {
filter := NewBloomFilter(10, 3)
filter.AddNumber(1)
filter.AddNumber(2)
filter.AddNumber(3)
filter.AddNumber(4)
filter.AddNumber(5)
filter.AddNumber(7)
filter.AddString("hu")
filter.AddString("Jemmy")
fmt.Println(filter.Number()) // 8
fmt.Println(filter.ExistsNumber(3))
fmt.Println(filter.ExistsNumber(5))
fmt.Println(filter.ExistsNumber(6))
fmt.Println(filter.ExistsString("Jemmy"))
fmt.Println(filter.ExistsString("jemmy"))
}
// 输出
8
true
true
false
true
false
【参考资料】