风也温柔

计算机科学知识库

【每日一题】Bean动态创建动态创建队列队列动态创建绑定关系SpringBoot实现RabbitMQ动态创建交换机、队列、交换机及绑定关系

  操作方式: 无需操作

  备注:所有指定为ExchangeEnum.fanout的队列,都将在队列名称后面追加“.10_87_10_*”(数字部分为服务所在IP地址)

  RabbitMQConfig

  说明:RabbitMQ核心配置类

  作用:实现动态创建交换机、动态创建队列、动态绑定

  操作方式: 无需操作

  备注:该类已将所有队列名称以Map形式注册至,bean名称“queuesNames”,结构:Map(Map) 消费者可直接使用@RabbitListener(queues = {"#{queuesNames.队列配置枚举名称}"})进行监听

  DefaultMQCallback

  说明:默认回调类,实现自MQCallback

  作用:消息发送成功或失败的回调类

  操作方式: 无需操作

  备注:可手动进行实现,调用queueMessageServiceImpl.setMQCallback(mqCallback)方法即可实现修改

  QueueMessageServiceImpl

  说明:默认消息发送类,实现自QueueMessageService

  作用:实现了消息发送的基本接口

  操作方式: 无需操作

  备注:无

  ②、核心代码解析spring示例注册

  <pre class="brush: html; toolbar: false">

public static  boolean registerBean(String beanName, T bean) {
    // 将applicationContext转换为ConfigurableApplicationContext
    ConfigurableApplicationContext context = (ConfigurableApplicationContext)     SpringContextUtil.getApplicationContext();
    // 将bean对象注册到bean工厂
    context.getBeanFactory().registerSingleton(beanName, bean);
    log.debug("【SpringContextUtil】注册实例“" + beanName + "”到spring容器:" + bean);
    return true;
}

  动态创建队列名称Bean

  <pre class="brush: html; toolbar: false"> @Bean("queuesNames")

public Map queuesNames() {
    return QueueEnum.getQueuesNames();
}
// 这个方法在QueueEnum
public static Map getQueuesNames() {
    return Arrays.asList(QueueEnum.values()).stream().collect(Collectors.toMap(queueEnum -> queueEnum.toString(), queueEnum -> queueEnum.getName()));
}

  动态创建交换机

  <pre class="brush: html; toolbar: false"> /**

 * @return java.lang.Object
 * @program com.gzqapi.rabbitmq.config.RabbitMQConfig.createExchange();
 * @description 动态创建交换机
 * @param: this is no-parameter method.
 * @author zhiqiang94@vip.qq.com
 * @create 2020/4/16 0016 9:28
 */
@Bean("createExchange")
public Object createExchange() {
    // 遍历交换机枚举
    ExchangeEnum.toList().forEach(exchangeEnum -> {
                // 声明交换机
                Exchange exchange;
                // 根据交换机模式 生成不同的交换机
                switch (exchangeEnum.getType()) {
                    case fanout:
                        exchange = ExchangeBuilder.fanoutExchange(exchangeEnum.getExchangeName()).durable(exchangeEnum.isDurable()).build();
                        break;
                    case topic:
                        exchange = ExchangeBuilder.directExchange(exchangeEnum.getExchangeName()).durable(exchangeEnum.isDurable()).build();
                        break;
                    case headers:
                        exchange = ExchangeBuilder.headersExchange(exchangeEnum.getExchangeName()).durable(exchangeEnum.isDurable()).build();
                        break;
                    case direct:
                    default:
                        exchange = ExchangeBuilder.topicExchange(exchangeEnum.getExchangeName()).durable(exchangeEnum.isDurable()).build();
                        break;
                }
                // 将交换机注册到spring bean工厂 让spring实现交换机的管理
                if (exchange != null) {
                    SpringContextUtil.registerBean(exchangeEnum.toString() + "_exchange", exchange);
                }
            }
    );
    // 不返回任何对象 该方法只用于在spring初始化时 动态的将bean对象注册到spring bean工厂
    return null;
}

  动态创建队列

  <pre class="brush: html; toolbar: false"> /**

 * @return java.lang.Object
 * @program com.gzqapi.rabbitmq.config.RabbitMQConfig.createQueue();
 * @description 动态创建队列
 * @param: this is no-parameter method.
 * @author zhiqiang94@vip.qq.com
 * @create 2020/4/16 0016 9:29
 */
@Bean("createQueue")
public Object createQueue() {
    // 遍历队列枚举 将队列注册到spring bean工厂 让spring实现队列的管理
    QueueEnum.toList().forEach(queueEnum -> SpringContextUtil.registerBean(queueEnum.toString() + "_queue", new Queue(queueEnum.getName(), queueEnum.isDurable(), queueEnum.isExclusive(), queueEnum.isAutoDelete(), queueEnum.getArguments())));
    // 不返回任何对象 该方法只用于在spring初始化时 动态的将bean对象注册到spring bean工厂
    return null;
}

  动态创建绑定关系

  <pre class="brush: html; toolbar: false"> /**

 * @return java.lang.Object
 * @program com.gzqapi.rabbitmq.config.RabbitMQConfig.createBinding();
 * @description 动态将交换机及队列绑定
 * @param: this is no-parameter method.
 * @author zhiqiang94@vip.qq.com
 * @create 2020/4/16 0016 9:29
 */
@Bean("createBinding")
public Object createBinding() {
    // 遍历队列枚举 将队列绑定到指定交换机
    QueueEnum.toList().forEach(queueEnum -> {
                // 从spring bean工厂中获取队列对象(刚才注册的)
                Queue queue = SpringContextUtil.getBean(queueEnum.toString() + "_queue", Queue.class);
                // 声明绑定关系
                Binding binding;
                // 根据不同的交换机模式 获取不同的交换机对象(注意:刚才注册时使用的是父类Exchange,这里获取的时候将类型获取成相应的子类)生成不同的绑定规则
                switch (queueEnum.getExchangeEnum().getType()) {
                    case fanout:
                        FanoutExchange fanoutExchange = SpringContextUtil.getBean(queueEnum.getExchangeEnum().toString() + "_exchange", FanoutExchange.class);
                        binding = BindingBuilder.bind(queue).to(fanoutExchange);
                        break;
                    case topic:
                        TopicExchange topicExchange = SpringContextUtil.getBean(queueEnum.getExchangeEnum().toString() + "_exchange", TopicExchange.class);
                        binding = BindingBuilder.bind(queue).to(topicExchange).with(queueEnum.getRoutingKey());
                        break;
                    case headers:
                        HeadersExchange headersExchange = SpringContextUtil.getBean(queueEnum.getExchangeEnum().toString() + "_exchange", HeadersExchange.class);
                        if (queueEnum.isWhereAll()) {
                            // whereAll表示全部匹配
                            binding = BindingBuilder.bind(queue).to(headersExchange).whereAll(queueEnum.getHeaders()).match();
                        } else {
                            // whereAny表示部分匹配
                            binding = BindingBuilder.bind(queue).to(headersExchange).whereAny(queueEnum.getHeaders()).match();
                        }
                        break;
                    case direct:
                    default:
                        DirectExchange directExchange = SpringContextUtil.getBean(queueEnum.getExchangeEnum().toString() + "_exchange", DirectExchange.class);
                        binding = BindingBuilder.bind(queue).to(directExchange).with(queueEnum.getRoutingKey());
                        break;
                }
                // 将绑定关系注册到spring bean工厂 让spring实现绑定关系的管理
                if (binding != null) {
                    SpringContextUtil.registerBean(queueEnum.toString() + "_binding", binding);
                }
            }
    );
    // 不返回任何对象 该方法只用于在spring初始化时 动态的将bean对象注册到spring bean工厂
    return null;
}

  3、使用教程①、按需增删交换机枚举

  

  ②、按需增删队列枚举

  

  ③、实现消费者监听

  

  ④、调用生产者发送消息

  

  ⑤、查看测试结果

  

  4、常见问题①、调用QueueMessageService.convertSendAndReceive方法返回值为null

  ​当消费者方法为void方法时,QueueMessageService.convertSendAndReceive方法返回值为nulljava消息队列使用场景,注意检查消费者监听方法。

  ②、QueueMessageService.convertSendAndReceive方法被多个消费者消费怎么办?

  ​当QueueMessageService.convertSendAndReceive发送的消息被多个消费者消费,返回的结果为第一个消费者的返回值。建议有返回值的队列使用直连模式(direct),这样能保证该消息只有一个消费者进行消费。

  ③、广播交换机(ExchangeTypeEnum.fanout)的使用场景有哪些

  ​广播交换机的角色类似于村里面的小喇叭,当有新的政策传达给这个村时java消息队列使用场景,就会使用小喇叭播放【每日一题】Bean动态创建动态创建队列队列动态创建绑定关系SpringBoot实现RabbitMQ动态创建交换机、队列、交换机及绑定关系,这样每个人(队列)都停到了内容。

  ​可用于发送类似公告、系统全局通知等广播需求。

  ④、直连交换机(ExchangeTypeEnum.direct)的使用场景有哪些

  java消息队列使用场景_java使用队列预定房间_java阻塞队列使用场景

  ​直连交换机的角色类似于邮递员,信封上写明了收件地址,收到的消息会根据目的地的不同投入到不同的队列中,若是邮递员没有找到这个地址,这封信就被丢弃了。

  ​可用于点对点之间的通讯。

  ⑤、通配符交换机(ExchangeTypeEnum.topic)的使用场景有哪些

  ​通配符交换机类似于脑子不聪明的邮件分发员,收到有一封送往“张家镇李家村12号”的信件,分发员来到分发点,发现有两个有污渍的邮箱(队列),一个能看清*李家村*,一个能看清张家镇*,快递员干脆就将信件复制了一份,往两个地方都送了投递了一份。

  ​可用于同一个任务,需要不同模块或者分布式系统的模块同时执行时使用。

  ⑥、头交换机(ExchangeTypeEnum.headers)的使用场景有哪些

  ​头交换机类似于程序中的鉴权,管理员拥有全部权限,用户拥有查看权限,用户无法调用需要管理员权限的接口(消息无法分发到未监听该header的队列),拥有相应权限才能调用相应权限的接口(消息分发到监听该header的队列)。这里需要注意,头交换机支持完全匹配和部分匹配,完全匹配为管理员接口必须拥有所有管理员权限才可调用,即拥有所有管理员权限才显示进入管理员后台的按钮。部分匹配为拥有其中一个或部分权限即可调用,即拥有任意管理员权限就显示进入管理员后台的按钮。

  ​本文仅为自己学习笔记,由于实现原理非常简单,且核心源码已在本文中展示,故不发放源码连接了。由于本人对RabbitMQ研究并不深入,本文若有错误之处,请大佬指出,我一定虚心学习,积极改正。

  ​第一次写文章,希望大家多多鼓励,多多支持。

  文章来源:http://m.zyiz.net/tech/detail-130757.html