当前位置 博文首页 > 三分恶的博客:消息中间件学习笔记--ActiveMQ(四、Spring方式)

    三分恶的博客:消息中间件学习笔记--ActiveMQ(四、Spring方式)

    作者:[db:作者] 时间:2021-07-29 09:40

    Spring模式

    前面学的是jms模式,现在是Spring模式。感觉现在好像没有spring整合不了的东西啊,如果有,那一定是要原地死亡了。

    实例

    还是用之前的项目,

    pom.xml

    添加 spring , junit依赖。

    <project xmlns="http://maven.apache.org/POM/4.0.0" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
    	xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd">
    	<modelVersion>4.0.0</modelVersion>
    
    	<groupId>edu.hpu</groupId>
    	<artifactId>ActiveMQDemo</artifactId>
    	<version>0.0.1-SNAPSHOT</version>
    	<packaging>jar</packaging>
    
    	<name>ActiveMQDemo</name>
    	<url>http://maven.apache.org</url>
    
    	<properties>
    		<project.build.sourceEncoding>UTF-8</project.build.sourceEncoding>
    	</properties>
    
    	<dependencies>
    		<dependency>
    			<groupId>org.apache.activemq</groupId>
    			<artifactId>activemq-all</artifactId>
    			<version>5.15.9</version>
    		</dependency>
    		<dependency>
    			<groupId>cn.hutool</groupId>
    			<artifactId>hutool-all</artifactId>
    			<version>4.3.1</version>
    		</dependency>
    	    <dependency>
                <groupId>org.springframework</groupId>  
                <artifactId>spring-jms</artifactId>     <!-- 整合包 -->
                <version>4.3.2.RELEASE</version>   
            </dependency>
    		<dependency>
    			<groupId>org.springframework</groupId>
    			<artifactId>spring-test</artifactId>
    			<version>4.2.4.RELEASE</version>
    		</dependency>
    		<dependency>
    			<groupId>junit</groupId>
    			<artifactId>junit</artifactId>
    			<version>4.9</version>
    		</dependency>
    
    	</dependencies>
    </project>
    

    spring_jms.xml

    在resouces目录下创建spring_jms.xml,添加对activemq的相关配置。

    <?xml version="1.0" encoding="UTF-8"?>
    <beans xmlns="http://www.springframework.org/schema/beans"
        xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
        xmlns:p="http://www.springframework.org/schema/p"
        xmlns:context="http://www.springframework.org/schema/context"
        xmlns:dubbo="http://code.alibabatech.com/schema/dubbo"
        xmlns:mvc="http://www.springframework.org/schema/mvc"
        xsi:schemaLocation="http://www.springframework.org/schema/beans http://www.springframework.org/schema/beans/spring-beans.xsd
            http://www.springframework.org/schema/mvc http://www.springframework.org/schema/mvc/spring-mvc.xsd
            http://code.alibabatech.com/schema/dubbo http://code.alibabatech.com/schema/dubbo/dubbo.xsd
            http://www.springframework.org/schema/context http://www.springframework.org/schema/context/spring-context.xsd">
       <context:component-scan base-package="edu.hpu"></context:component-scan>       
        <!-- 真正可以产生Connection的ConnectionFactory,由对应的 JMS服务厂商提供--> 
        <bean id="targetConnectionFactory" class="org.apache.activemq.ActiveMQConnectionFactory"> 
            <property name="brokerURL" value="tcp://127.0.0.1:61616"/> 
        </bean>         
         
        <!-- Spring用于管理真正的ConnectionFactory的ConnectionFactory --> 
        <bean id="connectionFactory" class="org.springframework.jms.connection.SingleConnectionFactory"> 
         
        <!-- 目标ConnectionFactory对应真实的可以产生JMS Connection的ConnectionFactory --> 
            <property name="targetConnectionFactory" ref="targetConnectionFactory"/> 
        </bean>        
         
        <!-- Spring提供的JMS工具类,它可以进行消息发送、接收等 --> 
        <bean id="jmsTemplate" class="org.springframework.jms.core.JmsTemplate"> 
            <!-- 这个connectionFactory对应的是我们定义的Spring提供的那个ConnectionFactory对象 --> 
            <property name="connectionFactory" ref="connectionFactory"/> 
        </bean>     
         
        <!--这个是队列目的地, ActiveMQQueue 就表示队列模式。 如果要用主题模式就改成 ActiveMQTopic就行了 --> 
        <bean id="textDestination" class="org.apache.activemq.command.ActiveMQQueue"> 
            <constructor-arg value="queue_style"/> 
        </bean> 
            
        <!-- 我的监听类 -->
        <bean id="myMessageListener" class="edu.hpu.MyMessageListener"></bean>
         
        <!-- 消息监听容器,会伴随spring的启动 -->
        <bean class="org.springframework.jms.listener.DefaultMessageListenerContainer">
            <property name="connectionFactory" ref="connectionFactory" />
            <property name="destination" ref="textDestination" />
            <property name="messageListener" ref="myMessageListener" />
        </bean>          
     
    </beans>
    

    生产者类

    Producter:

    package edu.hpu;
    
    import javax.jms.Destination;
    import javax.jms.JMSException;
    import javax.jms.Message;
    import javax.jms.Session;
    
    import org.springframework.beans.factory.annotation.Autowired;
    import org.springframework.jms.core.JmsTemplate;
    import org.springframework.jms.core.MessageCreator;
    import org.springframework.stereotype.Component;
    
    @Component
    public class Producter {
    	@Autowired
    	private JmsTemplate jmsTemplate;
    	
    	 @Autowired
    	 private Destination textDestination;
    	
    	public void sendTextMessage(final String text) {
    		jmsTemplate.send(textDestination,new MessageCreator() {
    			
    			public Message createMessage(Session session) throws JMSException {
    				// TODO Auto-generated method stub
    				return session.createTextMessage(text);
    			}
    		});
    	}
    
    }
    

    TestProducter

    测试生产类,发送一百条消息。

    package edu.hpu.test;
    
    import org.junit.Before;
    import org.junit.Test;
    import org.junit.runner.RunWith;
    import org.springframework.beans.factory.annotation.Autowired;
    import org.springframework.cglib.core.SpringNamingPolicy;
    import org.springframework.test.context.ContextConfiguration;
    import org.springframework.test.context.junit4.SpringJUnit4ClassRunner;
    
    import edu.hpu.Producter;
    import edu.hpu.util.ActiveMQUtil;
    
    @RunWith(SpringJUnit4ClassRunner.class)
    @ContextConfiguration(locations="classpath:spring_jms.xml")
    public class TestProducter {
       @Autowired
       private Producter producter;
       
       @Before
       public void checkServer() {
           ActiveMQUtil.checkServer();
       }
       
       @Test
       public void testSend() {
    	   for (int i = 0; i < 100; i++) {
               producter.sendTextMessage("消息 " + i);
           }
       }
       
    }
    

    MyMessageListener

    监听类,用于获取新的消息。

    package edu.hpu;
    
    import javax.jms.JMSException;
    import javax.jms.Message;
    import javax.jms.MessageListener;
    import javax.jms.TextMessage;
    
    import cn.hutool.core.util.RandomUtil;
    
    public class MyMessageListener implements MessageListener{
    	String name = "consumer-"+ RandomUtil.randomString(5);
        public  MyMessageListener() {
            System.out.println(name + " started");
        }
         
        public void onMessage(Message message) {
            TextMessage textMessage=(TextMessage)message;       
            try {
                System.out.println(name+" 接收到消息:"+textMessage.getText());
            } catch (JMSException e) {
                e.printStackTrace();
            }
        }
    }
    

    TestConsumer

    消费者测试类,他其实什么都没做。 虽然它什么都没做,但是因为他是运行在 spring框架下的测试,所以一旦启动,就会导致一个新的 DefaultMessageListenerContainer 被启动,间接地导致 一个新的 MyMessageListener 被启动。 于是也就充当了消费者的角色了。
    其中的,

    System.in.read();
    

    与这个类似的, TestProducer 类的启动,也会导致一个 MyMessageListener 被启动,所以 TestProducer 本身既是一个生产者,也是一个消费者。

    package edu.hpu.test;
    
    import java.io.IOException;
    
    import org.junit.Before;
    import org.junit.Test;
    import org.junit.runner.RunWith;
    import org.springframework.test.context.ContextConfiguration;
    import org.springframework.test.context.junit4.SpringJUnit4ClassRunner;
    
    import edu.hpu.util.ActiveMQUtil;
    
    @RunWith(SpringJUnit4ClassRunner.class)
    @ContextConfiguration(locations="classpath:spring_jms.xml")
    public class TestConsumer {
    	@Before
        public void checkServer() {
            ActiveMQUtil.checkServer();
        }
         
        @Test
        public void test(){
            try {
                //写这个是为了不让当前测试退出。  因为 spring的配置, MyMessageListener 会自动启动
                System.in.read();
            } catch (IOException e) {
                e.printStackTrace();
            }       
        }   
    }
    

    启动

    运行TestConsumer一次,运行一次TestProducter。
    在这里插入图片描述

    在这里插入图片描述

    模式切换

    在spring_jms.xml中,

        <!--这个是队列目的地, ActiveMQQueue 就表示队列模式。 如果要用主题模式就改成 ActiveMQTopic就行了 --> 
        <bean id="textDestination" class="org.apache.activemq.command.ActiveMQQueue"> 
            <constructor-arg value="queue_style"/> 
        </bean> 
    

    参考:

    【1】、http://how2j.cn/k/message/message-activemq-spring/2028.html#nowhere

    cs