一致性Hash算法
1.一致性Hash的算法背景
一致性 Hash 算法是解决
分布式缓存
等问题的一种算法
场景分析:
有三台缓存服务器编号node0
、node1
、node2
,现在有 3000 万个key
,希望可以将这些个 key 均匀的缓存到三台机器上,你会想到什么方案呢?
我们可能首先想到的方案是:取模算法hash(key)% N
,即:对 key 进行 hash 运算后取模,N 是机器的数量;
这样,对 key 进行 hash 后的结果对 3 取模,得到的结果一定是 0、1 或者 2,正好对应服务器node0
、node1
、node2
,存取数据直接找对应的服务器即可,简单粗暴,完全可以解决上述的问题;
取模算法虽然使用简单,但对机器数量取模,在集群扩容和收缩时却有一定的局限性:因为在生产环境中根据业务量的大小,调整服务器数量是常有的事;
而服务器数量 N 发生变化后hash(key)% N
计算的结果也会随之变化!
比如:一个服务器节点挂了,计算公式从hash(key)% 3
变成了hash(key)% 2
,结果会发生变化,此时想要访问一个 key,这个 key 的缓存位置大概率会发生改变,那么之前缓存 key 的数据也会失去作用与意义;
大量缓存在同一时间失效,造成缓存的雪崩,进而导致整个缓存系统的不可用,这基本上是不能接受的;
为了解决优化上述情况,一致性 hash 算法应运而生
2.一致性Hash算法详述
2.1 算法原理
一致性哈希算法在 1997 年由麻省理工学院提出,是一种特殊的哈希算法,在移除或者添加一个服务器时,能够尽可能小地改变已存在的服务请求与处理请求服务器之间的映射关系;
一致性哈希解决了简单哈希算法在分布式哈希表
Distributed Hash Table,DHT)中存在的动态伸缩等问题;
一致性 hash 算法本质上也是一种取模算法;
不过,不同于上边按服务器数量取模,一致性 hash 是对固定值 2^32 取模;
IPv4 的地址是 4 组 8 位 2 进制数组成,所以用 2^32 可以保证每个 IP 地址会有唯一的映射;
2.2 工作原理
- hash环
我们可以将这2^32
个值抽象成一个圆环⭕️ ,圆环的正上方的点代表 0,顺时针排列,以此类推:1、2、3…直到2^32-1
,而这个由 2 的 32 次方个点组成的圆环统称为hash环
;
- 服务器映射到hash环
在对服务器进行映射时,使用hash(服务器ip)% 2^32
,即:
可以使用服务器 IP 地址或主机名进行 hash 计算,用哈希后的结果对2^32
取模,结果一定是一个 0 到2^32-1
之间的整数;
而这个整数映射在 hash 环上的位置代表了一个服务器,依次将node0
、node1
、node2
三个缓存服务器映射到 hash 环上;
- 对象key映射到服务器
在对对应的 Key 映射到具体的服务器时,需要首先计算 Key 的 Hash 值:hash(key)% 2^32
;
注:此处的 Hash 函数可以和之前计算服务器映射至 Hash 环的函数不同,只要保证取值范围和 Hash 环的范围相同即可(即:
2^32
);
将 Key 映射至服务器遵循下面的逻辑:
从缓存对象 key 的位置开始,沿顺时针
方向遇到的第一个
服务器,便是当前对象将要缓存到的服务器;
假设我们有 "semlinker"、"kakuqo"、"lolo"、"fer" 四个key对应的对象,分别简写为 o1、o2、o3 和 o4;
首先,使用哈希函数计算这个对象的 hash 值,值的范围是 [0, 2^32-1]
:
假设经过计算后key对应的hash值为:
hash(o1) = k1; hash(o2) = k2; hash(o3) = k3; hash(o4) = k4;
同时 3 台缓存服务器,分别为 CS1、CS2 和 CS3。则可知,各对象和服务器的映射关系如下:
K1 => CS1 K4 => CS3 K2 => CS2 K3 => CS1
由此可以看到,一致性 Hash 就是:将原本单个点的 Hash 映射,转变为了在一个环上的某个片段上的映射!
2.3 服务器扩缩容场景
2.3.1 服务器减少
假设 CS3 服务器出现故障导致服务下线,这时原本存储于 CS3 服务器的对象 k4,需要被重新分配至 CS2 服务器,其它对象仍存储在原有的机器上:
此时受影响的数据只有 CS1 和 CS3 服务器之间的部分数据。即受影响的数据仅仅是此服务器到其环空间前一台服务器之间的数据
2.3.2 服务器增加
假如业务量激增,我们需要增加一台服务器 CS4,经过同样的 hash 运算,该服务器最终落于 CS2 和 CS3 服务器之间,具体如下图所示:
此时只有新增的服务器CS4和前一台服务器CS3之间的数据对象需要重新分配;
在以上示例中只有 k2 对象需要重新分配,即它被重新到 CS4 服务器;
在前面我们已经说过:如果使用简单的取模方法,当新添加服务器时可能会导致大部分缓存失效,而使用一致性哈希算法后,这种情况得到了较大的改善,因为只有少部分对象需要重新分配!
2.4 数据倾斜&缓存雪崩
问题:
在上面给出的例子中,各个服务器几乎是平均被均摊到 Hash 环上;
但是在实际场景中很难选取到一个 Hash 函数这么完美的将各个服务器散列到 Hash 环上;
此时,在服务器节点数量太少的情况下,很容易因为节点分布不均匀而造成数据倾斜问题;
如下图被缓存的对象大部分缓存在node1
服务器上,导致其他节点资源浪费,系统压力大部分集中在node1
节点上,这样的集群是非常不健康的:
如果每个节点在环上只有一个节点,那么可以想象,当某一集群从环中消失时,它原本所负责的任务将全部交顺
时针方向的下一个节点处理。例如,当node0退出时,它原本所负责的缓存将全部交给node1处理。这就意味着
node1的访问压力会瞬间增大。设想一下,如果node1因为压力过大而崩溃,那么更大的压力又会向node3压过
去,最终服务压力就像滚雪球一样越滚越大,最终导致雪崩。
2.4.1 虚拟节点
针对上面的问题,我们可以通过:引入虚拟节点来解决负载不均衡的问题:
即将
每台物理服务器
虚拟为一组虚拟服务器
,将虚拟服务器
放置到哈希环上
,如果要确定对象的服务器,需先确定对象的虚拟服务器,再由虚拟服务器确定物理服务器;
在图中:o1 和 o2 表示对象,v1 ~ v6 表示虚拟服务器,s1 ~ s3 表示实际的物理服务器;
虚拟节点的计算:
虚拟节点的 hash 计算通常可以采用:对应节点的 IP 地址或主机名加数字编号后缀 hash(10.24.23.227#1) 的方式;
举个例子,node-1 节点 IP 为 10.24.23.227,正常计算node-1
的 hash 值:
hash(10.24.23.227#1)% 2^32
假设我们给 node-1 设置三个虚拟节点,node-1#1
、node-1#2
、node-1#3
,对它们进行 hash 后取模:
hash(10.24.23.227#1)% 2^32
hash(10.24.23.227#2)% 2^32
hash(10.24.23.227#3)% 2^32
注意:
- 分配的虚拟节点个数越多,映射在 hash 环上才会越趋于均匀,节点太少的话很难看出效果;
- 引入虚拟节点的同时也增加了新的问题,要做虚拟节点和真实节点间的映射,
对象key->虚拟节点->实际节点
之间的转换;
使用场景:
一致性 hash 在分布式系统中应该是实现负载均衡的首选算法,它的实现比较灵活,既可以在客户端实现,也可以在中间件上实现,比如日常使用较多的缓存中间件memcached
和redis
集群都有用到它;
memcached 的集群比较特殊,严格来说它只能算是伪集群,因为它的服务器之间不能通信,请求的分发路由完全靠客户端来的计算出缓存对象应该落在哪个服务器上,而它的路由算法用的就是一致性 hash;
还有 redis 集群中 hash 槽的概念,虽然实现不尽相同,但思想万变不离其宗。
其它的应用场景还有很多:
RPC
框架Dubbo
用来选择服务提供者- 分布式关系数据库分库分表:数据与节点的映射关系
LVS
负载均衡调度器- ……
3.一致性Hash算法的实现
算法接口类:
public interface IHashService {
Long hash(String key);
}
算法接口实现类:
public class HashService implements IHashService {
/**
* MurMurHash算法,性能高,碰撞率低
*
* @param key String
* @return Long
*/
public Long hash(String key) {
ByteBuffer buf = ByteBuffer.wrap(key.getBytes());
int seed = 0x1234ABCD;
ByteOrder byteOrder = buf.order();
buf.order(ByteOrder.LITTLE_ENDIAN);
long m = 0xc6a4a7935bd1e995L;
int r = 47;
long h = seed ^ (buf.remaining() * m);
long k;
while (buf.remaining() >= 8) {
k = buf.getLong();
k *= m;
k ^= k >>> r;
k *= m;
h ^= k;
h *= m;
}
if (buf.remaining() > 0) {
ByteBuffer finish = ByteBuffer.allocate(8).order(ByteOrder.LITTLE_ENDIAN);
finish.put(buf).rewind();
h ^= finish.getLong();
h *= m;
}
h ^= h >>> r;
h *= m;
h ^= h >>> r;
buf.order(byteOrder);
return h;
}
}
模拟机器节点:
public class Node<T> {
private String ip;
private String name;
public Node(String ip, String name) {
this.ip = ip;
this.name = name;
}
public String getIp() {
return ip;
}
public void setIp(String ip) {
this.ip = ip;
}
public String getName() {
return name;
}
public void setName(String name) {
this.name = name;
}
/**
* 使用IP当做hash的Key
*
* @return String
*/
@Override
public String toString() {
return ip;
}
}
一致性Hash操作:
public class ConsistentHash<T> {
// Hash函数接口
private final IHashService iHashService;
// 每个机器节点关联的虚拟节点数量
private final int numberOfReplicas;
// 环形虚拟节点
private final SortedMap<Long, T> circle = new TreeMap<Long, T>();
public ConsistentHash(IHashService iHashService, int numberOfReplicas, Collection<T> nodes) {
this.iHashService = iHashService;
this.numberOfReplicas = numberOfReplicas;
for (T node : nodes) {
add(node);
}
}
/**
* 增加真实机器节点
*
* @param node T
*/
public void add(T node) {
for (int i = 0; i < this.numberOfReplicas; i++) {
circle.put(this.iHashService.hash(node.toString() + i), node);
}
}
/**
* 删除真实机器节点
*
* @param node T
*/
public void remove(T node) {
for (int i = 0; i < this.numberOfReplicas; i++) {
circle.remove(this.iHashService.hash(node.toString() + i));
}
}
public T get(String key) {
if (circle.isEmpty()) return null;
long hash = iHashService.hash(key);
// 沿环的顺时针找到一个虚拟节点
if (!circle.containsKey(hash)) {
SortedMap<Long, T> tailMap = circle.tailMap(hash);
hash = tailMap.isEmpty() ? circle.firstKey() : tailMap.firstKey();
}
return circle.get(hash);
}
}
测试类:
public class TestHashCircle {
// 机器节点IP前缀
private static final String IP_PREFIX = "192.168.0.";
public static void main(String[] args) {
// 每台真实机器节点上保存的记录条数
Map<String, Integer> map = new HashMap<String, Integer>();
// 真实机器节点, 模拟10台
List<Node<String>> nodes = new ArrayList<Node<String>>();
for (int i = 1; i <= 10; i++) {
map.put(IP_PREFIX + i, 0); // 初始化记录
Node<String> node = new Node<String>(IP_PREFIX + i, "node" + i);
nodes.add(node);
}
IHashService iHashService = new HashService();
// 每台真实机器引入100个虚拟节点
ConsistentHash<Node<String>> consistentHash = new ConsistentHash<Node<String>>(iHashService, 500, nodes);
// 将5000条记录尽可能均匀的存储到10台机器节点上
for (int i = 0; i < 5000; i++) {
// 产生随机一个字符串当做一条记录,可以是其它更复杂的业务对象,比如随机字符串相当于对象的业务唯一标识
String data = UUID.randomUUID().toString() + i;
// 通过记录找到真实机器节点
Node<String> node = consistentHash.get(data);
// 再这里可以能过其它工具将记录存储真实机器节点上,比如MemoryCache等
// ...
// 每台真实机器节点上保存的记录条数加1
map.put(node.getIp(), map.get(node.getIp()) + 1);
}
// 打印每台真实机器节点保存的记录条数
for (int i = 1; i <= 10; i++) {
System.out.println(IP_PREFIX + i + "节点记录条数:" + map.get(IP_PREFIX + i));
}
}
}
运行结果如下:
参考: