SpringCloud Stream的了解及使用

什么是SpringCloud Stream

官方定义Spring Cloud Stream是一个构建消息驱动微服务的框架,它的目标是屏蔽底层消息中间件的差异,降低切换成本,统一消息的编程模型。

应用程序通过inputs或者outputs与Spring Cloud Stream中binder对象交互。 通过我们配置来binding(绑定),而Spring Cloud Stream的binder对象负责与消息中间件交互。目前仅支持RabbitMQ、Kafka。

所以,我们只需要搞清楚如何与Spring Cloud Stream交互就可以方便使用消息驱动的方式。通过使用Spring Integration来连接消息代理中间件以实现消息事件驱动。

Spring Cloud Stream为一些供应商的消息中间件产品提供了个性化的自动化配置实现,引用了发布-订阅、消费组、分区的三个核心概念。

目前仅支持RabbitMQ、Kafka

SpringCloud Stream的了解及使用

编码API

组成 说明
Middleware 中间件,目前只支持RabbitMQ和Kafka
Binder 是应用与消息中间件之间的封装,目前实行了Kafka和RabitMQ的Binder,通过Binder可以很方便的连接中间件,可以动态的改变消息类型(对应于Kafka的topic,RabitMQ的exchange),这些都可以通过配置文件来实现
Channel 通道,是队列Queue的一种抽象,在消息通讯系统中就是实现存储和转发的媒介,通过Channel对队列进行配置
Source和Sink 简单的可理解为参考对象是Spring Cloud Stream的自身,从Stream发布信息就是输出,接受消息就是输入
@Input 注解标识输入通道,通过该输入通道接收到的消息进入应用程序
@Output 注解标识输出通道,发布的消息将通过该通道离开应用程序
@StreamListener 监听队列,用于消费者的队列的消息接收
@EnableBinding 指信道channel和exchange绑定在一起

简单使用实例

消息生产者

新建模块cloud-stream-rabbitmq-provider8801,依赖:

<!-- spring-cloud-starter-stream-rabbit -->
<dependency>
    <groupId>org.springframework.cloud</groupId>
    <artifactId>spring-cloud-starter-stream-rabbit</artifactId>
</dependency>
<dependency>
    <groupId>org.springframework.boot</groupId>
    <artifactId>spring-boot-starter-web</artifactId>
</dependency>
<dependency>
    <groupId>org.springframework.boot</groupId>
    <artifactId>spring-boot-starter-actuator</artifactId>
</dependency>
<dependency>
    <groupId>org.springframework.cloud</groupId>
    <artifactId>spring-cloud-starter-netflix-eureka-client</artifactId>
</dependency>
<dependency>
    <groupId>org.projectlombok</groupId>
    <artifactId>lombok</artifactId>
    <optional>true</optional>
</dependency>
<dependency>
    <groupId>org.springframework.boot</groupId>
    <artifactId>spring-boot-starter-test</artifactId>
    <scope>test</scope>
</dependency>

yaml配置:

server:
  port: 8801

spring:
  application:
    name: cloud-stream-provider
  rabbitmq:
    host: localhost
    port: 5672
    username: guest
    password: guest
  cloud:
    stream:
      binders: #在此配置要绑定的rabbitmq的服务信息
        defaultRabbit:
          type: rabbit # 消息组件类型
          environment: # 可实现多环境配置
            spring:
              rabbitmq:
                host: localhost
                port: 5672
                username: guest
                password: guest
      bindings: #服务的整合处理
        output: #这个名字是一个通道的名称
          destination: studyExchange # 表示要使用的exchange名称定义
          content-type: application/json # 设置消息类型,本次为json,文本则设置“text/plain”
          binder: defaultRabbit

eureka:
  client:
    service-url:
      defaultZone: http://eureka7001.com:7001/eureka

  instance:
    lease-renewal-interval-in-seconds: 2 #设置心跳的时间间隔
    lease-expiration-duration-in-seconds: 5 # 如果现在超过了5秒的间隔
    instance-id: send-8801.com
    prefer-ip-address: true #访问的路径变为ip地址

启动类:

@SpringBootApplication
public class StreamMQMain8801 {
    public static void main(String[] args) {
        SpringApplication.run(StreamMQMain8801.class, args);
    }
}

发送消息接口:

public interface IMessageProvider {
    public void send();
}

实现:

import com.rhett.springcloud.service.IMessageProvider;
import org.springframework.cloud.stream.annotation.EnableBinding;
import org.springframework.cloud.stream.messaging.Source;
import org.springframework.integration.support.MessageBuilder;
import org.springframework.messaging.MessageChannel;

import javax.annotation.Resource;
import java.util.UUID;

@EnableBinding(Source.class) //定义消息的推送管道
public class MessageProviderImpl implements IMessageProvider {
    @Resource
    private MessageChannel output; //消息发送管道

    @Override
    public void send() {
        String serial = UUID.randomUUID().toString();
        output.send(MessageBuilder.withPayload(serial).build());
        System.out.println("*******serial:"+serial);
    }
}

controller:

@RestController
public class SendMessageController {
    @Resource
    private IMessageProvider messageProvider;

    @GetMapping("/sendMessage")
    public String sendMessage(){
        return messageProvider.send();
    }
}

消息消费者

新建cloud-stream-rabbitmq-consumer8802模块,依赖和cloud-stream-rabbitmq-provider8801一样。

yaml配置也很类似:

server:
  port: 8802

spring:
  application:
    name: cloud-stream-consumer
  rabbitmq:
    host: localhost
    port: 5672
    username: guest
    password: guest
  cloud:
    stream:
      binders: # 在此处配置要绑定的rabbitMQ的服务信息
        defaultRabbit: # 表示定义的名称,用于binding的整合
          type: rabbit # 消息中间件类型
          environment: # 设置rabbitMQ的相关环境配置
            spring:
              rabbitmq:
                host: localhost
                port: 5672
                username: guest
                password: guest
      bindings: # 服务的整合处理
        input: # 这个名字是一个通道的名称
          destination: studyExchange # 表示要使用的exchange名称定义
          content-type: application/json # 设置消息类型,本次为json,文本则设为text/plain
          binder: defaultRabbit # 设置要绑定的消息服务的具体设置

eureka:
  client:
    service-url:
      defaultZone: http://eureka7001.com:7001/eureka
  instance:
    lease-renewal-interval-in-seconds: 2 # 设置心跳的间隔时间,默认30
    lease-expiration-duration-in-seconds: 5 # 超过5秒间隔,默认90
    instance-id: receive-8802.com
    prefer-ip-address: true

启动类:

@SpringBootApplication
public class StreamMQMain8802 {
    public static void main(String[] args) {
        SpringApplication.run(StreamMQMain8802.class, args);
    }
}

消费消息业务类:

@Component
@EnableBinding(Sink.class)
public class ReceiveMessageListenerController {
    @Value("${server.port}")
    private String serverPort;
    @StreamListener(Sink.INPUT)
    public void input(Message<String> message){
        System.out.println("消费者1号,----->接收到的消息:"+message.getPayload()+"\t port:"+serverPort);
    }
}

分组消费

上面搭建的简单使用实例仍然存在着问题,我们新建一个内容和cloud-stream-rabbitmq-consumer8802一样的模块cloud-stream-rabbitmq-consumer8803,作为消息的第二个消息消费者。运行8801、8802、8803后发现,8801发送一条消息后,8802和8803都处理了该消息,这就是重复消费的问题(实际上并不是重复消费,而是对每个分组都存在一个绑定到exchange的队列,消息会被发送到每条队列)。可以利用分组的特性来解决。在cloud stream中处于同一个group中的多个消费者是竞争关系(即监听同一条队列),就能够保证消息只会被其中一个应用消费一次。

SpringCloud Stream的了解及使用

并且注意到默认配置的队列具有AutoDelete属性,在消费者应用停止运行后就会自动删除队列,可能会导致消息丢失的情况。

在8802中配置spring.cloud.stream.bindings.input.group=spectrumrpcA,8803配置spring.cloud.stream.bindings.input.group=spectrumrpcB,再次查看rabbitmq控制界面,可见队列名发生变化

SpringCloud Stream的了解及使用

并且自定义group后生成的队列具有durable属性,即队列是持久化的,即使消费者应用重启,也不会丢失消息。

我们把8802和8803的group都配置成spectrumrpcA,再次运行应用,消息就不会被多处消费了。

原创文章,作者:彭晨涛,如若转载,请注明出处:https://www.codetool.top/article/springcloud-stream%e7%9a%84%e4%ba%86%e8%a7%a3%e5%8f%8a%e4%bd%bf%e7%94%a8/

发表评论

电子邮件地址不会被公开。