RabbitMQ

发布于 2023-01-02  216 次阅读


message Queue消息队列

1.初识MQ

1.1.同步和异步通讯

微服务间通讯有同步和异步两种方式:

同步通讯:就像打电话,需要实时响应。

异步通讯:就像发邮件,不需要马上回复。

两种方式各有优劣,打电话可以立即得到响应,但是你

却不能跟多个人同时通话。发送邮件可以同时与多个人收发邮件,但是往往响应会有延迟。

1.1.1.同步通讯

总结:

  • 同步调用的优点:
    • 时效性较强,可以立即得到结果
  • 同步调用的问题:
    • 耦合度高
    • 性能和吞吐能力下降
    • 有额外的资源消耗
    • 有级联失败问题

1.1.2.异步通讯

异步调用则可以避免上述问题:

我们以购买商品为例,用户支付后需要调用订单服务完成订单状态修改,调用物流服务,从仓库分配响应的库存并准备发货。

在事件模式中,支付服务是事

件发布者(publisher),在支付完成后只需要发布一个支付成功的事件(event),事件中带上订单id。

订单服务和物流服务是事件订阅者(Consumer),订阅支付成功的事件,监听到事件后完成自己业务即可。

为了解除事件发布者与订阅者之间的耦合,两者并不是直接通信,而是有一个中间人(Broker)。发布者发布事件到Broker,不关心谁来订阅事件。订阅者从Broker订阅事件,不关心谁发来的消息。

image-20210422095356088.png Broker 是一个像数据总线一样的东西,所有的服务要接收数据和发送数据都发到这个总线上,这个总线就像协议一样,让服务间的通讯变得标准和可控。

总结:

  • 好处:
    • 吞吐量提升:无需等待订阅者处理完成,响应更快速
    • 故障隔离:服务没有直接调用,不存在级联失败问题
    • 调用间没有阻塞,不会造成无效的资源占用
    • 耦合度极低,每个服务都可以灵活插拔,可替换
    • 流量削峰:不管发布事件的流量波动多大,都由Broker接收,订阅者可以按照自己的速度去处理事件
  • 缺点:
    • 架构复杂了,业务没有明显的流程线,不好管理
    • 需要依赖于Broker的可靠、安全、性能

1.2.技术对比:

MQ,中文是消息队列(MessageQueue),字面来看就是存放消息的队列。也就是事件驱动架构中的Broker。

比较常见的MQ实现:

  • ActiveMQ
  • RabbitMQ
  • RocketMQ
  • Kafka

几种常见MQ的对比:

RabbitMQActiveMQRocketMQKafka
公司/社区RabbitApache阿里Apache
开发语言ErlangJavaJavaScala&Java
协议支持AMQP,XMPP,SMTP,STOMPOpenWire,STOMP,REST,XMPP,AMQP自定义协议自定义协议
可用性一般
单机吞吐量一般非常高
消息延迟微秒级毫秒级毫秒级毫秒以内
消息可靠性一般一般

追求可用性:Kafka、 RocketMQ 、RabbitMQ

追求可靠性:RabbitMQ、RocketMQ

追求吞吐能力:RocketMQ、Kafka

追求消息低延迟:RabbitMQ、Kafka


2.快速入门

2.1 安装RabbitMQ

docker run -d --name rabbitmq -p 5671:5671 -p 5672:5672 -p 4369:4369 -p 25672:25672 -p 15671:15671 -p 15672:15672 rabbitmq:management

2.2 基本概念

mq的基本结构

3.Sprng整合RabbitMQ

3.1 基本配置

  1. 引入spring-boot-starter-amqp配置文件:RabbitAutoConfiguration 自动配置了 AmqpAdminRabbitTemplate、CachingConnectionFactory、RabbitMessagingTemplate
  2. 添加@EnableRabbit (只使用创建组件,发消息可以不加)题外话:spring整合组件的注解 @Enable....
  3. 在配置文件配置 host、port、virtual-host 等信息RabbitProperties --> prefix = "spring.rabbitmq"
  4. 发送信息时,如果发送的是个对象,则会使用序列号机制,将对象写入。所有这个对象必须实现 Serializable 接口。也可以通过配置,使用json实现序列化@Configuration public class MyRabbitConfig { @Bean public MessageConverter messageConverter() { return new Jackson2JsonMessageConverter(); } }

3.2 基本案例

1、如何创建交换机(Exchange)、队列(Queue)、绑定关系(Binding)

1.1、使用AmqpAdmin进行创建

2、如何收发消息

2.1、发消息:rabbitTemplate

2.2:收消息:@RabbitListener注解

3.2.1 创建交换机(Exchange)

首先注入amqpAdmin

@Autowired
private AmqpAdmin amqpAdmin;
@Test
void createExchange() {
    DirectExchange directExchange = new DirectExchange("hello-java-exchange",true,false);
    amqpAdmin.declareExchange(directExchange);
}

3.2.2 创建队列(Queue)

@Test
void createQueue() {
    Queue queue = new Queue("hello-java-queue",true,false,false);
    amqpAdmin.declareQueue(queue);
}

3.2.3 创建绑定关系(Binding)

@Test
void createBinding() {
    Binding binding = new Binding("hello-java-queue", Binding.DestinationType.QUEUE,
            "hello-java-exchange","hallo.java",null);
    amqpAdmin.declareBinding(binding);
}

3.2.3 测试发送消息

@Test
void sendMessageTest() {
    String msg = "Hello Word!";
    rabbitTemplate.convertAndSend("hello-java-exchange","hallo.java", msg);
}

发送信息时,如果发送的是个对象,则会使用序列号机制,将对象写入。所有这个对象必须实现 Serializable 接口。

@Test
void sendMessageTest() {
    // 如果发送的是个对象,则会使用序列号机制,将对象写入。所有这个对象必须实现 Serializable 接口
    OrderReturnReasonEntity reasonEntity = new OrderReturnReasonEntity();
    reasonEntity.setId(1L);
    reasonEntity.setCreateTime(new Date());
    reasonEntity.setName("haha");

    rabbitTemplate.convertAndSend("hello-java-exchange","hallo.java", reasonEntity);
}

也可以通过配置,使用json实现序列化

@Configuration
public class MyRabbitConfig {
    @Bean
    public MessageConverter messageConverter() {
        return new Jackson2JsonMessageConverter();
    }
} 

3.2.4 监听消息

需要在方法上添加@RabbitListener注解,使用@RabbitListener ,必须有@EnableRabbit

其中,queues需要声明监听的队列,类型为数组,可添加多个

监听方法入参类型可以是:

  • Message 原生信息:消息头和消息体
  • T<发送消息的类型> spring自动转化
  • Channel 当前传输数据的通道
/**
 * queues:声明需要监听的所有队列
 * 入参类型可以是:
 * @param message Message 原生信息:消息头和消息体
 * @param content T<发送消息的类型> spring自动转化
 * @param channel 当前传输数据的通道
 *
 * Queue可以有多个用户监听,但只能有一个收到消息,只要有一个用户收到消息,队列就会删除该数据
 */
@RabbitListener(queues = {"hello-java-queue"})
public void receiveMessage(Message message, OrderReturnReasonEntity content, Channel channel){
    // 消息体
    byte[] body = message.getBody();
    // 消息头属性信息
    MessageProperties messageProperties = message.getMessageProperties();

    System.out.println("原生信息:"+message+"===="+"对应实体类:"+content);
}

Queue可以有多个用户监听,但只能有一个收到消息,只要有一个用户收到消息,队列就会删除该数据

如果一个队列中有多种数据,可以使用@RabbitHandler+重载来解决

可以将@RabbitListener(queues = {"hello-java-queue"}) 放在类上,然后再方法上添加@RabbitHandler

    @RabbitHandler
    public void receiveMessage(OrderReturnReasonEntity content){
        System.out.println("对应实体类:"+content);
    }
    @RabbitHandler
    public void receiveMessage(OrderEntity content){
        System.out.println("对应实体类:"+content);
    }

3.3 消息确认机制

为了保证消息不丢失,可靠抵达,可以使用事务消息,但性能会下降250倍,为此引入确认机制

publisher confirmCallback 确认模式

publisher returnCallback 未投递到 queue 退回模式

consumer ack机制

3.3.1 基本流程

  1. ConfirmCallback
    • spring.rabbitmq.publisher-confirms=true
      • 在创建 connectionFactory 的时候设置 PublisherConfirms(true) 选项,开启confirmcallback 。
      • CorrelationData:用来表示当前消息唯一性。
      • 消息只要被 broker 接收到就会执行 confirmCallback,如果是 cluster 模式,需要所有broker 接收到才会调用 confirmCallback。
      • 被 broker 接收到只能表示 message 已经到达服务器,并不能保证消息一定会被投递到目标 queue 里。所以需要用到接下来的 returnCallback 。
  2. ReturnCallback
    • spring.rabbitmq.publisher-returns=true
    • spring.rabbitmq.template.mandatory=true
      • confrim 模式只能保证消息到达 broker,不能保证消息准确投递到目标 queue 里。在有些业务场景下,我们需要保证消息一定要投递到目标 queue 里,此时就需要用到return 退回模式。
      • 这样如果未能投递到目标 queue 里将调用 returnCallback ,可以记录下详细到投递数据,定期的巡检或者自动纠错都需要这些数据。
  3. Ack消息确认机制
    • 消费者获取到消息,成功处理,可以回复Ack给Broker
      • basic.ack用于肯定确认;broker将移除此消息
      • basic.nack用于否定确认;可以指定broker是否丢弃此消息,可以批量
      • basic.reject用于否定确认;同上,但不能批量
    • 默认自动ack,消息被消费者收到,就会从broker的queue中移除
    • queue无消费者,消息依然会被存储,直到消费者消费
    • 消费者收到消息,默认会自动ack。但是如果无法确定此消息是否被处理完成,或者成功处理。我们可以开启手动ack模式
      • 消息处理成功,ack(),接受下一个消息,此消息broker就会移除
      • 消息处理失败,nack()/reject(),重新发送给其他人进行处理,或者容错处理后ack
      • 消息一直没有调用ack/nack方法,broker认为此消息正在被处理,不会投递给别人,此时客户端断开,消息不会被broker移除,会投递给别人

3.3.2 代码实现

  1. 服务器收到消息就回调
    1. spring.rabbitmq.publisher-confirm-type: correlated
    2. 设置确认回调 ConfirmCallback
  2. 消息正确抵达队列回调
    1. spring.rabbitmq.publisher-returns: true spring.rabbitmq.template.mandatory: true
    2. 设置回调 ReturnsCallback
  3. 消费端确认(保证每个消息被正确消费,此时才可以broker删除这个消息)
    1. 默认自动确认,只要消息接收到,客户端就会自己确认,队列中就会移除这个消息存在问题:当接收消息时就自动回复,队列中就会删除,但若在处理消息时出错或者服务器宕机时,消息就会丢失
    2. 手动确认,只要没有告诉mq消息被签收,消息就一直是Unacked状态,不会丢失
      1. spring.rabbitmq.listener.simple.acknowledge-mode: manual
      2. 消费端手动签收
        1. 确认签收: channel.basicAck(deliveryTag, false);deliveryTag:交货标签 multiple:批量模式
        2. 拒绝签收: channel.basicNack(deliveryTag, false, false);deliveryTag:交货标签 multiple:批量模式 requeue:是否重新入队
/**
 * @author : ctc
 * @createTime : 2023/1/3 19:55
 */
@Slf4j
@Configuration
public class MyRabbitConfig {

    @Autowired
    private RabbitTemplate rabbitTemplate;

    /**
     * 使用json实现序列化
     * @return
     */
    @Bean
    public MessageConverter messageConverter() {
        return new Jackson2JsonMessageConverter();
    }

    /**
     * 定制RabbitTemplate
     *
     * 1、服务器收到消息就回调
     *  1.1、spring.rabbitmq.publisher-confirm-type: correlated
     *  1.2、设置确认回调 ConfirmCallback
     * 2、消息正确抵达队列回调
     *  2.1、spring.rabbitmq.publisher-returns: true
     *      spring.rabbitmq.template.mandatory: true
     *  2.2、设置回调 ReturnsCallback
     * 3、消费端确认(保证每个消息被正确消费,此时才可以broker删除这个消息)
     *  3.1、默认自动确认,只要消息接收到,客户端就会自己确认,队列中就会移除这个消息
     *      存在问题:当接收消息时就自动回复,队列中就会删除,但若在处理消息时出错或者服务器宕机时,消息就会丢失
     *  3.2、手动确认
     *      只要没有告诉mq消息被签收,消息就一直是Unacked状态,不会丢失
     *      3.2.1、spring.rabbitmq.listener.simple.acknowledge-mode: manual
     *      3.2.2、消费端手动签收
     *          确认签收:
     *              channel.basicAck(deliveryTag, false);
     *              deliveryTag:交货标签  multiple:批量模式
     *          拒绝签收:
     *              channel.basicNack(deliveryTag, false, false);
     *              deliveryTag:交货标签  multiple:批量模式 requeue:是否重新入队
     *
     * @PostConstruct  MyRabbitConfig 对象创建完成之后,调用这个方法
     */
    @PostConstruct
    public void initRabbitTemplate() {
        // 设置确认回调
        rabbitTemplate.setConfirmCallback(new RabbitTemplate.ConfirmCallback() {
            /**
             * 只要消息抵达服务器,ack就等于true
             * @param correlationData 当前消息的唯一关联数据(消息的唯一Id)
             * @param ack 消息是否成功收到
             * @param cause 失败的原因
             */
            @Override
            public void confirm(CorrelationData correlationData, boolean ack, String cause) {
                log.info("correlationData:{},ack:{},cause:{}", correlationData, ack, cause);
            }
        });

        // 设置消息正确抵达队列回调
        rabbitTemplate.setReturnsCallback(new RabbitTemplate.ReturnsCallback() {
            /**
             * 失败回调:只要消息没有投递给指定的队列,就触发这个方法
             * @param returnedMessage 封装着返回的信息
             *          Message message    投递失败的信息的详细信息
             *          int replyCode      回复的状态码
             *          tring replyText    回复的文本内容
             *          String exchange    当时发送给了哪个交换机
             *          String routingKey  使用了哪个路由键
             */
            @Override
            public void returnedMessage(ReturnedMessage returnedMessage) {
                // 投递失败的信息的详细信息
                Message message = returnedMessage.getMessage();
                // 回复的状态码
                int replyCode = returnedMessage.getReplyCode();
                // 回复的文本内容
                String replyText = returnedMessage.getReplyText();
                // 当时发送给了哪个交换机
                String exchange = returnedMessage.getExchange();
                // 使用了哪个路由键
                String routingKey = returnedMessage.getRoutingKey();
                log.info("message:{},replyCode:{},replyCode:{},replyText:{},routingKey:{},",message,replyCode,replyText,exchange,routingKey);
            }
        });
    }
}