RabbitMQ消息积压解决方案-TTL与死信队列

消息积压的场景有很多,如果发送的消息没有得到及时回复,则会导致持久化消息不断积压而得不到释放,从而堵塞消息队列。对于这种情况,可以通过配置消息的过期时间和死信队列处理来预防。

TTL消息

RabbitMQ支持消息的过期时间,即之前某篇博客在消息属性中设置的expiration("10000"),也支持在队列层面配置队列中消息的过期时间,从消息入队列开始计算,只要超过了队列的超时时间配置,那么消息会自动地清除。

配置队列的消息过期时间:在声明队列的时候配置队列参数x-message-ttl

Map<String, Object> arguments = new HashMap<>();
arguments.put("x-message-ttl", 10000);
channel.queueDeclare(queueName,true,false, false, arguments);

死信队列

概述

DLX,Dead-Letter-Exchange

利用DLX,当消息在一个队列中变成死信 (dead message)之后,它能被重新publish到另一个Exchange,这个Exchange就是DLX

消息变成死信有以下几种情况
+ 消息被拒绝(basic.reject/basic.nack),并且requeue=false
+ 消息TTL过期
+ 队列达到最大长度

DLX特点:

  • DLX也是一个正常的Exchange, 和一般的Exchange没有区别, 它能在任何的队列上被指定,实际上就是设置某个队列的属性。
  • 当这个队列中有死信时,RabbitMQ就会自动的将这个消息重新发布到设置的Exchange上去,进而被路由到另一个队列。
  • 可以监听这个队列中消息做相应的处理,这个特性可以弥补RabbitMQ3.0以前支持的immediate参数的功能。

声明队列时添加参数x-dead-letter-exchange,值为死信队列交换机的名字。

示例

消费端:

public class Comsumer {
    public static void main(String[] args) throws Exception {
        ConnectionFactory connectionFactory = new ConnectionFactory();
        connectionFactory.setHost("127.0.0.1");
        connectionFactory.setPort(5672);
        connectionFactory.setUsername("guest");
        connectionFactory.setPassword("guest");
        connectionFactory.setVirtualHost("/");

        Connection connection = connectionFactory.newConnection();
        Channel channel = connection.createChannel();

        String exchange = "test_dlx_exchange";
        String routingKey = "dlx.#";
        String queueName = "test_consumer_queue";
        String dlxName = "dlx.exchange";
        String dlxQueueName = "dlx.queue";

        //声明并绑定死信队列到交换器上
        channel.exchangeDeclare(dlxName, "topic",true,false,null);
        channel.queueDeclare(dlxQueueName,true,false,false,null);
        channel.queueBind(dlxQueueName, dlxName, "#");

        Map<String, Object> arguments = new HashMap<>();
        //设置队列消息ttl
        arguments.put("x-message-ttl", 10000);
        //设置死信队列交换器
        arguments.put("x-dead-letter-exchange",dlxName);

        //声明一个正常接收消息的队列和交换机
        channel.exchangeDeclare(exchange, "topic",true,false,null);
        channel.queueDeclare(queueName,true,false, false, arguments);
        channel.queueBind(queueName, exchange,routingKey);


        com.rabbitmq.client.Consumer consumer = new DefaultConsumer(channel) {
            @Override
            //消费消息
            public void handleDelivery(String consumerTag,
                                       Envelope envelope,
                                       AMQP.BasicProperties properties,
                                       byte[] body)
                    throws IOException
            {
                String routingKey = envelope.getRoutingKey();
                String contentType = properties.getContentType();
                System.out.println("消费的路由键:"+routingKey);
                System.out.println("消费的内容类型:"+contentType);
                long deliveryTag = envelope.getDeliveryTag();
                // 确认消息
                channel.basicAck(deliveryTag, false);
                System.out.println("消费的消息体内容:");
                String bodyStr = new String(body,"UTF-8");
                System.out.println(bodyStr);

            }
        };

        //直接接收死信队列内的消息,这样正常队列没有消费者,过了10s后消息就会被转发到死信队列中。
        channel.basicConsume(dlxQueueName,false,consumer);
    }
}

生产端:

public class Producer {
    public static void main(String[] args) throws Exception {
        ConnectionFactory connectionFactory = new ConnectionFactory();
        connectionFactory.setHost("127.0.0.1");
        connectionFactory.setPort(5672);
        connectionFactory.setUsername("guest");
        connectionFactory.setPassword("guest");
        connectionFactory.setVirtualHost("/");

        Connection connection = connectionFactory.newConnection();
        Channel channel = connection.createChannel();

        String exchange = "test_dlx_exchange";
        String routingKey = "dlx.save";

        String msg = "Hello RabbitMQ ";

        channel.basicPublish(exchange,routingKey,null,msg.getBytes());

        channel.close();
        connection.close();
    }
}

先运行消费端,暂停消费端应用,再运行生产端,观察到一开始消息在正常队列中

RabbitMQ消息积压解决方案-TTL与死信队列

过了十秒,消息进入死信队列

RabbitMQ消息积压解决方案-TTL与死信队列

再次运行消费端应用,看到控制台输出消息

消费的路由键:dlx.save
消费的内容类型:null
消费的消息体内容:
Hello RabbitMQ 

原创文章,作者:彭晨涛,如若转载,请注明出处:https://www.codetool.top/article/rabbitmq%e6%b6%88%e6%81%af%e7%a7%af%e5%8e%8b%e8%a7%a3%e5%86%b3%e6%96%b9%e6%a1%88-ttl%e4%b8%8e%e6%ad%bb%e4%bf%a1%e9%98%9f%e5%88%97/

发表评论

电子邮件地址不会被公开。