RabbitMQ的三大交换器详解

RabbitMQ的三大交换器详解
强烈推介IDEA2020.2破解激活,IntelliJ IDEA 注册码,2020.2 IDEA 激活码

pom文件都是相同的

<?xml version="1.0" encoding="UTF-8"?>
<project xmlns="http://maven.apache.org/POM/4.0.0" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
         xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 https://maven.apache.org/xsd/maven-4.0.0.xsd">
    <modelVersion>4.0.0</modelVersion>
    <parent>
        <groupId>org.springframework.boot</groupId>
        <artifactId>spring-boot-starter-parent</artifactId>
        <version>2.2.1.RELEASE</version>
        <relativePath/> <!-- lookup parent from repository -->
    </parent>
    <groupId>com.bjsxt</groupId>
    <artifactId>spring-boot-direct-consumer</artifactId>
    <version>0.0.1-SNAPSHOT</version>
    <name>spring-boot-direct-consumer</name>
    <description>spring-boot-direct-consumer</description>

    <properties>
        <java.version>1.8</java.version>
    </properties>

    <dependencies>
        <dependency>
            <groupId>org.springframework.boot</groupId>
            <artifactId>spring-boot-starter-amqp</artifactId>
        </dependency>
        <dependency>
            <groupId>org.springframework.boot</groupId>
            <artifactId>spring-boot-starter-web</artifactId>
        </dependency>

        <dependency>
            <groupId>org.projectlombok</groupId>
            <artifactId>lombok</artifactId>
            <optional>true</optional>
        </dependency>
        <dependency>
            <groupId>org.springframework.boot</groupId>
            <artifactId>spring-boot-starter-test</artifactId>
        </dependency>
    </dependencies>

    <build>
        <plugins>
            <plugin>
                <groupId>org.springframework.boot</groupId>
                <artifactId>spring-boot-maven-plugin</artifactId>
            </plugin>
        </plugins>
    </build>

</project>

 

Direct 交换器(发布与订阅 完全匹配)

需求

RabbitMQ的三大交换器详解

RabbitMQ的三大交换器详解

 

修改 Consumer 的配置文件

server.port=8081
spring.rabbitmq.host=192.168.181.133
spring.rabbitmq.port=5672
spring.rabbitmq.username=admin
spring.rabbitmq.password=111111

#设置交换器的名称
mq.config.exchange=log.direct

#设置info队列名称
mq.config.queue.info=log.info

#设置info的路由键
mq.config.queue.info.routing.key=log.info.routing.key

#设置error队列名称
mq.config.queue.error=log.error


#设置error的路由键
mq.config.queue.error.routing.key=log.error.routing.key

 

修改 Provider 的配置文件

server.port=8080
spring.rabbitmq.host=192.168.181.133
spring.rabbitmq.port=5672
spring.rabbitmq.username=admin
spring.rabbitmq.password=111111

#设置交换器的名称
mq.config.exchange=log.direct

#设置info的路由键
mq.config.queue.info.routing.key=log.info.routing.key

#设置error队列名称
mq.config.queue.error=log.error


#设置error的路由键
mq.config.queue.error.routing.key=log.error.routing.key

 

 

编写 Consumer

 

InfoReceiver

package com.bjsxt.receive;

import org.springframework.amqp.core.ExchangeTypes;
import org.springframework.amqp.rabbit.annotation.*;
import org.springframework.stereotype.Component;

/***
 * 消息的接收者
 */
@Component
@RabbitListener(
        bindings = @QueueBinding(
                value = @Queue(value = "${mq.config.queue.info}",autoDelete = "true"),
                exchange = @Exchange(value = "${mq.config.exchange}",type = ExchangeTypes.DIRECT),
                key="${mq.config.queue.info.routing.key}"
        )
)
public class InfoReceiver {

    /**
     * 接收消息的方法
     * 采用消息队列监听机制
     * @param msg
     */
   @RabbitHandler
    public void process(String msg){
        System.out.println("info--receiver=:"+msg);
    }
}

 

 

ErrorReceiver

 

package com.bjsxt.receive;

import org.springframework.amqp.core.ExchangeTypes;
import org.springframework.amqp.rabbit.annotation.*;
import org.springframework.stereotype.Component;

/***
 * 消息的接收者
 */
@Component
@RabbitListener(
        bindings = @QueueBinding(
                value = @Queue(value = "${mq.config.queue.error}",autoDelete = "true"),
                exchange = @Exchange(value = "${mq.config.exchange}",type = ExchangeTypes.DIRECT),
                key="${mq.config.queue.error.routing.key}"
        )
)
public class ErrorReceiver {

    /**
     * 接收消息的方法
     * 采用消息队列监听机制
     * @param msg
     */
   @RabbitHandler
    public void process(String msg){
        System.out.println("Error--receiver=:"+msg);
    }
}

 

 

编写 Provider

 

Sender
package com.bjsxt.send;

import org.springframework.amqp.core.AmqpTemplate;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.beans.factory.annotation.Value;
import org.springframework.stereotype.Component;

/**
 * 消息发送者
 */
@Component
public class Sender {

    @Autowired
    private AmqpTemplate amqpTemplate;

    @Value("${mq.config.exchange}")
    private String exchange;

    @Value("${mq.config.queue.error.routing.key}")
    private String routingkey;

    /**
     * 发送消息的方法
     * @param msg
     */
    public void sendMsg(String msg){
        /*向消息队列发送消息*/
        /*
        * 参数一:队列的名称
        * 参数二:发送的消息
        * */
        amqpTemplate.convertAndSend(exchange,routingkey,msg);

    }
}

 

com.bjsxt.test.RabbitMQTest

package com.bjsxt.test;

import com.bjsxt.SpringBootDirectProviderApplication;
import com.bjsxt.send.Sender;
import org.junit.Test;
import org.junit.runner.RunWith;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.boot.test.context.SpringBootTest;
import org.springframework.test.context.junit4.SpringJUnit4ClassRunner;

@RunWith(SpringJUnit4ClassRunner.class)
@SpringBootTest(classes = SpringBootDirectProviderApplication.class)
public class RabbitMQTest {

    @Autowired
    private Sender sender;

    @Test
    public void queueTest() throws InterruptedException {
        while (true){
            Thread.sleep(1000);
        sender.sendMsg("你好RabbitMQ");
        }
    }
}

 

 

 

Topic 交换器(主题,规则匹配)

 

 

需求

 

RabbitMQ的三大交换器详解

 

 

 

 

RabbitMQ的三大交换器详解

 

 

 

 

Consumer

 

server.port=8083
spring.rabbitmq.host=192.168.181.133
spring.rabbitmq.port=5672
spring.rabbitmq.username=admin
spring.rabbitmq.password=111111

#设置交换器的名称
mq.config.exchange=log.topic

#设置info队列名称
mq.config.queue.info=log.info

#设置info队列名称
mq.config.queue.error=log.error

#log 队列名称
mq.config.queue.logs=log.all

 

 

Provider

server.port=8084
spring.rabbitmq.host=192.168.181.133
spring.rabbitmq.port=5672
spring.rabbitmq.username=admin
spring.rabbitmq.password=111111

#设置交换器的名称
mq.config.exchange=log.topic

 

编写 Provider

 

 

UserSender (OrderSender以及ProduceSender三者类似)

 

package com.bjsxt.send;

import org.springframework.amqp.core.AmqpTemplate;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.beans.factory.annotation.Value;
import org.springframework.stereotype.Component;

/**
 * 消息发送者
 */
@Component
public class UserSender {

    @Autowired
    private AmqpTemplate amqpTemplate;

    @Value("${mq.config.exchange}")
    private String exchange;

    /**
     * 发送消息的方法
     * @param msg
     */
    public void sendMsg(String msg){
        /*向消息队列发送消息*/
        /*
        * 参数一:队列的名称
        * 参数二:发送的消息
        * */
        amqpTemplate.convertAndSend(exchange,"user.log.bug", "user.log.debug....."+msg);
        amqpTemplate.convertAndSend(exchange,"user.log.info", "user.log.info....."+msg);
        amqpTemplate.convertAndSend(exchange,"user.log.warn", "user.log.warn....."+msg);
        amqpTemplate.convertAndSend(exchange,"user.log.error", "user.log.error....."+msg);


    }
}

 

 

编写 Consumer

 

InfoReceiver

package com.bjsxt.receive;

import org.springframework.amqp.core.ExchangeTypes;
import org.springframework.amqp.rabbit.annotation.*;
import org.springframework.stereotype.Component;

/***
 * 消息的接收者
 */
@Component
@RabbitListener(
        bindings = @QueueBinding(
                value = @Queue(value = "${mq.config.queue.info}",autoDelete = "true"),
                exchange = @Exchange(value = "${mq.config.exchange}",type = ExchangeTypes.TOPIC),
                key="*.log.info"
        )
)
public class InfoReceiver {

    /**
     * 接收消息的方法
     * 采用消息队列监听机制
     * @param msg
     */
   @RabbitHandler
    public void process(String msg){
        System.out.println("info--receiver=:"+msg);
    }
}

 

LogReceiver

package com.bjsxt.receive;

import org.springframework.amqp.core.ExchangeTypes;
import org.springframework.amqp.rabbit.annotation.*;
import org.springframework.stereotype.Component;

/***
 * 消息的接收者
 */
@Component
@RabbitListener(
        bindings = @QueueBinding(
                value = @Queue(value = "${mq.config.queue.logs}",autoDelete = "true"),
                exchange = @Exchange(value = "${mq.config.exchange}",type = ExchangeTypes.TOPIC),
                key="*.log.*"
        )
)
public class LogReceiver {

    /**
     * 接收消息的方法
     * 采用消息队列监听机制
     * @param msg
     */
   @RabbitHandler
    public void process(String msg){
        System.out.println("log--receiver=:"+msg);
    }
}

 

 

Fanout 交换器(广播)

 

 

RabbitMQ的三大交换器详解

 

RabbitMQ的三大交换器详解

 

 

 

 

 

 

Consumer

 

spring.application.name=springcloud-mq 
spring.rabbitmq.host=192.168.181.133
spring.rabbitmq.port=5672 
spring.rabbitmq.username=admin 
spring.rabbitmq.password=111111
#设置交换器的名称
mq.config.exchange=order.fanout 
#短信服务队列名称 
mq.config.queue.sms=order.sms 
#push 服务队列名称 
mq.config.queue.push=order.push

 

Provider

spring.application.name=springcloud-mq 
spring.rabbitmq.host=192.168.181.133
spring.rabbitmq.port=5672 
spring.rabbitmq.username=admin 
spring.rabbitmq.password=111111
#设置交换器的名称 
mq.config.exchange=order.fanout

 

接收方的不停之处在于没有了key其他都是一样的,然后exchange类型换成fanout

 

发送方

rabbitAmqpTemplate.convertAndSend(this.exchange,"", msg);
本文来源huayang183,由架构君转载发布,观点不代表Java架构师必看的立场,转载请标明来源出处:https://javajgs.com/archives/18691

发表评论