springCloudStream
简介
该框架提供了一个灵活的编程模型,该模型建立在已经建立和熟悉的 Spring 习惯用语和最佳实践之上,包括对持久发布/订阅语义、消费者组和有状态分区的支持。
核心模块
- Destination Binders: 负责提供与外部消息系统集成的组件
- Destination Bindings: 外部消息系统和用户程序代码之间的桥梁(生产者-使用者之间的桥梁)
- Message:生产者和消费者用于与Destination Binders(以及通过外部消息系统与其他应用程序)通信的规范数据结构。
历史
Spring 的数据集成之旅始于 Spring Integration。通过其编程模型,它提供了一致的开发人员体验来构建应用程序,这些应用程序可以采用企业集成模式来连接外部系统,例如数据库、消息代理等。
快进到云时代,微服务在企业环境中变得突出。Spring Boot 改变了开发人员构建应用程序的方式。借助 Spring 的编程模型和 Spring Boot 处理的运行时职责,可以无缝开发独立的、基于 Spring 的生产级微服务。
为了将其扩展到数据集成工作负载,Spring Integration 和 Spring Boot 被放在一个新项目中。Spring Cloud Stream 诞生了。
架构模型
这张图是spring-stream官网的,里面的
下图是我们原来和消息队列通信的方式。我们的程序直接发送数据给MQ或者监听到MQ的数据。
通过
目前官方提供了两个
一开始图中的
简单介绍一下Binder,其实就是
public interface Binder{ function add(Message msg); } // 连接MQ1的Binder public class Binder1 implements Binder{ public function add(Message msg){ // 消息处理 // 发送到MQ1 publish(msg); } } // 连接MQ2的Binder public class Binder2 implements Binder{ public function add(Message msg){ // 消息处理 // 发送到MQ2 release(msg); } }
当我们使用的时候只需要自己决定使用哪个Binder就可以了。就是就和连接数据库一样,不需要关心连接的是Mysql还是PostgreSql。
public class main{ public static function main() { Binder binder = new Binder1(); Message msg = new Message(); binder.add(msg); } }
Bindings
Bindings作为一个桥梁,负责连接MQ和用户代码。比如绑定一个代码作为input往某一个Queue里面输入信息,绑定一个代码作为output从某个Queue里面接收信息。然后我们使用Binder来实现推送消息到MQ和消费消息。
这里是官网原文:The application communicates with the outside world by establishing bindings between destinations exposed by the external brokers and input/output arguments in your code. Broker specific details necessary to establish bindings are handled by middleware-specific Binder implementations.
下图为Bindings和Binder的关系
source 和 sink
source其实就是发送方的发送的Message. sink就是接收方接受的Message
注解实现
注解的实现已经被彻底删除,只有之前低版本的还能使用
函数式编程实现示例
依赖引入
将下面的代码加入
// 引入spring cloud stream依赖 <dependency> <groupId>org.springframework.cloud</groupId> <artifactId>spring-cloud-stream</artifactId> </dependency> // 引入spring cloud stream的rabbit binder依赖 // 如果是kafka,那么把这个换成kafka的binder // 在这个binder里面已经引入了 rabbit MQ依赖,所以不需要再单独引入rabbit MQ了 <dependency> <groupId>org.springframework.cloud</groupId> <artifactId>spring-cloud-stream-binder-rabbit</artifactId> </dependency>
配置文件
server: port: 8801 spring: application: name: cloud-stream-provider cloud: stream: # stream的配置 binders: # 在此处配置要绑定的rabbitmq的服务信息; defaultRabbit: # 表示定义的名称,用于于binding整合 type: rabbit # 消息组件类型 environment: # 设置rabbitmq的相关的环境配置 spring: rabbitmq: host: localhost port: 5672 username: guest password: guest
生产者
配置文件修改
对于函数式编程来说,
写生产者之前我们需要加上对应的
spring: cloud: function: # 配置哪些Bean是Stream可以用的 definition: log;logPub;sendLog stream: # stream的配置 bindings: # 服务的整合处理 logPub-out-0: destination: log # 表示要使用的Exchange名称定义,不存在会自动创建 content-type: application/json # 设置消息类型,本次为json,文本则设置“text/plain”
写代码
随便新建一个类,并标记为
@Component public class logProducer { }
然后开始编写生产者的代码。加入主要的方法
方法的返回值只能是
方法里面可以写生产者的具体代码。会注册一个名为
@Component public class logListener { @Bean public Supplier<logListener.Person> logPub() { return () -> { Person person = new Person(); person.setName("张三"); System.out.println("生产者:"+person); return person; }; } public static class Person { private String name; public String getName() { return name; } public void setName(String name) { this.name = name; } @Override public String toString() { return this.name; } } }
关于
下面是
//Represents a supplier of results. //There is no requirement that a new or distinct result be returned each time the supplier is invoked. //This is a functional interface whose functional method is get(). public interface Supplier<T>
翻译过来大概就是:
运行
运行的话会发现控制台一直在打印。我们的队列里面也一直在新增。
StreamBridge
当前的运行方式是当写完生产者以后,
我们可以通过
- public boolean send(String bindingName, Object data):第一个参数是bindingName,我们输入的是sendLog,就需要增加sendLog的配置,我们也可以用之前的
logPub-out-0 。第二个参数是发送的数据。 - public boolean send(String bindingName, Object data, MimeType outputContentType):比上面的多了一个数据类型。
- public boolean send(String bindingName, @Nullable String binderName, Object data):还可以指定Binder的name
- public boolean send(String bindingName, @Nullable String binderName, Object data, MimeType outputContentType): 四个参数放在一起了。
@RestController public class logController { @Autowired private StreamBridge streamBridge; @GetMapping("/sendLog") public void sendLog() { logListener.Person person = new logListener.Person(); person.setName("李四"); System.out.println("生产者发送消息"+person); streamBridge.send("sendLog", person); } }
消费者
随便新建一个类,并标记为
@Component public class logListener { }
然后开始编写消费者的代码。加入主要的方法
方法的返回值可以是
方法里面就可以写消费的具体代码了。
@Component public class logListener { @Bean public Consumer<logListener.Person> log() { return person -> { System.out.println("Received: " + person); }; } public static class Person { private String name; public String getName() { return name; } public void setName(String name) { this.name = name; } @Override public String toString() { return this.name; } } }
关于
下面是
//Represents an operation that accepts a single input argument and returns no result. Unlike most other functional interfaces, Consumer is expected to operate via side-effects. //This is a functional interface whose functional method is accept(Object). public interface Consumer<T>
翻译过来大概就是说Consumer接口
该接口只有一个方法
下面是
//Represents a function that accepts one argument and produces a result. //This is a functional interface whose functional method is apply(Object). public interface Function<T, R>
翻译过来大概就是说
手动ACK
通过禁止使用死信队列来执行手动的ACK,这个时候如果抛出异常,则会重试。如果开启了死信队列,那么抛出异常以后则会进入死信队列。
log-in-0: consumer: auto-bind-dlq: false
队列持久化
上面可以看出来,创建的都是
但是正常开发中,很少使用这种,都会指定一个持久化的队列,不管程序是否运行,队列都存在。
我们可以在
log-in-0: destination: log content-type: application/json # 设置消息类型,本次为json,文本则设置“text/plain” group: log123 sendLog: destination: log content-type: application/json # 设置消息类型,本次为json,文本则设置“text/plain” group: log123
再次运行程序,可以看到该队列被创建。接下来停止程序,可以看到队列还存在那里。
bindings重命名
默认约定的名称为
但是我们也可以将它重命名。通过配置文件可以将
spring: cloud: stream: function: bindings: log-in-0: input
显式绑定创建
默认约定的是
通过配置文件
spring: cloud: stream: input-bindings: login;fooin output-bindings: logout;fooout
轮询配置属性
spring: integration: poller: # 全局配置 fixedDelay: 1000L # 默认轮询器的延迟 单位毫秒,默认1000L maxMessagesPerPoll: 1L # 默认轮询器的每个轮询事件的最大消息数。默认 1L cron: none # Cron 触发器的 Cron 表达式值。默认 none initialDelay: 0 # 周期性触发的初始延迟。 默认0 timeUnit: MILLISECONDS # 要应用于延迟值的 TimeUnit。默认 MILLISECONDS
也可以单独为某个bindings来配置
spring: cloud: stream: bindings: log-out-0: producer: poller: # log-out-0的单独配置 fixedDelay: 1000L # 默认轮询器的延迟 单位毫秒,默认1000L maxMessagesPerPoll: 1L # 默认轮询器的每个轮询事件的最大消息数。默认 1L cron: none # Cron 触发器的 Cron 表达式值。默认 none initialDelay: 0 # 周期性触发的初始延迟。 默认0 timeUnit: MILLISECONDS # 要应用于延迟值的 TimeUnit。默认 MILLISECONDS
函数组合
假设我们有两个处理
@Bean public Function<Message<String>, Message<String>> enrich() { return message -> { Assert.isTrue(!message.getHeaders().containsKey("foo"), "Should NOT contain 'foo' header"); return MessageBuilder.fromMessage(message).setHeader("foo", "bar").build(); }; } @Bean public Function<Message<String>, Message<String>> echo() { return message -> { Assert.isTrue(message.getHeaders().containsKey("foo"), "Should contain 'foo' header"); System.out.println("Incoming message " + message); return message; }; }
通过配置将这两个bean组合起来,组合之后,这个bean名称就编程了
spring: cloud: function: definition: enrich|echo # 函数组合 stream: function: bindings: enrich|echo-in-0: input # 重命名