当前位置 博文首页 > 三分恶的博客:消息中间件学习笔记--ActiveMQ(四、Spring方式)
前面学的是jms模式,现在是Spring模式。感觉现在好像没有spring整合不了的东西啊,如果有,那一定是要原地死亡了。
还是用之前的项目,
添加 spring , junit依赖。
<project xmlns="http://maven.apache.org/POM/4.0.0" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd">
<modelVersion>4.0.0</modelVersion>
<groupId>edu.hpu</groupId>
<artifactId>ActiveMQDemo</artifactId>
<version>0.0.1-SNAPSHOT</version>
<packaging>jar</packaging>
<name>ActiveMQDemo</name>
<url>http://maven.apache.org</url>
<properties>
<project.build.sourceEncoding>UTF-8</project.build.sourceEncoding>
</properties>
<dependencies>
<dependency>
<groupId>org.apache.activemq</groupId>
<artifactId>activemq-all</artifactId>
<version>5.15.9</version>
</dependency>
<dependency>
<groupId>cn.hutool</groupId>
<artifactId>hutool-all</artifactId>
<version>4.3.1</version>
</dependency>
<dependency>
<groupId>org.springframework</groupId>
<artifactId>spring-jms</artifactId> <!-- 整合包 -->
<version>4.3.2.RELEASE</version>
</dependency>
<dependency>
<groupId>org.springframework</groupId>
<artifactId>spring-test</artifactId>
<version>4.2.4.RELEASE</version>
</dependency>
<dependency>
<groupId>junit</groupId>
<artifactId>junit</artifactId>
<version>4.9</version>
</dependency>
</dependencies>
</project>
在resouces目录下创建spring_jms.xml,添加对activemq的相关配置。
<?xml version="1.0" encoding="UTF-8"?>
<beans xmlns="http://www.springframework.org/schema/beans"
xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
xmlns:p="http://www.springframework.org/schema/p"
xmlns:context="http://www.springframework.org/schema/context"
xmlns:dubbo="http://code.alibabatech.com/schema/dubbo"
xmlns:mvc="http://www.springframework.org/schema/mvc"
xsi:schemaLocation="http://www.springframework.org/schema/beans http://www.springframework.org/schema/beans/spring-beans.xsd
http://www.springframework.org/schema/mvc http://www.springframework.org/schema/mvc/spring-mvc.xsd
http://code.alibabatech.com/schema/dubbo http://code.alibabatech.com/schema/dubbo/dubbo.xsd
http://www.springframework.org/schema/context http://www.springframework.org/schema/context/spring-context.xsd">
<context:component-scan base-package="edu.hpu"></context:component-scan>
<!-- 真正可以产生Connection的ConnectionFactory,由对应的 JMS服务厂商提供-->
<bean id="targetConnectionFactory" class="org.apache.activemq.ActiveMQConnectionFactory">
<property name="brokerURL" value="tcp://127.0.0.1:61616"/>
</bean>
<!-- Spring用于管理真正的ConnectionFactory的ConnectionFactory -->
<bean id="connectionFactory" class="org.springframework.jms.connection.SingleConnectionFactory">
<!-- 目标ConnectionFactory对应真实的可以产生JMS Connection的ConnectionFactory -->
<property name="targetConnectionFactory" ref="targetConnectionFactory"/>
</bean>
<!-- Spring提供的JMS工具类,它可以进行消息发送、接收等 -->
<bean id="jmsTemplate" class="org.springframework.jms.core.JmsTemplate">
<!-- 这个connectionFactory对应的是我们定义的Spring提供的那个ConnectionFactory对象 -->
<property name="connectionFactory" ref="connectionFactory"/>
</bean>
<!--这个是队列目的地, ActiveMQQueue 就表示队列模式。 如果要用主题模式就改成 ActiveMQTopic就行了 -->
<bean id="textDestination" class="org.apache.activemq.command.ActiveMQQueue">
<constructor-arg value="queue_style"/>
</bean>
<!-- 我的监听类 -->
<bean id="myMessageListener" class="edu.hpu.MyMessageListener"></bean>
<!-- 消息监听容器,会伴随spring的启动 -->
<bean class="org.springframework.jms.listener.DefaultMessageListenerContainer">
<property name="connectionFactory" ref="connectionFactory" />
<property name="destination" ref="textDestination" />
<property name="messageListener" ref="myMessageListener" />
</bean>
</beans>
Producter:
package edu.hpu;
import javax.jms.Destination;
import javax.jms.JMSException;
import javax.jms.Message;
import javax.jms.Session;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.jms.core.JmsTemplate;
import org.springframework.jms.core.MessageCreator;
import org.springframework.stereotype.Component;
@Component
public class Producter {
@Autowired
private JmsTemplate jmsTemplate;
@Autowired
private Destination textDestination;
public void sendTextMessage(final String text) {
jmsTemplate.send(textDestination,new MessageCreator() {
public Message createMessage(Session session) throws JMSException {
// TODO Auto-generated method stub
return session.createTextMessage(text);
}
});
}
}
测试生产类,发送一百条消息。
package edu.hpu.test;
import org.junit.Before;
import org.junit.Test;
import org.junit.runner.RunWith;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.cglib.core.SpringNamingPolicy;
import org.springframework.test.context.ContextConfiguration;
import org.springframework.test.context.junit4.SpringJUnit4ClassRunner;
import edu.hpu.Producter;
import edu.hpu.util.ActiveMQUtil;
@RunWith(SpringJUnit4ClassRunner.class)
@ContextConfiguration(locations="classpath:spring_jms.xml")
public class TestProducter {
@Autowired
private Producter producter;
@Before
public void checkServer() {
ActiveMQUtil.checkServer();
}
@Test
public void testSend() {
for (int i = 0; i < 100; i++) {
producter.sendTextMessage("消息 " + i);
}
}
}
监听类,用于获取新的消息。
package edu.hpu;
import javax.jms.JMSException;
import javax.jms.Message;
import javax.jms.MessageListener;
import javax.jms.TextMessage;
import cn.hutool.core.util.RandomUtil;
public class MyMessageListener implements MessageListener{
String name = "consumer-"+ RandomUtil.randomString(5);
public MyMessageListener() {
System.out.println(name + " started");
}
public void onMessage(Message message) {
TextMessage textMessage=(TextMessage)message;
try {
System.out.println(name+" 接收到消息:"+textMessage.getText());
} catch (JMSException e) {
e.printStackTrace();
}
}
}
消费者测试类,他其实什么都没做。 虽然它什么都没做,但是因为他是运行在 spring框架下的测试,所以一旦启动,就会导致一个新的 DefaultMessageListenerContainer 被启动,间接地导致 一个新的 MyMessageListener 被启动。 于是也就充当了消费者的角色了。
其中的,
System.in.read();
与这个类似的, TestProducer 类的启动,也会导致一个 MyMessageListener 被启动,所以 TestProducer 本身既是一个生产者,也是一个消费者。
package edu.hpu.test;
import java.io.IOException;
import org.junit.Before;
import org.junit.Test;
import org.junit.runner.RunWith;
import org.springframework.test.context.ContextConfiguration;
import org.springframework.test.context.junit4.SpringJUnit4ClassRunner;
import edu.hpu.util.ActiveMQUtil;
@RunWith(SpringJUnit4ClassRunner.class)
@ContextConfiguration(locations="classpath:spring_jms.xml")
public class TestConsumer {
@Before
public void checkServer() {
ActiveMQUtil.checkServer();
}
@Test
public void test(){
try {
//写这个是为了不让当前测试退出。 因为 spring的配置, MyMessageListener 会自动启动
System.in.read();
} catch (IOException e) {
e.printStackTrace();
}
}
}
运行TestConsumer一次,运行一次TestProducter。
在spring_jms.xml中,
<!--这个是队列目的地, ActiveMQQueue 就表示队列模式。 如果要用主题模式就改成 ActiveMQTopic就行了 -->
<bean id="textDestination" class="org.apache.activemq.command.ActiveMQQueue">
<constructor-arg value="queue_style"/>
</bean>
【1】、http://how2j.cn/k/message/message-activemq-spring/2028.html#nowhere
cs