RabbitMQ如何保证消息传输可靠性

1、保证消息发送到 MQ

RabbitMQ 的消息最终时存储在 Queue 上的,而在 Queue 之前还要经过 Exchange,那么这个过程中就有两个地方可能导致消息丢失。第一个是 Producer 到 Exchange 的过程,第二个是 Exchange 到 Queue 的过程。

上面两个可能丢失的过程,都可以利用 confirm 机制,注册回调来监听是否成功。
Publisher Confirm 是一种机制,用于确保消息已经被 Exchange 成功接收和处理。一旦消息成功到达 Exchange 并被处理,RabbitMQ 会向消息生产者发送确认信号(ACK)。如果由于某种原因(例如,Exchange 不存在或路由键不匹配)消息无法被处理,RabbitMQ 会向消息生产者发送否认信号(NACK)。

// 启用Publisher Confirms
channel.confirmSelect();

// 设置Publisher Confirms回调
channel.addConfirmListener(new ConfirmListener() {
    @Override
    public void handleAck(long deliveryTag, boolean multiple) throws IOException {
        System.out.println("Message confirmed with deliveryTag: " + deliveryTag);
        // 在这里处理消息确认
    }

    @Override
    public void handleNack(long deliveryTag, boolean multiple) throws IOException {
        System.out.println("Message not confirmed with deliveryTag: " + deliveryTag);
        // 在这里处理消息未确认
    }
});

Publisher Returns 机制与 Publisher Confirms 类似,但用于处理在消息无法路由到任何队列时的情况。当 RabbitMQ 在无法路由消息时将消息返回给消息生产者,但是如果能正确路由,则不会返回消息。

// 启用Publisher Returns
channel.addReturnListener(new ReturnListener() {
    @Override
    public void handleReturn(int replyCode, String replyText, String exchange, String routingKey, AMQP.BasicProperties properties, byte[] body) throws IOException {
        System.out.println("Message returned with replyCode: " + replyCode);
        // 在这里处理消息发送到Queue失败的返回
    }
});

通过以上方式,我们注册了两个回调监听,用于在消息发送到 Exchange 或者 Queue 失败时进行异常处理。通常我们可以在失败时进行报警或者重试来保障一定能发送成功。
完整的代码如下:

import com. Rabbitmq. Client.*;

Public class PublisherCallbacksExample {

    public static void main(String[] args) throws Exception {
        ConnectionFactory factory = new ConnectionFactory();
        factory.setHost("localhost");

        try (Connection connection = factory.newConnection();
             Channel channel = connection.createChannel()) {

            // 启用Publisher Confirms
            channel.confirmSelect();

            // 设置Publisher Confirms回调
            channel.addConfirmListener(new ConfirmListener() {
                @Override
                public void handleAck(long deliveryTag, boolean multiple) throws IOException {
                    System.out.println("Message confirmed with deliveryTag: " + deliveryTag);
                    // 在这里处理消息确认
                }

                @Override
                public void handleNack(long deliveryTag, boolean multiple) throws IOException {
                    System.out.println("Message not confirmed with deliveryTag: " + deliveryTag);
                    // 在这里处理消息未确认
                }
            });

            // 启用Publisher Returns
            channel.addReturnListener(new ReturnListener() {
                @Override
                public void handleReturn(int replyCode, String replyText, String exchange, String routingKey, AMQP.BasicProperties properties, byte[] body) throws IOException {
                    System.out.println("Message returned with replyCode: " + replyCode);
                    // 在这里处理消息发送到Queue失败的返回
                }
            });

            String exchangeName = "my_exchange";
            String routingKey = "my_routing_key";
            String message = "Hello, RabbitMQ!";

            // 发布消息到Exchange
            channel.basicPublish(exchangeName, routingKey, true, null, message.getBytes());

            // 等待Publisher Confirms
            if (!channel.waitForConfirms()) {
                System.out.println("Message was not confirmed!");
            }

            // 关闭通道和连接
            channel.close();
        }
    }
}

2、保证 MQ 发送给消费者不丢

RabbitMQ 在接收到消息后,默认并不会立即进行持久化,而是先把消息暂存在内存中,这时候如果 MQ 挂了,那么消息就会丢失。所以需要通过持久化机制来保证消息可以被持久化下来。

2.1 队列和交换机的持久化

在声明队列时,可以通过设置 durable 参数为 true 来创建一个持久化队列。持久化队列会在 RabbitMQ 服务器重启后保留,确保队列的元数据不会丢失。
在声明交换机时,也可以通过设置 durable 参数为 true 来创建一个持久化交换机。持久化交换机会在 RabbitMQ 服务器重启后保留,以确保交换机的元数据不会丢失。
绑定关系通常与队列和交换机相关联。当创建绑定关系时,还是可以设置 durable 参数为 true,以创建一个持久化绑定。持久化绑定关系会在服务器重启后保留,以确保绑定关系不会丢失。

@Bean
public Queue TestQueue() {
    // 第二个参数durable:是否持久化,默认是false
    return new Queue("queue-name",true,true,false);
}


@Bean
public DirectExchange mainExchange() {
  	//第二个参数durable:是否持久化,默认是false
    return new DirectExchange("main-exchange",true,false);
}
2.2 持久化消息

生产者发送的消息可以通过设置消息的 deliveryMode 为 2 来创建持久化消息。持久化消息在发送到持久化队列后,将在服务器重启后保留,以确保消息不会丢失。

DeliveryMode 是一项用于设置消息传递模式的属性,用于指定消息的持久性级别。DeliveryMode 可以具有两个值:
1(非持久化):这是默认的传递模式。如果消息被设置为非持久化,RabbitMQ 将尽力将消息传递给消费者,但不会将其写入磁盘,这意味着如果 RabbitMQ 服务器在消息传递之前崩溃或重启,消息可能会丢失。
2(持久化):如果消息被设置为持久化,RabbitMQ 会将消息写入磁盘,以确保即使在 RabbitMQ 服务器重启时,消息也不会丢失。持久化消息对于重要的消息非常有用,以确保它们不会在传递过程中丢失。

Message message = MessageBuilder.withBody("hello, spring".getBytes(StandardCharsets.UTF_8)) //kp 消息体,字符集
                .setDeliveryMode(MessageDeliveryMode.PERSISTENT) 
                .build();

rabbitTemplate.convertAndSend("simple.queue", message);

通过设置 deliveryMode 类实现消息的持久化。但是需要注意,将消息设置为持久化会增加磁盘 I/O 开销。

2.3 消费者确认机制

有了持久化机制后,那么怎么保证消息在持久化下来之后一定能被消费者消费呢?这里就涉及到消息的消费确认机制。
在 RabbitMQ 中,消费者处理消息成功后可以向 MQ 发送 ack 回执,MQ 收到 ack 回执后才会删除该消息,这样才能确保消息不会丢失。如果消费者在处理消息中出现了异常,那么就会返回 nack 回执,MQ 收到回执之后就会重新投递一次消息,如果消费者一直都没有返回 ACK/NACK 的话,那么他也会在尝试重新投递。

2.4 无法做到 100%不丢

虽然我们通过发送者端进行异步回调、MQ 进行持久化、消费者做确认机制,但是也没办法保证 100%不丢,因为 MQ 的持久化过程其实是异步的。即使我们开了持久化,也有可能在内存暂存成功后,异步持久化之前宕机了,那么这个消息就会丢失。
如果想要做到 100%不丢失,就需要引入本地消息表,来通过轮询的方式来进行消息重投。