博客
关于我
强烈建议你试试无所不能的chatGPT,快点击我
springboot2.x整合rabbitmq(二)
阅读量:2216 次
发布时间:2019-05-07

本文共 5298 字,大约阅读时间需要 17 分钟。

rabbitmq遵循AMQP协议,(协议介绍),springboot将整个过程化得编程封装得确实有时候让人不是很容易看懂。没关系,我们来讲讲怎么使用就好了。

springboot 1.x与2.x还是有挺大区别得。我这里使用得是springboot2.1.3.如有不一致地方可能是版本上得问题。

先看看官方文档得支持,地址:

 

以上文档大致意思是说,rabbitmq是基于AMQP协议得,轻量级、可靠、可升级、可拔插得消息中间件,spring 通过AMQP协议与rabbitmq进行交互

在springboot中使用RabbitMQ 只需要配置spring.rabbitmq.*。例如你只需要配置以下属性就可以了。

#rabbitmq配置(springboot2.1.3)spring.rabbitmq.host=127.0.0.1spring.rabbitmq.port=5672spring.rabbitmq.username=guestspring.rabbitmq.password=guest# 开启发送确认spring.rabbitmq.publisher-confirms=true# 开启发送失败退回spring.rabbitmq.publisher-returns=truespring.rabbitmq.listener.type=simple# 开启ACK#spring.rabbitmq.listener.direct.acknowledge-mode=manualspring.rabbitmq.listener.simple.acknowledge-mode=manual# 开启ack限制,服务器如果没有收到ack,则不会继续往某个消费者继续发送消息#限制每次发送一条数据。spring.rabbitmq.listener.simple.prefetch=1#spring.rabbitmq.listener.direct.prefetch=1#spring.rabbitmq.listener.direct.consumers-per-queue=4#同一个队列启动几个消费者spring.rabbitmq.listener.simple.concurrency=2#启动消费者最大数量spring.rabbitmq.listener.simple.max-concurrency=3#重试策略相关配置spring.rabbitmq.listener.simple.retry.enabled=truespring.rabbitmq.listener.simple.retry.stateless=falsespring.rabbitmq.listener.simple.retry.max-attempts=5#时间策略乘数因子spring.rabbitmq.listener.simple.retry.multiplier=1.0spring.rabbitmq.listener.simple.retry.initial-interval=1000msspring.rabbitmq.listener.simple.retry.max-interval=10000msspring.rabbitmq.listener.simple.default-requeue-rejected=true

当然还有很多得配置,大家自己可以去试试,完整得配置文件请在这里查看

(大家去看网络上其它得教程会发现配置文件少了一层 simple,或者direct,那就是springboot1.x版本了)

2.x修改messageListenerContainer,为SimpleMessageListenerContainer与DirectMessageListenerContainer

关于两者得区别可以参考 这边博客

springboot对MQ得支持已经简化到消息得发送使用 RabbitTemplate rabbitTemplate; 消息监听使用注解@

@RabbitListener(queues = "someQueue")//queues指的是队列名称(其中还有很多得属性,可以自行百度查阅相关属性得作用,也可以嵌套注解使用)

以@Bean得方式创建队列路由连接容器等。

总的来说非常方便,好下面上代码

RabbitConfig.java

package com.ydsh.framworks.config;import org.springframework.amqp.core.*;import org.springframework.context.annotation.Bean;import org.springframework.context.annotation.Configuration;/** * @Auth yaozhongjie * @Date 2019/3/14 17:53 **/@Configurationpublic class RabbitConfig {    @Bean    public Queue directQueue1() {        return new Queue(RabbitConstans.DIRECT_QUEUE1);    }    @Bean    public DirectExchange directExchange() {        return new DirectExchange(RabbitConstans.DIRECT_EXCHANGE);    }    @Bean    public Binding directBinding1() {        return BindingBuilder.bind(directQueue1()).to(directExchange()).with(RabbitConstans.DIRECT_BIND_KEY);    }}
RabbitConstans.java
package com.ydsh.framworks.config;/** * @Auth yaozhongjie * @Date 2019/3/18 11:57 **/public class RabbitConstans {    //redirect模式    public static final String DIRECT_QUEUE1 = "direct.queue1";    public static final String DIRECT_EXCHANGE = "direct.exchange";    public static final String DIRECT_BIND_KEY ="direct.bind.key" ;}

DirectSender.java

package com.ydsh.framworks.sender;import com.ydsh.framworks.config.RabbitConstans;import com.ydsh.framworks.entity.MqMessage;import org.springframework.amqp.core.Message;import org.springframework.amqp.rabbit.core.RabbitTemplate;import org.springframework.beans.factory.annotation.Autowired;import org.springframework.stereotype.Component;/** * @Auth yaozhongjie * @Date 2019/3/18 10:43 **/@Componentpublic class DirectSender implements RabbitTemplate.ReturnCallback {    @Autowired    RabbitTemplate rabbitTemplate;    public void send(MqMessage message){        rabbitTemplate.setReturnCallback(this);        this.rabbitTemplate.setConfirmCallback((data, ack, cause) -> {            if (!ack) {                System.out.println("消息发送失败" + cause + data.toString());            } else {                System.out.println("消息发送成功 ");            }        });        rabbitTemplate.convertAndSend(RabbitConstans.DIRECT_EXCHANGE,RabbitConstans.DIRECT_BIND_KEY,message);//这个message随便定义个对象,但是一定要实现序列化接口    }    @Override    public void returnedMessage(Message message, int i, String s, String s1, String s2) {        System.out.println("sender return success" + message.toString()+"==="+i+"==="+s1+"==="+s2);    }}

DirectReciver.java

package com.ydsh.framworks.reciver;import com.rabbitmq.client.Channel;import com.ydsh.framworks.entity.MqMessage;import lombok.extern.slf4j.Slf4j;import org.springframework.amqp.core.Message;import org.springframework.amqp.rabbit.annotation.RabbitHandler;import org.springframework.amqp.rabbit.annotation.RabbitListener;import org.springframework.stereotype.Component;import java.io.IOException;/** * @Auth yaozhongjie * @Date 2019/3/18 10:45 **/@Component@RabbitListener(queues =RabbitConstans.DIRECT_QUEUE1)@Slf4j //需要lombok包public class DirectReciver {    @RabbitHandler    public void receiveTopic1(MqMessage mqMessage, Channel channel, Message message){        log.info("MqMessage:{}",mqMessage);        //拒绝消息 这边之前测试死信队列得时候做得        try {            channel.basicReject(message.getMessageProperties().getDeliveryTag(),false);        } catch (IOException e) {            e.printStackTrace();        }        //确认消息也就是回复ACK        /*try {            channel.basicAck(message.getMessageProperties().getDeliveryTag(),false);            log.info("receiver success");        } catch (IOException e) {            e.printStackTrace();            log.info("receiver fail");        }*/    }}

好的就是这么简单。大家可以试试看

转载地址:http://ixkfb.baihongyu.com/

你可能感兴趣的文章
【LEETCODE】106-Construct Binary Tree from Inorder and Postorder Traversal
查看>>
【LEETCODE】202-Happy Number
查看>>
和机器学习和计算机视觉相关的数学
查看>>
十个值得一试的开源深度学习框架
查看>>
【LEETCODE】240-Search a 2D Matrix II
查看>>
【LEETCODE】53-Maximum Subarray
查看>>
【LEETCODE】215-Kth Largest Element in an Array
查看>>
【LEETCODE】241-Different Ways to Add Parentheses
查看>>
【LEETCODE】312-Burst Balloons
查看>>
【LEETCODE】232-Implement Queue using Stacks
查看>>
【LEETCODE】225-Implement Stack using Queues
查看>>
【LEETCODE】155-Min Stack
查看>>
【LEETCODE】20-Valid Parentheses
查看>>
【LEETCODE】290-Word Pattern
查看>>
【LEETCODE】36-Valid Sudoku
查看>>
【LEETCODE】205-Isomorphic Strings
查看>>
【LEETCODE】204-Count Primes
查看>>
【LEETCODE】228-Summary Ranges
查看>>
【LEETCODE】27-Remove Element
查看>>
【LEETCODE】66-Plus One
查看>>