message Queue消息队列
1.初识MQ
1.1.同步和异步通讯
微服务间通讯有同步和异步两种方式:
同步通讯:就像打电话,需要实时响应。
异步通讯:就像发邮件,不需要马上回复。
两种方式各有优劣,打电话可以立即得到响应,但是你
却不能跟多个人同时通话。发送邮件可以同时与多个人收发邮件,但是往往响应会有延迟。
1.1.1.同步通讯
总结:
- 同步调用的优点:
- 时效性较强,可以立即得到结果
- 同步调用的问题:
- 耦合度高
- 性能和吞吐能力下降
- 有额外的资源消耗
- 有级联失败问题
1.1.2.异步通讯
异步调用则可以避免上述问题:
我们以购买商品为例,用户支付后需要调用订单服务完成订单状态修改,调用物流服务,从仓库分配响应的库存并准备发货。
在事件模式中,支付服务是事
件发布者(publisher),在支付完成后只需要发布一个支付成功的事件(event),事件中带上订单id。
订单服务和物流服务是事件订阅者(Consumer),订阅支付成功的事件,监听到事件后完成自己业务即可。
为了解除事件发布者与订阅者之间的耦合,两者并不是直接通信,而是有一个中间人(Broker)。发布者发布事件到Broker,不关心谁来订阅事件。订阅者从Broker订阅事件,不关心谁发来的消息。
Broker 是一个像数据总线一样的东西,所有的服务要接收数据和发送数据都发到这个总线上,这个总线就像协议一样,让服务间的通讯变得标准和可控。
总结:
- 好处:
- 吞吐量提升:无需等待订阅者处理完成,响应更快速
- 故障隔离:服务没有直接调用,不存在级联失败问题
- 调用间没有阻塞,不会造成无效的资源占用
- 耦合度极低,每个服务都可以灵活插拔,可替换
- 流量削峰:不管发布事件的流量波动多大,都由Broker接收,订阅者可以按照自己的速度去处理事件
- 缺点:
- 架构复杂了,业务没有明显的流程线,不好管理
- 需要依赖于Broker的可靠、安全、性能
1.2.技术对比:
MQ,中文是消息队列(MessageQueue),字面来看就是存放消息的队列。也就是事件驱动架构中的Broker。
比较常见的MQ实现:
- ActiveMQ
- RabbitMQ
- RocketMQ
- Kafka
几种常见MQ的对比:
RabbitMQ | ActiveMQ | RocketMQ | Kafka | |
---|---|---|---|---|
公司/社区 | Rabbit | Apache | 阿里 | Apache |
开发语言 | Erlang | Java | Java | Scala&Java |
协议支持 | AMQP,XMPP,SMTP,STOMP | OpenWire,STOMP,REST,XMPP,AMQP | 自定义协议 | 自定义协议 |
可用性 | 高 | 一般 | 高 | 高 |
单机吞吐量 | 一般 | 差 | 高 | 非常高 |
消息延迟 | 微秒级 | 毫秒级 | 毫秒级 | 毫秒以内 |
消息可靠性 | 高 | 一般 | 高 | 一般 |
追求可用性:Kafka、 RocketMQ 、RabbitMQ
追求可靠性:RabbitMQ、RocketMQ
追求吞吐能力:RocketMQ、Kafka
追求消息低延迟:RabbitMQ、Kafka
2.快速入门
2.1 安装RabbitMQ
docker run -d --name rabbitmq -p 5671:5671 -p 5672:5672 -p 4369:4369 -p 25672:25672 -p 15671:15671 -p 15672:15672 rabbitmq:management
2.2 基本概念
mq的基本结构
3.Sprng整合RabbitMQ
3.1 基本配置
- 引入spring-boot-starter-amqp配置文件:RabbitAutoConfiguration 自动配置了 AmqpAdmin、RabbitTemplate、CachingConnectionFactory、RabbitMessagingTemplate
- 添加@EnableRabbit (只使用创建组件,发消息可以不加)题外话:spring整合组件的注解 @Enable....
- 在配置文件配置 host、port、virtual-host 等信息RabbitProperties --> prefix = "spring.rabbitmq"
- 发送信息时,如果发送的是个对象,则会使用序列号机制,将对象写入。所有这个对象必须实现 Serializable 接口。也可以通过配置,使用json实现序列化
@Configuration public class MyRabbitConfig { @Bean public MessageConverter messageConverter() { return new Jackson2JsonMessageConverter(); } }
3.2 基本案例
1、如何创建交换机(Exchange)、队列(Queue)、绑定关系(Binding)
1.1、使用AmqpAdmin进行创建
2、如何收发消息
2.1、发消息:rabbitTemplate
2.2:收消息:@RabbitListener注解
3.2.1 创建交换机(Exchange)
首先注入amqpAdmin
@Autowired
private AmqpAdmin amqpAdmin;
@Test
void createExchange() {
DirectExchange directExchange = new DirectExchange("hello-java-exchange",true,false);
amqpAdmin.declareExchange(directExchange);
}
3.2.2 创建队列(Queue)
@Test
void createQueue() {
Queue queue = new Queue("hello-java-queue",true,false,false);
amqpAdmin.declareQueue(queue);
}
3.2.3 创建绑定关系(Binding)
@Test
void createBinding() {
Binding binding = new Binding("hello-java-queue", Binding.DestinationType.QUEUE,
"hello-java-exchange","hallo.java",null);
amqpAdmin.declareBinding(binding);
}
3.2.3 测试发送消息
@Test
void sendMessageTest() {
String msg = "Hello Word!";
rabbitTemplate.convertAndSend("hello-java-exchange","hallo.java", msg);
}
发送信息时,如果发送的是个对象,则会使用序列号机制,将对象写入。所有这个对象必须实现 Serializable 接口。
@Test
void sendMessageTest() {
// 如果发送的是个对象,则会使用序列号机制,将对象写入。所有这个对象必须实现 Serializable 接口
OrderReturnReasonEntity reasonEntity = new OrderReturnReasonEntity();
reasonEntity.setId(1L);
reasonEntity.setCreateTime(new Date());
reasonEntity.setName("haha");
rabbitTemplate.convertAndSend("hello-java-exchange","hallo.java", reasonEntity);
}
也可以通过配置,使用json实现序列化
@Configuration
public class MyRabbitConfig {
@Bean
public MessageConverter messageConverter() {
return new Jackson2JsonMessageConverter();
}
}
3.2.4 监听消息
需要在方法上添加@RabbitListener
注解,使用@RabbitListener
,必须有@EnableRabbit
。
其中,queues需要声明监听的队列,类型为数组,可添加多个
监听方法入参类型可以是:
- Message 原生信息:消息头和消息体
- T<发送消息的类型> spring自动转化
- Channel 当前传输数据的通道
/**
* queues:声明需要监听的所有队列
* 入参类型可以是:
* @param message Message 原生信息:消息头和消息体
* @param content T<发送消息的类型> spring自动转化
* @param channel 当前传输数据的通道
*
* Queue可以有多个用户监听,但只能有一个收到消息,只要有一个用户收到消息,队列就会删除该数据
*/
@RabbitListener(queues = {"hello-java-queue"})
public void receiveMessage(Message message, OrderReturnReasonEntity content, Channel channel){
// 消息体
byte[] body = message.getBody();
// 消息头属性信息
MessageProperties messageProperties = message.getMessageProperties();
System.out.println("原生信息:"+message+"===="+"对应实体类:"+content);
}
Queue可以有多个用户监听,但只能有一个收到消息,只要有一个用户收到消息,队列就会删除该数据
如果一个队列中有多种数据,可以使用@RabbitHandler
+重载来解决
可以将@RabbitListener(queues = {"hello-java-queue"})
放在类上,然后再方法上添加@RabbitHandler
@RabbitHandler
public void receiveMessage(OrderReturnReasonEntity content){
System.out.println("对应实体类:"+content);
}
@RabbitHandler
public void receiveMessage(OrderEntity content){
System.out.println("对应实体类:"+content);
}
3.3 消息确认机制
为了保证消息不丢失,可靠抵达,可以使用事务消息,但性能会下降250倍,为此引入确认机制。
• publisher confirmCallback 确认模式
• publisher returnCallback 未投递到 queue 退回模式
• consumer ack机制
3.3.1 基本流程
- ConfirmCallback
- spring.rabbitmq.publisher-confirms=true
- 在创建 connectionFactory 的时候设置 PublisherConfirms(true) 选项,开启confirmcallback 。
- CorrelationData:用来表示当前消息唯一性。
- 消息只要被 broker 接收到就会执行 confirmCallback,如果是 cluster 模式,需要所有broker 接收到才会调用 confirmCallback。
- 被 broker 接收到只能表示 message 已经到达服务器,并不能保证消息一定会被投递到目标 queue 里。所以需要用到接下来的 returnCallback 。
- spring.rabbitmq.publisher-confirms=true
- ReturnCallback
- spring.rabbitmq.publisher-returns=true
- spring.rabbitmq.template.mandatory=true
- confrim 模式只能保证消息到达 broker,不能保证消息准确投递到目标 queue 里。在有些业务场景下,我们需要保证消息一定要投递到目标 queue 里,此时就需要用到return 退回模式。
- 这样如果未能投递到目标 queue 里将调用 returnCallback ,可以记录下详细到投递数据,定期的巡检或者自动纠错都需要这些数据。
- Ack消息确认机制
- 消费者获取到消息,成功处理,可以回复Ack给Broker
- basic.ack用于肯定确认;broker将移除此消息
- basic.nack用于否定确认;可以指定broker是否丢弃此消息,可以批量
- basic.reject用于否定确认;同上,但不能批量
- 默认自动ack,消息被消费者收到,就会从broker的queue中移除
- queue无消费者,消息依然会被存储,直到消费者消费
- 消费者收到消息,默认会自动ack。但是如果无法确定此消息是否被处理完成,或者成功处理。我们可以开启手动ack模式
- 消息处理成功,ack(),接受下一个消息,此消息broker就会移除
- 消息处理失败,nack()/reject(),重新发送给其他人进行处理,或者容错处理后ack
- 消息一直没有调用ack/nack方法,broker认为此消息正在被处理,不会投递给别人,此时客户端断开,消息不会被broker移除,会投递给别人
- 消费者获取到消息,成功处理,可以回复Ack给Broker
3.3.2 代码实现
- 服务器收到消息就回调
- spring.rabbitmq.publisher-confirm-type: correlated
- 设置确认回调 ConfirmCallback
- 消息正确抵达队列回调
- spring.rabbitmq.publisher-returns: true spring.rabbitmq.template.mandatory: true
- 设置回调 ReturnsCallback
- 消费端确认(保证每个消息被正确消费,此时才可以broker删除这个消息)
- 默认自动确认,只要消息接收到,客户端就会自己确认,队列中就会移除这个消息存在问题:当接收消息时就自动回复,队列中就会删除,但若在处理消息时出错或者服务器宕机时,消息就会丢失
- 手动确认,只要没有告诉mq消息被签收,消息就一直是Unacked状态,不会丢失
- spring.rabbitmq.listener.simple.acknowledge-mode: manual
- 消费端手动签收
- 确认签收: channel.basicAck(deliveryTag, false);deliveryTag:交货标签 multiple:批量模式
- 拒绝签收: channel.basicNack(deliveryTag, false, false);deliveryTag:交货标签 multiple:批量模式 requeue:是否重新入队
/**
* @author : ctc
* @createTime : 2023/1/3 19:55
*/
@Slf4j
@Configuration
public class MyRabbitConfig {
@Autowired
private RabbitTemplate rabbitTemplate;
/**
* 使用json实现序列化
* @return
*/
@Bean
public MessageConverter messageConverter() {
return new Jackson2JsonMessageConverter();
}
/**
* 定制RabbitTemplate
*
* 1、服务器收到消息就回调
* 1.1、spring.rabbitmq.publisher-confirm-type: correlated
* 1.2、设置确认回调 ConfirmCallback
* 2、消息正确抵达队列回调
* 2.1、spring.rabbitmq.publisher-returns: true
* spring.rabbitmq.template.mandatory: true
* 2.2、设置回调 ReturnsCallback
* 3、消费端确认(保证每个消息被正确消费,此时才可以broker删除这个消息)
* 3.1、默认自动确认,只要消息接收到,客户端就会自己确认,队列中就会移除这个消息
* 存在问题:当接收消息时就自动回复,队列中就会删除,但若在处理消息时出错或者服务器宕机时,消息就会丢失
* 3.2、手动确认
* 只要没有告诉mq消息被签收,消息就一直是Unacked状态,不会丢失
* 3.2.1、spring.rabbitmq.listener.simple.acknowledge-mode: manual
* 3.2.2、消费端手动签收
* 确认签收:
* channel.basicAck(deliveryTag, false);
* deliveryTag:交货标签 multiple:批量模式
* 拒绝签收:
* channel.basicNack(deliveryTag, false, false);
* deliveryTag:交货标签 multiple:批量模式 requeue:是否重新入队
*
* @PostConstruct MyRabbitConfig 对象创建完成之后,调用这个方法
*/
@PostConstruct
public void initRabbitTemplate() {
// 设置确认回调
rabbitTemplate.setConfirmCallback(new RabbitTemplate.ConfirmCallback() {
/**
* 只要消息抵达服务器,ack就等于true
* @param correlationData 当前消息的唯一关联数据(消息的唯一Id)
* @param ack 消息是否成功收到
* @param cause 失败的原因
*/
@Override
public void confirm(CorrelationData correlationData, boolean ack, String cause) {
log.info("correlationData:{},ack:{},cause:{}", correlationData, ack, cause);
}
});
// 设置消息正确抵达队列回调
rabbitTemplate.setReturnsCallback(new RabbitTemplate.ReturnsCallback() {
/**
* 失败回调:只要消息没有投递给指定的队列,就触发这个方法
* @param returnedMessage 封装着返回的信息
* Message message 投递失败的信息的详细信息
* int replyCode 回复的状态码
* tring replyText 回复的文本内容
* String exchange 当时发送给了哪个交换机
* String routingKey 使用了哪个路由键
*/
@Override
public void returnedMessage(ReturnedMessage returnedMessage) {
// 投递失败的信息的详细信息
Message message = returnedMessage.getMessage();
// 回复的状态码
int replyCode = returnedMessage.getReplyCode();
// 回复的文本内容
String replyText = returnedMessage.getReplyText();
// 当时发送给了哪个交换机
String exchange = returnedMessage.getExchange();
// 使用了哪个路由键
String routingKey = returnedMessage.getRoutingKey();
log.info("message:{},replyCode:{},replyCode:{},replyText:{},routingKey:{},",message,replyCode,replyText,exchange,routingKey);
}
});
}
}
Comments NOTHING