消息积压的场景有很多,如果发送的消息没有得到及时回复,则会导致持久化消息不断积压而得不到释放,从而堵塞消息队列。对于这种情况,可以通过配置消息的过期时间和死信队列处理来预防。
TTL消息
RabbitMQ支持消息的过期时间,即之前某篇博客在消息属性中设置的expiration("10000")
,也支持在队列层面配置队列中消息的过期时间,从消息入队列开始计算,只要超过了队列的超时时间配置,那么消息会自动地清除。
配置队列的消息过期时间:在声明队列的时候配置队列参数x-message-ttl
:
Map<String, Object> arguments = new HashMap<>();
arguments.put("x-message-ttl", 10000);
channel.queueDeclare(queueName,true,false, false, arguments);
死信队列
概述
DLX,Dead-Letter-Exchange
利用DLX,当消息在一个队列中变成死信 (dead message)之后,它能被重新publish到另一个Exchange,这个Exchange就是DLX
消息变成死信有以下几种情况
+ 消息被拒绝(basic.reject/basic.nack),并且requeue=false
+ 消息TTL过期
+ 队列达到最大长度
DLX特点:
- DLX也是一个正常的Exchange, 和一般的Exchange没有区别, 它能在任何的队列上被指定,实际上就是设置某个队列的属性。
- 当这个队列中有死信时,RabbitMQ就会自动的将这个消息重新发布到设置的Exchange上去,进而被路由到另一个队列。
- 可以监听这个队列中消息做相应的处理,这个特性可以弥补RabbitMQ3.0以前支持的immediate参数的功能。
声明队列时添加参数x-dead-letter-exchange
,值为死信队列交换机的名字。
示例
消费端:
public class Comsumer {
public static void main(String[] args) throws Exception {
ConnectionFactory connectionFactory = new ConnectionFactory();
connectionFactory.setHost("127.0.0.1");
connectionFactory.setPort(5672);
connectionFactory.setUsername("guest");
connectionFactory.setPassword("guest");
connectionFactory.setVirtualHost("/");
Connection connection = connectionFactory.newConnection();
Channel channel = connection.createChannel();
String exchange = "test_dlx_exchange";
String routingKey = "dlx.#";
String queueName = "test_consumer_queue";
String dlxName = "dlx.exchange";
String dlxQueueName = "dlx.queue";
//声明并绑定死信队列到交换器上
channel.exchangeDeclare(dlxName, "topic",true,false,null);
channel.queueDeclare(dlxQueueName,true,false,false,null);
channel.queueBind(dlxQueueName, dlxName, "#");
Map<String, Object> arguments = new HashMap<>();
//设置队列消息ttl
arguments.put("x-message-ttl", 10000);
//设置死信队列交换器
arguments.put("x-dead-letter-exchange",dlxName);
//声明一个正常接收消息的队列和交换机
channel.exchangeDeclare(exchange, "topic",true,false,null);
channel.queueDeclare(queueName,true,false, false, arguments);
channel.queueBind(queueName, exchange,routingKey);
com.rabbitmq.client.Consumer consumer = new DefaultConsumer(channel) {
@Override
//消费消息
public void handleDelivery(String consumerTag,
Envelope envelope,
AMQP.BasicProperties properties,
byte[] body)
throws IOException
{
String routingKey = envelope.getRoutingKey();
String contentType = properties.getContentType();
System.out.println("消费的路由键:"+routingKey);
System.out.println("消费的内容类型:"+contentType);
long deliveryTag = envelope.getDeliveryTag();
// 确认消息
channel.basicAck(deliveryTag, false);
System.out.println("消费的消息体内容:");
String bodyStr = new String(body,"UTF-8");
System.out.println(bodyStr);
}
};
//直接接收死信队列内的消息,这样正常队列没有消费者,过了10s后消息就会被转发到死信队列中。
channel.basicConsume(dlxQueueName,false,consumer);
}
}
生产端:
public class Producer {
public static void main(String[] args) throws Exception {
ConnectionFactory connectionFactory = new ConnectionFactory();
connectionFactory.setHost("127.0.0.1");
connectionFactory.setPort(5672);
connectionFactory.setUsername("guest");
connectionFactory.setPassword("guest");
connectionFactory.setVirtualHost("/");
Connection connection = connectionFactory.newConnection();
Channel channel = connection.createChannel();
String exchange = "test_dlx_exchange";
String routingKey = "dlx.save";
String msg = "Hello RabbitMQ ";
channel.basicPublish(exchange,routingKey,null,msg.getBytes());
channel.close();
connection.close();
}
}
先运行消费端,暂停消费端应用,再运行生产端,观察到一开始消息在正常队列中

过了十秒,消息进入死信队列

再次运行消费端应用,看到控制台输出消息
消费的路由键:dlx.save
消费的内容类型:null
消费的消息体内容:
Hello RabbitMQ
原创文章,作者:彭晨涛,如若转载,请注明出处:https://www.codetool.top/article/rabbitmq%e6%b6%88%e6%81%af%e7%a7%af%e5%8e%8b%e8%a7%a3%e5%86%b3%e6%96%b9%e6%a1%88-ttl%e4%b8%8e%e6%ad%bb%e4%bf%a1%e9%98%9f%e5%88%97/