本文参考资源:
RabbitMQ学习(五)——消息确认机制(AMQP事务)_大数据_Anumbrella-CSDN博客
可靠投递
生产端负责的任务有:
- 保障消息的成功发出
- 保障MQ节点的成功接收,并收到来自MQ节点的确认应答
- 完善的消息补偿机制
那么如何确保MQ(Broker)收到了消息?就要讲到RabbitMQ对消息确认机制的支持
消息确认机制
AMQP事务机制
在AMQP中当把信道设置成了事务模式之后,生产者和Broker之间会有一种发送/响应机制判断当前命令操作是否可以继续。
AMQP-client中与事务有关的主要有三个方法:
txSelect()
:开启事务txCommit()
:提交事务txRollback()
:回滚事务
当我们使用txSelect开始事务之后,我们就可以发送消息给Broker,如果txCommit提交成功了,则消息一定到达了Broker了,如果在txCommit执行之前Broker出现异常崩溃或者由于其他原因抛出异常,这个时候我们便可以捕获异常通过txRollback方法进行回滚事务。
但是事务模式要求生产者同步等待Broker的返回结果,所以性能不好,一般不使用。
发送方确认机制
发送方确认机制是RabbitMQ对AMQP的拓展实现,发送方确认模式是RabbitMQ对AMQP的扩展实现,把信道设置成确认模式之后,在该信道上发布的所有消息都会被分配一个唯一ID, 一旦消息被投递到所有匹配的队列中,该信道就会向生产者发送确认消息,在确认消息中包含了之前的唯一ID,从而让生产者知道消息已到达目的队列。发送方确认模式的最大优势是异步,生产者发送完一条消息后可继续发送下一条消息,当生产者收到确认消息后调用回调方法处理。由于没有事务回滚的概念,这种方式比事务模式轻了许多,其对Broker的性能影响相对来说也小了很多。
AMQP-client中与发送方确认有关的主要有三个方法:
confirmSelect()
:开启确认模式waitForConfirms()
:阻塞等待Broker确认addConfirmListener()
:这是支持异步确认的关键,可以让发送方在收到确认消息后调用接口内的回调方法。
channel.addConfirmListener(new ConfirmListener() {
@Override
//收到确认后的回调,deliveryTag每发送一条消息+1(id)
public void handleAck(long deliveryTag, boolean multiple) throws IOException {
}
@Override
//收到NACK后的回调
public void handleNack(long deliveryTag, boolean multiple) throws IOException {
}
});
需要注意 handleNack
方法只在比如磁盘写满了,MQ出现了一些异常,或者Queue容量到达上限了这类情况下调用,如果因为网络或宕机原因消息传递过程中丢失等情况不会受到NACK,这个时候就得考虑发送端这边定时判断是否收到ACK从而判断Broker是否收到消息。
路由退回
如果Broker收到了消息,但没有可路由的队列,或是队列已满,也不能实现可靠投递。针对这个情况RabbitMQ提供了ReturnListener,和ConfirmListener差不多的用法。但是发消息的时候使用
void basicPublish(String exchange, String routingKey, boolean mandatory, BasicProperties props, byte[] body)
throws IOException;
重载,其中Mandotory这个属性如果为true代表如果没有路由成功,则退回给生产段,如果为false则直接删除,这里必须设置为true。
channel.addReturnListener(new ReturnListener() {
@Override
//如果无法路由,退回
public void handleReturn(int replyCode, String replyText, String exchange, String routingKey, AMQP.BasicProperties properties, byte[] body) throws IOException {
}
});
消费确认
消费者ACK
有的时候不仅要保证发送端100%将消息发送到了Broker,还需要确认消费端是否收到消息或是是否消费了消息。
RabbitMQ支持消费端的两种消息回执机制:
+ 自动回执:Broker发送消息给接收端后立即删除该消息,不等待消费端回执。
+ 手动回执:Broker发送消息给接收端后暂不删除该消息,等待消费端回执后再删除。如果没有等到消费者的ACK,会将消息转发给其他消费者。
是否开启自动回执模式由消费端的basicConsume方法的autoAck参数决定,手动回执需要在自定义消费者中实现:
public void handleDelivery(String consumerTag,
Envelope envelope,
AMQP.BasicProperties properties,
byte[] body)
throws IOException
{
String routingKey = envelope.getRoutingKey();
String contentType = properties.getContentType();
System.out.println("消费的路由键:"+routingKey);
System.out.println("消费的内容类型:"+contentType);
long deliveryTag = envelope.getDeliveryTag();
// 手动回执
channel.basicAck(deliveryTag, false);
System.out.println("消费的消息体内容:");
String bodyStr = new String(body,"UTF-8");
System.out.println(bodyStr);
}
拒绝消息
当消费者目前不能处理该消息时,可以选择给Broker发送一个拒绝消息的指令,并且可以要求Broker将该消息丢弃或重新放入队列中。对应channel中的两个方法:
//退回一条消息,deliveryTag指定消息的deliveryTag,requeue为true重新放入队列,否则销毁
void basicReject(long deliveryTag, boolean requeue) throws IOException;
//退回多条消息,如果multiple为true代表除了已应答的消息,否则比当前deliveryTag小的消息全部拒绝
void basicNack(long deliveryTag, boolean multiple, boolean requeue)
throws IOException;
消息预取(消费端限流)
在实际场景中,如果对每条消息的处理时间不同,则可能导致有些消费者一直很忙,而有些消费者处理很快并一直空闲。这时可通过设置预取数量(PrefetchCount)限制每个消费者在收到下一个确认回执前一次最多可以接收多少条消息。例如,设置prefetchCount为1,则表示RabbitMQ服务器每次给每个消费者发送一条消息,在收到该消息的消费者ACK指令之前RabbitMQ不会再向该消费者发送新的消息。可以通过channel中的basicQos
方法设置预取数量:
要使用这个策略,必须使用手动回执,否则Broker不会等待ACK消息。
//prefetchSize指的是以字节度量的最大大小,0代表不限制,通常不用这个参数
void basicQos(int prefetchSize, int prefetchCount, boolean global) throws IOException;
//常用版本,global决定是生效于Channel还是Consumer
void basicQos(int prefetchCount, boolean global) throws IOException;
总结
有了以上技术支撑,就可以通过一系列的措施来实现消息100%能从生产端到达消费端,具体业务解决方案可以参考:
RabbitMQ消息100%可靠性投递的解决方案实现(一)_大数据_eluanshi12的博客-CSDN博客
原创文章,作者:彭晨涛,如若转载,请注明出处:https://www.codetool.top/article/rabbitmq%e5%8f%af%e9%9d%a0%e6%8a%95%e9%80%92%e5%92%8c%e6%b6%88%e8%b4%b9%e7%a1%ae%e8%ae%a4/