搬砖小抄

Spring Cloud Stream 阅读笔记(二)

字数统计: 2.7k阅读时长: 10 min
2018/04/13 Share

Spring Cloud Stream Reference Guide(Ditmars.RELEASE)的阅读笔记
相关内容

  • 第四章:绑定器(Binder)
  • 第五章:配置选项

文档中到处都有BindingBinder这两个概念,一开始觉得有点晕头转向,推敲之后发现:

  • Binding 指绑定的过程,和Binding有关参数就是在执行绑定过程中需要用到的参数.
  • Binder 指执行绑定过程的对象,如Rabbit Binder,Kafka Binder.

4 绑定器(Binder)

Binder是框架提供的一层抽象,其具体的实现有RabbitMQKafka.Binder接口定义如下:

1
2
3
4
5
public interface Binder<T, C extends ConsumerProperties, P extends ProducerProperties> {
Binding<T> bindConsumer(String name, String group, T inboundBindTarget, C consumerProperties);

Binding<T> bindProducer(String name, T outboundBindTarget, P producerProperties);
}

Binder的探测

Spring Cloud Stream 依靠Spring Boot的自动配置机制处理绑定过程,如果在类路径下有且只有一个Binder的实现,它就会被自动选择并使用.

如果类路径下存在多个Binder的实现,比如:

1
2
3
4
5
6
7
8
<dependency>
<groupId>org.springframework.cloud</groupId>
<artifactId>spring-cloud-stream-binder-rabbit</artifactId>
</dependency>
<dependency>
<groupId>org.springframework.cloud</groupId>
<artifactId>spring-cloud-stream-binder-kafka</artifactId>
</dependency>

这种情况下,应用程序就需要明确指定使用那个Binder的实现,有两种方式:

  • 全局方式:spring.cloud.stream.defaultBinder=binder的名称,比如
    1
    spring.cloud.stream.defaultBinder=rabbit
  • 每个Channel单独指定:spring.cloud.stream.bindings.input.binder=binder的名称,比如
    1
    2
    spring.cloud.stream.bindings.my-channle-1.binder=kafka
    spring.cloud.stream.bindings.my-channle-2.binder=rabbit
    至于Binder的名字,可以查看Binder的jar包中的META-INF/spring.binders文件,内容大致如下:
    1
    2
    rabbit:\
    org.springframework.cloud.stream.binder.rabbit.config.RabbitServiceAutoConfiguration

配置网络信息

一直纳闷,为啥文档全篇都不提及怎么配置网络信息,研究了一下发现它们归Spring Boot管,RabbitMQ的通过:RabbitProperties.java映射,Kafka的通过KafkaProperties.java映射.

相关的配置项可以在common-application-properties查阅.

再结合Connecting to Multiple Systems的内容,总结起来有两种配置模式(以RabbitMQ为例):

  • 第一种是全局配置:相关配置信息写在spring.rabbitmq下.
  • 第二种是spring cloud stream所支持的多环境独立配置:需要写在spring.cloud.stream.binders.<配置名称>.environment.spring.rabbitmq.*里面.例子:
    1
    2
    3
    4
    5
    6
    7
    8
    9
    10
    11
    12
    13
    14
    15
    16
    17
    18
    19
    20
    21
    22
    23
    spring:
    cloud:
    stream:
    bindings:
    input:
    destination: foo
    binder: rabbit1
    output:
    destination: bar
    binder: rabbit2
    binders:
    rabbit1:
    type: rabbit
    environment:
    spring:
    rabbitmq:
    host: <host1>
    rabbit2:
    type: rabbit
    environment:
    spring:
    rabbitmq:
    host: <host2>
    这个配置为不同的通道(input和output)配备了不同的Broker主机,即两个通道连不同的服务器(Broker),Spring Cloud Stream在处理通道绑定的时候,会用spring.cloud.stream.binders.<配置名称>.environment.spring.rabbitmq.里面的内容创建一个新的RabbitProperties对象,用这个对象去创建连接工厂(CachingConnectionFactory)

Binder 的配置选项

Binder的配置参数需要写在spring.cloud.stream.binders.<configurationName>下面.上面那个多环境的配置就是一个例子.
具体的配置项有以下这些:

type
指明Binder的类型,RabbitMQ叫rabbit,Kafka叫kafka.Binder会在META-INF/spring.binders标明自己的type名称.

默认值:这个参数的默认值就是配置名称(spring.cloud.stream.binders.<configurationName>的最后一部分)的值.

inheritEnvironment
是否继承来自应用程序的环境信息(environment)

默认值:true.

environment
用于申明自定义Binder环境配置信息

默认值:空

defaultCandidate
是否将此Binder的视为默认Binder的候选者.

默认值:true.

5 配置选项

配置分为两个大类,一类通用配置,另一类是特定中间件的配置.
配置信息可以由Spring Boot支持的任何方式传递给框架(Spring Cloud Stream),比如命令行,环境变量,Cloud Config,配置文件等.

Spring Cloud Stream 配置项

这部分配置项的前缀是:spring.cloud.stream.

instanceCount

应用程序的实例数量.

  • 默认值: 1
  • 如果用的是kafka,并且需要支持分区就一定设置这个参数.

instanceIndex

应用程序实例的索引号,范围是[0,instanceCount).

  • 默认值:0
  • 用于配置kafka分区,在Cloud Foundry环境下框架会自动设置.

dynamicDestinations

动态目的地,列表类型

  • 默认值:empty
  • 如果设置了,只有列表中的目的地可以绑定,没有设置的话,任何目的地都允许绑定.

可以理解为限制通道目的地的白名单.

defaultBinder

指定默认绑定器

  • 默认值:empty

前面Binder的探测提到过这个这个参数.

overrideCloudConnectors

覆盖云平台提供的Connector

  • 默认值: false
  • 这个参数生效需要两个前提条件:
    • cloud配置(profile)被激活
    • 云平台提供了Spring Cloud Connectors(给应用程序).
  • false:框架会自动探测绑定服务,并使用该绑定服务创建连接.
  • true:框架会忽略绑定服务,根据程序自身的配置(如spring.rabbitmq.*)来创建连接.

绑定(Binding)配置项

一般来说,主要就是配置这部分参数,这样才能让消息发往正确的地方,或者从Broker上拿到想要的消息.
配置基于这样一种格式:

1
spring.cloud.stream.bindings.<channelName>.<property>=<value>

这部分配置又可以分为三类:

  • 通用的配置项
  • 消息生产者专用的配置项
  • 消息消费者专用的配置项

通用Binging配置项

这部分配置的前缀是spring.cloud.stream.bindings.<channelName>,channelName表示通道的名称,这个是自己定义的.也就是说需要为每一个通道分别配置.
如果需要声明一些默认配置可以使用spring.cloud.stream.bindings.default.<property>=<value>.比如:

1
spring.cloud.stream.default.contentType=application/json
destination

通道的目的地名称,对于RabbitMQ就是交换机,对于Kafka来说就是topic.

  • 默认值:通道的名称

如果此通道是绑定给消费者(consumer)的,这个地方就可以设置多个目的地,用逗号分隔.

group

通道的消费者组名称,只有入站绑定才有效

  • 默认值: null(表示匿名消费者)
contentType

内容的类型,比如application/json

  • 默认值: null(无强制类型要求)
binder

指定此通道所使用的绑定器.

  • 默认值:null(使用默认绑定器,如果不存在会报错)

消费者Binging配置项

这部分配置的前缀是spring.cloud.stream.bindings.<channelName>.consumer.
如果需要声明一些消费者的默认配置可以使用spring.cloud.stream.default.consumer.<property>=<value>.比如:

1
spring.cloud.stream.default.consumer.headerMode=raw
concurrency

消费的并发性

  • 默认值:1

Stack Overflow上找到一个帖子,可以看看有助理解.

partitioned

是否分区(消息来自分区的生产者)

  • 默认值:false
headerMode

头部模式.如果设置为raw,header解析会被禁用.只有在消息中间件自身并不是支持消息头,但是又要求植入消息头的情况下才有效果.

  • 默认值:embeddedHeaders

这个参数一般用于消息来自外部消息系统的场景.

maxAttempts

处理消息的最大尝试次数,用于配置失败重试,设为1的就是禁用失败重试(因为这个次数包含第一次处理)

  • 默认值:3
backOffInitialInterval

重试时间间隔的初始值(毫秒).

  • 默认值:1000
backOffMaxInterval

重试时间间隔的最大值(毫秒).

  • 默认值:10000
backOffMultiplier

重试时间间隔的计算倍数(float).

  • 默认值:2.0
instanceIndex

这个参数用于设置该消费者对应的实例索引号

  • 默认值:-1

通用Binding配置里面也有一个配置实例索引号的配置(spring.cloud.stream.instanceIndex),那个地方相当于是全局配置,这里用于对当前的消费者单独配置.如果这个参数的值<0,表示使用全局配置.

instanceCount

和上面那个instanceIndex的作用机制一样.

  • 默认值:-1

生产者Binging配置项

这部分配置的前缀是spring.cloud.stream.bindings.<channelName>.producer.
如果需要声明一些消费者的默认配置可以使用spring.cloud.stream.default.producer.<property>=<value>.比如:

1
spring.cloud.stream.default.producer.partitionKeyExpression=payload.id
instanceCount

和instanceIndex原理一样.

  • 默认值:-1
partitionKeyExpression

声明如何根据消息确定消息分区的键,这个参数的值应该是一个SpEL表达式.如果配置了此参数或者partitionKeyExtractorClass的一个,就表示启用了分区(当前通道上的出站数据会被分区).不过,要是分区真正生效,还需要设置partitionCount>0.

  • 默认值:null

这个参数和partitionKeyExtractorClass是互相排斥的,只能配一个.

partitionKeyExtractorClass

声明如何根据消息确定消息分区的键,这个参数的值应该是一个PartitionKeyExtractorStrategy接口的实现类.作用机制和上面的partitionKeyExpression一样.

  • 默认值:null

这个参数和partitionKeyExpression是互相排斥的,只能配一个.

partitionSelectorClass

声明如何选择分区,这个参数的值应该是一个PartitionSelectorStrategy接口的实现类.这个参数需要依赖partitionKeyExpression或者 partitionKeyExtractorClass参数来计算分区键.

  • 默认值:null
  • 这个参数和partitionSelectorExpression是互相排斥的,只能配一个.假设这两个参数都没设置,那么分区的选择算法为:hashCode(key) % partitionCount
partitionSelectorExpression

声明如何选择分区,这个参数的值应该是一个SpEL表达式.其作用机制和上面的partitionSelectorClass一样.

  • 默认值:null
partitionCount

数据的分区数量(如果分区已经开启,见前面partitionKeyExpression部分).必须是一个>1的值.

  • 默认值:1

对于kafka而言,这个参数应该视为一个分区数量的提示,kafka会使用一个比这个更大的值.

requiredGroups

一个逗号分隔的group列表,用于确保消息能够被投递.

因为组(group)对与消费方来说才是必须要声明的,假设生产者比消费者先启动,那么消费者回因为组的声明时机太晚而无法拿到历史消息.rabbit Binder的实现就是检查这个参数,(自己)把队列创建了.

headerMode

和消费者Binging里的同名参数一样.

useNativeEncoding

是否采用原生编码,如果设置为true,消息的编码直接由中间件的原生客户端完成.

  • 默认值:false
errorChannelEnabled

是否开启错误通道.这个参数要参考Message Channel Binders and Error Channels.

  • 默认值:false

一个rabbit配置样本

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
spring:
rabbitmq:
host: 192.168.0.166
cloud:
stream:
default:
content-type: application/json
binders.rabbit1:
environment.spring.rabbitmq.host: 192.168.0.166
default-candidate: false
type: rabbit
bindings:
order_created_input:
binder: rabbit1
destination: orderCreated # the RabbitMQ exchange or Kafka topic
group: monitor
rabbit:
bindings:
order_created_input:
consumer:
acknowledgeMode: AUTO
durableSubscription: true
prefetch: 1
requeueRejected: true
autoBindDlq: true
republishToDlq: true

这个配置实现的效果是:

  • 默认的RabbitMQ 绑定器(Binder)使用192.168.0.166
  • 自定义了一个名为rabbit1的绑定器,类型是rabbit,它是专用的,因此default-candidate为false
  • 定义了一个绑定(Binding),通道名称是order_created_input,使用名为rabbit1的绑定器
  • spring.cloud.stream.rabbit下面是一些binder特定的配置项.RabbitMQ看这里,Kafka看这里
CATALOG
  1. 1. 4 绑定器(Binder)
    1. 1.1. Binder的探测
    2. 1.2. 配置网络信息
    3. 1.3. Binder 的配置选项
  2. 2. 5 配置选项
    1. 2.1. Spring Cloud Stream 配置项
      1. 2.1.1. instanceCount
      2. 2.1.2. instanceIndex
      3. 2.1.3. dynamicDestinations
      4. 2.1.4. defaultBinder
      5. 2.1.5. overrideCloudConnectors
    2. 2.2. 绑定(Binding)配置项
      1. 2.2.1. 通用Binging配置项
        1. 2.2.1.1. destination
        2. 2.2.1.2. group
        3. 2.2.1.3. contentType
        4. 2.2.1.4. binder
      2. 2.2.2. 消费者Binging配置项
        1. 2.2.2.1. concurrency
        2. 2.2.2.2. partitioned
        3. 2.2.2.3. headerMode
        4. 2.2.2.4. maxAttempts
        5. 2.2.2.5. backOffInitialInterval
        6. 2.2.2.6. backOffMaxInterval
        7. 2.2.2.7. backOffMultiplier
        8. 2.2.2.8. instanceIndex
        9. 2.2.2.9. instanceCount
      3. 2.2.3. 生产者Binging配置项
        1. 2.2.3.1. instanceCount
        2. 2.2.3.2. partitionKeyExpression
        3. 2.2.3.3. partitionKeyExtractorClass
        4. 2.2.3.4. partitionSelectorClass
        5. 2.2.3.5. partitionSelectorExpression
        6. 2.2.3.6. partitionCount
        7. 2.2.3.7. requiredGroups
        8. 2.2.3.8. headerMode
        9. 2.2.3.9. useNativeEncoding
        10. 2.2.3.10. errorChannelEnabled
    3. 2.3. 一个rabbit配置样本