搬砖小抄

Spring Cloud Stream RabbitMQ 可靠消息传输

字数统计: 824阅读时长: 3 min
2020/07/22 Share

使用RabbitMQ进行消息传输有以下几个地方可能会丢失数据:

  • 1: 投递失败,即producerRabbitMQ的传输过程中发生意外,RabbitMQ没拿到消息。
  • 2: RabbitMQ自己把消息弄丢了
  • 3: 消费失败,包含RabbitMQconsumer的传输过程中发生意外,以及consumer在处理消息的时候发生异常。

对于第三点,网上有些文章的观点是只要消费方成功收到消息就是算完成可靠传输

消息传输的过程如下图所示

解决办法

借用一张别人的图,原生RabbitMQ的解决方案:

1 确保消息投递成功(confirm模式)

1
2
3
# 让连接池支持publisher confirms
spring.rabbitmq.publisher-confirm-type: correlated
spring.rabbitmq.publisher-returns: true

题外话:如果想处理消息发送的确认事件(比如消息确认发送成功后执行某个逻辑),则可以通过confirmAckChannel指定一个通道的名称,然后在程序中去监听这个通道。

1
spring.cloud.stream.rabbit.bindings.<channelName>.producer.confirm-ack-channel=XXX

confirmAckChannel:When errorChannelEnabled is true, a channel to which to send positive delivery acknowledgments (aka publisher confirms). If the channel does not exist, a DirectChannel is registered with this name. The connection factory must be configured to enable publisher confirms. Default: nullChannel (acks are discarded).

对应的配置为

1
2
3
4
5
6
7
8
9
10
11
12
spring:
cloud:
stream:
bindings:
<channelName>:
producer:
error-channel-enabled: true
rabbit:
bindings:
<channelName>:
producer:
confirm-ack-channel: acks

2 确保消息被RabbitMQ妥善保存(开启持久化)

相关参数默认就是开始持久化的,不用特别配置

发送端

1
2
3
4
# 默认就是 true 
spring.cloud.stream.rabbit.bindings.<channelName>.consumer.exchange-durable=true
# 默认就是 true
spring.cloud.stream.rabbit.bindings.<channelName>.consumer.durable-subscription=true

durableSubscription:Whether the subscription should be durable. Only effective if group is also set. Default: true.

exchangeDurable:If declareExchange is true, whether the exchange should be durable (that is, it survives broker restart). Default: true.

消费端

1
2
# 默认就是 PERSISTENT 
spring.cloud.stream.rabbit.bindings.<channelName>.producer.delivery-mode=persistent

deliveryMode:The delivery mode. Default: PERSISTENT.

3 确保消费方完成消息处理(ACK确认)

消费端

1
2
# 默认就是 AUTO 
spring.cloud.stream.rabbit.bindings.<channelName>.consumer.acknowledge-mode=auto

acknowledgeMode: The acknowledge mode. Default: AUTO.

4 备注

durableSubscription 的用处

原生的RabbitMQ 是单独对队列进行持久化配置,Spring Cloud Stream是通过durableSubscription来设置,相关代码如下:

RabbitExchangeQueueProvisioner.doProvisionConsumerDestination

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
boolean durable = !anonymous && properties.getExtension().isDurableSubscription();
Queue queue;
if (anonymous) {
String anonQueueName = queueName;
queue = new AnonymousQueue((org.springframework.amqp.core.NamingStrategy) () -> anonQueueName,
queueArgs(queueName, properties.getExtension(), false));
}
else {
if (partitioned) {
String partitionSuffix = "-" + properties.getInstanceIndex();
queueName += partitionSuffix;
}
if (durable) {
queue = new Queue(queueName, true, false, false,
queueArgs(queueName, properties.getExtension(), false));
}
else {
queue = new Queue(queueName, false, false, true,
queueArgs(queueName, properties.getExtension(), false));
}
}

关于acknowledgeMode.AUTO

org.springframework.amqp.core.AcknowledgeMode

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
public enum AcknowledgeMode {

/**
* No acks - {@code autoAck=true} in {@code Channel.basicConsume()}.
*/
NONE,

/**
* Manual acks - user must ack/nack via a channel aware listener.
*/
MANUAL,

/**
* Auto - the container will issue the ack/nack based on whether
* the listener returns normally, or throws an exception.
* <p><em>Do not confuse with RabbitMQ {@code autoAck} which is
* represented by {@link #NONE} here</em>.
*/
AUTO;
}

也就是说,只要你的处理逻辑不抛异常,框架就会自动发送ack,反之则发送nack

总结:默认情况下只需要让连接池支持publisher confirms就行了

application.yml

1
2
3
4
5
spring:
rabbitmq:
# 让连接池支持publisher confirms
publisher-confirm-type: correlated
publisher-returns: true

效果

rabbit-channels

rabbit-queues

rabbit-exchanges

参考资料

CATALOG
  1. 1. 解决办法
    1. 1.1. 1 确保消息投递成功(confirm模式)
    2. 1.2. 2 确保消息被RabbitMQ妥善保存(开启持久化)
    3. 1.3. 3 确保消费方完成消息处理(ACK确认)
    4. 1.4. 4 备注
      1. 1.4.1. durableSubscription 的用处
      2. 1.4.2. 关于acknowledgeMode.AUTO
    5. 1.5. 总结:默认情况下只需要让连接池支持publisher confirms就行了
    6. 1.6. 效果
      1. 1.6.1. rabbit-channels
      2. 1.6.2. rabbit-queues
      3. 1.6.3. rabbit-exchanges
  2. 2. 参考资料