AMQP协议介绍和使用AMQP-client操作RabbitMQ

本文参考资源:

深入理解AMQP协议_网络_My Blogs-CSDN博客

推荐阅读:

消息队列概述与JMS使用

RabbitMQ概述

RabbitMQ是一个由Erlang语言开发的基于AMQP标准的开源实现。RabbitMQ 最初起源于金融系统,用于在分布式系统中存储转发消息,在易用性、扩展性、高可用性等方面表现不俗。其具体特点包括:

  • 保证可靠性(Reliability)。RabbitMQ 使用一些机制来保证可靠性,如持久化、传输确认、发布确认等。
  • 具有灵活的路由(Flexible Routing)功能。在消息进入队列之前,是通过Exchange(交换器)来路由消息的。对于典型的路由功能,RabbitMQ已经提供了一些内置的Exchange来实现。针对更复杂的路由功能,可以将多个Exchange绑定在一起,也可以通过插件机制来实现自己的Exchange。
  • 支持消息集群(Clustering)。 多台RabbitMQ服务器可以组成一个集群,形成一个逻辑Broker。
  • 具有高可用性(Highly Available)。队列可以在集群中的机器上进行镜像,使得在部分节点出现问题的情况下队列仍然可用。
  • 支持多种协议(Multi-protocol)。RabbitMQ 除支持AMQP协议之外,还通过插件的方式支持其他消息队列协议,比如STOMP、MQTT等。

AMQP协议

AMQP核心概念

Server,服务器,又称Broker,接收客户端的连接,实现AMQP实体服务

Connection,连接,应用程序和Broker的网络连接

Channel,网络信道,几乎所有的操作都在Channel中进行,客户端可建立多个Channel,每个Channel代表一个会话任务。有点像JMS的Session。

Message,消息,由Properties和Body组成,Properties可以对消息进行修饰(类似于Http的请求头),Body就是消息体内容。

Exchange,交换机,用来接收生产者发送的消息并将这些消息路由给服务器中的队列

Queue,消息队列,用来保存消息直到发送给消费者。

Routing Key,路由键,虚拟机可以用它来确定如何路由一个特定消息

Binding,绑定,用于Exchange和Queue之间的关联。一个Binding路由规则就是一个RoutingKey和Queue的对应关系。

Virtual host:虚拟主机,用于进行逻辑隔离,最上层的消息路由。每个Virtual host类似于一个mini版的消息服务器。一个Virtual host里面可以有若干个Exchange和Queue,同一个Virtual Host里面不能有相同名称的Exchange和Queue。

AMQP协议介绍和使用AMQP-client操作RabbitMQ

在Server内部是可以有多个Exchange和Queue的。

核心组件的生命周期

消息的生命周期

  1. Publisher产生一条数据,发送到Broker。
  2. Broker中的Exchange根据RoutingKey查询投递的目标Queue(Broker从消息属性中获取Routing Key,如果不能完成路由会将消息丢弃或返回给生产者,一条消息可以路由到多个队列)。

  3. Consumer向Broker告知自己监听哪个队列,当有数据到达Queue,Broker会推送给Consumer。如果没有消费者,消息队列通过AMQP将消息返回给生产者。

交换器的生命周期

每台AMQP服务器都预先创建了许多交换器实例,它们在服务器启动时就存在并且不能被销毁。如果你的应用程序有特殊要求,则可以选择自己创建交换器,并在完成工作后进行销毁。

队列的生命周期

主要有两种消息队列,即持久化消息队列和临时消息队列。持久化消息队列可被多个消费者共享,不管是否有消费者接收,它们都可以独立存在。临时消息队列对某个消费者是私有的,只能绑定到此消费者,当消费者断开连接时,该消息队列将被删除。

功能命令

AMQP协议文本是分层描述的,0-9版本分为功能层和传输层

  • 功能层:定义了一系列的命令,这些命令按功能逻辑组合成不同的类(Class),客户端应用可以利用它们来实现自己的业务功能。
  • 传输层:将功能层接收的消息传递给服务器经过相应处理后再返回,处理的事情包括信道复用、帧同步、内容编码、心跳检测、数据表示和错误处理等。

0-10版本分为模型层、会话层和传输层。

  • 模型层:原来的功能层,定义了一系列的命令,利用它们来实现业务功能。
  • 会话层:负责将命令从客户端应用传递给服务器,再将服务端的响应返回给客户端应用,会话层为这个过程提供了可靠性、同步机制和错误处理。

  • 传输层:提供信道复用、帧同步、错误检测和数据表示。

消息数据格式

AMQP是二进制协议,所有的消息数据被组织成各种类型的帧,以0-9-1版本为例,帧的格式:

帧头(header,7个字节),包含帧类型type(一个字节)、信道(Channel)、size(帧负载的大小)

帧类型包括:
+ 1,“METHOD”,方法帧
+ 2,“HEADER”,内容头帧
+ 3,“BODY”,内容体帧
+ 4,“HEARTBEAT”,心跳帧

任意大小的帧负载(格式依赖于帧类型)

交换器类型

不同类型的交换器分发消息的策略是不同的,目前交换器有四种类型:Direct,Fanout,Topic,Headers,其中Headers基本不用了。

Direct交换器

如果消息中的路由键和Binding中的绑定键一致,交换器就把消息发送到对应的队列中。

Fanout交换器

Fanout交换器不处理路由键,它把消息转发给所有与其绑定的队列,类似于子网转发。

Topic交换器

Topic交换器通过模式匹配分配消息的路由键属性,将路由键和某种模式进行匹配,每个队列都绑定了一种模式,有两种通配符:“#”匹配0个或多个单词,“*”匹配一个单词

RabbitMQ默认提供的Exchanges

RabbitMQ使用

RabbitMQ的安装这里不做介绍。(不太会)

默认端口号5672

启动RabbitMQ服务:rabbitmq-server start &

服务的停止:rabbitmqctl stop_app

启用网页可视化管理插件: rabbitmq-plugins enable rabbitmq_management

安装完之后在/usr/lib/rabbitmq/lib/rabbitmq_server-3.6.5/ebin下面更改rabbit.app中的loop_users,把[]里面的内容去掉。(默认用户是guest,密码guest,这里允许guest远程访问,或者你添加一个用户,赋予权限)

访问地址:http://localhost:15672/

命令行操作

rabbitmqctl stop_app:关闭应用

rabbitmqctl start_app:开启应用

rabbitmqctl status:节点状态

rabbitmqctl add_user username password:添加用户

rabbitmqctl change_password username newpassword:修改密码

rabbitmqctl list_users:列出所有用户

rabbitmqctl delete_user username:删除用户

rabbitmqctl list_user_permissions username:列出用户权限

rabbitmqctl clear_permissions -p vhostpath username:清除用户权限

rabbitmqctl set_permissions -p vhostpath username ".*" ".*" ". *":清除用户权限

rabbitmqctl add_vhost vhostpath:创建虚拟主机

rabbitmqctl delete_vhost vhostpath:删除虚拟主机

rabbitmqctl list_vhosts:列出所有虚拟主机

rabbitmqctl list_permissions -p vhostpath:列出虚拟主机权限

rabbitmqctl list_queues:查看所有队列信息

rabbitmqctl -p vhostpath purge_queue blue:清除队列里的消息

rabbitmqctl reset:移除所有数据,要在rabbitmqctl stop_app之后使用

AMQP-clinet使用

maven依赖:

<dependency>
    <groupId>com.rabbitmq</groupId>
    <artifactId>amqp-client</artifactId>
    <version>5.6.0</version>
</dependency>

这里的AMQP指的是一套操控rabbitMQ的API,类似于JMS

需要注意的是JMS需要通过session去创建消费者和生产者,AMQP只需要通过channel去创建消费者。

public class Producer {
    public static void main(String[] args) throws IOException, TimeoutException {
        //1.创建一个ConnectionFactory
        ConnectionFactory connectionFactory = new ConnectionFactory();
        connectionFactory.setHost("127.0.0.1");
        connectionFactory.setPort(5672);
        connectionFactory.setUsername("guest");
        connectionFactory.setPassword("guest");
        connectionFactory.setVirtualHost("/");
        //2.创建一个连接
        Connection connection = connectionFactory.newConnection();
        //3.通过connection创建一个Channel
        Channel channel = connection.createChannel();

        for (int i=0;i<5;i++) {
            String msg = "Hello RabbitMQ!";
            //4.通过Channel发送数据,四个参数:exchange,routingKey,props,body
            channel.basicPublish("","test001",null,msg.getBytes());
        }

        //5. 关闭连接
        channel.close();
        connection.close();
    }
}
public class Comsumer {
    public static void main(String[] args) throws IOException, TimeoutException {
        //1.创建一个ConnectionFactory
        ConnectionFactory connectionFactory = new ConnectionFactory();
        connectionFactory.setHost("127.0.0.1");
        connectionFactory.setPort(5672);
        connectionFactory.setUsername("guest");
        connectionFactory.setPassword("guest");
        connectionFactory.setVirtualHost("/");
        //2.创建一个连接
        Connection connection = connectionFactory.newConnection();
        //3.通过connection创建一个Channel
        Channel channel = connection.createChannel();
        //4.声明一个队列
        String queueName = "test001";
        //五个参数:queue队列名、durable是否持久化、exclusive保证顺序独占锁
        //autoDelete如果队列不再被使用(绑定)就自动删除、arguments附带的参数
        channel.queueDeclare(queueName,true,false, false, null);
        //5.创建消费者
        com.rabbitmq.client.Consumer consumer = new DefaultConsumer(channel) {
            @Override
            //消费消息
            public void handleDelivery(String consumerTag,
                                       Envelope envelope,
                                       AMQP.BasicProperties properties,
                                       byte[] body)
                    throws IOException
            {
                String routingKey = envelope.getRoutingKey();
                String contentType = properties.getContentType();
                System.out.println("消费的路由键:"+routingKey);
                System.out.println("消费的内容类型:"+contentType);
                long deliveryTag = envelope.getDeliveryTag();
                // 确认消息
                channel.basicAck(deliveryTag, false);
                System.out.println("消费的消息体内容:");
                String bodyStr = new String(body,"UTF-8");
                System.out.println(bodyStr);
            }
        };

        // 6.设置Channel,三个参数:queue队列名,autoAck自动签收,Callback一个Consumer对象
        channel.basicConsume(queueName,false,consumer);

    }
}

要注意amqp-client老版本有一个QueueingConsumer,会造成内存溢出,已经被废弃了。

在这个案例中,生产者的basicPublish没有指定任何交换器,此时使用的是默认交换器AMQP DEFAULT,此交换器默认绑定所有队列,传routingKey给它的时候,它会找有没有队列名和该key相同的队列,有则路由消息给该队列。

其他相关API:

// 声明交换机,后面两个参数可省略,第二个参数声明类型,参考上面关于交换器类型的介绍
channel.exchangeDeclare(String exchange, String type, boolean durable, boolean autoDelete);
// 绑定队列到交换机,为队列分配一个key
channel.queueBind(String queue, String exchange, String routingKey);

Exchange除了和Queue绑定,还可以和另一个Exchange进行绑定,就可以把消息经多次路由。

//destination被绑定的交换机,source发起绑定的交换机
channel.exchangeBind(String destination, String source, String routingKey) throws IOException;

消息由Properties和Payload组成,Payload就是消息体,而Properties包含:
+ Delivery mode:消息是否持久化,1:否,2:是
+ Headers:头信息,是由一个或多个键值对组成的

还包含一些预定义属性,如下:
+ content_type:消息类型
+ content_encoding:消息编码
+ priority:消息优先级(0-9),但不保证遵从
+ message-idcorrelation-id:表示唯一消息标识和消息响应标识,用于在工作流程中实现消息跟踪
+ timestamp:表示消息创建时间
+ expiration:表示消息的过期时间
+ reply-to:实现响应消息的路由(构建一个用来恢复消息的私有响应队列)

Java示例,把消息属性传入:

Map<String,Object> headers = new HashMap<>();
headers.put("my1", "111");
headers.put("my2", "222");
//定义属性
AMQP.BasicProperties properties = new AMQP.BasicProperties.Builder()
        .deliveryMode(2)
        .contentEncoding("UTF-8")
        .expiration("10000")
        .headers(headers)
        .build();
String msg = "Hello RabbitMQ!";
channel.basicPublish("","test001",properties,msg.getBytes());

原创文章,作者:彭晨涛,如若转载,请注明出处:https://www.codetool.top/article/amqp%e5%8d%8f%e8%ae%ae%e4%bb%8b%e7%bb%8d%e5%92%8c%e4%bd%bf%e7%94%a8amqp-client%e6%93%8d%e4%bd%9crabbitmq/

发表评论

电子邮件地址不会被公开。