RabbitMQ多消费者消息分配

RabbitMQ多消费者消息分配
强烈推介IDEA2020.2破解激活,IntelliJ IDEA 注册码,2020.2 IDEA 激活码

一、 轮询分配

  当有多个消费者同时监听一个队列时,RabbitMQ默认将消息逐一顺序分配给各消费者,该消息分配机制称为轮询(Round-Robin)。
  为验证该机制,建立两个消费者,同时监听同一队列,消息生产者连续向队列中发送20条消息,查看消息的分配状况。
 

//生产者
public static void main(String[] args) throws Exception {
    
    ConnectionFactory factory = new ConnectionFactory();
    // 设置服务端的地址、端口、用户名和密码...
    
    Connection connection = factory.newConnection();
    Channel channel = connection.createChannel();
    channel.queueDeclare("Queue_Java", false, false, false, null);
    
    for(int i = 0; i < 20; i++) {
        byte[] message = ("message" + i).getBytes();
        channel.basicPublish("", "Queue_Java", null, message);
    }
    
    channel.close();
    connection.close();
}


//消费者
public static void main(String[] args) throws Exception {
    
    ConnectionFactory factory = new ConnectionFactory();
    // 设置服务端的地址、端口、用户名和密码...
    
    Connection connection = factory.newConnection();
    Channel channel = connection.createChannel();
    channel.queueDeclare("Queue_Java", false, false, false, null);
    
    Consumer consumer = new DefaultConsumer(channel) {

        @Override
        public void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body) {
            String message = new String(body);
            System.out.println("Received: " + message);
            try {
                channel.basicAck(envelope.getDeliveryTag(), false);
            } catch (Exception e) {
                e.printStackTrace();
            }
        }
    };
    
    // 标识进程,第二个消费者将输出内容改为“Consumer2:”,再次运行程序即可
    System.out.println("Consumer2:");
    channel.basicConsume(QUEUE_NAME, false, consumer);
}

  运行结果如下:

RabbitMQ多消费者消息分配
  第一个消费者收到了所有偶数号的消息,第二个消费者收到了所有奇数号的消息,消息被顺序分配给了两个消费者。

二、 消息预取

  消息转发到队列后,分配是提前一次性完成的,即RabbitMQ尽可能快速地将消息推送至客户端,由客户端缓存本地,而并非在消息消费时才逐一确定。再加入新的消费者时,队列已经为空,即使前面的消费者未处理完消息,新加入的消费者也不会接收到。
  为验证该结论,在消费者处理消息的方法中,加入线程休眠。首先启动2个消费者,生产者将20个消息发送完毕后,断开2号消费者,启动3号消费者,观察消息消费情况。
 

public static void main(String[] args) throws Exception {
    
    ConnectionFactory factory = new ConnectionFactory();
    // 设置服务端的地址、端口、用户名和密码...
    
    Connection connection = factory.newConnection();
    Channel channel = connection.createChannel();
    channel.queueDeclare("Queue_Java", false, false, false, null);
    
    Consumer consumer = new DefaultConsumer(channel) {

        @Override
        public void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body) {
            String message = new String(body);
            System.out.println("Received: " + message);
            try {
                Thread.sleep(5000);        // 加入线程休眠
                channel.basicAck(envelope.getDeliveryTag(), false);
            } catch (Exception e) {
                e.printStackTrace();
            }
        }
    };
    
    System.out.println("Consumer1:");
    channel.basicConsume(QUEUE_NAME, false, consumer);
}

  程序运行结果:

RabbitMQ多消费者消息分配
  可见,中途断开的2号消费者所应消费的余下的奇数号消息,既未分配给新加入的3号消费者,也未交给发送消息前已建立连接的1号消费者。

三、 公平分配

  消息的轮询分配机制和尽可能快速推送消息的机制给实际使用带来困难。实际情况下,每个消费者处理消息的能力、每个消息处理所需时间可能都是不同的,若只是机械化地顺次分配,可能造成一个消费者由于处理的消息的业务复杂、处理能力低而积压消息,另一个消费者早早处理完所有的消息,处于空闲状态,造成系统的处理能力的浪费。且无法加入新的消费者以提高系统的处理能力。
  希望达到的效果是每个消费者都根据自身处理能力合理分配消息处理任务,既无挤压也无空闲,新加入的消费者也能分担消息处理任务,使系统的处理能力能够平行扩展。
  RabbitMQ客户端可通过Channel类的basicQos(int prefetchCount)设置消费者的预取数目,即消费者最大的未确认消息的数目。
  假设prefetchCount=10,有两个消费者,两个消费者依次从队列中抓取10条消息缓存本地,若此时有新的消息到达队列,先判断信道中未确认的消息是否大于或等于20条,若是,则不向信道中投递消息,当信道中未确认消息数小于20条后,信道中哪个消费者未确认消息小于10条,就将消息投递给哪个消费者。
  设置信道的预取数量为1,重复5.1.2节的测试。

public static void main(String[] args) throws Exception {
    
    ConnectionFactory factory = new ConnectionFactory();
    // 设置服务端的地址、端口、用户名和密码...
    
    Connection connection = factory.newConnection();
    Channel channel = connection.createChannel();
    channel.queueDeclare("Queue_Java", false, false, false, null);
    
    // 设置预取数量为1
    channel.basicQos(1);
    Consumer consumer = new DefaultConsumer(channel) {

        @Override
        public void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body) {
            String message = new String(body);
            System.out.println("Received: " + message);
            try {
                Thread.sleep(5000);        // 加入线程休眠
                channel.basicAck(envelope.getDeliveryTag(), false);
            } catch (Exception e) {
                e.printStackTrace();
            }
        }
    };
    
    System.out.println("Consumer1:");
    channel.basicConsume(QUEUE_NAME, false, consumer);
}

  程序运行结果(消费者2在打印5号消息后、返回确认前被停止):

RabbitMQ多消费者消息分配
  在停止消费者2、加入消费者3后,消息被平均分配给消费者1和3,达到了所需的效果。
  还可将两个消费者的休眠时间设为不同值(代表不同的处理消息耗时),观察运行情况。消息将会按照消息处理速度的比例分配给两个消费者,达到了消息均衡分配的效果。

四、 预取数量的优化

  channel.basicQos()中设置的预取数量多少合适,是一个颇有讲究的问题。我们希望充分利用消费者的处理能力,因此不宜设置过小,否则在消费者处理消息后,RabbitMQ收到确认消息后才会投递新的消息,导致此期间消费者处于空闲状态,浪费消费者的处理能力;但设置过大,又可能使消息积压在消费者的缓存里,我们希望对于来不及处理的消息,应保留在队列中,便于加入新的消费者或空闲出来的消费者分担消息处理任务。
  RabbitMQ官网的一篇文章详细讨论了预取数量的设置问题:
  https://www.rabbitmq.com/blog/2012/05/11/some-queuing-theory-throughput-latency-and-bandwidth/
  文章大致内容如下。
  假设从RabbitMQ服务端队列取消息、传输消息到消费者耗时为50ms,消费者消费消息耗时4ms,消费者传输确认消息到服务端耗时为50ms。若网络状况、消费者处理速度稳定,则预取数量的最优数值为:(50 + 4 + 50)/4=26个。

RabbitMQ多消费者消息分配
  最初服务端将向客户端发送26条消息,并缓存在客户端本地,当消费者处理好第一个消息后,向服务端发送确认消息并取本地缓存的第二个消息,确认消息由客户端传送到服务端耗时50ms,服务端收到确认后发送新的消息经过50ms又到达了客户端,而余下的25个消息被消费耗时为25×4=100ms,所以当新的消息达到时,第一轮的26个消息恰好全部处理完。依次类推,之后,每当处理完一个旧有的消息时,恰好会到达一个新的消息。既不会发生消息积压,消费者也不会空闲。
  但实际情况是,网络的传输状况、消费者处理消息的速度都不会是恒定的,会时快时慢,造成消息积压或消费者空闲,这就要求预取数量要与网络和消费者的状况实时改变。
  新近发表的一个称作“Controlled Delay”(控制延迟?)算法(参见https://queue.acm.org/detail.cfm?id=2209336),能够较好地解决此问题。作者实现了其Java版本:
  https://gist.github.com/2658712
  文章中说明了其中的参数,有兴趣者可自行研究。

本文来源huayang183,由架构君转载发布,观点不代表Java架构师必看的立场,转载请标明来源出处:https://javajgs.com/archives/18676

发表评论