分布式锁【数据库乐观锁实现的分布式锁、Zookeeper分布式锁原理、Redis实现的分布式锁】(三)-全面详解(学习总结---从入门到深化)

2023-07-13 20:48:38

 

目录

 分布式锁解决方案_数据库乐观锁实现的分布式锁

分布式锁解决方案_Redis实现的分布式锁原理

 分布式锁解决方案_Redis实现的分布式锁

分布式锁解决方案_Redis分布式锁误删除问题

分布式锁解决方案_Redis分布式锁不可重入问题

分布式锁解决方案_基于Redisson实现的分布式锁实现

分布式锁解决方案_Zookeeper分布式锁原理

 分布式锁解决方案_基于Zookeeper实现分布式锁

三种分布式锁对比


 

 分布式锁解决方案_数据库乐观锁实现的分布式锁

 什么是乐观锁

总是假设最好的情况,每次去拿数据的时候都认为别人不会修改, 所以不会上锁,但是在更新的时候会判断一下在此期间别人有没有 去更新这个数据,可以使用版本号机制和CAS算法实现。

 编写乐观锁更新语句

<update id="decreaseStockForVersion" parameterType="int" >
        UPDATE product SET count = count - # {count}, version = version + 1 WHERE id = #{id} AND count > 0 AND version = #{version}
</update>

编写创建订单业务层

/**
     * 创建订单 乐观锁
     *
     * @return
     */
    @Transactional(rollbackFor = Exception.class)
    @Override
    public String createOrder(Integer productId, Integer count) throws Exception {
        int retryCount = 0;
        int update = 0;
        // 1、根据商品id查询商品信息
        Product product = productMapper.selectById(productId);
        // 2、判断商品是否存在
        if (product == null) {
            throw new RuntimeException("购买商品不存在:" + productId + "不存在");
       }
        // 3、校验库存
        if (count > product.getCount()) {
            throw new Exception("库存不够");
       }
         // 乐观锁更新库存
        // 更新失败,说明其他线程已经修改过数据,本次扣减库存失败,可以重试一定次数或者返回
        // 最多重试3次
        while(retryCount < 3 && update == 0){
            update = this.reduceStock(product.getId(),count);
            retryCount++;
       }
        if (update == 0){
            throw new Exception("库存不够");
       }
        // 6、 创建订单
        TOrder order = new TOrder();
        order.setOrderStatus(1);//待处理
        order.setReceiverName("张三");
        order.setReceiverMobile("18587781068");
        order.setOrderAmount(product.getPrice().multiply(new BigDecimal(count)));//订单价格
        baseMapper.insert(order);
        // 7、 创建订单和商品关系数据
        OrderItem orderItem = new OrderItem();
        orderItem.setOrderId(order.getId());
        orderItem.setProduceId(product.getId());
        orderItem.setPurchasePrice(product.getPrice());
        orderItem.setPurchaseNum(count);
        orderItemMapper.insert(orderItem);
        return order.getId();
   }
    /**
     * 减库存
     * <p>
     * 由于默认的事务隔离级别是可重复读,produce.findById()
     * 得到的数据始终是相同的,所以需要提取 reduceStock方法。每次循环都启动新的事务尝试扣减库存操作。
     */
    @Transactional(rollbackFor = Exception.class)
    public int reduceStock(int gid,int count) {
        int result = 0;
        //1、查询商品库存
        Product product = productMapper.selectById(gid);
        //2、判断库存是否充足
        if (product.getCount() >= count) {
            //3、减库存
            // 乐观锁更新库存
            result = productMapper.decreaseStockForVersion(gid,count, product.getVersion());
       }
        return result;
   }

分布式锁解决方案_Redis实现的分布式锁原理

获取锁

互斥:确保只有一个线程获得锁

# 添加锁 利用setnx的互斥性
127.0.0.1:6379> setnx lock thread1

释放锁

1、手动释放锁

2、超时释放:获取锁时设置一个超时时间

#释放锁 删除即可
127.0.0.1:6379> del lock

超时释放

127.0.0.1:6379> setnx lock tread1
127.0.0.1:6379> expire lock 5
127.0.0.1:6379> ttl lock

两步合成一步

help set
 SET key value [EX seconds] [PX milliseconds] [NX|XX]
 summary: Set the string value of a key
 since: 1.0.0
 group: string
127.0.0.1:6379> get k1
(nil)
127.0.0.1:6379> set lock k1 ex 5 nx
OK
127.0.0.1:6379> set lock k1 ex 5 nx
nil

 

 分布式锁解决方案_Redis实现的分布式锁

 引入依赖

        <dependency>
            <groupId>org.springframework.boot</groupId>
            <artifactId>spring-boot-starter-data-redis</artifactId>
        </dependency>

添加Redis配置

spring:
 redis:
   host: localhost
   port: 6379

编写创建订单实现类

@Override
    public String createOrderRedis(Integer productId, Integer count) throws Exception {
        log.info("*************** 进入方法 **********");
        String key = "lock:";
        String value = UUID.randomUUID().toString();
        // 获取分布式锁
        Boolean result = stringRedisTemplate.opsForValue().setIfAbsent(key+productId,String.valueOf(Thread.currentThread().getId()),30,TimeUnit.SECONDS);
        // 判断是否获取锁成功
        if (!result){
            log.info("我进入了锁");
            return "不允许重复下单";
       }
        try {
            // 1、根据商品id查询商品信息
            Product product = productMapper.selectById(productId);
            // 2、判断商品是否存在 if (product == null) {
                throw new RuntimeException("购买商品不存在:" + productId + "不存在");
           }
            // 3、校验库存
            if (count > product.getCount()) {
                throw new RuntimeException("商品" + productId + "仅剩" + product.getCount() + "件,无法购买");
           }
            // 4、计算库存
            Integer leftCount = product.getCount() - count;
            // 5、更新库存
            product.setCount(leftCount);
            productMapper.updateById(product);
            // 6、 创建订单
            TOrder order = new TOrder();
            order.setOrderStatus(1);//待处理
            order.setReceiverName("张三");
            order.setReceiverMobile("18587781068");
            order.setOrderAmount(product.getPrice().multiply(new BigDecimal(count)));//订单价格
            baseMapper.insert(order);
            // 7、 创建订单和商品关系数据
            OrderItem orderItem = new OrderItem();
            orderItem.setOrderId(order.getId());
            orderItem.setProduceId(product.getId());
            orderItem.setPurchasePrice(product.getPrice());
            orderItem.setPurchaseNum(count);
            orderItemMapper.insert(orderItem);
            return order.getId();
       }catch (Exception e){
            e.printStackTrace();
       }finally {
            // 释放锁
          stringRedisTemplate.delete(key+productId);
       }
        return "创建失败";
   }

分布式锁解决方案_Redis分布式锁误删除问题

 

 配置锁标识

private static final String KEY_PREFIX = "lock:";
private static final String ID_PREFIX = UUID.randomUUID().toString().replace("-" ,"");

获取锁

//1、获取线程标识
String threadId = ID_PREFIX + Thread.currentThread().getId();
// 2、获得锁 setnx key   value   time   type
Boolean result = stringRedisTemplate.opsForValue().setIfAbsent(KEY_PREFIX+produceId, threadId, 30,TimeUnit.SECONDS);

释放锁

// 获取锁标识
  String s = stringRedisTemplate.opsForValue().get(KEY_PREFIX + produceId);
            // 判断标识是否一致
            if (s.equals(threadId)){
                // 释放锁
               stringRedisTemplate.delete(KEY_PREFIX + produceId);
           }

分布式锁解决方案_Redis分布式锁不可重入问题

 不可重入问题

 如何解决

 

分布式锁解决方案_基于Redisson实现的分布式锁实现

 Redisson介绍

Redisson - 是一个高级的分布式协调Redis客服端,能帮助用户在分 布式环境中轻松实现一些Java的对象,Redisson、Jedis、Lettuce 是三个不同的操作 Redis 的客户端,Jedis、Lettuce 的 API 更侧重 对 Reids 数据库的 CRUD(增删改查),而 Redisson API 侧重于分布式开发。

引入Redisson依赖

<dependency>
     <groupId>org.redisson</groupId>
     <artifactId>redisson-spring-boot-starter</artifactId>
     <version>3.17.2</version>
</dependency>

添加Reids的配置

spring:
 redis:
   host: localhost
   port: 6379

编写Redis分布式锁工具类

package com.itbaizhan.lock.utils;
import lombok.extern.slf4j.Slf4j;
import org.redisson.api.RLock;
import org.redisson.api.RedissonClient;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.stereotype.Component;
import java.util.concurrent.TimeUnit;
@Component
@Slf4j
public class DistributedRedisLock {
    @Autowired
    private RedissonClient redissonClient;
    // 加锁
    public Boolean lock(String lockName) {
        if (redissonClient == null) {
            log.info("DistributedRedisLock redissonClient is null");
            return false;
       }
        try {
            RLock lock = redissonClient.getLock(lockName);
            // 锁15秒后自动释放,防止死锁
            lock.lock(15, TimeUnit.SECONDS);
            log.info("Thread [{}] DistributedRedisLock lock [{}] success",Thread.currentThread().getName(), lockName);
            // 加锁成功
            return true;
       } catch (Exception e) {
            log.error("DistributedRedisLocklock [{}] Exception:", lockName, e);
            return false;
       }
   }
    // 释放锁
    public Boolean unlock(String lockName) {
        if (redissonClient == null) {
            log.info("DistributedRedisLock redissonClient is null");
            return false;
       }
        try {
            RLock lock = redissonClient.getLock(lockName);
            lock.unlock();
            log.info("Thread [{}] DistributedRedisLock unlock [{}] success",Thread.currentThread().getName(), lockName);
            // 释放锁成功
            return true;
} catch (Exception e) {
            log.error("DistributedRedisLock unlock [{}] Exception:", lockName, e);
            return false;
       }
   }
}

编写创建订单接口实现

/**
     * Redis锁实现
     *
     * @param productId
     * @param count
     * @return
     * @throws Exception
     */
    @Override
    public String createOrderRedis(Integer productId, Integer count) throws Exception {
        //获取锁对象
        if (distributedRedisLock.lock(String.valueOf(productId))) {
            try {
                // 1、根据商品id查询商品信息
                Product product = productMapper.selectById(productId);
                // 2、判断商品是否存在
                if (product == null) {
                throw new RuntimeException("购买商品不存在:" + productId + "不存在");
               }
                // 3、校验库存
                if (count > product.getCount())
               {
                    throw new RuntimeException("商品" + productId + "仅剩" + product.getCount() + "件,无法购买");
               }
                // 4、计算库存
                Integer leftCount = product.getCount() - count;
                // 5、更新库存
                product.setCount(leftCount);
                productMapper.updateById(product);
                // 6、 创建订单
                TOrder order = new TOrder();
                order.setOrderStatus(1);//待处理
                order.setReceiverName("张三");
                order.setReceiverMobile("18587781068");
                order.setOrderAmount(product.getPrice().multiply(new BigDecimal(count)));//订单价格
                baseMapper.insert(order);
                // 7、 创建订单和商品关系数据
                OrderItem orderItem = new OrderItem();
                orderItem.setOrderId(order.getId());
                orderItem.setProduceId(product.getId());
                orderItem.setPurchasePrice(product.getPrice());
                orderItem.setPurchaseNum(count);
                orderItemMapper.insert(orderItem);
                return order.getId();
           } catch (Exception e) {
                e.printStackTrace();
           } finally {
               distributedRedisLock.unlock(String.valueOf(productId));
           }
       }
        return "创建失败";
   }

分布式锁解决方案_Zookeeper分布式锁原理

 公平锁和可重入锁的原理

 

 这种排队取水模型,就是一种锁的模型。

什么是可重入锁呢?

 

 创建临时顺序节点

[zk: localhost:2181(CONNECTED) 1] create -e  -s
/test 123

 

 ZK分布式锁的实现原理

当第一个客户端请求过来时,Zookeeper 客户端会创建一个持久节 点 locks。如果它(Client1)想获得锁,需要在 locks 节点下创建 一个顺序节点 lock1。

 接着,客户端 Client1 会查找 locks 下面的所有临时顺序子节点,判 断自己的节点 lock1 是不是排序最小的那一个,如果是,则成功获得锁。

 

 这时候如果又来一个客户端 client2 前来尝试获得锁,它会在 locks 下再创建一个临时节点 lock2。

 客户端 client2 一样也会查找 locks 下面的所有临时顺序子节点,判 断自己的节点 lock2 是不是最小的,此时,发现 lock1 才是最小 的,于是获取锁失败。获取锁失败,它是不会甘心的,client2 向它 排序靠前的节点 lock1 注册 Watcher 事件,用来监听 lock1 是否存 在,也就是说 client2 抢锁失败进入等待状态。

 

 此时,如果再来一个客户端Client3来尝试获取锁,它会在 locks 下 再创建一个临时节点 lock3。

 

 同样的,client3 一样也会查找 locks 下面的所有临时顺序子节点, 判断自己的节点 lock3 是不是最小的,发现自己不是最小的,就获 取锁失败。它也是不会甘心的,它会向在它前面的节点 lock2 注册 Watcher 事件,以监听 lock2 节点是否存在。

 

 释放锁

如果是任务完成,Client1 会显式调用删除 lock1 的指令。


 如果是客户端故障了,根据临时节点得特性,lock1 是会自动删除的。

 

 lock1 节点被删除后,Client2 可开心了,因为它一直监听着 lock1。lock1 节点删除,Client2 立刻收到通知,也会查找 locks 下面的所有临时顺序子节点,发下 lock2 是最小,就获得锁。

 

 同理,Client2 获得锁之后,Client3 也对它虎视眈眈:

 分布式锁解决方案_基于Zookeeper实现分布式锁

 简介

Apache Curator是一个比较完善的ZooKeeper客户端框架,通过封 装的一套高级API 简化了ZooKeeper的操作。

 

 引入Curator依赖

       <dependency>
         <groupId>org.apache.curator</groupId>
          <artifactId>curator-framework</artifactId>
            <version>5.2.0</version>
        </dependency>
        <dependency>
            <groupId>org.apache.curator</groupId>
            <artifactId>curator-recipes</artifactId>
            <version>5.2.0</version>
        </dependency>
        <dependency>
            <groupId>org.apache.curator</groupId>
            <artifactId>curator-client</artifactId>
            <version>5.2.0</version>
        </dependency>

编写Zookeeper配置

@Configuration
public class ZookeeperConfig {
    @Bean
    public CuratorFramework zookeeperClient() {
        CuratorFramework client = CuratorFrameworkFactory.builder()
               .connectString("127.0.0.1:2181")
               .sessionTimeoutMs(5000)
               .connectionTimeoutMs(5000)
               .retryPolicy(new ExponentialBackoffRetry(1000, 3))
               //.namespace("test")
               .build();
        client.start();
        return client;
   }
}

编写创建订单接口实现

使用InterProcessMutex的acquire和release方法,来获取和释放锁。

@Autowired
    CuratorFramework client;
    @Override
    public String createOrderZookeeper(Integer productId, Integer count) throws Exception 
     {
        // client cruator中zk客户端对象   path 抢锁路径同一个锁path需要一致
        InterProcessMutex lock = new InterProcessMutex(client, "/lockPath");
        //第一个属性:定时的时间数字
        //第二个属性:定义时间的单位
        if (lock.acquire(3, TimeUnit.SECONDS))
          {
            try {
                // 1、根据商品id查询商品信息
                Product product = productMapper.selectById(productId);
                // 2、判断商品是否存在
                if (product == null) {
            throw new RuntimeException("购买商品不存在:" + productId + "不存在");
               }
                // 3、校验库存
                if (count > product.getCount())
                 {
                    throw new RuntimeException("商品" + productId + "仅剩" +
product.getCount() + "件,无法购买");
               }
                // 4、计算库存
                Integer leftCount = product.getCount() - count;
                // 5、更新库存
                product.setCount(leftCount);
                productMapper.updateById(product);
                // 6、 创建订单
                TOrder order = new TOrder();
                order.setOrderStatus(1);//待处理
                order.setReceiverName("张三");
                order.setReceiverMobile("18587781068");
                order.setOrderAmount(product.getPrice().multiply(new BigDecimal(count)));//订单价格
                baseMapper.insert(order);
                // 7、 创建订单和商品关系数据
                OrderItem orderItem = new OrderItem();
                orderItem.setOrderId(order.getId());
                orderItem.setProduceId(product.getId());
                orderItem.setPurchasePrice(product.getPrice());
                orderItem.setPurchaseNum(count);
                orderItemMapper.insert(orderItem);
                return order.getId();
           } finally {
                lock.release();
           }
       }
        return "创建失败";
   }

三种分布式锁对比

 数据库分布式锁实现

优点:简单,使用方便,不需要引入 Redis、Zookeeper 等中间件。

缺点:1、不适合高并发的场景 2、db 操作性能较差

Redis 分布式锁实现

优点:1、性能好,适合高并发场景 2、较轻量级 3、有较好的框架支持,如 Redisson

 缺点:1、过期时间不好控制 2、需要考虑锁被别的线程误删场景

Zookeeper 分布式锁实现

优点:1、有较好的性能和可靠性 2、有封装较好的框架,如 Curator

缺点:1、性能不如 Redis 实现的分布式锁  2、比较重的分布式锁。

汇总对比

1、从性能角度:Redis > Zookeeper >= 数据库

2、从实现的复杂性角度:Zookeeper > Redis > 数据库

3、从可靠性角度:Zookeeper > Redis > 数据库

更多推荐

git命令的操作

git命令操作及命令大全1.创建一个新的本地仓库:2.添加文件到仓库:3.远程仓库操作:4.分支操作:5.git命令大全1.创建一个新的本地仓库:使用命令gitinit在本地目录中创建一个新的git仓库。2.添加文件到仓库:使用命令gitadd<文件名>将文件添加到暂存区。使用命令gitcommit-m"提交信息"将文

新款 锐科达 SV-2102VP SIP广播音频模块 RTP流音频广播

新款锐科达SV-2102VPSIP广播音频模块RTP流音频广播SV-2102VP和SV-2103VP网络音频模块是一款通用的独立SIP音频功能模块,可以轻松地嵌入到OEM产品中。该模块对来自网络的SIP协议及RTP音频流进行编解码。本系列模块可以应用于以下领域:•各种商业网络音频流应用•网络报警器,网络播放器•用于寻呼

计算机视觉与深度学习-图像分割-视觉识别任务03-实例分割-【北邮鲁鹏】

目录参考定义MarkR-CNN结构思路MaskR-CNN训练阶段使用的Mask样例MaskR-CNN实例分割结果MaskR-CNN检测姿态参考论文题目:MaskR-CNN论文链接:论文下载论文代码:Facebook代码链接;Tensorflow版本代码链接;KerasandTensorFlow版本代码链接;MxNet版

docker day01

docker:是一个容器管理工具。podman,pouch,lxc,rkt,...2013开源--->iT福音--->2015~coreOs--->Redhat--->IBM什么是容器:生活中装东西的,比如水杯,碗,水缸,鱼缸,...在计算机中,容器装的就是文件,只不过这些文件是一类程序,包括该程序依赖的库文件。LIN

基于未知环境碰撞冲突预测的群机器人多目标搜索研究

源自:指挥与控制学报作者:边晓荟周少武张红强吴亮红王汐王茂刘朝华陈磊“人工智能技术与咨询”发布摘要群机器人在未知动态环境下进行多目标搜索时,存在碰撞预测和搜索效率不高等问题。提出了一种碰撞几何锥和改进惯性权重的粒子群优化算法相结合的多目标搜索策略。首先,根据静、动态威胁物的不同分别引入碰撞锥(CC)和速度障碍法(VO)

MySQL 自动根据年份动态创建范围分区

前言要在MySQL中按年对日期进行分区,可以使用自动递增存储过程的方式实现动态分区,它并没有像Oracle里面的**INTERVAL(numtoyminterval(1,‘year’))**方法。创建动态分区1.创建一个包含所有分区的表,并使用InnoDB存储引擎。例如,创建一个名为mytable的表。CREATETA

51单片机 LCD1602

LCD1602.H#ifndef__LCD1602_H__#define__LCD1602_H__//用户调用函数:voidLCD_Init();voidLCD_ShowChar(unsignedcharLine,unsignedcharColumn,charChar);voidLCD_ShowString(unsig

Python常用做题笔记

1列表/队列/堆栈构建列表:[]声明列表比list()方法效率高声明队列和堆栈:法1:使用[]/list()方法模拟法2:使用collections.deque双端队列:模拟队列append\popleft模拟堆栈append\popcollections.deque():声明双端队列,初始化传入可迭代对象就行,例如列

AI-Toolbox

集合你需要的所有AI工具。(GatheralltheAItoolsyouneed.)1.AI绘画工具(AIdrawingtool)Midjourney:目前最强的AI绘画工具。StableDiffusion:最强开源AI绘画工具。Civital:AI艺术共享平台。HuggingFace:开源SD模型下载。VEGA:国内

统计报告期节能率

声明本文是学习GB-T29314-2023电动机系统节能改造规范.而整理的学习笔记,分享出来希望更多人受益,如果存在侵权请及时联系我们7综合评估7.1一般规定7.1.1电动机系统节能改造后,应对设备运行效果进行评估,改造后系统的生产效率和系统分界点指标应达到改造设计要求。7.1.2电动机系统改造后,应对与改造设备相关的

苏宁易购商品详情数据接口

苏宁易购商品详情数据接口采集方法如下:下载安装载图助手软件并打开苏宁易购平台,找到需要采集的商品类目。点击一个商品进入详情页,启用软件的批量下载功能并打开“自动粘贴网址”。复制商品上方的链接,链接会自动传送到首页的地址栏里。全部复制完成后,在选项中把“下载主图”、“同时下载视频”、“下载详情图”三项打勾。单击“立即下载

热文推荐