当前位置 博文首页 > JavaEdge全是干货的技术号:阿里P8级大佬详解并发编程里的设计模

    JavaEdge全是干货的技术号:阿里P8级大佬详解并发编程里的设计模

    作者:[db:作者] 时间:2021-07-21 12:46

    • 发消息和消费这俩操作之间是异步

    用户通过app发来一个请求,会被转换成消息发往MQ,等MQ返回结果后,将其返回至app。

    给MQ发消息的线程是处理请求的线程t1,但消费MQ结果的线程不是t1,那t1如何等待MQ的返回结果呢?

    class Message{
      String id;
      String content;
    }
    
    // 发消息
    void send(Message msg){
      ...
    }
    
    // MQ消息返回后会调用该方法
    // 执行该方法的线程 并非 发消息的线程
    void onMessage(Message msg){
      ...
    }
    
    // 处理app的请求
    Respond handle(){
      // 创建消息
      Message msg = new Message("1","{...}");
      // 发消息
      send(msg);
      // t1如何等待MQ返回的消息呢?
      String result = ...;
    }
    

    问题也就是异步转同步。

    什么是Guarded Suspension模式?


    比如离职前,约好了同事吃最后的晚餐

    网上预订了希尔顿一个包厢,然后直接过去了,到那儿后大堂经理看眼包厢,发现服务员还在收拾,通知我们:“包厢服务员正在收拾,wait。”
    过一会,大堂经理发现已经收拾完了,于是马上带我们去包厢就餐。

    【等待收拾完】和【等待MQ返回消息】本质相同,都是等一个条件:

    • 就餐需要等待包间收拾完
    • 程序需要等待MQ返回消息

    那现实里是如何解决这类问题的呢?

    大堂经理很重要,顾客是否等待,完全由他协调。
    程序也需要这样一个大堂经理,即设计模式:Guarded Suspension,“保护性地暂停”。

    • 【GuardedObject】:大堂经理

    • 【受保护对象】:包厢

    • 受保护对象的get():我们就餐

    • p:就餐的前提,即包厢收拾好了

    • 受保护对象的onChanged():服务员把包厢收拾好了,通过onChanged() fire 一个事件,该事件往往能改变p的计算结果。

    • 左侧线程:需要就餐的我们

    • 右侧线程:收拾包厢的服务员

    Guarded Suspension模式结构图

    GuardedObject的内部实现是管程的一个经典用法:

    class GuardedObject<T>{
      //受保护的对象
      T obj;
      final Lock lock = 
        new ReentrantLock();
      final Condition done =
        lock.newCondition();
      final int timeout=1;
      //获取受保护对象  
      T get(Predicate<T> p) {
        lock.lock();
        try {
          // MESA管程推荐写法
          while(!p.test(obj)){
          	// 通过条件变量的await()方法实现等待
            done.await(timeout, 
              TimeUnit.SECONDS);
          }
        }catch(InterruptedException e){
          throw new RuntimeException(e);
        }finally{
          lock.unlock();
        }
        //返回非空的受保护对象
        return obj;
      }
      //事件通知方法
      // 通过条件变量的signalAll()方法实现唤醒功能
      void onChanged(T obj) {
        lock.lock();
        try {
          this.obj = obj;
          done.signalAll();
        } finally {
          lock.unlock();
        }
      }
    }
    

    扩展Guarded Suspension模式

    Guarded Suspension“大堂经理”能否解决小灰同学遇到的问题。

    在处理Web请求的方法handleWebReq()中,可以调用GuardedObject#get()实现等待;在MQ消息的消费方法onMessage()中,可调用GuardedObject#onChanged()实现唤醒。

    // 处理浏览器请求
    Respond handleWebReq(){
      // 创建一消息
      Message msg1 = new Message("1","{...}");
      // 发送消息
      send(msg1);
      // 利用GuardedObject实现等待
      GuardedObject<Message> go = new GuardObjec<>();
      Message r = go.get(t->t != null);
    }
    
    void onMessage(Message msg){
      // 如何找到匹配的go?
      GuardedObject<Message> go=???
      go.onChanged(msg);
    }
    

    但问题是,handleWebReq()里创建了GuardedObject对象的实例go,并调用get()等待结果,那在onMessage()方法中,如何才能够找到匹配的GuardedObject对象呢?
    这类似服务员告诉大堂经理某某包间已经收拾好了,大堂经理如何根据包间找到就餐的人。现实世界里,大堂经理的头脑中,有包间和就餐人之间的关系图,所以服务员说完之后大堂经理立刻就能把就餐人找出来。

    参考大堂经理识别就餐人的办法,来扩展一下Guarded Suspension模式,从而使它能够很方便地解决小灰同学的问题。在小灰的程序中,每个发送到MQ的消息,都有一个唯一性的属性id,所以我们可以维护一个MQ消息id和GuardedObject对象实例的关系,这个关系可以类比大堂经理大脑里维护的包间和就餐人的关系。

    如何实现呢?下面代码是扩展Guarded Suspension,扩展后的

    GuardedObject内部维护了一个Map:

    • Key是MQ消息id
    • Value是GuardedObject对象实例

    静态方法:

    • create()
      创建一个GuardedObject对象实例,并根据key值将其加入到Map中
    • fireEvent()
      模拟的大堂经理根据包间找就餐人的逻辑。
    class GuardedObject<T>{
      //受保护的对象
      T obj;
      final Lock lock = 
        new ReentrantLock();
      final Condition done =
        lock.newCondition();
      final int timeout=2;
      //保存所有GuardedObject
      final static Map<Object, GuardedObject> 
      gos=new ConcurrentHashMap<>();
      //静态方法创建GuardedObject
      static <K> GuardedObject 
          create(K key){
        GuardedObject go=new GuardedObject();
        gos.put(key, go);
        return go;
      }
      static <K, T> void 
          fireEvent(K key, T obj){
        GuardedObject go=gos.remove(key);
        if (go != null){
          go.onChanged(obj);
        }
      }
      //获取受保护对象  
      T get(Predicate<T> p) {
        lock.lock();
        try {
          //MESA管程推荐写法
          while(!p.test(obj)){
            done.await(timeout, 
              TimeUnit.SECONDS);
          }
        }catch(InterruptedException e){
          throw new RuntimeException(e);
        }finally{
          lock.unlock();
        }
        //返回非空的受保护对象
        return obj;
      }
      //事件通知方法
      void onChanged(T obj) {
        lock.lock();
        try {
          this.obj = obj;
          done.signalAll();
        } finally {
          lock.unlock();
        }
      }
    }
    

    这样利用扩展后的GuardedObject来解决小灰同学的问题就很简单了,代码如下:

    //处理浏览器发来的请求
    Respond handleWebReq(){
      int id=序号生成器.get();
      //创建一消息
      Message msg1 = new 
        Message(id,"{...}");
      //创建GuardedObject实例
      GuardedObject<Message> go=
        GuardedObject.create(id);  
      //发送消息
      send(msg1);
      //等待MQ消息
      Message r = go.get(
        t->t != null);  
    }
    void onMessage(Message msg){
      //唤醒等待的线程
      GuardedObject.fireEvent(
        msg.id, msg);
    }
    

    总结

    Guarded Suspension模式本质上是一种等待唤醒机制,只不过Guarded Suspension模式将其规范化了。
    规范化好处是你无需重头思考如何实现,也无需担心实现程序的可理解性问题,同时也能避免一不小心写出个Bug来。但Guarded Suspension模式在解决实际问题的时候,往往还是需要扩展的,扩展的方式有很多,本篇文章就直接对GuardedObject的功能进行了增强,Dubbo中DefaultFuture这个类也是采用的这种方式。

    Guarded Suspension模式也常被称作Guarded Wait模式、Spin Lock模式(因为使用了while循环去等待),这些名字都很形象,不过它还有一个更形象的非官方名字:多线程版本的if。单线程场景中,if语句是不需要等待的,因为在只有一个线程的条件下,如果这个线程被阻塞,那就没有其他活动线程了,这意味着if判断条件的结果也不会发生变化了。但是多线程场景中,等待就变得有意义了,这种场景下,if判断条件的结果是可能发生变化的。所以,用“多线程版本的if”来理解这个模式会更简单。

    cs