Spring整合RabbitMQ及其各组件介绍

Spring-rabbit

使用Spring-rabbit,maven依赖:

<dependency>
    <groupId>org.springframework.amqp</groupId>
    <artifactId>spring-rabbit</artifactId>
    <version>2.2.5.RELEASE</version>
</dependency>

各组件介绍:

RabbitAdmin

编写配置类,RabbitAdmin为核心操作RabbitMQ的类,我们要注入一个它的bean:

@Configuration
public class RabbitMQConfig {
    @Bean
    public ConnectionFactory connectionFactory(){
        CachingConnectionFactory connectionFactory = new CachingConnectionFactory();
        connectionFactory.setAddresses("127.0.0.1");
        connectionFactory.setUsername("guest");
        connectionFactory.setPassword("guest");
        connectionFactory.setVirtualHost("/");
        return connectionFactory;
    }

    @Bean
    public RabbitAdmin rabbitAdmin(ConnectionFactory connectionFactory){
        RabbitAdmin rabbitAdmin = new RabbitAdmin(connectionFactory);
        rabbitAdmin.setAutoStartup(true);//必须设置为true
        return rabbitAdmin;
    }
}

RabbitAdmin可以声明队列、交换器、绑定等。

@RunWith(SpringJUnit4ClassRunner.class)
@ContextConfiguration(classes = SpringConfiguration.class)
public class RabbitTest {
    @Autowired
    private RabbitAdmin rabbitAdmin;

    @Test
    public void testAdmin() throws Exception{
        //声明
        rabbitAdmin.declareExchange(new DirectExchange("test.direct",false,false));
        rabbitAdmin.declareExchange(new TopicExchange("test.topic",false,false));
        rabbitAdmin.declareQueue(new Queue("test.direct.queue",false));
        rabbitAdmin.declareQueue(new Queue("test.topic.queue",false));
        rabbitAdmin.declareBinding(new Binding("test.direct.queue",Binding.DestinationType.QUEUE,
                "test.direct","direct",new HashMap<>()));
        rabbitAdmin.declareBinding(
                BindingBuilder
                        .bind(new Queue("test.topic.queue",false)) //队列
                        .to(new TopicExchange("test.topic",false,false)) //交换机
                        .with("user.#")); //指定路由key
        //清空队列数据
        rabbitAdmin.purgeQueue("test.topic.queue",false);
    }
}

也可以把Queue、Exchange、Binding注入到IoC容器中,再使用。

RabbitTemplate

该类提供了丰富的发送消息方法,包括可靠性投递消息方法、回调监听消息接口ConfirmCallback、返回值确认接口ReturnCallback等等。 同样我们需要进行注入到Spring容器中,然后直接使用。

@Configuration
public class RabbitMQConfig {
    @Bean
    public ConnectionFactory connectionFactory(){
        CachingConnectionFactory connectionFactory = new CachingConnectionFactory();
        connectionFactory.setAddresses("127.0.0.1");
        connectionFactory.setUsername("guest");
        connectionFactory.setPassword("guest");
        connectionFactory.setVirtualHost("/");
        return connectionFactory;
    }

    @Bean
    public RabbitAdmin rabbitAdmin(ConnectionFactory connectionFactory){
        RabbitAdmin rabbitAdmin = new RabbitAdmin(connectionFactory);
        rabbitAdmin.setAutoStartup(true);//必须设置为true
        return rabbitAdmin;
    }

    @Bean
    public RabbitTemplate rabbitTemplate(ConnectionFactory connectionFactory){
        RabbitTemplate rabbitTemplate = new RabbitTemplate(connectionFactory);
        return rabbitTemplate;
    }
}
@Test
public void testSendMessage() throws Exception{
    MessageProperties messageProperties = new MessageProperties();
    messageProperties.getHeaders().put("desc", "信息描述..");
    messageProperties.getHeaders().put("type", "自定义消息类型..");
    Message message = new Message("Hello RabbitMQ".getBytes(), messageProperties);

    rabbitTemplate.convertAndSend("test.topic","user.template",message,new MessagePostProcessor(){
        @Override
        public Message postProcessMessage(Message message) throws AmqpException {
            System.out.println("-----添加额外的设置------");
            message.getMessageProperties().getHeaders().put("desc", "额外修改的信息描述");
            message.getMessageProperties().getHeaders().put("attr", "额外新加的信息描述");
            return message;
        }
    });
}

send或convertAndSend用来发送消息,MessagePostProcessor可以帮我们在消息发送前再对消息做一些处理。

Spring整合RabbitMQ及其各组件介绍

convertAndSend可以传的消息是一个Object对象,如果不是Message类型,会帮我们用转换器转换成消息对象,默认的Converter是SimpleMessageConverter,可以自定义Converter然后使用setMessageConverter设置转换器。

其他自带可用的Converter还有Jackson2MessageConvertorSerializerMessageConverter等等。

SimpleMessageConverter如何将Java对象转成消息:

@Override
protected Message createMessage(Object object, MessageProperties messageProperties) throws MessageConversionException {
    byte[] bytes = null;
    //如果是字节数组,直接设置成消息体
    if (object instanceof byte[]) {
        bytes = (byte[]) object;
        messageProperties.setContentType(MessageProperties.CONTENT_TYPE_BYTES);
    }
    //如果是String,转成字节数组设置消息体,默认以UTF-8编码
    else if (object instanceof String) {
        try {
            bytes = ((String) object).getBytes(this.defaultCharset);
        }
        catch (UnsupportedEncodingException e) {
            throw new MessageConversionException(
                    "failed to convert to Message content", e);
        }
        messageProperties.setContentType(MessageProperties.CONTENT_TYPE_TEXT_PLAIN);
        messageProperties.setContentEncoding(this.defaultCharset);
    }
    //如果是可序列化的,就将对象序列化
    else if (object instanceof Serializable) {
        try {
            bytes = SerializationUtils.serialize(object);
        }
        catch (IllegalArgumentException e) {
            throw new MessageConversionException(
                    "failed to convert to serialized Message content", e);
        }
        messageProperties.setContentType(MessageProperties.CONTENT_TYPE_SERIALIZED_OBJECT);
    }
    if (bytes != null) {
        messageProperties.setContentLength(bytes.length);
        return new Message(bytes, messageProperties);
    }
    //否则就是不支持的类型
    throw new IllegalArgumentException(getClass().getSimpleName()
            + " only supports String, byte[] and Serializable payloads, received: " + object.getClass().getName());
}

SimpleMessageListenerContainer

SimpleMessageListenerContainer是一个消息监听容器(就是消费者的容器,可以监听多个队列上的消息)

  • 这个类非常的强大,我们可以对它进行很多设置,对于消费者的配置项,这个类都可以满足
  • 监听队列(多个队列)、自动启动、自动声明功能
  • 设置事务特性、事务管理器、事务属性、事务容量(并发)、是否开启事务、回滚消息等
  • 设置消费者数量、最小最大数量、批量消费
  • 设置消息确认和自动确认模式、是否重回队列、异常捕获handler函数。
  • 设置消费者标签生成策略、是否独占模式、消费者属性等
  • 设置具体的监听器、消息转换器等等。

注意: SimpleMessageListenerContainer可以进行动态设置, 比如在运行中的应用可以动态的修改其消费者数量的大小、接收消息的模式等。

很多基于RabbitMQ的自制定化后端管控台在进行动态设置的时候,也是根据这一特性去实现的。可以看出SpringAMQP非常的强大。

@Configuration
public class RabbitMQConfig {
    //略。。。
    @Bean
    public Queue directQueue(){
        return new Queue("test.direct.queue",false);
    }
    @Bean
    public Queue topicQueue(){
        return new Queue("test.topic.queue",false);
    }

    @Bean
    public SimpleMessageListenerContainer simpleMessageListenerContainer(ConnectionFactory connectionFactory){
        SimpleMessageListenerContainer container = new SimpleMessageListenerContainer(connectionFactory);
        container.setQueues(directQueue(),topicQueue());
        container.setConcurrentConsumers(2);
        container.setDefaultRequeueRejected(false);
        container.setAcknowledgeMode(AcknowledgeMode.AUTO);
        container.setConsumerTagStrategy(new ConsumerTagStrategy() {
            @Override
            public String createConsumerTag(String queue) {
                return queue+"_"+ UUID.randomUUID().toString();
            }
        });
        container.setMessageListener(new ChannelAwareMessageListener() {
            @Override
            public void onMessage(Message message, Channel channel) throws Exception {
                String msg = new String(message.getBody());
                System.out.println("----------消费者:" +msg);
            }
        });

        return container;
    }
}

只要它在IoC容器中,就保持监听注册队列上的消息,监听到消息就可以通过内部的MessageListener来处理消息。如果收到的消息是序列化的、json等其他类型,可以使用Converter转换。或写一个MessageListenerAdapter,它可以注册Converter。

MessageListenerAdapter

MessageListenerAdapter

  1. 可以把一个没有实现MessageListener和ChannelAwareMessageListener接口的类适配成一个可以处理消息的处理器
  2. 默认的方法名称为:handleMessage,可以通过setDefaultListenerMethod设置新的消息处理方法
  3. MessageListenerAdapter支持不同的队列交给不同的方法去执行。使用setQueueOrTagToMethodName方法设置,当根据queue名称没有找到匹配的方法的时候,就会交给默认的方法去处理。

作者:二月_春风
链接:https://www.jianshu.com/p/d21bafe3b9fd

MessageListenerAdapter还可以通过MessageConverter将收到的消息转换成其他类型的数据,从而给Delegate中的方法处理。

MessageConverter

Java对象和Message互转。

public interface MessageConverter {
    Message toMessage(Object object, MessageProperties messageProperties) throws MessageConversionException;

    Object fromMessage(Message message) throws MessageConversionException;
}

SpringBoot整合RabbitMQ

maven依赖:

<dependency>
    <groupId>org.springframework.boot</groupId>
    <artifactId>spring-boot-starter-amqp</artifactId>
</dependency>

使用SpringBoot就不用我们手动去注入组件了,显然这些本来是应该在ConnectionFactory中配置的东西现在都可以拿出来,SpringBoot预先帮我们注入各种组件bean。

#RABBITMQ START
spring.rabbitmq.host=127.0.0.1
spring.rabbitmq.port=5672
spring.rabbitmq.username=guest
spring.rabbitmq.password=guest
# 消息确认机制:none不启用,simple使用waitForConfirms,correlated使用CorrelationData
spring.rabbitmq.publisher-confirm-type=none
# 自定义的属性,定义了两个队列的名称
rabbitmq.queue.msg=spring-boot-queue-msg
rabbitmq.queue.user=spring-boot-queue-user
#RABBITMQ END

先配置@EnableRabbit,可以使用@RabbitListener(queues={})来给方法注册一个MessageListenerContainer,这个MessageListenerContainer是从SpringBoot默认注入的MessageListenerContainerFactory获取的,如果要修改默认的Converter,就要自己注入MessageListenerContainerFactory,然后更改converter。

更多的注解相关见:使用@RabbitListener注解消费消息Java思考、总结、专注-CSDN博客

其他的用法就没有很大区别了。

原创文章,作者:彭晨涛,如若转载,请注明出处:https://www.codetool.top/article/spring%e6%95%b4%e5%90%88rabbitmq%e5%8f%8a%e5%85%b6%e5%90%84%e7%bb%84%e4%bb%b6%e4%bb%8b%e7%bb%8d/