什么是SpringCloud Stream
官方定义Spring Cloud Stream是一个构建消息驱动微服务的框架,它的目标是屏蔽底层消息中间件的差异,降低切换成本,统一消息的编程模型。
应用程序通过inputs
或者outputs
与Spring Cloud Stream中binder对象交互。 通过我们配置来binding(绑定),而Spring Cloud Stream的binder对象负责与消息中间件交互。目前仅支持RabbitMQ、Kafka。
所以,我们只需要搞清楚如何与Spring Cloud Stream交互就可以方便使用消息驱动的方式。通过使用Spring Integration
来连接消息代理中间件以实现消息事件驱动。
Spring Cloud Stream为一些供应商的消息中间件产品提供了个性化的自动化配置实现,引用了发布-订阅、消费组、分区的三个核心概念。
目前仅支持RabbitMQ、Kafka

编码API
组成 | 说明 |
---|---|
Middleware | 中间件,目前只支持RabbitMQ和Kafka |
Binder | 是应用与消息中间件之间的封装,目前实行了Kafka和RabitMQ的Binder,通过Binder可以很方便的连接中间件,可以动态的改变消息类型(对应于Kafka的topic,RabitMQ的exchange),这些都可以通过配置文件来实现 |
Channel | 通道,是队列Queue的一种抽象,在消息通讯系统中就是实现存储和转发的媒介,通过Channel对队列进行配置 |
Source和Sink | 简单的可理解为参考对象是Spring Cloud Stream的自身,从Stream发布信息就是输出,接受消息就是输入 |
@Input | 注解标识输入通道,通过该输入通道接收到的消息进入应用程序 |
@Output | 注解标识输出通道,发布的消息将通过该通道离开应用程序 |
@StreamListener | 监听队列,用于消费者的队列的消息接收 |
@EnableBinding | 指信道channel和exchange绑定在一起 |
简单使用实例
消息生产者
新建模块cloud-stream-rabbitmq-provider8801
,依赖:
<!-- spring-cloud-starter-stream-rabbit -->
<dependency>
<groupId>org.springframework.cloud</groupId>
<artifactId>spring-cloud-starter-stream-rabbit</artifactId>
</dependency>
<dependency>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-web</artifactId>
</dependency>
<dependency>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-actuator</artifactId>
</dependency>
<dependency>
<groupId>org.springframework.cloud</groupId>
<artifactId>spring-cloud-starter-netflix-eureka-client</artifactId>
</dependency>
<dependency>
<groupId>org.projectlombok</groupId>
<artifactId>lombok</artifactId>
<optional>true</optional>
</dependency>
<dependency>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-test</artifactId>
<scope>test</scope>
</dependency>
yaml配置:
server:
port: 8801
spring:
application:
name: cloud-stream-provider
rabbitmq:
host: localhost
port: 5672
username: guest
password: guest
cloud:
stream:
binders: #在此配置要绑定的rabbitmq的服务信息
defaultRabbit:
type: rabbit # 消息组件类型
environment: # 可实现多环境配置
spring:
rabbitmq:
host: localhost
port: 5672
username: guest
password: guest
bindings: #服务的整合处理
output: #这个名字是一个通道的名称
destination: studyExchange # 表示要使用的exchange名称定义
content-type: application/json # 设置消息类型,本次为json,文本则设置“text/plain”
binder: defaultRabbit
eureka:
client:
service-url:
defaultZone: http://eureka7001.com:7001/eureka
instance:
lease-renewal-interval-in-seconds: 2 #设置心跳的时间间隔
lease-expiration-duration-in-seconds: 5 # 如果现在超过了5秒的间隔
instance-id: send-8801.com
prefer-ip-address: true #访问的路径变为ip地址
启动类:
@SpringBootApplication
public class StreamMQMain8801 {
public static void main(String[] args) {
SpringApplication.run(StreamMQMain8801.class, args);
}
}
发送消息接口:
public interface IMessageProvider {
public void send();
}
实现:
import com.rhett.springcloud.service.IMessageProvider;
import org.springframework.cloud.stream.annotation.EnableBinding;
import org.springframework.cloud.stream.messaging.Source;
import org.springframework.integration.support.MessageBuilder;
import org.springframework.messaging.MessageChannel;
import javax.annotation.Resource;
import java.util.UUID;
@EnableBinding(Source.class) //定义消息的推送管道
public class MessageProviderImpl implements IMessageProvider {
@Resource
private MessageChannel output; //消息发送管道
@Override
public void send() {
String serial = UUID.randomUUID().toString();
output.send(MessageBuilder.withPayload(serial).build());
System.out.println("*******serial:"+serial);
}
}
controller:
@RestController
public class SendMessageController {
@Resource
private IMessageProvider messageProvider;
@GetMapping("/sendMessage")
public String sendMessage(){
return messageProvider.send();
}
}
消息消费者
新建cloud-stream-rabbitmq-consumer8802
模块,依赖和cloud-stream-rabbitmq-provider8801
一样。
yaml配置也很类似:
server:
port: 8802
spring:
application:
name: cloud-stream-consumer
rabbitmq:
host: localhost
port: 5672
username: guest
password: guest
cloud:
stream:
binders: # 在此处配置要绑定的rabbitMQ的服务信息
defaultRabbit: # 表示定义的名称,用于binding的整合
type: rabbit # 消息中间件类型
environment: # 设置rabbitMQ的相关环境配置
spring:
rabbitmq:
host: localhost
port: 5672
username: guest
password: guest
bindings: # 服务的整合处理
input: # 这个名字是一个通道的名称
destination: studyExchange # 表示要使用的exchange名称定义
content-type: application/json # 设置消息类型,本次为json,文本则设为text/plain
binder: defaultRabbit # 设置要绑定的消息服务的具体设置
eureka:
client:
service-url:
defaultZone: http://eureka7001.com:7001/eureka
instance:
lease-renewal-interval-in-seconds: 2 # 设置心跳的间隔时间,默认30
lease-expiration-duration-in-seconds: 5 # 超过5秒间隔,默认90
instance-id: receive-8802.com
prefer-ip-address: true
启动类:
@SpringBootApplication
public class StreamMQMain8802 {
public static void main(String[] args) {
SpringApplication.run(StreamMQMain8802.class, args);
}
}
消费消息业务类:
@Component
@EnableBinding(Sink.class)
public class ReceiveMessageListenerController {
@Value("${server.port}")
private String serverPort;
@StreamListener(Sink.INPUT)
public void input(Message<String> message){
System.out.println("消费者1号,----->接收到的消息:"+message.getPayload()+"\t port:"+serverPort);
}
}
分组消费
上面搭建的简单使用实例仍然存在着问题,我们新建一个内容和cloud-stream-rabbitmq-consumer8802
一样的模块cloud-stream-rabbitmq-consumer8803
,作为消息的第二个消息消费者。运行8801、8802、8803后发现,8801发送一条消息后,8802和8803都处理了该消息,这就是重复消费的问题(实际上并不是重复消费,而是对每个分组都存在一个绑定到exchange的队列,消息会被发送到每条队列)。可以利用分组的特性来解决。在cloud stream中处于同一个group中的多个消费者是竞争关系(即监听同一条队列),就能够保证消息只会被其中一个应用消费一次。

并且注意到默认配置的队列具有AutoDelete属性,在消费者应用停止运行后就会自动删除队列,可能会导致消息丢失的情况。
在8802中配置spring.cloud.stream.bindings.input.group=spectrumrpcA
,8803配置spring.cloud.stream.bindings.input.group=spectrumrpcB
,再次查看rabbitmq控制界面,可见队列名发生变化

并且自定义group后生成的队列具有durable属性,即队列是持久化的,即使消费者应用重启,也不会丢失消息。
我们把8802和8803的group都配置成spectrumrpcA
,再次运行应用,消息就不会被多处消费了。
原创文章,作者:彭晨涛,如若转载,请注明出处:https://www.codetool.top/article/springcloud-stream%e7%9a%84%e4%ba%86%e8%a7%a3%e5%8f%8a%e4%bd%bf%e7%94%a8/