当前位置 博文首页 > 外星喵的博客:Redis实现分布式锁

    外星喵的博客:Redis实现分布式锁

    作者:[db:作者] 时间:2021-07-03 22:07

    分布式锁

    什么是分布式锁?

    在传统单体应用单机部署的情况下,可以使用本地锁如ReentrantLcok 或 synchronized进行互斥控制来解决。但是,随着业务的发展,系统架构也会逐步优化升级,原本单体单机部署的系统被演化成分布式集群系统,由于分布式系统多线程、多进程并且分布在多个不同机器上,这将使原单机部署情况下的并发控制锁策略无法满足需求。

    锁的作用是保证多个进程或线程在并发操作操作共享资源时资源的正确性。在分布式应用中,一个服务需要部署多个实例,对于操作分布式环境下的共享资源,就需要使用分布式锁来保证操作的正确性。

    在分布式环境中,对资源进行上锁有时候是很重要的,比如秒杀、抢购某一资源,使用分布式锁就可以很好地控制资源。

    分布式锁的意义

    • 处理效率提升:应用分布式锁,可以减少重复任务的执行,避免资源处理效率的浪费;

    • 数据准确性保障:使用分布式锁可以放在数据资源的并发访问,避免数据不一致情况,甚至数据损失等。

    分布式锁的特点

    分布式锁应该具有互斥、可重入、锁超时,高可用等特点。

    其中前面几个特点和本地锁具体的特点相同,高可用是分布式锁需要具备的重要的特点。

    为什么可以用Redis 做分布式锁?

    在CAP 理论基础下,大家都会牺牲强一致性来换取系统的高可用性,这样我们很多的场景,其实是只需为了保证数据的最终一致性。

    要实现分布式锁,需要具备以下条件:

    1. 在分布式系统环境下,共享资源在同一时间只能被一个机器的一个线程使用;
    2. 获取锁与释放锁的高可用及高性能;
    3. 具备非阻塞锁特性,获取不到锁将直接返回获取锁失败;
    4. 具备锁失效机制,防止死锁。

    而redis恰恰具备以下特点:

    • 高性能:Redis 读写性能高,可以应对高并发的锁操作场景
    • 高可靠:Redis 是分布式系统,具备高可用方案

    使用 Redis 实现分布式锁应该具备满足两个条件: 1. 加锁和解锁过程必须是原子操作;2. 保证高可用。

    Redis实现分布式锁的原理

    使用命令介绍

    setnx:

    SETNX key val:当且仅当 key 不存在时,set 一个 key 为 val 的字符串,返回 1;若 key 存在,则什么都不做,返回 0。

    expire:

    expire key val:为 key 设置一个超时时间,单位为 second,超过这个时间锁会自动释放,避免死锁。

    delete:

    del key:在使用 Redis 实现分布式锁的时候,主要就会使用到这三个命令。

    实现思想

    • 获取锁的时候,使用 setnx 加锁,并使用 expire 命令为锁添加一个超时时间,超过该时间则自动释放锁,锁的 value 值为一个随机生成的 UUID,通过此在释放锁的时候进行判断。
    • 获取锁的时候还设置一个获取的超时时间,若超过这个时间则放弃获取锁。
    • 释放锁的时候,通过 UUID 判断是不是该锁,若是该锁,则执行 delete 进行锁释放。

    Redis 实现分布式锁的方案

    基于单节点实现

    线上的 Redis 部署一般都是集群模式,基于单节点实现,即在集群模式下只会对一个 master 进行加解锁操作,至于是哪个 master,则需要根据 Redis 的 key 计算出的哈希槽来决定(具体可以去了解 Redis Cluster 模式)。

    加锁、解锁操作可能需要多个操作,需要保证操作的原子性,通过 Redis 的单命令和 Lua 脚本两种方式实现原子操作。

    加锁

    加锁注意事项

    • 1、加锁过程要保证原子性
    • 2、保证谁加的锁只能被谁解锁,即 Redis 加锁的 value,解锁时需要传入相同的 value 才能成功,保证 value 唯一性
    • 3、设置锁超时时间,防止加锁方异常无法释放锁时其他客户端无法获取锁,同时,超时时间要大于业务处理时间

    使用 Redis 命令 SET lock_key unique_value NX EX seconds进行加锁,单命令操作,Redis 是串行执行命令,所以能保证只有一个能加锁成功。

    解锁

    解锁通过DEL命令来删除,为了避免错误的解锁(A 加锁,B 解锁),所以需要比较 value,整个过程为了保证原子性,所以使用 Lua 脚本(unlock.script)

    此实现方案是基于单个节点保存锁信息,不具备高可靠性。要保证高可靠,则要基于多个节点实现。

    还有一个缺点,可能存在设置的过期时间已到,业务处理还未结束就提前释放锁,其他请求可以继续获取到锁。解决方案就是使用Redisson,大致的解决原理是,开启一个守护线程定期检查锁是否存在,如存在则延长 key 的时间,尝试获取的客户端则通过自旋的方式获取锁。

    基于多个节点实现

    RedLock 算法

    由 Redis 的开发者 Antirez 提出,RedLock 算法的基本思路是让客户端和多个独立的 Redis 实例依次请求加锁,如果能和半数以上的实例加锁成功,就可认为客户端获取分布式锁成功。

    整体的流程是这样的,一共分为 5 步:

    1. 客户端先获取「当前时间戳 T1」
    2. 客户端依次向这 5 个 Redis 实例发起加锁请求(用前面讲到的 SET 命令),且每个请求会设置超时时间(毫秒级,要远小于锁的有效时间),如果某一个实例加锁失败(包括网络超时、锁被其它人持有等各种异常情况),就立即向下一个 Redis 实例申请加锁
    3. 如果客户端从 >=3 个(大多数)以上 Redis 实例加锁成功,则再次获取「当前时间戳 T2」,如果 T2 - T1 < 锁的过期时间,此时,认为客户端加锁成功,否则认为加锁失败
    4. 加锁成功,去操作共享资源(例如修改 MySQL 某一行,或发起一个 API 请求)
    5. 加锁失败,向「全部节点」发起释放锁请求(前面讲到的 Lua 脚本释放锁)

    Java代码实现

    加锁

    import redis.clients.jedis.Jedis;
    import redis.clients.jedis.JedisPool;
    import redis.clients.jedis.Transaction;
    import redis.clients.jedis.exceptions.JedisException;
    import java.util.List;
    import java.util.UUID;
    
    public class DistributedLock {
    	
      private final JedisPool jedisPool;
    	
      public DistributedLock(JedisPool jedisPool) {
    		this.jedisPool = jedisPool;
    	}
    
      /**
    	 * 加锁
    	 * @param locaName 锁的key
    	 * @param acquireTimeout 获取超时时间
       * @param timeout 锁的超时时间
       * @return 锁标识
       */
    
       public String lockWithTimeout(String locaName, long acquireTimeout, long timeout) {
    			Jedis conn = null;
    			String retIdentifier = null;
    			try {
    				// 获取连接
    				conn = jedisPool.getResource();
    				// 随机生成一个value
    				String identifier = UUID.randomUUID().toString();
    				// 锁名,即key值
    				String lockKey = "lock:" + locaName;
    				// 超时时间,上锁后超过此时间则自动释放锁
    				int lockExpire = (int)(timeout / 1000);
    				// 获取锁的超时时间,超过这个时间则放弃获取锁
    				long end = System.currentTimeMillis() + acquireTimeout;	
    				while (System.currentTimeMillis() < end) {
    						if (conn.setnx(lockKey, identifier) == 1) {
    								conn.expire(lockKey, lockExpire);
    								// 返回value值,用于释放锁时间确认
    								retIdentifier = identifier;
    								return retIdentifier;
    						}
    					// 返回-1代表key没有设置超时时间,为key设置一个超时时间
    						if (conn.ttl(lockKey) == -1) {
    								conn.expire(lockKey, lockExpire);
    						}
              try {
    		          Thread.sleep(10);
              } catch (InterruptedException e) {
    		          Thread.currentThread().interrupt();
              }
    			 }
    			} catch (JedisException e) {
    					e.printStackTrace();
    			} finally {
    					if (conn != null) {
    							conn.close();	
    			}
    		}
    		return retIdentifier;
    	}
    

    解锁

    /**
    * 解锁
    * @param lockName 锁的key
    * @param identifier 解锁标识
    * @return
    */
    public boolean releaseLock(String lockName, String identifier) {
    
    		Jedis conn = null;
        String lockKey = "lock:" + lockName;
    		boolean retFlag = false;
        try {
    	    conn = jedisPool.getResource();
    	    while (true) {
    		    // 监视lock,准备开始事务
    		    conn.watch(lockKey);
    		    //通过前面返回的value值判断是不是该锁,若是该锁,则删除,释放锁
        		if (identifier.equals(conn.get(lockKey))) {
                Transaction transaction = conn.multi();
                transaction.del(lockKey);		
                List<Object> results = transaction.exec();
                if (results == null) {
                  continue;
                }	
    			    	retFlag = true;
    	    	}
    				conn.unwatch();
    				break;
    		}
    	} catch (JedisException e) {
    		e.printStackTrace();
    	} finally {
    		if (conn != null) {
          conn.close();
        }
    	}
    		return retFlag;
      }
    
    }
    

    测试示例

    import redis.clients.jedis.JedisPool;
    
    import redis.clients.jedis.JedisPoolConfig;
    
    
    public class Service {
    
    	private static JedisPool pool = null;
    
        static {
    
          JedisPoolConfig config = new JedisPoolConfig();
    
          // 设置最大连接数
    
          config.setMaxTotal(200);
    
          // 设置最大空闲数
    
          config.setMaxIdle(8);
    
          // 设置最大等待时间
    
          config.setMaxWaitMillis(1000 * 100);
    
          // 在borrow一个jedis实例时,是否需要验证,若为true,则所有jedis实例均是可用的
    
          config.setTestOnBorrow(true);
    
          pool = new JedisPool(config, "127.0.0.1", 6379, 3000);
    
       }
    
          DistributedLock lock = new DistributedLock(pool);
    
          int n = 500;
    
          
      	public void seckill() {
    
            // 返回锁的value值,供释放锁时候进行判断
    
            String indentifier = lock.lockWithTimeout("resource", 5000, 1000);
    
            System.out.println(Thread.currentThread().getName() + "获得了锁");
    
            System.out.println(--n);
    
            lock.releaseLock("resource", indentifier);
    
        	}
    
    }
    
    // 模拟线程进行秒杀服务
    
    public class ThreadA extends Thread {
    
      private Service service;
    
      public ThreadA(Service service) {
    
      		this.service = service;
    
    	}
    
    	@Override
    
      public void run() {
    
          service.seckill();
    
      }
    
    }
    
    public class Test {
    
    public static void main(String[] args) {
    
        Service service = new Service();
    
        for (int i = 0; i < 50; i++) {
    
        ThreadA threadA = new ThreadA(service);
    
        threadA.start();
    
        }
    
      }
    
    }
    
    结果如下,结果为有序的。
    
    若注释掉使用锁的部分
    
    public void seckill() {
    
      // 返回锁的value值,供释放锁时候进行判断
    
      //String indentifier = lock.lockWithTimeout("resource", 5000, 1000);
    
      System.out.println(Thread.currentThread().getName() + "获得了锁");
    
      System.out.println(--n);
    
      //lock.releaseLock("resource", indentifier);
    
    }
    
    cs
    下一篇:没有了