当前位置 博文首页 > hellohello-tom:Event Sourcing落地与意义

    hellohello-tom:Event Sourcing落地与意义

    作者:hellohello-tom 时间:2021-02-04 16:25

    jsoncat:https://github.com/Snailclimb/jsoncat (仿 Spring Boot 但不同于 Spring Boot 的一个轻量级的 HTTP 框架)

    高内聚低耦合一直是程序设计提倡的方式,但是很多人在实际项目中一直再使用面向过程的编码,导致代码臃肿不堪,项目维护难度日益增加,在我接触的初中高级程序员中,很多人曾问我如何从设计阶段初期尽量避免日后带来维护难度,今天就从Event Soucing(事件溯源)模式聊聊如何优化我们的业务系统。

    枯燥的理论知识避不可免,下面我尽量以代码形式去演示事件驱动给在我们业务编程中带来的好处。

    什么是Event Sourcing ? 简单来说,大家应该都知道mysql 数据同步中的binlog模式,我们在执行一条查询语句 select * from Person limit 1 时看到的数据可以理解为当前时间的快照,眼前的这条数据,可能经历过若干update语句之后才得到的结果,事件溯源亦如此,如果我们把某一行数据 看做Person对象,一个对象从开始到消亡会经历过很多事件(update语句),如果我们要还原某个时间点的对象,只需按照按照事件的产生日期,按照顺序依次在初始化对象上依次叠加上去,就能还原这一时期的对象了,举个例子一个person(张三)对象

    Person zs = new Person(); 张三出生了

    6岁 ?? 学生

    25岁 ?? 警察

    60岁 ?? 退休老人

    虽然都是张三对象,但是不同时间段里张三的身份截然不同,如果我们要获取警察时代的zs,我们用初始得到的zs依次累加上学生时代,警察时代就可以得到这一时代的zs对象了。

    由此来看,对象好像显得已经不那么重要,事件溯源更加具有意义,因为它完整描述了这个对象从出生到消亡的全过程,也可以看为不断在改变对象的状态,事件是只会增加不会修改,对于现如今大数据时代,事件的产生对于数据挖掘、数据分析更具有意义。

    扯了这么多,还是要以代码来实际说说事件驱动带来的好处,先看一处经典的代码

    StockService.java

    @Service
    @AllArgsConstructor
    public class StockService extends BaseMapper<Product> {
        //京东服务
        private final JdService jdService;
        //淘宝服务
        private final TaobaoService productService;
        //有赞服务
        private final YouzanService youzanService;
        //拼多多服务
        private final pddService pddService;
        //更多服务
        ...
    
        //设置商品库存
        @Override
        public void changeProductStock(ChangeProductStockInputDTO inputDTO) {
            if(inputDTO.getStock<0){
              throw new BusinessException("库存不能小于0");
            }
            Product product = baseMapper.getById(inputDTO.getId());
            product.setStock(inputDTO.getStock());
            baseMapper.updateById(product);
            //通知京东
            jdService.notify();
            //通知淘宝
            productService.notify();
            //通知有赞
            youzanService.notify();
            //更多需要执行的业务...
        }
    }
    

    Product.java

    @Data
    public class Product {
        //id
        private String id;
        
        //库存
        private BigDecimal stock;
        
        //...
    }
    

    例如比如在电商系统中,在我们自己的商品后台中修改商品库存后,我们要依次告知在其他第三方平台这个商品库存信息,我相信很多同学都会这样写的吧,这样的代码确实可以完成我们的业务功能,但随着业务功能的复杂度提升,加上我们面向过程的编码模式,一定会越加复杂,曾看到有将近5000多行的一个订单类,相信不管谁看见这样的类都会头大,接下来我们就要想办法优化它,安排!

    首先存在这样的代码是因为没有划清边界,没有保持一个领域中的纯粹性,从StockService中注入大量的服务类与标志性的贫血模型Product对象就能看出,既然我们提倡以高内聚低耦合去编写代码,那首先去修改我们的Product吧,让它变得丰富起来。

    改变的
    Product.java

    @Data
    public class Product {
        
        public void changeStock(BigDecimal stock){
          if(delStatus == 1){
            throw new BusinessException("商品信息不存在");
          }
          if(stock < 0){
            throw new BusinessException("库存不能小于0");
          }
          this.stock = stock;
          EventBus.instance().register(new ChangedProductStockDomainEvent(this));
        }
        
        //id
        private String id;
        
        //库存
        private BigDecimal stock;
        
        //删除状态
        private int delStatus;
        
        //...
    }
    
    //名字尽量起得生动一些,单词语法的过去式,现在进行时都具有意义
    @Getter
    @AllArgsConstructor
    public class ChangedProductStockDomainEvent {
        
       private Product product;
    }
    

    更改的 StockService.java

    @Service
    @AllArgsConstructor
    public class StockService extends BaseMapper<Product> {
    
        //设置商品库存
        @Override
        public void changeProductStock(ChangeProductStockInputDTO inputDTO) {
            Product product = baseMapper.getById(inputDTO.getId());
            product.setProductStock(inputDTO.getProductStock());
        }
    }
    

    更改过后的代码是不是看起来清爽了很多,加上我们赋予了Product对象方法之后,职责看起来就更加明确,充血模型体现出聚合内单一的行为,在Product中我们只描述了此领域范围的职能,已经充分体现了高内聚低耦合的思想,不参合其他业务逻辑。这时可能有的同学会问那怎么持久化到数据库呢?在我工作的这些年里,遇到很多程序员,不论初中高级程序员都习惯了先建立数据库,再去建立模型,但是我们要改变传统思维,我们写代码是面向对象,面向对象,面向对象(重要的事情说三遍),不是面向数据或者过程,在剥离了数据后,其实我们真正就做到了数据与业务代码的剥离,下面我在说这样具体的好处。

    细心的同学看到我在Product的changeStock方法里,在执行完一些逻辑判断后,设置完商品库存后,我们在EventBus 事件总线中注册了一个事件,这个事件还没有具体的作用,我们看看EventBus的实现

    StockService.java

    
    public class EventBus {
    
        public static EventBus instance() {
            return new EventBus();
        }
    
        private static final ThreadLocal<List<DomainEvent>> domainEvents = new ThreadLocal<>();
    
        public void init() {
            if (domainEvents.get() == null) {
                domainEvents.set(new ArrayList<>());
            }
        }
    
        public EventBus register(DomainEvent domainEvent) {
            List<DomainEvent> domainEventList = domainEvents.get();
            if (domainEventList == null)
                throw new IllegalArgumentException("domainEventList not init");
            domainEventList.add(domainEvent);
            return this;
        }
    
        /**
         * 获取领域事件
         *
         * @return
         */
        public List<DomainEvent> getDomainEvent() {
            return domainEvents.get();
        }
    
    
        /**
         * 请空领域事件集合
         */
        public void reset() {
            domainEvents.set(null);
        }
    }
    

    在当前线程内内存空间我们吧事件塞了进去,目前只有存储作用,接下来我们要定义它的处理者

    DomainEventProcessor.java

    @Aspect
    @Component
    @Slf4j
    public class DomainEventProcessor {
    
        /**
         * 这里我是我对RocketMq的封装
         */
        @Autowired
        private EventPublisherExecutor processPublisherExecutor;
    
        /**
         * 当前上下文内订阅者
         */
        @Autowired
        protected ApplicationContext applicationContext;
    
        private static ThreadLocal<AtomicInteger> counter = new ThreadLocal<>();
    
        @Pointcut("within(com.github.tom.*.application..*)")
        public void aopRule() {
    
        }
    
        /**
         * 为当前线程初始化EventBus
         */
        @Before("aopRule()")
        public void initEventBus(JoinPoint joinPoint) {
            log.debug("初始化领域事件槽");
            log.debug("切入切点信息:" + joinPoint.getSignature().toString());
            EventBus.instance().init();
            if (counter.get() == null) {
                counter.set(new AtomicInteger(0));
            }
            counter.get().incrementAndGet();
        }
    
        /**
         * 发布领域事件
         */
        @AfterReturning("aopRule()")
        public void publish() {
            int count = counter.get().decrementAndGet();
            if (count == 0) {
                try {
                    List<DomainEvent> domainEventList = EventBus.instance().getDomainEvent();
                    if (domainEventList != null && domainEventList.size() > 0) {
                        //进程内事件
                        domainEventList.forEach(domainEvent -> applicationContext.publishEvent(domainEvent));
                        //进程外事件
                        domainEventList.forEach(domainEvent -> processPublisherExecutor.publish(domainEvent));
                    }
                } finally {
                    EventBus.instance().reset();
                    counter.set(null);
                }
            }
        }
    
        @AfterThrowing(throwing = "ex", pointcut = "aopRule()")
        public void exception(Throwable ex) {
            log.error(ex.getMessage(), ex);
            EventBus.instance().reset();
            //释放计数器
            counter.set(null);
        }
    }
    

    这里借助了AOP功能,在AOP内我对service进行拦截,在执行方法拦截的出口时,查找当前线程内的EventBus中看是否有存在的领域事件,接下来把事件发送出去,事件的响应分为进程内和进程外(多微服务),刚才的同学问的如何持久化到DB这里可以看到答案

    @Slf4j
    public abstract class AbstractEventHandler<T extends EventData> implements SmartApplicationListener {
    
        private Class<?> clazzType;
    
        public AbstractEventHandler(Class<? extends ApplicationEvent> clazzType) {
            this.clazzType = clazzType;
        }
    
        @Override
        public boolean supportsEventType(Class<? extends ApplicationEvent> clazzType) {
            return clazzType == this.clazzType;
        }
    
        @Override
        public boolean supportsSourceType(Class<?> clazz) {
            return true;
        }
    
        @Override
        public void onApplicationEvent(ApplicationEvent applicationEvent) {
            onApplicationEventHandler((T) applicationEvent);
        }
    
        protected abstract void onApplicationEventHandler(T eventData);
    }
    
    
    @Slf4j
    public abstract class AbstractPersistenceEventHandler<T extends EventData> extends AbstractEventHandler<T> {
    
        public AbstractPersistenceEventHandler(Class<? extends ApplicationEvent> clazzType) {
            super(clazzType);
        }
    
        @Override
        public int getOrder() {
            return 0;
        }
    
    }
    
    @Component
    public class ChangeProductStockPersistenceEventHandler
            extends AbstractPersistenceEventHandler<ChangedProductStockDomainEvent> {
    
        @Autowired
        private ProductRepository productRepository;
    
        public CreatedPortalArticlePersistenceEventHandler() {
            super(CreatedPortalArticleDomainEvent.class);
        }
    
        @Override
        protected void onApplicationEventHandler(ChangedProductStockDomainEvent eventData) {
            if (portalArticleRepository.updateById(eventData.getProduct()) <= 0) {
                throw new BusinessException("数据操作错误");
            }
        }
    }
    
    

    在响应事件的其中一个订阅者,可以完成数据库的持久化操作。接下来我们去定义各个响应ChangedProductStockDomainEvent事件的订阅者就行,例如京东服务

    @Component
    public class JdStockEventHandler {
    
        @Autowired
        private JdAppService jdAppService;
    
        /**
         * 库存持久化事件
         *
         * @param eventData
         */
        @StreamListener(value = "product-channel")
        public void receive(@Payload ChangedProductStockDomainEvent eventData) {
            jdAppService.changingInventory(eventData);
        }
    }
    

    事件驱动的模型大大降低了业务模块耦合严重,在每个聚合的领域内,我们应该着重自身聚合的业务逻辑,事件的消费我们可以通过广播通知和最终一致性来达成目的。业务代码的纯粹,也更适合TDD只对业务编写测试代码,例如我在编写设置库存的测试方法时,我只要构造好商品对象,就可以按照测试用例编写不同情况下的测试代码了。

    @Component
    public class ProductStockTest {
    
        @Before
        public void setUp() {
          EventBus.instance().init();
        }
        
        @Test
        public void testChangeStockError() {
            Product product = new Product();
            product.setStock(BigDecimal.valueOf("-1"));
            product.changeStock();
        }
    
       @Test
        public void testChangeStockSuccess() {
            Product product = new Product();
            product.setStock(BigDecimal.valueOf("2"));
            product.changeStock();
            assertThat(product.getStock()).isEqualTo("2");
        }
    
    }
    

    好了今天的介绍就先这么多,后面我会介绍如何让三层架构中的Service层升级,变得充满业务味道(领域服务)。

    bk