# 一、工作队列模式原理
与简单模式相比,工作队列模式(Work Queue)多了一些消费者,该模式也是使用 direct 交换机,应用于处理消息较多的情况,对于任务过重或任务较多情况使用工作队列可以提高任务处理的速度。
工作队列模式实际上是一种竞争关系的模式,多个消费者之间是竞争关系,即一条消息如果被某个消费者消费了,那么其他的消费者就获取不到了。特点如下:
- 一个队列对应多个消费者。
- 一条消息只会被一个消费者消费。
- 消息队列默认采用 轮询 的方式将消息平均发送给消费者。
# 二、工作队列模式实战
# 1、抽取工具类
通过前边简单模式的实践发现,其实无论是生产者还是消费者,在代码实现中是有共通点的,我们可以将前期建立连接获取信道的代码片段抽取出来,形成一个工具类。
public class RabbitMqUtil {
//得到一个连接的 channel
public static Channel getChannel() throws Exception {
//创建一个连接工厂并建立连接
ConnectionFactory factory = new ConnectionFactory();
factory.setHost("192.168.222.139");
factory.setUsername("admin");
factory.setPassword("scr1pt_yang.");
Connection connection = factory.newConnection();
Channel channel = connection.createChannel();
return channel;
}
}
1
2
3
4
5
6
7
8
9
10
11
12
13
2
3
4
5
6
7
8
9
10
11
12
13
# 2、消费者代码
工作队列模式的消费者与简单模式的消费者代码实现上没有区别,只是消费者数量增加了。
/**
* Description: 工作队列模式消费者01
*/
public class Consumer01 {
//队列名称
private static final String QUEUE_NAME = "work_queue";
public static void main(String[] args) throws Exception {
Channel channel = RabbitMqUtil.getChannel();
// 创建队列
channel.queueDeclare(QUEUE_NAME, true, false, false, null);
System.out.println("C1等待接收消息.....");
channel.basicConsume(QUEUE_NAME, true, new DefaultConsumer(channel){
@Override
public void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body) throws IOException {
String message = new String(body, "UTF-8");
System.out.println("接收到消息: " + message);
}
});
}
}
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
/**
* Description: 工作队列模式消费者02
*/
public class Consumer01 {
//队列名称
private static final String QUEUE_NAME = "work_queue";
public static void main(String[] args) throws Exception {
Channel channel = RabbitMqUtil.getChannel();
System.out.println("C2等待接收消息.....");
channel.basicConsume(QUEUE_NAME, true, new DefaultConsumer(channel){
@Override
public void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body) throws IOException {
String message = new String(body, "UTF-8");
System.out.println("接收到消息: " + message);
}
});
}
}
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
# 3、生产者代码
生产者代码逻辑比较简单,获取到channel以后直接通过channel.basicPublish()
发送消息即可。
/**
* Description: 工作队列模式生产者
*/
public class Producer {
//队列名称
private static final String QUEUE_NAME = "work_queue";
public static void main(String[] args) throws Exception {
// 1.通过工具类建立信道
Channel channel = RabbitMqUtil.getChannel();
// 2.从控制台获取输入,并将其作为消息发送消息队列
Scanner scanner = new Scanner(System.in);
while (scanner.hasNext()){
String message = scanner.next();
channel.basicPublish("",QUEUE_NAME,null,message.getBytes());
System.out.println("生产者发出消息:"+message);
}
// 3.关闭资源
channel.close();
}
}
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
# 4、查看运行结果
先将消费者启动,因为消费者1的代码中负责声明了work_queue队列,再启动生产者进行消息的发送。
通过上述执行结果可以发现在工作队列工作模式下,消息队列是通过轮询的方式将消息轮流发送给所有消费者,当第一条消息到达消费者1时,消费者2则不能接收到这条消息,同理反之亦然。两者接受到的消息是互斥的。