RabbitMQ中basicConsume、basicCancel、basicPublish方法

RabbitMQ中basicConsume、basicCancel、basicPublish方法
强烈推介IDEA2020.2破解激活,IntelliJ IDEA 注册码,2020.2 IDEA 激活码
1.String basicConsume(String queue, boolean autoAck, DeliverCallback deliverCallback, CancelCallback cancelCallback)
   /**
         * 启动一个消费者,并返回服务端生成的消费者标识
         * queue:队列名
         * autoAck:true 接收到传递过来的消息后acknowledged(应答服务器),false 接收到消息后不应答服务器
         * deliverCallback: 当一个消息发送过来后的回调接口
         * cancelCallback:当一个消费者取消订阅时的回调接口;取消消费者订阅队列时除了使用{@link Channel#basicCancel}之外的所有方式都会调用该回调方法
         * @return 服务端生成的消费者标识
         */
        channel.basicConsume(QUEUE_NAME, true, deliverCallback, consumerTag -> {
            System.out.println("调用"+consumerTag);
        });
 
2.String basicConsume(String queue, boolean autoAck, Consumer callback)
  /**
         * 启动一个消费者,并返回服务端生成的消费者标识
         * queue:队列名
         * autoAck:true 接收到传递过来的消息后acknowledged(应答服务器),false 接收到消息后不应答服务器
         * callback: 消费者对象的回调接口
         * @return 服务端生成的消费者标识
         */
        channel.basicConsume(QUEUE_NAME, true, new DefaultConsumer(channel){});
 
使用接口com.rabbitmq.client.Consumer的实现类com.rabbitmq.client.DefaultConsumer实现自定义消息监听器,接口中有多个不同的方法可以根据自己系统的需要实现;

3.String basicConsume(String queue, DeliverCallback deliverCallback, CancelCallback cancelCallback, ConsumerShutdownSignalCallback shutdownSignalCallback)
/**
         * queue:队列名
         * deliverCallback: 当一个消息发送过来后的回调接口
         * cancelCallback:当一个消费者取消订阅时的回调接口;取消消费者订阅队列时除了使用{@link Channel#basicCancel}之外的所有方式都会调用该回调方法
         * shutdownSignalCallback: 当channel/connection 关闭后回调
         * @return 服务端生成的消费者标识
         */
        channel.basicConsume(QUEUE_NAME, deliverCallback, consumerTag -> {}, (consumerTag, sig) -> {});
 
4.String basicConsume(String queue, DeliverCallback deliverCallback, ConsumerShutdownSignalCallback shutdownSignalCallback)
/**
         * queue:队列名
         * deliverCallback: 当一个消息发送过来后的回调接口
         * shutdownSignalCallback: 当channel/connection 关闭后回调
         * @return 服务端生成的消费者标识
         */
        channel.basicConsume(QUEUE_NAME, deliverCallback, (consumerTag, sig) -> {});
 
5.String basicConsume(String queue, DeliverCallback deliverCallback, CancelCallback cancelCallback)
 /**
         * queue:队列名
         * deliverCallback: 当一个消息发送过来后的回调接口
         * cancelCallback:当一个消费者取消订阅时的回调接口;取消消费者订阅队列时除了使用{@link Channel#basicCancel}之外的所有方式都会调用该回调方法
         * @return 服务端生成的消费者标识
         */
        channel.basicConsume(QUEUE_NAME, deliverCallback, consumerTag -> {});
 
6.String basicConsume(String queue, Consumer callback)
  /**
         * queue:队列名
         * callback: 消费者对象的回调接口
         * @return 服务端生成的消费者标识
         */
        channel.basicConsume(QUEUE_NAME, new DefaultConsumer(channel){});
 
7.String basicConsume(String queue, boolean autoAck, DeliverCallback deliverCallback, ConsumerShutdownSignalCallback shutdownSignalCallback)
 /**
         * 启动一个消费者,并返回服务端生成的消费者标识
         * queue:队列名
         * autoAck:true 接收到传递过来的消息后acknowledged(应答服务器),false 接收到消息后不应答服务器
         * deliverCallback: 当一个消息发送过来后的回调接口
         * shutdownSignalCallback: 当channel/connection 关闭后回调
         * @return 服务端生成的消费者标识
         */
        channel.basicConsume(QUEUE_NAME, true, deliverCallback, (consumerTag, sig)->{
            //consumerTag服务端生成的消费者标识
            //sig(ShutdownSignalException):说明关闭的原因
        });
 
8.String basicConsume(String queue, boolean autoAck, DeliverCallback deliverCallback, CancelCallback cancelCallback, ConsumerShutdownSignalCallback shutdownSignalCallback)
 /**
         * 启动一个消费者,并返回服务端生成的消费者标识
         * queue:队列名
         * autoAck:true 接收到传递过来的消息后acknowledged(应答服务器),false 接收到消息后不应答服务器
         * deliverCallback: 当一个消息发送过来后的回调接口
         * cancelCallback:当一个消费者取消订阅时的回调接口;取消消费者订阅队列时除了使用{@link Channel#basicCancel}之外的所有方式都会调用该回调方法
         * shutdownSignalCallback: 当channel/connection 关闭后回调
         * @return 服务端生成的消费者标识
         */
        channel.basicConsume(QUEUE_NAME, true, deliverCallback, consumerTag->{
            //consumerTag:服务端小费制标识
        }, (consumerTag, sig) -> {
            //consumerTag服务端生成的消费者标识
            //sig(ShutdownSignalException):说明关闭的原因
        });
 
9.String basicConsume(String queue, boolean autoAck, Map<String, Object> arguments, Consumer callback)
  /**
         * 启动一个消费者,并返回服务端生成的消费者标识
         * queue:队列名
         * autoAck:true 接收到传递过来的消息后acknowledged(应答服务器),false 接收到消息后不应答服务器
         * arguments: 消费的一组参数
         * callback:消费者对象接口
         * @return 服务端生成的消费者标识
         */
        channel.basicConsume(QUEUE_NAME, true, arguments, new DefaultConsumer(channel){
          //根据需要实现对应的方法
        });
 
10.String basicConsume(String queue, boolean autoAck, Map<String, Object> arguments, DeliverCallback deliverCallback, CancelCallback cancelCallback)
 /**
         * 启动一个消费者,并返回服务端生成的消费者标识
         * queue:队列名
         * autoAck:true 接收到传递过来的消息后acknowledged(应答服务器),false 接收到消息后不应答服务器
         * arguments: 消费的一组参数
         * deliverCallback: 当一个消息发送过来后的回调接口
         * cancelCallback:当一个消费者取消订阅时的回调接口;取消消费者订阅队列时除了使用{@link Channel#basicCancel}之外的所有方式都会调用该回调方法
         * @return 服务端生成的消费者标识
         */
        channel.basicConsume(QUEUE_NAME, true, arguments, deliverCallback, consumerTag -> {});
 
11.String basicConsume(String queue, boolean autoAck, Map<String, Object> arguments, DeliverCallback deliverCallback, ConsumerShutdownSignalCallback shutdownSignalCallback)
/**
         * 启动一个消费者,并返回服务端生成的消费者标识
         * queue:队列名
         * autoAck:true 接收到传递过来的消息后acknowledged(应答服务器),false 接收到消息后不应答服务器
         * arguments: 消费的一组参数
         * deliverCallback: 当一个消息发送过来后的回调接口
         * shutdownSignalCallback: 当channel/connection 关闭后回调
         * @return 服务端生成的消费者标识
         */
        channel.basicConsume(QUEUE_NAME, true, arguments, deliverCallback, (consumerTag, sig) -> {});
 
12.String basicConsume(String queue, boolean autoAck, Map<String, Object> arguments, DeliverCallback deliverCallback, CancelCallback cancelCallback, ConsumerShutdownSignalCallback shutdownSignalCallback)
 /**
         * 启动一个消费者,并返回服务端生成的消费者标识
         * queue:队列名
         * autoAck:true 接收到传递过来的消息后acknowledged(应答服务器),false 接收到消息后不应答服务器
         * arguments: 消费的一组参数
         * deliverCallback: 当一个消息发送过来后的回调接口
         * cancelCallback:当一个消费者取消订阅时的回调接口;取消消费者订阅队列时除了使用{@link Channel#basicCancel}之外的所有方式都会调用该回调方法
         * shutdownSignalCallback: 当channel/connection 关闭后回调
         * @return 服务端生成的消费者标识
         */
        channel.basicConsume(QUEUE_NAME, true, arguments, deliverCallback, consumerTag -> {}, (consumerTag, sig) -> {});
 
13.String basicConsume(String queue, boolean autoAck, String consumerTag, Consumer callback)
 /**
         * 启动一个消费者,并返回服务端生成的消费者标识
         * queue:队列名
         * autoAck:true 接收到传递过来的消息后acknowledged(应答服务器),false 接收到消息后不应答服务器
         * consumerTag: 客户端生成的用于建立上线文的使用者标识
         * callback:消费者对象接口
         * @return 与消费者关联的消费者标识
         */
        channel.basicConsume(QUEUE_NAME, true, ctag, new DefaultConsumer(channel){
            //根据需要实现具体的方法
 
14.String basicConsume(String queue, boolean autoAck, String consumerTag, DeliverCallback deliverCallback, CancelCallback cancelCallback)
  /**
         * 启动一个消费者,并返回服务端生成的消费者标识
         * queue:队列名
         * autoAck:true 接收到传递过来的消息后acknowledged(应答服务器),false 接收到消息后不应答服务器
         * consumerTag:客户端生成的一个消费者标识
         * deliverCallback: 当一个消息发送过来后的回调接口
         * cancelCallback:当一个消费者取消订阅时的回调接口;取消消费者订阅队列时除了使用{@link Channel#basicCancel}之外的所有方式都会调用该回调方法
         * @return 与消费者关联的消费者标识
         */
        channel.basicConsume(QUEUE_NAME, true, ctag, deliverCallback, consumerTag -> {});
 
15.String basicConsume(String queue, boolean autoAck, String consumerTag, DeliverCallback deliverCallback, ConsumerShutdownSignalCallback shutdownSignalCallback)
/**
         * 启动一个消费者,并返回服务端生成的消费者标识
         * queue:队列名
         * autoAck:true 接收到传递过来的消息后acknowledged(应答服务器),false 接收到消息后不应答服务器
         * consumerTag:客户端生成的一个消费者标识
         * deliverCallback: 当一个消息发送过来后的回调接口
         * shutdownSignalCallback: 当channel/connection 关闭后回调
         * @return 与消费者关联的消费者标识
         */
        channel.basicConsume(QUEUE_NAME, true, ctag, deliverCallback, (consumerTag, sig) -> {});
 
16.String basicConsume(String queue, boolean autoAck, String consumerTag, DeliverCallback deliverCallback, CancelCallback cancelCallback, ConsumerShutdownSignalCallback shutdownSignalCallback)
  /**
         * 启动一个消费者,并返回服务端生成的消费者标识
         * queue:队列名
         * autoAck:true 接收到传递过来的消息后acknowledged(应答服务器),false 接收到消息后不应答服务器
         * consumerTag:客户端生成的一个消费者标识
         * deliverCallback: 当一个消息发送过来后的回调接口
         * cancelCallback:当一个消费者取消订阅时的回调接口;取消消费者订阅队列时除了使用{@link Channel#basicCancel}之外的所有方式都会调用该回调方法
         * shutdownSignalCallback: 当channel/connection 关闭后回调
         * @return 与消费者关联的消费者标识
         */
        channel.basicConsume(QUEUE_NAME, true, ctag, deliverCallback, consumerTag -> {}, (consumerTag, sig) -> {});
 
17.String basicConsume(String queue, boolean autoAck, String consumerTag, boolean noLocal, boolean exclusive, Map<String, Object> arguments, Consumer callback)
 /**
         * queue:队列名
         * autoAck:true 接收到传递过来的消息后acknowledged(应答服务器),false 接收到消息后不应答服务器
         * consumerTag:客户端生成的一个消费者标识
         * nolocal:如果服务器不应将在此通道连接上发布的消息传递给此使用者,则为true;请注意RabbitMQ服务器上不支持此标记
         * exclusive: 如果是单个消费者,则为true
         * callback:消费者对象接口
         * @return 与消费者关联的消费者标识
         */
        channel.basicConsume(QUEUE_NAME, true, ctag, false, false, arguments, new DefaultConsumer(channel){
            //根据需求实现方法
        });
 
18.String basicConsume(String queue, boolean autoAck, String consumerTag, boolean noLocal, boolean exclusive, Map<String, Object> arguments, DeliverCallback deliverCallback, CancelCallback cancelCallback)
 /**
         * queue:队列名
         * autoAck:true 接收到传递过来的消息后acknowledged(应答服务器),false 接收到消息后不应答服务器
         * consumerTag:客户端生成的一个消费者标识
         * nolocal:如果服务器不应将在此通道连接上发布的消息传递给此使用者,则为true;请注意RabbitMQ服务器上不支持此标记
         * exclusive: 如果是单个消费者,则为true
         * arguments:消费的一组参数
         * deliverCallback: 当一个消息发送过来后的回调接口
         * cancelCallback:当一个消费者取消订阅时的回调接口;取消消费者订阅队列时除了使用{@link Channel#basicCancel}之外的所有方式都会调用该回调方法
         * @return 与消费者关联的消费者标识
         */
        channel.basicConsume(QUEUE_NAME, true, ctag, false, false, arguments, deliverCallback, consumerTag -> {});
 
19.String basicConsume(String queue, boolean autoAck, String consumerTag, boolean noLocal, boolean exclusive, Map<String, Object> arguments, DeliverCallback deliverCallback, ConsumerShutdownSignalCallback shutdownSignalCallback)
 /**
         * queue:队列名
         * autoAck:true 接收到传递过来的消息后acknowledged(应答服务器),false 接收到消息后不应答服务器
         * consumerTag:客户端生成的一个消费者标识
         * nolocal:如果服务器不应将在此通道连接上发布的消息传递给此使用者,则为true;请注意RabbitMQ服务器上不支持此标记
         * exclusive: 如果是单个消费者,则为true
         * arguments:消费的一组参数
         * deliverCallback: 当一个消息发送过来后的回调接口
         * shutdownSignalCallback: 当channel/connection 关闭后回调
         */
        channel.basicConsume(QUEUE_NAME, true, ctag, false, false, arguments, deliverCallback, (consumerTag, sig) -> {});
 
20.String basicConsume(String queue, boolean autoAck, String consumerTag, boolean noLocal, boolean exclusive, Map<String, Object> arguments, DeliverCallback deliverCallback, CancelCallback cancelCallback, ConsumerShutdownSignalCallback shutdownSignalCallback)
	 /**
         * queue:队列名
         * autoAck:true 接收到传递过来的消息后acknowledged(应答服务器),false 接收到消息后不应答服务器
         * consumerTag:客户端生成的一个消费者标识
         * nolocal:如果服务器不应将在此通道连接上发布的消息传递给此使用者,则为true;请注意RabbitMQ服务器上不支持此标记
         * exclusive: 如果是单个消费者,则为true
         * arguments:消费的一组参数
         * deliverCallback: 当一个消息发送过来后的回调接口
         * cancelCallback:当一个消费者取消订阅时的回调接口;取消消费者订阅队列时除了使用{@link Channel#basicCancel}之外的所有方式都会调用该回调方法
         * shutdownSignalCallback: 当channel/connection 关闭后回调
         */
        channel.basicConsume(QUEUE_NAME, true, ctag, false, false, arguments, deliverCallback, consumerTag -> {}, (consumerTag, sig) -> {});
 
主动拉取队列中的一条消息
GetResponse basicGet(String queue, boolean autoAck)
1
使用示例:

        /**
         * 从消息队列中取出第一条消息;整个方法的执行过程是首先消费队列,然后检索第一条消息,然后再取消订阅
         */
        GetResponse response = channel.basicGet(QUEUE_NAME, true);
        System.out.println("消费者接收到的消息是:"+new String(response.getBody(), "UTF-8"));
 
取消消费者订阅
/**
* 取消消费者对队列的订阅关系
* consumerTag:服务器端生成的消费者标识
**/
void basicCancel(String consumerTag)
 
basicQoc设置服务端每次发送给消费者的消息数量
/**
* prefetchSize:服务器传送最大内容量(以八位字节计算),如果没有限制,则为0
* prefetchCount:服务器每次传递的最大消息数,如果没有限制,则为0;
* global:如果为true,则当前设置将会应用于整个Channel(频道)
**/
void basicQos(int prefetchSize, int prefetchCount, boolean global)
 
/**
* prefetchCount:服务器每次传递的最大消息数,如果没有限制,则为0;
* global:如果为true,则当前设置将会应用于整个Channel(频道)
**/
void basicQos(int prefetchCount, boolean global)
 
/**
* prefetchCount:服务器每次传递的最大消息数,如果没有限制,则为0;
**/
void basicQos(int prefetchCount)
 
Acknowledge(确认)收到一个或者多个消息
/**
* 消费者确认收到一个或者多个消息
* deliveryTag:服务器端向消费者推送消息,消息会携带一个deliveryTag参数,也可以成此参数为消息 * 的唯一标识,是一个递增的正整数
* multiple:true表示确认所有消息,包括消息唯一标识小于等于deliveryTag的消息,false只确认 * * deliveryTag指定的消息
**/
void basicAck(long deliveryTag, boolean multiple)
 
/**
* 要求代理重新发送未确认的消息
* requeue:如果为true,消息将会重新入队,可能会被发送给其它的消费者;如果为false,消息将会发送给* 相同的消费者
**/
Basic.RecoverOk basicRecover(boolean requeue)
 
/**
* 要求代理重新发送未确认的消息;消息将会重新排队,并且可能会发送给其它的消费者
**/
Basic.RecoverOk basicRecover()
 
拒绝消息
/**
* 拒绝接收到的一个或者多个消息
* deliveryTag:接收到消息的唯一标识
* multiple: true表示拒绝所有的消息,包括提供的deliveryTag;false表示仅拒绝提供的deliveryTag
* requeue:true 表示拒绝的消息应重新入队,而不是否丢弃
*/
void basicNack(long deliveryTag, boolean multiple, boolean requeue)
 
/**
* 拒绝接收到的一个或者多个消息
* deliveryTag:接收到消息的唯一标识
* requeue:true 表示拒绝的消息应重新入队,而不是否丢弃
*/
void basicReject(long deliveryTag, boolean requeue) 

发送消息
/**
* exchange:要将消息发送到的Exchange(交换器)
* routingKey:路由Key
* mandatory:true 如果mandatory标记被设置
* immediate: true 如果immediate标记被设置,注意:RabbitMQ服务端不支持此标记
* props:其它的一些属性,如:{@link MessageProperties.PERSISTENT_TEXT_PLAIN}
* body:消息内容
**/
void basicPublish(String exchange, String routingKey, boolean mandatory, boolean immediate, BasicProperties props, byte[] body)


/**
* 发布消息
* 发布到不存在的交换器将导致信道级协议异常,该协议关闭信道,
* exchange: 要将消息发送到的交换器
* routingKey: 路由KEY
* mandatory:true 如果mandatory标记被设置
* props: 消息的其它属性,如:路由头等
* body: 消息体
*/
void basicPublish(String exchange, String routingKey, boolean mandatory, BasicProperties props, byte[] body)
 
/**
* 发布消息
* 发布到不存在的交换器将导致信道级协议异常,该协议关闭信道,
* exchange: 要将消息发送到的交换器
* routingKey: 路由KEY
* props: 消息的其它属性,如:路由头等
* body: 消息体
*/
void basicPublish(String exchange, String routingKey, BasicProperties props, byte[] body)

 

本文来源huayang183,由架构君转载发布,观点不代表Java架构师必看的立场,转载请标明来源出处:https://javajgs.com/archives/18686

发表评论