消息应答机制

12/3/2023 RabbitMQ消息队列

# 一、消息应答详解

# 1、基本概念

前文在进行消息发送时,应答机制参数我们设置的是true,即自动应答。此时RabbitMQ 一旦向消费者传递了一条消息,便立即认为已经发送成功,并将该消息标记为删除。在通常情况下消费者完成一个消息处理任务可能需要一段时间,如果其中一个消费者在处理一个长的任务过程中突然挂掉了,将丢失正在处理的消息。以及后续发送给该消费者的消息,因为它无法接收到。
为了保证消息在发送过程中不丢失,rabbitmq 引入消息应答机制,消息应答就是消费者在接收到消息并且处理该消息成功之后,告诉 rabbitmq 它已经处理了,rabbitmq 可以把该消息删除了。

# 2、自动应答

前文案例我们一直使用的是自动应答方式,消息在发送后立即被认为已经传送成功,这种模式下如果消息在接收到之前,消费者那边出现连接关闭或者 channel 关闭,或者是在消息处理过程中消费者宕机,那么消息就丢失了,使用这种模式需要在高吞吐量和数据传输安全性方面做权衡,速度快,但是会失去数据安全性。

# 3、手动应答

消费者在接收到消息以后使用手动应答的方式回复RabbitMQ已经接收成功,在java中,可以使用以下三种函数进行手动应答:

  • Channel.basicAck(用于肯定确认):告诉RabbitMQ已成功处理该消息,可以将其丢弃了。
  • Channel.basicNack(用于否定确认):告诉RabbitMQ没有成功接收该消息。
  • Channel.basicReject(用于否定确认):告诉RabbitMQ不处理该消息而是直接拒绝,可以将其丢弃,与Channel.basicNack相比少一个Multiple参数。

multiple 解释:

image.png multiple取值为true和false代表不同意思:

  • true:代表批量应答channel上未应答的消息。比如说channel上有传送tag的消息 5,6,7,8 当前tag是8,那么此时5-8的这些还未应答的消息都会被确认收到消息应答,即批量应答。但此时后边的5-7实际上还未接收处理,此时进行批量应答仍然会有数据丢失的风险。
  • false:同上面相比只会应答tag=8的消息 5,6,7 这三个消息依然不会被确认收到消息应答,相当于是逐条应答,此时可靠性较高,相应的速度会降低。

image.png

# 4、自动重新入队

如果消费者由于某些原因失去连接(其通道已关闭,连接已关闭或 TCP 连接丢失),导致消息未发送 ACK 确认,RabbitMQ 将了解到消息未完全处理,并将对其重新排队。如果此时其他消费者可以处理,它将很快将其重新分发给另一个消费者。这样,即使某个消费者偶尔下线,也可以确保不会丢失任何消息。
image.png

# 5、手动应答代码

默认情况下在进行消息接收 channel.basicConsume() 的第二个函数为true即自动应答,所以我们要想实现消息消费过程中不丢失,需要把自动应答改为手动应答,并在消费者实现代码中手动实现ack函数。
消息生产者代码:

public class Task {
    //队列名称
    private static final String QUEUE_NAME = "task_queue";

    public static void main(String[] args) throws Exception {
        // 1.通过工具类建立信道
        Channel channel = RabbitMqUtil.getChannel();
        // 2.声明队列
        channel.queueDeclare(QUEUE_NAME, false, false, false, null);
        
        // 3.从控制台获取输入,并将其作为消息发送消息队列
        Scanner scanner = new Scanner(System.in);
        System.out.println("请输入信息");
		while (scanner.hasNext()){
			String message = scanner.next(); 
            channel.basicPublish("",QUEUE_NAME,null,message.getBytes("UTF-8")); 
			System.out.println("生产者发出消息:"+message);
		}
        // 4.关闭资源
        channel.close();
    }
}
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22

消息消费者01代码:

public class Consumer01 {
    //队列名称
    private static final String QUEUE_NAME = "task_queue";

    public static void main(String[] args) throws Exception {
        Channel channel = RabbitMqUtil.getChannel();
        System.out.println("C1等待接收消息,处理时间较短.....");
        // 采用手动应答
        channel.basicConsume(QUEUE_NAME, false, new DefaultConsumer(channel){
            @Override
            public void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body) throws IOException {
                String message = new String(body, "UTF-8");
                SleepUtils.sleep(1);
                System.out.println("接收到消息: " + message);
                /*
                 * 确认消息一般放在最后,等程序执行完无异常后,在进行确认消息
                 *
                 * 第一个参数:添加手动确认消息的唯一标识
                 * 第二个参数:确认方式。是否一次抓取并确认多个消息
                 *   true:设置为true,即表示可以批量确认消息
                 *   false:设置为false,即表示无论多少个消息,都需要一次一次的确认
                 *   实际中可根据不同场景进行设置...
                 */
                channel.basicAck(envelope.getDeliveryTag(),false);
            }
        });
    }
}
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28

消息消费者02代码:

public class Consumer02 {
    //队列名称
    private static final String QUEUE_NAME = "task_queue";

    public static void main(String[] args) throws Exception {
        Channel channel = RabbitMqUtil.getChannel();
        System.out.println("C2等待接收消息,处理时间较长.....");
        // 采用手动应答
        channel.basicConsume(QUEUE_NAME, false, new DefaultConsumer(channel){
            @Override
            public void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body) throws IOException {
                String message = new String(body, "UTF-8");
                SleepUtils.sleep(30);
                System.out.println("接收到消息: " + message);
                channel.basicAck(envelope.getDeliveryTag(),false);
            }
        });
    }
}
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19

睡眠工具类代码:

public class SleepUtils {
    public static void sleep(int second){
        try {
        	Thread.sleep(1000*second);
        } catch (InterruptedException _ignored) {
        	Thread.currentThread().interrupt();
        }
    }
}
1
2
3
4
5
6
7
8
9

# 6、手动应答演示

正常情况下消息发送方发送两个消息 C1 和 C2 分别接收到消息并进行处理。
image.png
在发送消息dd时,dd本应交给c2处理,由于它处理时间比较长,在还未处理完时,模拟c2出现故障,将c2停掉,也就是说C2还没有执行ack代码的时候,C2宕机了,此时会看到消息被C1接收到了,说明消息dd被重新入队,然后分配给能处理消息的C1处理了。
image.png
image.png
image.png

# 二、不公平分发

上述例子的运行结果其实属于公平分发,即消息被均匀分发到各个消费者,每个消费者各占N条消息,但是在某种场景下这种策略并不是很好,比方说有两个消费者在处理任务,其中有个消费者1处理任务的速度非常快,而另外一个消费者处理速度却很慢,这个时候我们还是采用轮训分发的话就会导致处理速度快的消费者很大一部分时间处于空闲状态,而处理慢的那个消费者一直在干活,显然在这种场景下公平分发机制并不适合。
为了避免这种情况,我们可以设置参数channel.basicQos(1);
RabbitMQ 提供了一种 Qos(Quality Of Service,服务质量)服务质量保证功能。即在非自动确认消息的前提下,如果一定数目的消息未被确认之前,不再消费新的消息。
image.png
这个数值称之为预先抓取消息的数量值,当设置为1时,c1和c2每次都会从队列中抓取一个消息进行处理,当执行完处理流程发送确认完毕以后才会继续抓取下一个,如果c2速度慢,这个任务还没有处理完,rabbitmq就会把下一个任务分配给没有那么忙的那个空闲消费者c1,当然如果所有的消费者都没有完成手上任务,队列还在不停的添加新任务,队列有可能就会遇到队列被撑满的情况,这个时候就只能添加新的worker或者改变其他存储任务的策略。
消费者每次最多拉取1条消息消费,这样谁处理的快谁拉取下一条消息,实现了不公平分发。

注意:使用这种机制一定是在手动应答情况下的,不能是自动应答。


通过测试发现,c1的速度快,因此c1处理的消息就多,属于能者多劳。

# 三、预取值机制

不公平分发其实就是使用了预取值机制,通过将prefetch设置为1,消费端每次拉取一条消息进行处理,这样谁的速度快谁拉取的频率就高,自然处理的消息就多。下面是预取值(prefetch)的详细介绍。

ChatGPT:

在 RabbitMQ 中,预取值(Prefetch Count)是指消费者从队列中预取的消息数量。当一个消费者连接到一个队列并开始消费消息时,它可以通过设置预取值来控制一次从队列中获取的消息数量。预取值可以在消费者创建时进行设置,也可以在运行时进行更改。 预取值的主要作用是控制消费者的负载,避免一个消费者在处理消息时占用过多的资源,导致其他消费者无法获得足够的资源。通过限制每次预取的消息数量,可以控制消费者的处理速度,避免过度消费队列中的消息。 预取值的设置方式有两种:

  • 全局设置:通过 channel.basicQos(prefetchCount) 方法设置全局预取值。在这种情况下,所有的消费者都将使用相同的预取值。
  • 单独设置:通过 channel.basicConsume(queue, consumer) 方法的 prefetchCount 参数设置单独的预取值。在这种情况下,每个消费者都可以使用不同的预取值。

假想我们 RabbtiMQ 服务器积压了成千上万条未处理的消息,然后随便打开一个消费者客户端,巨量的消息瞬间全部喷涌推动过来,但是单个客户端无法同时处理这么多条数据,就会被压垮崩溃。此时可以通过消费端限流的方式限制消息的拉取速度,达到保护消费端的目的。