本文共 4633 字,大约阅读时间需要 15 分钟。
RabbitMQ的大约的介绍,上一篇已经有介绍了,这篇不介绍,直接描述RabbitMQ与SpringMVC集成并实现发送消息和接收消息(持久化)。
使用了Spring-rabbit 发送消息和接收消息,我们使用的Maven来管理Jar包,在Maven的pom.xml文件中引入jar包
org.springframework.amqp spring-rabbit 1.3.6.RELEASE
1.实现生产者 第一步:是要设置调用安装RabbitMQ的IP、端口等
配置一个global.properties文件
第二步:通过SpringMVC把global.properties文件读进来
classpath:global.properties
第三步:配置 RabbitMQ服务器连接、创建rabbitTemplate 消息模板类等,在SpringMVC的配置文件加入下面这些
<bean id="rmqProducer2" class="cn.test.spring.rabbitmq.RmqProducer"></bean>
第四步:实现消息类实体和发送消息 类实体
/** * 消息 * */public class RabbitMessage implements Serializable{ private static final long serialVersionUID = -6487839157908352120L; private Class [] paramTypes;//参数类型 private String exchange;//交换器 private Object[] params; private String routeKey;//路由key public RabbitMessage(){} public RabbitMessage(String exchange,String routeKey,Object...params) { this.params=params; this.exchange=exchange; this.routeKey=routeKey; } @SuppressWarnings("rawtypes") public RabbitMessage(String exchange,String routeKey,String methodName,Object...params) { this.params=params; this.exchange=exchange; this.routeKey=routeKey; int len=params.length; Class[] clazzArray=new Class[len]; for(int i=0;i [] getParamTypes() { return paramTypes; } public Object[] getParams() { return params; } }
发送消息
/** * 生产着 * */public class RmqProducer{ @Resource private RabbitTemplate rabbitTemplate; /** * 发送信息 * @param msg */ public void sendMessage(RabbitMessage msg) { try { System.out.println(rabbitTemplate.getConnectionFactory().getHost()); System.out.println(rabbitTemplate.getConnectionFactory().getPort()); //发送信息 rabbitTemplate.convertAndSend(msg.getExchange(), msg.getRouteKey(), msg); } catch (Exception e) { } } }
说明: 1. rabbitTemplate.convertAndSend(msg.getExchange(), msg.getRouteKey(), msg);
源代码中的send调用的方法,一些发送消息帮我们实现好了。
2.上面的代码实现没申明交换器和队列,RabbitMQ不知交换器和队列他们的绑定关系,如果RabbitMQ管理器上没有对应的交换器和队列是不会新建的和关联的,需要手动关联。
我们也可以用代码申明:
rabbitAdmin要申明:eclareExchange方法 参数是交换器
BindingBuilder.bind(queue).to(directExchange).with(queueName);//将queue绑定到exchange
rabbitAdmin.declareBinding(binding);//声明绑定关系
源代码有这些方法:
这样就可以实现交换器和队列的绑定关系
交换器我们可以申明为持久化,还有使用完不会自动删除
TopicExchange 参数的说明:name是交换器名称,durable:true 是持久化 autoDelete:false使用完不删除
源代码:
队列也可以申明为持久化
第五步:实现测试类
@Resource private RmqProducer rmqProducer2; @Test public void test() throws IOException { String exchange="testExchange";交换器 String routeKey="testQueue";//队列 String methodName="test";//调用的方法 //参数 Map param=new HashMap (); param.put("data","hello"); RabbitMessage msg=new RabbitMessage(exchange,routeKey, methodName, param); //发送消息 rmqProducer2.sendMessage(msg); }
结果:RabbitMQ有一条消息
2.消费者
第一步:RabbitMQ服务器连接这些在生产者那边已经介绍了,这边就不介绍了,我们要配置 RabbitMQ服务器连接、创建rabbitTemplate 消息模板类、消息转换器、消息转换器监听器等,在SpringMVC的配置文件加入下面这些
说明:
1.org.springframework.amqp.rabbit.listener.SimpleMessageListenerContainer中的queues可以传入多个队列
2.org.springframework.amqp.rabbit.listener.adapter.MessageListenerAdapter
有哪个消费者适配器来处理 ,参数defaultListenerMethod是默认调用方法来处理消息。
3.交换器和队列的持久化在生产者有介绍过了。
4.org.springframework.amqp.core.Binding这个类的绑定,在SpringMVC配置文件中配置时,
DestinationType这个参数要注意点
源代码:
第二步:处理消息
/** * 消费者 * */public class RmqConsumer { public void rmqProducerMessage(Object object){ RabbitMessage rabbitMessage=(RabbitMessage) object; System.out.println(rabbitMessage.getExchange()); System.out.println(rabbitMessage.getRouteKey()); System.out.println(rabbitMessage.getParams().toString()); } }
在启动过程中会报这样的错误,可能是你的交换器和队列没配置好