RabbitMQ消息接收的确认方式

RabbitMQ消息接收的确认方式
强烈推介IDEA2020.2破解激活,IntelliJ IDEA 注册码,2020.2 IDEA 激活码

一、消息接收手工确认

  • 消息从队列推送至消费者后,消息被消费,并从队列中移除。若在消费者消费消息的过程中出现异常或回滚,当消费者从异常中恢复后,想要重新处理异常的消息,然而消息已经从队列中移除,无法再次获取。为处理该问题,避免消息的丢失,需利用消息接收的确认机制。
  •   消息是在得到确认(Acknowledged,ACK)后,从队列中移除的。默认情况下,消息确认是自动进行的,消息在发送给消费者后立即确认。为避免消息丢失,使用手工确认,自行管理消息接收确认的时机。
     
public static void main(String[] args) throws Exception {
	
	ConnectionFactory factory = new ConnectionFactory();
	// 设置服务端的地址、端口、用户名和密码...
	
	Connection connection = factory.newConnection();
	Channel channel = connection.createChannel();
	channel.queueDeclare("Queue_Java", false, false, false, null);
	
	Consumer consumer = new DefaultConsumer(channel) {

		@Override
		public void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body) {
			String message = new String(body);
			System.out.println("Received: " + message);
			// 消息确认
			try {
				channel.basicAck(envelope.getDeliveryTag(), false);
			} catch (IOException e) {
				e.printStackTrace();
			}
		}
	};
	
	// 关闭自动消息确认,autoAck = false
	channel.basicConsume("Queue_Java", false, consumer);
}
  • 在消费者监听队列时,关闭自动确认,在回调方法中手工进行确认。可以将回调方法中,消息确认一段注释掉,运行程序观察效果,可以看到,若消费者接收到消息后未进行确认,则消息依然保存在队列中 

二、 消息的批量确认

  • 向信道的每次投递都带有一个投递标签(Delivery Tag),该投递标签是一个64位长的值,从1开始每次增加1,用于唯一标识信道的每次投递。
  •   channel.basicAck()方法的第一个参数位投递标签,用于标识对哪次消息投递进行确认,第二个参数表示是否进行消息的批量确认。若确认消息时开启批量确认,则投递标签小于当前消息投递标签的所有消息也都会进行确认。
  •   使用批量确认,可起到减少网络流量的作用
     
public static void main(String[] args) throws Exception {
	
	ConnectionFactory factory = new ConnectionFactory();
	// 设置服务端的地址、端口、用户名和密码...
	
	Connection connection = factory.newConnection();
	Channel channel = connection.createChannel();
	channel.queueDeclare("Queue_Java", false, false, false, null);
	
	Consumer consumer = new DefaultConsumer(channel) {

		@Override
		public void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body) {
			String message = new String(body);
			System.out.println("Received: " + message);
			// 投递标签“10”之前的消息进行批量确认
			if(10L == envelope.getDeliveryTag()) {
				try {
					channel.basicAck(envelope.getDeliveryTag(), true);
				} catch (IOException e) {
					e.printStackTrace();
				}
			}
		}
	};
	
	channel.basicConsume("Queue_Java", false, consumer);
}
  • 程序中只对投递标签为“10”的消息进行确认,但使用批量确认。运行程序,向队列中连续发送20条消息。登录Web管理界面,查看队列,队列中有10条消息得到了确认,剩余的10条消息处于未确认(Unacked)状态。

RabbitMQ消息接收的确认方式

  • 批量确认消息参数置为false,重新创建队列、运行消费者,向其中发送20条消息后,队列中未确认的消息变为19条,仅投递标签为“10”的一条消息得到了确认。
     

三、消息的接收拒绝

  • 若投递的消息数目已经超过消费者的处理能力,继续投递消息将会导致消息的积压。此时消费者可选择拒绝。
  • void basicNack(long deliveryTag, boolean multiple, boolean requeue):deliveryTag表示被拒绝的消息的投递标签;multiple表示是否批量拒绝,若是则所有投递标签小于当前消息且未确认的消息也都将被拒绝,若否则仅拒绝当前消息;requeue表示被拒绝的消息是否重新放回队列,若是则消息会重新回到队列并选择新的消费者进行投递,若否则该条消息会被丢弃。
public static void main(String[] args) throws Exception {
	
	ConnectionFactory factory = new ConnectionFactory();
	// 设置服务端的地址、端口、用户名和密码...	
	Connection connection = factory.newConnection();
	Channel channel = connection.createChannel();
	channel.queueDeclare("Queue_Java", false, false, false, null);
	
	Consumer consumer = new DefaultConsumer(channel) {

		@Override
		public void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body) {
			String message = new String(body);
			System.out.println("Received: " + message);
			// 消息拒绝
			try {
				channel.basicNack(envelope.getDeliveryTag(), false, true);
			} catch (IOException e) {
				e.printStackTrace();
			}
		}
	};
	
	channel.basicConsume(QUEUE_NAME, false, consumer);
}
  •  运行消费者,并向队列中发送消息,可以看到,由于消息始终被拒绝,并重回队列,消费者的控制台上会连续不断重复打印出队列的消息。 
  •  为解决消息自动确认后消费者异常,导致消息丢失的问题,使用了手工确认的方案,只有在消息处理成功后,才向RabbitMQ服务端返回确认。若在消息处理过程中出现异常,则消费者在恢复后,消息依然保留在队列中,可交由消费者重新处理,避免了消息的丢失。
      但仅此还不够,若在消费者处理消息成功后、返回确认ACK之前出现异常,或在传输ACK消息的过程中,网络出现异常,导致ACK未发送给RabbitMQ服务端,会出现什么情况呢?这条消息将会作为未确认的消息留在队列中,并在信道断开后,交由其他消费者进行处理,这就造成同一个消息被处理2次,若这是一笔转账消息,后果很严重,这笔转账将会被转双倍的钱。
      要解决以上问题,需要要求消费者对消息的处理具有幂等性,即消息处理一次与处理多次效果相同。通常在消费者端维护一个列表,记录被处理过的消息,消费者收到消息后,首先查询该列表,若消息已被处理则丢弃,否则才继续处理。
     
本文来源huayang183,由架构君转载发布,观点不代表Java架构师必看的立场,转载请标明来源出处:https://javajgs.com/archives/18677

发表评论