Code Ease Code Ease
  • 个人博客网站 (opens new window)
  • 好用的工具网站 (opens new window)
  • Java核心基础
  • 框架的艺术
  • 分布式与微服务
  • 开发经验大全
  • 设计模式
  • 版本新特性
数据库系列
大数据+AI
  • xxl-job
运维与Linux
  • 基于SpringBoot和BootStrap的论坛网址
  • 基于VuePress的个人博客网站
  • 基于SpringBoot开发的小功能
  • 做一个自己的IDEA插件
程序人生
关于我
  • 分类
  • 标签
  • 归档

神秘的鱼仔

你会累是因为你在走上坡路
  • 个人博客网站 (opens new window)
  • 好用的工具网站 (opens new window)
  • Java核心基础
  • 框架的艺术
  • 分布式与微服务
  • 开发经验大全
  • 设计模式
  • 版本新特性
数据库系列
大数据+AI
  • xxl-job
运维与Linux
  • 基于SpringBoot和BootStrap的论坛网址
  • 基于VuePress的个人博客网站
  • 基于SpringBoot开发的小功能
  • 做一个自己的IDEA插件
程序人生
关于我
  • 分类
  • 标签
  • 归档
服务器
  • Java核心基础

  • 框架的艺术

    • Spring

    • Mybatis

    • SpringBoot

    • MQ

      • RabbitMQ的了解安装和使用
      • 简单队列详解
      • 工作队列详解
        • (一)RabbitMQ工作队列模型结构
        • (二)工作队列实践(轮询分发)
          • 2.1 创建工具类
          • 2.2 创建生产者
          • 2.3 创建消费者一
          • 2.4 创建消费者二
        • (三)公平分发(Fair dispatch)
          • 3.1 修改生产者
          • 3.2 修改消费者
          • 3.3 关于自动应答
      • 发布-订阅模型详解
      • routing路由模式和Topic主题模式
      • RabbitMQ消息确认机制
    • Zookeeper

    • netty

  • 分布式与微服务

  • 开发经验大全

  • 版本新特性

  • Java
  • 框架的艺术
  • MQ
CodeEase
2023-11-11
目录

工作队列详解

作者:鱼仔
博客首页: codeease.top (opens new window)
公众号:Java鱼仔

# (一)RabbitMQ工作队列模型结构

工作队列的模型相比简单队列增加了消费者的数量。

3-1.png

生产者提供消息到消息队列中,消费者可以去获取队列中的消息。在工作队列中默认采用轮询分发的方式将消息分发给消费者。所谓轮询分发,就是指不管消费者处理消息的速度是快是慢,都按照顺序轮流把消息发给消费者。

# (二)工作队列实践(轮询分发)

使用工作队列的代码和简单队列基本一致,只不过多了几个消费者

# 2.1 创建工具类

创建工具类的代码在前一篇博客中已经讲到了,这里直接贴上代码:

public class ConnectionUtil {
    public static Connection getConnection() throws IOException, TimeoutException {
        //定义一个连接工厂
        ConnectionFactory factory=new ConnectionFactory();
        //设置服务地址
        factory.setHost("127.0.0.1");
        //设置AMQP端口
        factory.setPort(5672);
        //设置VHOSTS
        factory.setVirtualHost("/vhosts_sdxb");
        //设置用户名
        factory.setUsername("user_sdxb");
        factory.setPassword("123456");
        return factory.newConnection();
    }
}
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16

# 2.2 创建生产者

public class Send {
    private static final String QUEUE_NAME="work_queue";
    public static void main(String[] args) throws IOException, TimeoutException {
        //1.获取连接
        Connection connection = ConnectionUtil.getConnection();
        //2.创建通道
        Channel channel = connection.createChannel();
        //3.创建队列声明
        channel.queueDeclare(QUEUE_NAME,false,false,false,null);
        for (int i=0;i<50;i++){
            String msg="i="+i;
            channel.basicPublish("",QUEUE_NAME,null,msg.getBytes());
        }
        channel.close();
        connection.close();
    }
}
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17

# 2.3 创建消费者一

为了体现消费者处理消息的快慢,我在两个消费者中分别设置线程休眠1s和2s

public class Receive1 {
    private static final String QUEUE_NAME="work_queue";
    public static void main(String[] args) throws IOException, TimeoutException {
        //获取连接
        Connection connection = ConnectionUtil.getConnection();
        //创建通道
        Channel channel = connection.createChannel();
        //创建队列声明
        channel.queueDeclare(QUEUE_NAME, false, false, false, null);
        //创建消费者监听方法
        Consumer consumer=new DefaultConsumer(channel){
            @Override
            public void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body) throws IOException {
                String msg=new String(body,"utf-8");
                System.out.println(msg);
                try {
                    //设置睡眠实践1s
                    Thread.sleep(1000);
                } catch (InterruptedException e) {
                    e.printStackTrace();
                }
            }
        };
        //监听队列
        channel.basicConsume(QUEUE_NAME,true,consumer);
    }
}
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27

# 2.4 创建消费者二

public class Receive2 {
    private static final String QUEUE_NAME="work_queue";
    public static void main(String[] args) throws IOException, TimeoutException {
        Connection connection = ConnectionUtil.getConnection();
        Channel channel = connection.createChannel();
        channel.queueDeclare(QUEUE_NAME, false, false, false, null);
        Consumer consumer=new DefaultConsumer(channel){
            @Override
            public void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body) throws IOException {
                String msg=new String(body,"utf-8");
                System.out.println(msg);
                try {
                    //设置睡眠时间2s
                    Thread.sleep(2000);
                } catch (InterruptedException e) {
                    e.printStackTrace();
                }
            }
        };
        channel.basicConsume(QUEUE_NAME,true,consumer);
    }
}
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22

分别将两个消费者运行起来,然后运行生产者发送50条消息,可以发现虽然两个消费者处理消息的能力有快有慢,但是得到的消息都是25条,下面展示消费者1获取的消息部分截图。

3-2.png

# (三)公平分发(Fair dispatch)

在某些场景下轮询分发是不合理的,因此工作队列还有公平分发的方式,所谓公平分发,就是能者多劳,处理消息快的人获得消息多,处理消息慢的人获得消息少。公平分发的实现只需要对代码做一些修改:

# 3.1 修改生产者

对于生产者,只需要对通道增加一条限制,限制通道发送给同一个消费者不得超过一条消息,也就是只有当消费者处理完一条消息以后才会发第二条消息给它。使用channel.basicQos();方法,设置参数为1表示限制一次不超过1条消息。

public class Send {
    private static final String QUEUE_NAME="work_queue_fair";
    public static void main(String[] args) throws IOException, TimeoutException {
        Connection connection = ConnectionUtil.getConnection();
        Channel channel = connection.createChannel();
        channel.queueDeclare(QUEUE_NAME,false,false,false,null);
        //限制通道发送给同一个消费者不得超过一条消息
        int prefenchCount=1;
        channel.basicQos(prefenchCount);
        for (int i=0;i<50;i++){
            String msg="i="+i;
            channel.basicPublish("",QUEUE_NAME,null,msg.getBytes());
        }
        channel.close();
        connection.close();
    }
}
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17

# 3.2 修改消费者

对于消费者,需要修改三处地方,第一处和生产者一样修改通道的限制信息;第二处关闭消费者的自动应答;第三处设置手动回执,即处理完一条消息后手动发送处理完成的指令给队列。

//保证一次只分发一次
channel.basicQos(1);
//设置手动回执
channel.basicAck(envelope.getDeliveryTag(),false);
//关闭自动应答
boolean autoAck=false;
channel.basicConsume(QUEUE_NAME,autoAck,consumer);
1
2
3
4
5
6
7

以下是修改后的消费者代码

public class Receive1 {
    private static final String QUEUE_NAME="work_queue_fair";
    public static void main(String[] args) throws IOException, TimeoutException {
        Connection connection = ConnectionUtil.getConnection();
        final Channel channel = connection.createChannel();
        channel.queueDeclare(QUEUE_NAME, false, false, false, null);
        //保证一次只分发一次
        channel.basicQos(1);
        Consumer consumer=new DefaultConsumer(channel){
            @Override
            public void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body) throws IOException {
                String msg=new String(body,"utf-8");
                System.out.println(msg);
                //设置手动回执
                channel.basicAck(envelope.getDeliveryTag(),false);
                try {
                    Thread.sleep(1000);
                } catch (InterruptedException e) {
                    e.printStackTrace();
                }
            }
        };
        //关闭自动应答
        boolean autoAck=false;
        channel.basicConsume(QUEUE_NAME,autoAck,consumer);
    }
}
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27

设置完之后工作队列就变成了公平分发方式,测试结果:

3-3.png

# 3.3 关于自动应答

在前面修改消费者代码的时候,我们关闭了自动应答

boolean autoAck=false;
channel.basicConsume(QUEUE_NAME,autoAck,consumer);
1
2

这是basicConsume的第二个参数

当autoAck=true时,表示开启自动应答,一旦rabbitmq将队列中的消息发送给消费者,这个消息就会从队列中消失。但是如果此时消费者挂掉了,那么这条消息也就彻底消失了。

当autoAck=false时,关闭自动应答,rabbitmq将队列中的消息发送给消费者,只有当消费者返回确认之后,队列中的消息才会被删除。

上次更新: 2025/04/29, 17:22:06
简单队列详解
发布-订阅模型详解

← 简单队列详解 发布-订阅模型详解→

最近更新
01
AI大模型部署指南
02-18
02
半个月了,DeepSeek为什么还是服务不可用
02-13
03
Python3.9及3.10安装文档
01-23
更多文章>
Theme by Vdoing | Copyright © 2023-2025 备案图标 浙公网安备33021202002405 | 浙ICP备2023040452号
  • 跟随系统
  • 浅色模式
  • 深色模式
  • 阅读模式