当前位置 博文首页 > 外星喵的博客:Redis实现分布式锁
在传统单体应用单机部署的情况下,可以使用本地锁如ReentrantLcok 或 synchronized进行互斥控制来解决。但是,随着业务的发展,系统架构也会逐步优化升级,原本单体单机部署的系统被演化成分布式集群系统,由于分布式系统多线程、多进程并且分布在多个不同机器上,这将使原单机部署情况下的并发控制锁策略无法满足需求。
锁的作用是保证多个进程或线程在并发操作操作共享资源时资源的正确性。在分布式应用中,一个服务需要部署多个实例,对于操作分布式环境下的共享资源,就需要使用分布式锁来保证操作的正确性。
在分布式环境中,对资源进行上锁有时候是很重要的,比如秒杀、抢购某一资源,使用分布式锁就可以很好地控制资源。
处理效率提升:应用分布式锁,可以减少重复任务的执行,避免资源处理效率的浪费;
数据准确性保障:使用分布式锁可以放在数据资源的并发访问,避免数据不一致情况,甚至数据损失等。
分布式锁应该具有互斥、可重入、锁超时,高可用等特点。
其中前面几个特点和本地锁具体的特点相同,高可用是分布式锁需要具备的重要的特点。
在CAP 理论基础下,大家都会牺牲强一致性来换取系统的高可用性,这样我们很多的场景,其实是只需为了保证数据的最终一致性。
要实现分布式锁,需要具备以下条件:
而redis恰恰具备以下特点:
使用 Redis 实现分布式锁应该具备满足两个条件: 1. 加锁和解锁过程必须是原子操作;2. 保证高可用。
SETNX key val:当且仅当 key 不存在时,set 一个 key 为 val 的字符串,返回 1;若 key 存在,则什么都不做,返回 0。
expire key val:为 key 设置一个超时时间,单位为 second,超过这个时间锁会自动释放,避免死锁。
del key:在使用 Redis 实现分布式锁的时候,主要就会使用到这三个命令。
线上的 Redis 部署一般都是集群模式,基于单节点实现,即在集群模式下只会对一个 master 进行加解锁操作,至于是哪个 master,则需要根据 Redis 的 key 计算出的哈希槽来决定(具体可以去了解 Redis Cluster 模式)。
加锁、解锁操作可能需要多个操作,需要保证操作的原子性,通过 Redis 的单命令和 Lua 脚本两种方式实现原子操作。
加锁注意事项
使用 Redis 命令 SET lock_key unique_value NX EX seconds
进行加锁,单命令操作,Redis 是串行执行命令,所以能保证只有一个能加锁成功。
解锁通过DEL
命令来删除,为了避免错误的解锁(A 加锁,B 解锁),所以需要比较 value,整个过程为了保证原子性,所以使用 Lua 脚本(unlock.script)
此实现方案是基于单个节点保存锁信息,不具备高可靠性。要保证高可靠,则要基于多个节点实现。
还有一个缺点,可能存在设置的过期时间已到,业务处理还未结束就提前释放锁,其他请求可以继续获取到锁。解决方案就是使用Redisson
,大致的解决原理是,开启一个守护线程定期检查锁是否存在,如存在则延长 key 的时间,尝试获取的客户端则通过自旋的方式获取锁。
由 Redis 的开发者 Antirez 提出,RedLock 算法的基本思路是让客户端和多个独立的 Redis 实例依次请求加锁,如果能和半数以上的实例加锁成功,就可认为客户端获取分布式锁成功。
整体的流程是这样的,一共分为 5 步:
import redis.clients.jedis.Jedis;
import redis.clients.jedis.JedisPool;
import redis.clients.jedis.Transaction;
import redis.clients.jedis.exceptions.JedisException;
import java.util.List;
import java.util.UUID;
public class DistributedLock {
private final JedisPool jedisPool;
public DistributedLock(JedisPool jedisPool) {
this.jedisPool = jedisPool;
}
/**
* 加锁
* @param locaName 锁的key
* @param acquireTimeout 获取超时时间
* @param timeout 锁的超时时间
* @return 锁标识
*/
public String lockWithTimeout(String locaName, long acquireTimeout, long timeout) {
Jedis conn = null;
String retIdentifier = null;
try {
// 获取连接
conn = jedisPool.getResource();
// 随机生成一个value
String identifier = UUID.randomUUID().toString();
// 锁名,即key值
String lockKey = "lock:" + locaName;
// 超时时间,上锁后超过此时间则自动释放锁
int lockExpire = (int)(timeout / 1000);
// 获取锁的超时时间,超过这个时间则放弃获取锁
long end = System.currentTimeMillis() + acquireTimeout;
while (System.currentTimeMillis() < end) {
if (conn.setnx(lockKey, identifier) == 1) {
conn.expire(lockKey, lockExpire);
// 返回value值,用于释放锁时间确认
retIdentifier = identifier;
return retIdentifier;
}
// 返回-1代表key没有设置超时时间,为key设置一个超时时间
if (conn.ttl(lockKey) == -1) {
conn.expire(lockKey, lockExpire);
}
try {
Thread.sleep(10);
} catch (InterruptedException e) {
Thread.currentThread().interrupt();
}
}
} catch (JedisException e) {
e.printStackTrace();
} finally {
if (conn != null) {
conn.close();
}
}
return retIdentifier;
}
/**
* 解锁
* @param lockName 锁的key
* @param identifier 解锁标识
* @return
*/
public boolean releaseLock(String lockName, String identifier) {
Jedis conn = null;
String lockKey = "lock:" + lockName;
boolean retFlag = false;
try {
conn = jedisPool.getResource();
while (true) {
// 监视lock,准备开始事务
conn.watch(lockKey);
//通过前面返回的value值判断是不是该锁,若是该锁,则删除,释放锁
if (identifier.equals(conn.get(lockKey))) {
Transaction transaction = conn.multi();
transaction.del(lockKey);
List<Object> results = transaction.exec();
if (results == null) {
continue;
}
retFlag = true;
}
conn.unwatch();
break;
}
} catch (JedisException e) {
e.printStackTrace();
} finally {
if (conn != null) {
conn.close();
}
}
return retFlag;
}
}
import redis.clients.jedis.JedisPool;
import redis.clients.jedis.JedisPoolConfig;
public class Service {
private static JedisPool pool = null;
static {
JedisPoolConfig config = new JedisPoolConfig();
// 设置最大连接数
config.setMaxTotal(200);
// 设置最大空闲数
config.setMaxIdle(8);
// 设置最大等待时间
config.setMaxWaitMillis(1000 * 100);
// 在borrow一个jedis实例时,是否需要验证,若为true,则所有jedis实例均是可用的
config.setTestOnBorrow(true);
pool = new JedisPool(config, "127.0.0.1", 6379, 3000);
}
DistributedLock lock = new DistributedLock(pool);
int n = 500;
public void seckill() {
// 返回锁的value值,供释放锁时候进行判断
String indentifier = lock.lockWithTimeout("resource", 5000, 1000);
System.out.println(Thread.currentThread().getName() + "获得了锁");
System.out.println(--n);
lock.releaseLock("resource", indentifier);
}
}
// 模拟线程进行秒杀服务
public class ThreadA extends Thread {
private Service service;
public ThreadA(Service service) {
this.service = service;
}
@Override
public void run() {
service.seckill();
}
}
public class Test {
public static void main(String[] args) {
Service service = new Service();
for (int i = 0; i < 50; i++) {
ThreadA threadA = new ThreadA(service);
threadA.start();
}
}
}
结果如下,结果为有序的。
若注释掉使用锁的部分
public void seckill() {
// 返回锁的value值,供释放锁时候进行判断
//String indentifier = lock.lockWithTimeout("resource", 5000, 1000);
System.out.println(Thread.currentThread().getName() + "获得了锁");
System.out.println(--n);
//lock.releaseLock("resource", indentifier);
}
cs