当前位置 主页 > 服务器问题 > Linux/apache问题 >

    springBoot整合RocketMQ及坑的示例代码

    栏目:Linux/apache问题 时间:2019-11-06 21:58

    版本:

    JDK:1.8 springBoot:1.5.10 rocketMQ:4.2.0

    pom 配置:    

    <parent>
     <groupId>org.springframework.boot</groupId>
     <artifactId>spring-boot-starter-parent</artifactId>
     <version>1.5.10.RELEASE</version>
    </parent>
    <dependency>
      <groupId>org.apache.rocketmq</groupId>
      <artifactId>rocketmq-client</artifactId>
      <version>4.2.0</version>
    </dependency>

    application.properties  配置:

    # 消费者的组名
    apache.rocketmq.consumer.PushConsumer=PushConsumer
    # 生产者的组名
    apache.rocketmq.producer.producerGroup=Producer
    # NameServer地址
    apache.rocketmq.namesrvAddr=localhost:9876

    java代码:

    生产者

    package test.config.rocketmq;
    
    import org.apache.rocketmq.client.producer.DefaultMQProducer;
    import org.apache.rocketmq.client.producer.SendResult;
    import org.apache.rocketmq.common.message.Message;
    import org.apache.rocketmq.remoting.common.RemotingHelper;
    import org.springframework.beans.factory.annotation.Value;
    import org.springframework.stereotype.Component;
    import org.springframework.util.StopWatch;
    import javax.annotation.PostConstruct;
    
    @Component
    public class RocketMQClient {
      /**
       * 生产者的组名
       */
      @Value("${apache.rocketmq.producer.producerGroup}")
      private String producerGroup;
    
      /**
       * NameServer 地址
       */
      @Value("${apache.rocketmq.namesrvAddr}")
      private String namesrvAddr;
    
      @PostConstruct
      public void defaultMQProducer() {
        //生产者的组名
        DefaultMQProducer producer = new DefaultMQProducer(producerGroup);
        //指定NameServer地址,多个地址以 ; 隔开
        producer.setNamesrvAddr(namesrvAddr);
        producer.setVipChannelEnabled(false);
        try {
          /**
           * Producer对象在使用之前必须要调用start初始化,初始化一次即可
           * 注意:切记不可以在每次发送消息时,都调用start方法
           */
          producer.start();
    
          //创建一个消息实例,包含 topic、tag 和 消息体
          //如下:topic 为 "TopicTest",tag 为 "push"
          Message message = new Message("TopicTest", "push", "发送消息----zhisheng-----".getBytes(RemotingHelper.DEFAULT_CHARSET));
    
          StopWatch stop = new StopWatch();
          stop.start();
    
          for (int i = 0; i < 1; i++) {
            SendResult result = producer.send(message);
            System.out.println("发送响应:MsgId:" + result.getMsgId() + ",发送状态:" + result.getSendStatus());
          }
          stop.stop();
          System.out.println("----------------发送一万条消息耗时:" + stop.getTotalTimeMillis());
        } catch (Exception e) {
          e.printStackTrace();
        } finally {
          producer.shutdown();
        }
      }
    }
    
    

    消费者: 

    import org.apache.rocketmq.client.consumer.DefaultMQPushConsumer;
    import org.apache.rocketmq.client.consumer.listener.ConsumeConcurrentlyStatus;
    import org.apache.rocketmq.client.consumer.listener.MessageListenerConcurrently;
    import org.apache.rocketmq.common.consumer.ConsumeFromWhere;
    import org.apache.rocketmq.common.message.MessageExt;
    import org.apache.rocketmq.remoting.common.RemotingHelper;
    import org.springframework.beans.factory.annotation.Value;
    import org.springframework.stereotype.Component;
    
    import javax.annotation.PostConstruct;
    
    
    @Component
    public class RocketMQServer {
      /**
       * 消费者的组名
       */
      @Value("${apache.rocketmq.consumer.PushConsumer}")
      private String consumerGroup;
    
      /**
       * NameServer 地址
       */
      @Value("${apache.rocketmq.namesrvAddr}")
      private String namesrvAddr;
    
      @PostConstruct
      public void defaultMQPushConsumer() {
        //消费者的组名
        DefaultMQPushConsumer consumer = new DefaultMQPushConsumer(consumerGroup);
    
        //指定NameServer地址,多个地址以 ; 隔开
        consumer.setNamesrvAddr(namesrvAddr);
        consumer.setVipChannelEnabled(false);
        try {
          //订阅PushTopic下Tag为push的消息
          consumer.subscribe("TopicTest", "push");
    
          //设置Consumer第一次启动是从队列头部开始消费还是队列尾部开始消费
          //如果非第一次启动,那么按照上次消费的位置继续消费
          consumer.setConsumeFromWhere(ConsumeFromWhere.CONSUME_FROM_FIRST_OFFSET);
          consumer.registerMessageListener((MessageListenerConcurrently) (list, context) -> {
            try {
              for (MessageExt messageExt : list) {
    
                System.out.println("messageExt: " + messageExt);//输出消息内容
    
                String messageBody = new String(messageExt.getBody(), RemotingHelper.DEFAULT_CHARSET);
    
                System.out.println("消费响应:msgId : " + messageExt.getMsgId() + ", msgBody : " + messageBody);//输出消息内容
              }
            } catch (Exception e) {
              e.printStackTrace();
              return ConsumeConcurrentlyStatus.RECONSUME_LATER; //稍后再试
            }
            return ConsumeConcurrentlyStatus.CONSUME_SUCCESS; //消费成功
          });
          consumer.start();
        } catch (Exception e) {
          e.printStackTrace();
        }
      }
    }