本文共 5298 字,大约阅读时间需要 17 分钟。
rabbitmq遵循AMQP协议,(协议介绍),springboot将整个过程化得编程封装得确实有时候让人不是很容易看懂。没关系,我们来讲讲怎么使用就好了。
springboot 1.x与2.x还是有挺大区别得。我这里使用得是springboot2.1.3.如有不一致地方可能是版本上得问题。
先看看官方文档得支持,地址:
以上文档大致意思是说,rabbitmq是基于AMQP协议得,轻量级、可靠、可升级、可拔插得消息中间件,spring 通过AMQP协议与rabbitmq进行交互
在springboot中使用RabbitMQ 只需要配置spring.rabbitmq.*。例如你只需要配置以下属性就可以了。
#rabbitmq配置(springboot2.1.3)spring.rabbitmq.host=127.0.0.1spring.rabbitmq.port=5672spring.rabbitmq.username=guestspring.rabbitmq.password=guest# 开启发送确认spring.rabbitmq.publisher-confirms=true# 开启发送失败退回spring.rabbitmq.publisher-returns=truespring.rabbitmq.listener.type=simple# 开启ACK#spring.rabbitmq.listener.direct.acknowledge-mode=manualspring.rabbitmq.listener.simple.acknowledge-mode=manual# 开启ack限制,服务器如果没有收到ack,则不会继续往某个消费者继续发送消息#限制每次发送一条数据。spring.rabbitmq.listener.simple.prefetch=1#spring.rabbitmq.listener.direct.prefetch=1#spring.rabbitmq.listener.direct.consumers-per-queue=4#同一个队列启动几个消费者spring.rabbitmq.listener.simple.concurrency=2#启动消费者最大数量spring.rabbitmq.listener.simple.max-concurrency=3#重试策略相关配置spring.rabbitmq.listener.simple.retry.enabled=truespring.rabbitmq.listener.simple.retry.stateless=falsespring.rabbitmq.listener.simple.retry.max-attempts=5#时间策略乘数因子spring.rabbitmq.listener.simple.retry.multiplier=1.0spring.rabbitmq.listener.simple.retry.initial-interval=1000msspring.rabbitmq.listener.simple.retry.max-interval=10000msspring.rabbitmq.listener.simple.default-requeue-rejected=true
当然还有很多得配置,大家自己可以去试试,完整得配置文件请在这里查看
(大家去看网络上其它得教程会发现配置文件少了一层 simple,或者direct,那就是springboot1.x版本了)
2.x修改messageListenerContainer,为SimpleMessageListenerContainer与DirectMessageListenerContainer
关于两者得区别可以参考 这边博客
springboot对MQ得支持已经简化到消息得发送使用 RabbitTemplate rabbitTemplate; 消息监听使用注解@
@RabbitListener(queues = "someQueue")//queues指的是队列名称(其中还有很多得属性,可以自行百度查阅相关属性得作用,也可以嵌套注解使用)
以@Bean得方式创建队列路由连接容器等。
总的来说非常方便,好下面上代码
RabbitConfig.java
package com.ydsh.framworks.config;import org.springframework.amqp.core.*;import org.springframework.context.annotation.Bean;import org.springframework.context.annotation.Configuration;/** * @Auth yaozhongjie * @Date 2019/3/14 17:53 **/@Configurationpublic class RabbitConfig { @Bean public Queue directQueue1() { return new Queue(RabbitConstans.DIRECT_QUEUE1); } @Bean public DirectExchange directExchange() { return new DirectExchange(RabbitConstans.DIRECT_EXCHANGE); } @Bean public Binding directBinding1() { return BindingBuilder.bind(directQueue1()).to(directExchange()).with(RabbitConstans.DIRECT_BIND_KEY); }}
RabbitConstans.java
package com.ydsh.framworks.config;/** * @Auth yaozhongjie * @Date 2019/3/18 11:57 **/public class RabbitConstans { //redirect模式 public static final String DIRECT_QUEUE1 = "direct.queue1"; public static final String DIRECT_EXCHANGE = "direct.exchange"; public static final String DIRECT_BIND_KEY ="direct.bind.key" ;}
DirectSender.java
package com.ydsh.framworks.sender;import com.ydsh.framworks.config.RabbitConstans;import com.ydsh.framworks.entity.MqMessage;import org.springframework.amqp.core.Message;import org.springframework.amqp.rabbit.core.RabbitTemplate;import org.springframework.beans.factory.annotation.Autowired;import org.springframework.stereotype.Component;/** * @Auth yaozhongjie * @Date 2019/3/18 10:43 **/@Componentpublic class DirectSender implements RabbitTemplate.ReturnCallback { @Autowired RabbitTemplate rabbitTemplate; public void send(MqMessage message){ rabbitTemplate.setReturnCallback(this); this.rabbitTemplate.setConfirmCallback((data, ack, cause) -> { if (!ack) { System.out.println("消息发送失败" + cause + data.toString()); } else { System.out.println("消息发送成功 "); } }); rabbitTemplate.convertAndSend(RabbitConstans.DIRECT_EXCHANGE,RabbitConstans.DIRECT_BIND_KEY,message);//这个message随便定义个对象,但是一定要实现序列化接口 } @Override public void returnedMessage(Message message, int i, String s, String s1, String s2) { System.out.println("sender return success" + message.toString()+"==="+i+"==="+s1+"==="+s2); }}
DirectReciver.java
package com.ydsh.framworks.reciver;import com.rabbitmq.client.Channel;import com.ydsh.framworks.entity.MqMessage;import lombok.extern.slf4j.Slf4j;import org.springframework.amqp.core.Message;import org.springframework.amqp.rabbit.annotation.RabbitHandler;import org.springframework.amqp.rabbit.annotation.RabbitListener;import org.springframework.stereotype.Component;import java.io.IOException;/** * @Auth yaozhongjie * @Date 2019/3/18 10:45 **/@Component@RabbitListener(queues =RabbitConstans.DIRECT_QUEUE1)@Slf4j //需要lombok包public class DirectReciver { @RabbitHandler public void receiveTopic1(MqMessage mqMessage, Channel channel, Message message){ log.info("MqMessage:{}",mqMessage); //拒绝消息 这边之前测试死信队列得时候做得 try { channel.basicReject(message.getMessageProperties().getDeliveryTag(),false); } catch (IOException e) { e.printStackTrace(); } //确认消息也就是回复ACK /*try { channel.basicAck(message.getMessageProperties().getDeliveryTag(),false); log.info("receiver success"); } catch (IOException e) { e.printStackTrace(); log.info("receiver fail"); }*/ }}
好的就是这么简单。大家可以试试看
转载地址:http://ixkfb.baihongyu.com/