当前位置 博文首页 > 失足成万古风流人物:实战即时聊天,一文说明白:聊天服务器+聊

    失足成万古风流人物:实战即时聊天,一文说明白:聊天服务器+聊

    作者:失足成万古风流人物 时间:2021-05-17 18:20

    一、前言

      说实话,写这个玩意儿是我上周刚刚产生的想法,本想写完后把代码挂上来赚点积分也不错。写完后发现这东西值得写一篇文章,授人予鱼不如授人以渔嘛(这句话是这么说的吧),顺便赚点应届学生MM的膜拜那就更妙了。然后再挂一个收款二维码,一个人1块钱,一天10000个人付款,一个月30万,一年360万。。。可了不得了,离一个亿的小目标就差几十年了。

      不知道博客园对梦话有没有限制,有的话请告知,我会尽快删除上述文字。

      那么现在回到现实中,这篇博文如果能有>2个评论,我后续会再出一个Netty相关的专栏。否则,就不出了。有人会好奇,为什么把阈值定义成>2呢?不为什么,因为我肯定会先用我媳妇儿的号留个言,然后用自己的号留个言。

      好了,废话不多说了,后面还有好多事儿呢,洗菜、做饭、刷碗、跪搓衣。。。好了,言归正传吧。

    二、最终效果

      为什么先看最终效果?因为此刻代码已经撸完了。更重要的是我们带着感官的目标去进行后续的分析,可以更好地理解。标题中提到了,整个工程包含三个部分:

    1、聊天服务器

      聊天服务器的职责一句话解释:负责接收所有用户发送的消息,并将消息转发给目标用户。

      聊天服务器没有任何界面,但是却是IM中最重要的角色,为表达敬意,必须要给它放个效果图:

     

    2021-05-11 10:41:40.037  INFO 9392 --- [ntLoopGroup-3-1] c.e.o.s.netty.handler.HeartBeatHandler   : server收到心跳包:{"time":1620700900029,"messageType":"99"}
    2021-05-11 10:41:50.049  INFO 9392 --- [ntLoopGroup-3-1] c.e.o.s.n.handler.BussMessageHandler     : 收到消息:{"time":1620700910045,"messageType":"14","sendUserName":"guodegang","recvUserName":"yuqian","sendMessage":"于老师你好"}
    2021-05-11 10:41:50.055  INFO 9392 --- [ntLoopGroup-3-2] c.e.o.s.netty.executor.SendMsgExecutor   : 消息转发成功:{"time":1620700910052,"messageType":"14","sendUserName":"guodegang","recvUserName":"yuqian","sendMessage":"于老师你好"}
    2021-05-11 10:41:54.068  INFO 9392 --- [ntLoopGroup-3-2] c.e.o.s.netty.handler.HeartBeatHandler   : server收到心跳包:{"time":1620700914064,"messageType":"99"}
    2021-05-11 10:41:57.302  INFO 9392 --- [ntLoopGroup-3-2] c.e.o.s.n.handler.BussMessageHandler     : 收到消息:{"time":1620700917301,"messageType":"14","sendUserName":"yuqian","recvUserName":"guodegang","sendMessage":"郭老师你好"}
    2021-05-11 10:41:57.304  INFO 9392 --- [ntLoopGroup-3-1] c.e.o.s.netty.executor.SendMsgExecutor   : 消息转发成功:{"time":1620700917303,"messageType":"14","sendUserName":"yuqian","recvUserName":"guodegang","sendMessage":"郭老师你好"}
    2021-05-11 10:42:05.050  INFO 9392 --- [ntLoopGroup-3-1] c.e.o.s.netty.handler.HeartBeatHandler   : server收到心跳包:{"time":1620700925049,"messageType":"99"}
    2021-05-11 10:42:12.309  INFO 9392 --- [ntLoopGroup-3-2] c.e.o.s.netty.handler.HeartBeatHandler   : server收到心跳包:{"time":1620700932304,"messageType":"99"}
    2021-05-11 10:42:20.066  INFO 9392 --- [ntLoopGroup-3-1] c.e.o.s.netty.handler.HeartBeatHandler   : server收到心跳包:{"time":1620700940050,"messageType":"99"}
    2021-05-11 10:42:27.311  INFO 9392 --- [ntLoopGroup-3-2] c.e.o.s.netty.handler.HeartBeatHandler   : server收到心跳包:{"time":1620700947309,"messageType":"99"}
    2021-05-11 10:42:35.070  INFO 9392 --- [ntLoopGroup-3-1] c.e.o.s.netty.handler.HeartBeatHandler   : server收到心跳包:{"time":1620700955068,"messageType":"99"}
    2021-05-11 10:42:42.316  INFO 9392 --- [ntLoopGroup-3-2] c.e.o.s.netty.handler.HeartBeatHandler   : server收到心跳包:{"time":1620700962312,"messageType":"99"}
    2021-05-11 10:42:50.072  INFO 9392 --- [ntLoopGroup-3-1] c.e.o.s.netty.handler.HeartBeatHandler   : server收到心跳包:{"time":1620700970071,"messageType":"99"}
    2021-05-11 10:42:57.316  INFO 9392 --- [ntLoopGroup-3-2] c.e.o.s.netty.handler.HeartBeatHandler   : server收到心跳包:{"time":1620700977315,"messageType":"99"}

      从效果图我们看到了一些内容:收到心跳包、收到消息,转发消息,这些内容后面会详细讲解。

    2、聊天客户端

      聊天客户端的职责一句话解释:登陆,给别人发聊天内容,收其它人发给自己的聊天内容。

      下面为方便演示,我会打开两个客户端,用两个不同用户登陆,然后发消息。

     

    3、Web管理控制台

      目前只做了一个账户管理,具体看图吧:

    三、需求分析

      无(见第二章节)。

    四、概要设计

    1、技术选型

    1)聊天服务端

      聊天服务器与客户端通过TCP协议进行通信,使用长连接、全双工通信模式,基于经典通信框架Netty实现。

      那么什么是长连接?顾名思义,客户端和服务器连上后,会在这条连接上面反复收发消息,连接不会断开。与长连接对应的当然就是短连接了,短连接每次发消息之前都需要先建立连接,然后发消息,最后断开连接。显然,即时聊天适合使用长连接。

      那么什么又是全双工?当长连接建立起来后,在这条连接上既有上行的数据,又有下行的数据,这就叫全双工。那么对应的半双工、单工,大家自行百度吧。

    2)Web管理控制台

      Web管理端使用SpringBoot脚手架,前端使用Layuimini(一个基于Layui前端框架封装的前端框架),后端使用SpringMVC+Jpa+Shiro。

    3)聊天客户端

      使用SpringBoot+JavaFX,做了一个极其简陋的客户端,JavaFX是一个开发Java桌面程序的框架,本人也是第一次使用,代码中的写法都是网上查的,这并不是本文的重点,有兴趣的仔细百度吧。

    4)SpringBoot

      以上三个组件,全部以SpringBoot做为脚手架开发。

    5)代码构建

      Maven。

    2、数据库设计

      我们只简单用到一张用户表,比较简单直接贴脚本:

    CREATE TABLE `sys_user` (
      `id` bigint(20) NOT NULL AUTO_INCREMENT COMMENT '主键',
      `user_name` varchar(64) DEFAULT NULL COMMENT '用户名:登陆账号',
      `pass_word` varchar(128) DEFAULT NULL COMMENT '密码',
      `name` varchar(16) DEFAULT NULL COMMENT '昵称',
      `sex` char(1) DEFAULT NULL COMMENT '性别:1-男,2女',
      `status` bit(1) DEFAULT NULL COMMENT '用户状态:1-有效,0-无效',
      `online` bit(1) DEFAULT NULL COMMENT '在线状态:1-在线,0-离线',
      `salt` varchar(128) DEFAULT NULL COMMENT '密码盐值',
      `admin` bit(1) DEFAULT NULL COMMENT '是否管理员(只有管理员才能登录Web端):1-是,0-否',
      PRIMARY KEY (`id`)
    ) ENGINE=InnoDB AUTO_INCREMENT=1 DEFAULT CHARSET=utf8;

      这张表都在什么时候用到?

      1)Web管理端登陆的时候;2)聊天客户端将登陆请求发送到聊天服务端时,聊天服务端进行用户认证;3)聊天客户端的好友列表加载。

    3、通信设计

      本节将会是本文的核心内容之一,主要描述通信报文协议格式、以及通信报文的交互场景。

    1)报文协议格式

      下面这张图应该能说明99%了:

      剩下的1%在这里说:

      a)粘包问题,TCP长连接中,粘包是第一个需要解决的问题。通俗的讲,粘包的意思是消息接收方往往收到的不是“整个”报文,有时候比“整个”多一点,有时候比“整个”少一点,这样就导致接收方无法解析这个报文。那么上图中的头8个字节就为了解决这个问题,接收方根据头8个字节标识的长度来获取到“整个”报文,从而进行正常的业务处理;

      b)2字节报文类型,为了方便解析报文而设计。根据这两个字节将后面的json转成相应的实体以便进行后续处理;

      c)变长报文体实际上就是json格式的串,当然,你可以自己设计报文格式,我这里为了方便处理就直接放json了;

      d)当然,你可以把报文设计的更复杂、更专业,比如加密、加签名等。

    2)报文交互场景

      a)登陆

      b)发送消息-成功

      c)发送消息-目标客户端不在线

      d)发送消息-目标客户端在线,但消息转发失败

    五、编码实现

      前面说了那么多,现在总得说点有用的。

    1、先说说Netty

      Netty是一个相当优秀的通信框架,大多数的顶级开源框架中都有Netty的身影。具体它有多么优秀,建议大家自行百度,我不如百度说的好。我只从应用方面说说Netty。应用过程中,它最核心的东西叫handler,我们可以简单理解它为消息处理器。收到的消息和出去的消息都会经过一系列的handler加工处理。收到的消息我们叫它入站消息,发出去的消息我们叫它出站消息,因此handler又分为出站handler和入站handler。收到的消息只会被入站handler处理,发出去的消息只会被出站handler处理。

      举个例子,我们从网络上收到的消息是二进制的字节码,我们的目标是将消息转换成java bean,这样方便我们程序处理,针对这个场景我设计这么几个入站handler:

      1)将字节转换成String的handler;

      2)将String转成java bean的handler;

      3)对java bean进行业务处理的handler。

      发出去的消息呢,我设计这么几个出站handler:

      1)java bean 转成String的handler;

      2)String转成byte的handler。

      以上是关于handler的说明。

      接下来再说一下Netty的异步。异步的意思是当你做完一个操作后,不会立马得到操作结果,而是有结果后Netty会通知你。通过下面的一段代码来说明:

    channel.writeAndFlush(sendMsgRequest).addListener(new GenericFutureListener<Future<? super Void>>() {
                    @Override
                    public void operationComplete(Future<? super Void> future) throws Exception {
                        if (future.isSuccess()){
                            logger.info("消息发送成功:{}",sendMsgRequest);
                        }else {
                            logger.info("消息发送失败:{}",sendMsgRequest);
                        }
                    }
                });

      上面的writeAndFlush操作无法立即返回结果,如果你关注结果,那么为他添加一个listener,有结果后会在listener中响应。

      到这里,百度上搜到的Netty相关的代码你基本就能看懂了。

    2、聊天服务端

      首先看主入口的代码

    public void start(){
            EventLoopGroup boss = new NioEventLoopGroup();
            EventLoopGroup worker = new NioEventLoopGroup();
            ServerBootstrap serverBootstrap = new ServerBootstrap();
            serverBootstrap.group(boss, worker)
                    .channel(NioServerSocketChannel.class)
                    .option(ChannelOption.SO_BACKLOG, 1024)
                    .handler(new LoggingHandler(LogLevel.INFO))
                    .childHandler(new ChannelInitializer<SocketChannel>() {
                        @Override
                        protected void initChannel(SocketChannel ch) throws Exception {
                            //心跳
                            ch.pipeline().addLast(new IdleStateHandler(25, 20, 0, TimeUnit.SECONDS));
                            //收整包
                            ch.pipeline().addLast(new StringLengthFieldDecoder());
                            //转字符串
                            ch.pipeline().addLast(new StringDecoder(Charset.forName("UTF-8")));
                            //json转对象
                            ch.pipeline().addLast(new JsonDecoder());
                            //心跳
                            ch.pipeline().addLast(new HeartBeatHandler());
                            //实体转json
                            ch.pipeline().addLast(new JsonEncoder());
                            //消息处理
                            ch.pipeline().addLast(bussMessageHandler);
                        }
                    });
            try {
                ChannelFuture f = serverBootstrap.bind(port).sync();
                f.channel().closeFuture().sync();
            }catch (InterruptedException e) {
                logger.error("服务启动失败:{}", ExceptionUtils.getStackTrace(e));
            }finally {
                worker.shutdownGracefully();
                boss.shutdownGracefully();
            }
        }

      代码中除了initChannel方法中的代码,其他代码都是固定写法。那么什么叫固定写法呢?通俗来讲就是可以Ctrl+c、Ctrl+v。

      下面我们着重看initChannel方法里面的代码。这里面就是上面讲到的各种handler,我们下面挨个讲这些handler都是干啥的。

      1)IdleStateHandler。这个是Netty内置的一个handler,既是出站handler又是入站handler。它的作用一般是用来实现心跳监测。所谓心跳,就是客户端和服务端建立连接后,服务端要实时监控客户端的健康状态,如果客户端挂了或者hung住了,服务端及时释放相应的资源,以及做出其他处理比如通知运维。所以在我们的场景中,客户端需要定时上报自己的心跳,如果服务端检测到一段时间内没收到客户端上报的心跳,那么及时做出处理,我们这里就是简单的将其连接断开,并修改数据库中相应账户的在线状态。

      现在开始说IdleStateHandler,第一个参数叫读超时时间,第二个参数叫写超时时间,第三个参数叫读写超时时间,第四个参数时时间单位秒。这个handler表达的意思是当25秒内没读到客户端的消息,或者20秒内没往客户端发消息,就会产生一个超时事件。那么这个超时事件我们该对他做什么处理呢,请看下一条。

      2)HeartBeatHandler。结合a)一起看,当发生超时事件时,HeartBeatHandler会收到这个事件,并对它做出处理:第一将链接断开;第二讲数据库中相应的账户更新为不在线状态。

    public class HeartBeatHandler extends ChannelInboundHandlerAdapter {
        private static Logger logger = LoggerFactory.getLogger(HeartBeatHandler.class);
    
        @Override
        public void userEventTriggered(ChannelHandlerContext ctx, Object evt) throws Exception {
            if (evt instanceof IdleStateEvent){
                IdleStateEvent event = (IdleStateEvent)evt;
                if (event.state() == IdleState.READER_IDLE) {
                    //读超时,应将连接断掉
                    InetSocketAddress socketAddress = (InetSocketAddress)ctx.channel().remoteAddress();
                    String ip = socketAddress.getAddress().getHostAddress();
                    ctx.channel().disconnect();
                    logger.info("【{}】连接超时,断开",ip);
                    String userName = SessionManager.removeSession(ctx.channel());
                    SpringContextUtil.getBean(UserService.class).updateOnlineStatus(userName,Boolean.FALSE);
                }else {
                    super.userEventTriggered(ctx, evt);
                }
            }else {
                super.userEventTriggered(ctx, evt);
            }
    
        }
    
        @Override
        public void channelRead(ChannelHandlerContext ctx, Object msg) throws Exception {
            if (msg instanceof HeartBeat){
                //收到心跳包,不处理
                logger.info("server收到心跳包:{}",msg);
                return;
            }
            super.channelRead(ctx, msg);
        }
    }
     

      3)StringLengthFieldDecoder。这是个入站handler,他的作用就是解决上面提到的粘包问题:

    public class StringLengthFieldDecoder extends LengthFieldBasedFrameDecoder {
        public StringLengthFieldDecoder() {
            super(10*1024*1024,0,8,0,8);
        }
    
    
        @Override
        protected long getUnadjustedFrameLength(ByteBuf buf, int offset, int length, ByteOrder order) {
            buf = buf.order(order);
            byte[] lenByte = new byte[length];
            buf.getBytes(offset, lenByte);
            String lenStr = new String(lenByte);
            Long len =  Long.valueOf(lenStr);
            return len;
        }
    }

      只需要集成Netty提供的LengthFieldBasedFrameDecoder 类,并重写getUnadjustedFrameLength方法即可。

      首先看构造方法中的5个参数。第一个表示能处理的包的最大长度;第二三个参数应该结合起来理解,表示长度字段从第几位开始,长度的长度是多少,也就是上面报文格式协议中的头8个字节;第四个参数表示长度是否需要校正,举例理解,比如头8个字节解析出来的长度=包体长度+头8个字节的长度,那么这里就需要校正8个字节,我们的协议中长度只包含报文体,因此这个参数填0;最后一个参数,表示接收到的报文是否要跳过一些字节,本例中设置为8,表示跳过头8个字节,因此经过这个handler后,我们收到的数据就只有报文本身了,不再包含8个长度字节了。

      再看getUnadjustedFrameLength方法,其实就是将头8个字符串型的长度为转换成long型。重写完这个方法后,Netty就知道如何收一个“完整”的数据包了。

      4)StringDecoder。这个是Netty自带的入站handler,会将字节流以指定的编码解析成String。

      5)JsonDecoder。是我们自定义的一个入站handler,目的是将json String转换成java bean,以方便后续处理:

    public class JsonDecoder extends MessageToMessageDecoder<String> {
        @Override
        protected void decode(ChannelHandlerContext channelHandlerContext, String o, List<Object> list) throws Exception {
            Message msg = MessageEnDeCoder.decode(o);
            list.add(msg);
        }
    
    }

      这里会调用我们自定义的一个编解码帮助类进行转换:

    public static Message decode(String message){
            if (StringUtils.isEmpty(message) || message.length() < 2){
                return null;
            }
            String type = message.substring(0,2);
            message = message.substring(2);
            if (type.equals(LoginRequest)){