Spring Cloud Stream使用Spring Cloud Function提供的功能,分别将
发布数据
在Spring Cloud Stream中,发布数据的方式主要有两种,一种是通过
通过Supplier<T> 自动触发
使用
命令式(Imperative)编程模式的触发
如果使用的是命令式编程模式(Imperative),则直接在
java
复制代码
我们可以通过两种方式来调整触发频率:
-
全局属性:
spring.integration.poller.xxx - 设置所有函数的拉取频率
-
每个绑定特定的属性:
spring.cloud.stream.bindings.<binding-name>.producer.poller.xxx - 为某个特定的绑定设置拉取频率
反应式(Reactive)编程模式的触发
使用反应式编程模式时,
-
无限流
java
复制代码
@Bean Supplier<Flux<String>> stringSupplier() { return () -> Flux.fromStream( Stream.generate(() -> { try { TimeUnit.SECONDS.sleep(1); String value = "Hello World!!"; log.info(">>> {}", value); return value; } catch (InterruptedException e) { throw new RuntimeException(e); } })) .subscribeOn(Schedulers.boundedElastic()) .share(); } -
有限流,使用
PollableBean 注解java
复制代码
@PollableBean Supplier<Flux<String>> stringSupplier() { return () -> { System.out.println("sending..."); return Flux.just("Hello", "World", "!"); }; }
通过StreamBridge 触发外部源
如果要发布的数据来自于REST请求,或者其他的外部源系统,则可以使用
基本使用
比如,下面是通过GET请求发布数据:
java
复制代码
上述
yaml
复制代码
建议提前配置好bindingName,否则可能会造成内存溢出。为了防止内存溢出,可以通过以下属性来限制动态创建binding的数量。
spring.cloud.stream.dynamic-destination-cache-size=5
使用Interceptor
因为
java
复制代码
注:
-
ChannleInterceptor 的实现类需要注册到Spring容器中 -
使用
GlobalChannelIntercptor 来标注这个拦截器-
参数
patterns 的值可以控制该拦截器的硬性范围* 表示拦截所有bindingfoo-* 表示只拦截那些以foo- 开头的binding
-
消费数据
可以使用多种方式来消费stream中的数据,最常见的方式是通过
反应式的Consumer
对于命令式编程模式,使用
对于反应式的Consumer,由于它没有返回值,导致Spring Framework无法自动subscribe它,所以需要一些特殊处理。有两种方式:
使用Function<Flux<T>,Mono<Void>
建议的方式,是不用
java
复制代码
具体的操作,可以放在
使用Consumer<Flux<T>> ,主动订阅
如果必须使用
java
复制代码
函数组合(Function Composition)
利用Spring Cloud Function的函数组合功能,可以将多个
yaml
复制代码
上述配置,表示:
- 应用中有两个
Function (uppercase和quote)和一个Consumer (printText),他们共同组成一个运行时函数; - 为了方便引用,这个运行时函数的绑定器被命名为
convert ,即:uppercase|quote|printText-in-0 的别名 - 将这个绑定器绑定到叫做
my-binding 的目标上
使用函数组合,可以很方便地执行切面操作,比如上述案例中的quote其实可以看作一个切面,它对输入做了quote的增强。
多输入/输出参数
在需要合并/分流stream的场景下,会涉及到多个输入/输出参数。可以利用Project Reactor提供的Tuple来处理。
java
复制代码
对应的绑定器命名如下:
gather-in-0 :第一个输入流的绑定器gather-in-1 :第二个输入流的绑定器gather-out-0 :第一个输出流的绑定器
使用PollableMessageSource
如果不想使用函数式编程,那么Spring Framework提供了
java
复制代码
结论
使用Spring Cloud Stream的函数式编程模式,可以使用很少的代码实现数据流的发布、流转和消费。另外,Spring Cloud Stream默认支持Kafka和RabbitMQ,内置了它们的binder。如果需要支持其他的消息软件,则可以根据规范自己开发。