当前位置 博文首页 > Zee_7D:性能工具之Jmeter压测Thrift RPC服务

    Zee_7D:性能工具之Jmeter压测Thrift RPC服务

    作者:Zee_7D 时间:2021-06-27 16:39

    概述

    Thrift是一个可互操作和可伸缩服务的框架,用来进行可扩展且跨语言的服务的开发。它结合了功能强大的软件堆栈和代码生成引擎,以构建在 C++, Java, Python, PHP, Ruby, Erlang, Perl, Haskell, C#, Cocoa, JavaScript, Node.js, Smalltalk, and OCaml 等等编程语言间无缝结合的、高效的服务。

    Thrift最初由facebook开发,07年四月开放源码,08年5月进入apache孵化器。thrift允许你定义一个简单的定义文件中的数据类型和服务接口(IDL)。以作为输入文件,编译器生成代码用来方便地生成RPC客户端和服务器通信的无缝跨编程语言。

    其传输数据采用二进制格式,相对于XML和JSON等序列化方式体积更小,对于高并发、大数据量和多语言的环境更有优势。 Thrift它含有三个主要的组件:protocol,transport和server,其中,protocol定义了消息是怎样序列化的,transport定义了消息是怎样在客户端和服务器端之间通信的,server用于从transport接收序列化的消息,根据protocol反序列化之,调用用户定义的消息处理器,并序列化消息处理器的响应,然后再将它们写回transport。

    官网地址:thrift.apache.org

    基本概念

    架构图

    堆栈的顶部是从Thrift定义文件生成的代码。Thrift 服务生成的客户端和处理器代码。这些由图中的棕色框表示。红色框为发送的数据结构(内置类型除外)也会生成代码。协议和传输是Thrift运行时库的一部分。因此使用Thrift可以定义服务,并且可以自由更改协议和传输,而无需重新生成代码。 Thrift还包括一个服务器基础结构,用于将协议和传输绑定在一起。有可用的阻塞,非阻塞,单线程和多线程服务器。 堆栈的“底层I / O”部分根据所开发语言而有所不同。对于Java和Python网络I / O,Thrift库利用内置库,而C ++实现使用自己的自定义实现。

    数据类型:

    基本类型:

    • bool:布尔值,true 或 false,对应 Java 的 boolean

    • byte:8 位有符号整数,对应 Java 的 byte

    • i16:16 位有符号整数,对应 Java 的 short

    • i32:32 位有符号整数,对应 Java 的 int

    • i64:64 位有符号整数,对应 Java 的 long

    • double:64 位浮点数,对应 Java 的 double

    • string:未知编码文本或二进制字符串,对应 Java 的 String

    结构体类型:

    • struct:定义公共的对象,类似于 C 语言中的结构体定义,在 Java 中是一个 JavaBean

    集合类型:

    • list:对应 Java 的 ArrayList

    • set:对应 Java 的 HashSet

    • map:对应 Java 的 HashMap

    异常类型:

    • exception:对应 Java 的 Exception

    服务类型:

    • service:对应服务的类

    数据传输层Transport

    • TSocket —— 使用阻塞式 I/O 进行传输,是最常见的模式

    • TFramedTransport —— 使用非阻塞方式,按块的大小进行传输,类似于 Java 中的 NIO,若使用 TFramedTransport 传输层,其服务器必须修改为非阻塞的服务类型

    • TNonblockingTransport —— 使用非阻塞方式,用于构建异步客户端

    数据传输协议Protocol

    Thrift 可以让用户选择客户端与服务端之间传输通信协议的类别,在传输协议上总体划分为文本 (text) 和二进制 (binary) 传输协议,为节约带宽,提高传输效率,一般情况下使用二进制类型的传输协议为多数,有时还会使用基于文本类型的协议,这需要根据项目 / 产品中的实际需求。

    常用协议有以下几种:

    • TBinaryProtocol : 二进制格式.

    • TCompactProtocol : 高效率的、密集的二进制压缩格式

    • TJSONProtocol : JSON格式

    • TSimpleJSONProtocol : 提供JSON只写协议, 生成的文件很容易通过脚本语言解析

    注意:客户端和服务端的协议要一致。

    服务器类型Server

    • TSimpleServer ——单线程服务器端使用标准的阻塞式 I/O,一般用于测试。

    • TThreadPoolServer —— 多线程服务器端使用标准的阻塞式 I/O,预先创建一组线程处理请求。

    • TNonblockingServer —— 多线程服务器端使用非阻塞式 I/O,服务端和客户端需要指定 TFramedTransport 数据传输的方式。

    • THsHaServer —— 半同步半异步的服务端模型,需要指定为: TFramedTransport 数据传输的方式。它使用一个单独的线程来处理网络I/O,一个独立的worker线程池来处理消息。这样,只要有空闲的worker线程,消息就会被立即处理,因此多条消息能被并行处理。

    • TThreadedSelectorServer —— TThreadedSelectorServer允许你用多个线程来处理网络I/O。它维护了两个线程池,一个用来处理网络I/O,另一个用来进行请求的处理。当网络I/O是瓶颈的时候,TThreadedSelectorServer比THsHaServer的表现要好。

    实现逻辑

    服务端

    实现服务处理接口 impl

    创建TProcessor 创建TServerTransport 创建TProtocol 创建TServer 启动Server

    客户端

    创建Transport 创建TProtocol 基于TTransport和TProtocol创建 Client 调用Client的相应方法

    ThriftServerDemo实例

    新建 Maven项目,并且添加 thrift依赖包

    1.  <dependencies>

    2.        <dependency>

    3.            <groupId>org.apache.thrift</groupId>

    4.            <artifactId>libthrift</artifactId>

    5.            <version>0.9.3</version>

    6.        </dependency>

    7.        <dependency>

    8.            <groupId>org.slf4j</groupId>

    9.            <artifactId>slf4j-log4j12</artifactId>

    10.            <version>1.7.12</version>

    11.        </dependency>

    12.        <dependency>

    13.            <groupId>org.apache.logging.log4j</groupId>

    14.            <artifactId>log4j-api</artifactId>

    15.            <version>2.7</version>

    16.        </dependency>

    17.        <dependency>

    18.            <groupId>org.apache.logging.log4j</groupId>

    19.            <artifactId>log4j-core</artifactId>

    20.            <version>2.7</version>

    21.        </dependency>

    22.    </dependencies>

    23.    <build>

    24.        <plugins>

    25.            <plugin>

    26.                <groupId>org.apache.maven.plugins</groupId>

    27.                <artifactId>maven-compiler-plugin</artifactId>

    28.                <version>3.3</version>

    29.                <configuration>

    30.                    <source>1.8</source>

    31.                    <target>1.8</target>

    32.                    <encoding>utf-8</encoding>

    33.                </configuration>

    34.            </plugin>

    35.        </plugins>

    36.    </build>


    编写 IDL接口并生成接口文件

    1. namespace java thrift.service

    2.  

    3. // 计算类型 - 仅限整数四则运算

    4. enum ComputeType {

    5.    ADD = 0;

    6.    SUB = 1;

    7.    MUL = 2;

    8.    DIV = 3;

    9. }

    10.  

    11. // 服务请求

    12. struct ComputeRequest {

    13.    1:required i64 x;

    14.    2:required i64 y;

    15.    3:required ComputeType computeType;

    16. }

    17.  

    18. // 服务响应

    19. struct ComputeResponse {

    20.    1:required i32 errorNo;

    21.    2:optional string errorMsg;

    22.    3:required i64 computeRet;

    23. }

    24.  

    25. service ComputeServer {

    26.    ComputeResponse getComputeResult(1:ComputeRequest request);

    27. }


    执行编译命令:

    1. thrift-0.11.0.exe -r -gen java computeServer.thrift

    拷贝生成的 Service类文件到 IDEA 

    服务端接口实现

    1. public class ThriftTestImpl implements ComputeServer.Iface {

    2.    private static final Logger logger = LogManager.getLogger(ThriftTestImpl.class);

    3.    public ComputeResponse getComputeResult(ComputeRequest request) {

    4.        ComputeType computeType = request.getComputeType();

    5.        long x = request.getX();

    6.        long y = request.getY();

    7.        logger.info("get compute result begin. [x:{}] [y:{}] [type:{}]", x, y, computeType.toString());

    8.        long begin = System.currentTimeMillis();

    9.        ComputeResponse response = new ComputeResponse();

    10.        response.setErrorNo(0);

    11.        try {

    12.            long ret;

    13.            if (computeType == ComputeType.ADD) {

    14.                ret = add(x, y);

    15.                response.setComputeRet(ret);

    16.            } else if (computeType == ComputeType.SUB) {

    17.                ret = sub(x, y);

    18.                response.setComputeRet(ret);

    19.            } else if (computeType == ComputeType.MUL) {

    20.                ret = mul(x, y);

    21.                response.setComputeRet(ret);

    22.            } else {

    23.                ret = div(x, y);

    24.                response.setComputeRet(ret);

    25.            }

    26.        } catch (Exception e) {

    27.            response.setErrorNo(1001);

    28.            response.setErrorMsg(e.getMessage());

    29.            logger.error("exception:", e);

    30.        }

    31.        long end = System.currentTimeMillis();

    32.        logger.info("get compute result end. [errno:{}] cost:[{}ms]", response.getErrorNo(), (end - begin));

    33.        return response;

    34.    }

    35.    private long add(long x, long y) {

    36.        return x + y;

    37.    }

    38.    private long sub(long x, long y) {

    39.        return x - y;

    40.    }

    41.    private long mul(long x, long y) {

    42.        return x * y;

    43.    }

    44.    private long div(long x, long y) {

    45.        return x / y;

    46.    }

    47. }


    服务端实现

    1. public class ServerMain {

    2.    private static final Logger logger = LogManager.getLogger(ServerMain.class);

    3.  

    4.    public static void main(String[] args) {

    5.        try {

    6.            //实现服务处理接口impl

    7.            ThriftTestImpl workImpl = new ThriftTestImpl();

    8.            //创建TProcessor

    9.            TProcessor tProcessor = new ComputeServer.Processor<ComputeServer.Iface>(workImpl);

    10.            //创建TServerTransport,非阻塞式 I/O,服务端和客户端需要指定 TFramedTransport 数据传输的方式

    11.            final TNonblockingServerTransport transport = new TNonblockingServerSocket(9999);

    12.            //创建TProtocol

    13.            TThreadedSelectorServer.Args ttpsArgs = new TThreadedSelectorServer.Args(transport);

    14.            ttpsArgs.transportFactory(new TFramedTransport.Factory());

    15.            //二进制格式反序列化

    16.            ttpsArgs.protocolFactory(new TBinaryProtocol.Factory());

    17.            ttpsArgs.processor(tProcessor);

    18.            ttpsArgs.selectorThreads(16);

    19.            ttpsArgs.workerThreads(32);

    20.            logger.info("compute service server on port :" + 9999);

    21.            //创建TServer

    22.            TServer server = new TThreadedSelectorServer(ttpsArgs);

    23.            //启动Server

    24.            server.serve();

    25.        } catch (Exception e) {

    26.            logger.error(e);

    27.        }

    28.    }

    29. }


    服务端整体代码结构 

    log4j2.xml配置文件

    1. <?xml version="1.0" encoding="UTF-8"?>

    2. <!--日志级别以及优先级排序: OFF > FATAL > ERROR > WARN > INFO > DEBUG > TRACE > ALL -->

    3. <!--Configuration后面的status,这个用于设置log4j2自身内部的信息输出,可以不设置,当设置成trace时,你会看到log4j2内部各种详细输出-->

    4. <!--monitorInterval:Log4j能够自动检测修改配置 文件和重新配置本身,设置间隔秒数-->

    5. <configuration status="INFO" monitorInterval="30">

    6.    <!--先定义所有的appender-->

    7.    <appenders>

    8.        <!--这个输出控制台的配置-->

    9.        <console name="Console" target="SYSTEM_OUT">

    10.            <!--输出日志的格式-->

    11.            <PatternLayout pattern="%highlight{[ %p ] [%-d{yyyy-MM-dd HH:mm:ss}] [%l] %m%n}"/>

    12.        </console>

    13.  

    14.        <RollingFile name="RollingFileInfo" fileName="log/log.log" filePattern="log/log.log.%d{yyyy-MM-dd}">

    15.            <!-- 只接受level=INFO以上的日志 -->

    16.            <ThresholdFilter level="info" onMatch="ACCEPT" onMismatch="DENY