本文参考资源:
推荐阅读:
RabbitMQ概述
RabbitMQ是一个由Erlang语言开发的基于AMQP标准的开源实现。RabbitMQ 最初起源于金融系统,用于在分布式系统中存储转发消息,在易用性、扩展性、高可用性等方面表现不俗。其具体特点包括:
- 保证可靠性(Reliability)。RabbitMQ 使用一些机制来保证可靠性,如持久化、传输确认、发布确认等。
- 具有灵活的路由(Flexible Routing)功能。在消息进入队列之前,是通过Exchange(交换器)来路由消息的。对于典型的路由功能,RabbitMQ已经提供了一些内置的Exchange来实现。针对更复杂的路由功能,可以将多个Exchange绑定在一起,也可以通过插件机制来实现自己的Exchange。
- 支持消息集群(Clustering)。 多台RabbitMQ服务器可以组成一个集群,形成一个逻辑Broker。
- 具有高可用性(Highly Available)。队列可以在集群中的机器上进行镜像,使得在部分节点出现问题的情况下队列仍然可用。
- 支持多种协议(Multi-protocol)。RabbitMQ 除支持AMQP协议之外,还通过插件的方式支持其他消息队列协议,比如STOMP、MQTT等。
AMQP协议
AMQP核心概念
Server
,服务器,又称Broker,接收客户端的连接,实现AMQP实体服务
Connection
,连接,应用程序和Broker的网络连接
Channel
,网络信道,几乎所有的操作都在Channel中进行,客户端可建立多个Channel,每个Channel代表一个会话任务。有点像JMS的Session。
Message
,消息,由Properties和Body组成,Properties可以对消息进行修饰(类似于Http的请求头),Body就是消息体内容。
Exchange
,交换机,用来接收生产者发送的消息并将这些消息路由给服务器中的队列
Queue
,消息队列,用来保存消息直到发送给消费者。
Routing Key
,路由键,虚拟机可以用它来确定如何路由一个特定消息
Binding
,绑定,用于Exchange和Queue之间的关联。一个Binding路由规则就是一个RoutingKey和Queue的对应关系。
Virtual host
:虚拟主机,用于进行逻辑隔离,最上层的消息路由。每个Virtual host类似于一个mini版的消息服务器。一个Virtual host里面可以有若干个Exchange和Queue,同一个Virtual Host里面不能有相同名称的Exchange和Queue。
在Server内部是可以有多个Exchange和Queue的。
核心组件的生命周期
消息的生命周期
- Publisher产生一条数据,发送到Broker。
-
Broker中的Exchange根据RoutingKey查询投递的目标Queue(Broker从消息属性中获取Routing Key,如果不能完成路由会将消息丢弃或返回给生产者,一条消息可以路由到多个队列)。
-
Consumer向Broker告知自己监听哪个队列,当有数据到达Queue,Broker会推送给Consumer。如果没有消费者,消息队列通过AMQP将消息返回给生产者。
交换器的生命周期
每台AMQP服务器都预先创建了许多交换器实例,它们在服务器启动时就存在并且不能被销毁。如果你的应用程序有特殊要求,则可以选择自己创建交换器,并在完成工作后进行销毁。
队列的生命周期
主要有两种消息队列,即持久化消息队列和临时消息队列。持久化消息队列可被多个消费者共享,不管是否有消费者接收,它们都可以独立存在。临时消息队列对某个消费者是私有的,只能绑定到此消费者,当消费者断开连接时,该消息队列将被删除。
功能命令
AMQP协议文本是分层描述的,0-9版本分为功能层和传输层。
- 功能层:定义了一系列的命令,这些命令按功能逻辑组合成不同的类(Class),客户端应用可以利用它们来实现自己的业务功能。
-
传输层:将功能层接收的消息传递给服务器经过相应处理后再返回,处理的事情包括信道复用、帧同步、内容编码、心跳检测、数据表示和错误处理等。
0-10版本分为模型层、会话层和传输层。
- 模型层:原来的功能层,定义了一系列的命令,利用它们来实现业务功能。
-
会话层:负责将命令从客户端应用传递给服务器,再将服务端的响应返回给客户端应用,会话层为这个过程提供了可靠性、同步机制和错误处理。
-
传输层:提供信道复用、帧同步、错误检测和数据表示。
消息数据格式
AMQP是二进制协议,所有的消息数据被组织成各种类型的帧,以0-9-1版本为例,帧的格式:
帧头(header,7个字节),包含帧类型type(一个字节)、信道(Channel)、size(帧负载的大小)
帧类型包括:
+ 1,“METHOD”
,方法帧
+ 2,“HEADER”
,内容头帧
+ 3,“BODY”
,内容体帧
+ 4,“HEARTBEAT”
,心跳帧
任意大小的帧负载(格式依赖于帧类型)
交换器类型
不同类型的交换器分发消息的策略是不同的,目前交换器有四种类型:Direct,Fanout,Topic,Headers,其中Headers基本不用了。
Direct交换器
如果消息中的路由键和Binding中的绑定键一致,交换器就把消息发送到对应的队列中。
Fanout交换器
Fanout交换器不处理路由键,它把消息转发给所有与其绑定的队列,类似于子网转发。
Topic交换器
Topic交换器通过模式匹配分配消息的路由键属性,将路由键和某种模式进行匹配,每个队列都绑定了一种模式,有两种通配符:“#
”匹配0个或多个单词,“*
”匹配一个单词
RabbitMQ使用
RabbitMQ的安装这里不做介绍。(不太会)
默认端口号5672
启动RabbitMQ服务:rabbitmq-server start &
服务的停止:rabbitmqctl stop_app
启用网页可视化管理插件: rabbitmq-plugins enable rabbitmq_management
安装完之后在/usr/lib/rabbitmq/lib/rabbitmq_server-3.6.5/ebin
下面更改rabbit.app
中的loop_users,把[]里面的内容去掉。(默认用户是guest,密码guest,这里允许guest远程访问,或者你添加一个用户,赋予权限)
访问地址:http://localhost:15672/
命令行操作
rabbitmqctl stop_app
:关闭应用
rabbitmqctl start_app
:开启应用
rabbitmqctl status
:节点状态
rabbitmqctl add_user username password
:添加用户
rabbitmqctl change_password username newpassword
:修改密码
rabbitmqctl list_users
:列出所有用户
rabbitmqctl delete_user username
:删除用户
rabbitmqctl list_user_permissions username
:列出用户权限
rabbitmqctl clear_permissions -p vhostpath username
:清除用户权限
rabbitmqctl set_permissions -p vhostpath username ".*" ".*" ". *"
:清除用户权限
rabbitmqctl add_vhost vhostpath
:创建虚拟主机
rabbitmqctl delete_vhost vhostpath
:删除虚拟主机
rabbitmqctl list_vhosts
:列出所有虚拟主机
rabbitmqctl list_permissions -p vhostpath
:列出虚拟主机权限
rabbitmqctl list_queues
:查看所有队列信息
rabbitmqctl -p vhostpath purge_queue blue
:清除队列里的消息
rabbitmqctl reset
:移除所有数据,要在rabbitmqctl stop_app
之后使用
AMQP-clinet使用
maven依赖:
<dependency>
<groupId>com.rabbitmq</groupId>
<artifactId>amqp-client</artifactId>
<version>5.6.0</version>
</dependency>
这里的AMQP指的是一套操控rabbitMQ的API,类似于JMS
需要注意的是JMS需要通过session去创建消费者和生产者,AMQP只需要通过channel去创建消费者。
public class Producer {
public static void main(String[] args) throws IOException, TimeoutException {
//1.创建一个ConnectionFactory
ConnectionFactory connectionFactory = new ConnectionFactory();
connectionFactory.setHost("127.0.0.1");
connectionFactory.setPort(5672);
connectionFactory.setUsername("guest");
connectionFactory.setPassword("guest");
connectionFactory.setVirtualHost("/");
//2.创建一个连接
Connection connection = connectionFactory.newConnection();
//3.通过connection创建一个Channel
Channel channel = connection.createChannel();
for (int i=0;i<5;i++) {
String msg = "Hello RabbitMQ!";
//4.通过Channel发送数据,四个参数:exchange,routingKey,props,body
channel.basicPublish("","test001",null,msg.getBytes());
}
//5. 关闭连接
channel.close();
connection.close();
}
}
public class Comsumer {
public static void main(String[] args) throws IOException, TimeoutException {
//1.创建一个ConnectionFactory
ConnectionFactory connectionFactory = new ConnectionFactory();
connectionFactory.setHost("127.0.0.1");
connectionFactory.setPort(5672);
connectionFactory.setUsername("guest");
connectionFactory.setPassword("guest");
connectionFactory.setVirtualHost("/");
//2.创建一个连接
Connection connection = connectionFactory.newConnection();
//3.通过connection创建一个Channel
Channel channel = connection.createChannel();
//4.声明一个队列
String queueName = "test001";
//五个参数:queue队列名、durable是否持久化、exclusive保证顺序独占锁
//autoDelete如果队列不再被使用(绑定)就自动删除、arguments附带的参数
channel.queueDeclare(queueName,true,false, false, null);
//5.创建消费者
com.rabbitmq.client.Consumer consumer = new DefaultConsumer(channel) {
@Override
//消费消息
public void handleDelivery(String consumerTag,
Envelope envelope,
AMQP.BasicProperties properties,
byte[] body)
throws IOException
{
String routingKey = envelope.getRoutingKey();
String contentType = properties.getContentType();
System.out.println("消费的路由键:"+routingKey);
System.out.println("消费的内容类型:"+contentType);
long deliveryTag = envelope.getDeliveryTag();
// 确认消息
channel.basicAck(deliveryTag, false);
System.out.println("消费的消息体内容:");
String bodyStr = new String(body,"UTF-8");
System.out.println(bodyStr);
}
};
// 6.设置Channel,三个参数:queue队列名,autoAck自动签收,Callback一个Consumer对象
channel.basicConsume(queueName,false,consumer);
}
}
要注意amqp-client老版本有一个QueueingConsumer,会造成内存溢出,已经被废弃了。
在这个案例中,生产者的basicPublish没有指定任何交换器,此时使用的是默认交换器AMQP DEFAULT,此交换器默认绑定所有队列,传routingKey给它的时候,它会找有没有队列名和该key相同的队列,有则路由消息给该队列。
其他相关API:
// 声明交换机,后面两个参数可省略,第二个参数声明类型,参考上面关于交换器类型的介绍
channel.exchangeDeclare(String exchange, String type, boolean durable, boolean autoDelete);
// 绑定队列到交换机,为队列分配一个key
channel.queueBind(String queue, String exchange, String routingKey);
Exchange除了和Queue绑定,还可以和另一个Exchange进行绑定,就可以把消息经多次路由。
//destination被绑定的交换机,source发起绑定的交换机
channel.exchangeBind(String destination, String source, String routingKey) throws IOException;
消息由Properties和Payload组成,Payload就是消息体,而Properties包含:
+ Delivery mode
:消息是否持久化,1:否,2:是
+ Headers
:头信息,是由一个或多个键值对组成的
还包含一些预定义属性,如下:
+ content_type
:消息类型
+ content_encoding
:消息编码
+ priority
:消息优先级(0-9),但不保证遵从
+ message-id
和correlation-id
:表示唯一消息标识和消息响应标识,用于在工作流程中实现消息跟踪
+ timestamp
:表示消息创建时间
+ expiration
:表示消息的过期时间
+ reply-to
:实现响应消息的路由(构建一个用来恢复消息的私有响应队列)
Java示例,把消息属性传入:
Map<String,Object> headers = new HashMap<>();
headers.put("my1", "111");
headers.put("my2", "222");
//定义属性
AMQP.BasicProperties properties = new AMQP.BasicProperties.Builder()
.deliveryMode(2)
.contentEncoding("UTF-8")
.expiration("10000")
.headers(headers)
.build();
String msg = "Hello RabbitMQ!";
channel.basicPublish("","test001",properties,msg.getBytes());
原创文章,作者:彭晨涛,如若转载,请注明出处:https://www.codetool.top/article/amqp%e5%8d%8f%e8%ae%ae%e4%bb%8b%e7%bb%8d%e5%92%8c%e4%bd%bf%e7%94%a8amqp-client%e6%93%8d%e4%bd%9crabbitmq/