us apple developer accounts for sale:再看rabbitmq的交换器和行列的关系

admin/2020-06-22/ 分类:科技/阅读:

最近又要用到rabbitmq,营业上要求服务器只发一次新闻,需要多个客户端都去单独消费。但我们知道rabbitmq的机制里,每个行列里的新闻只能消费一次,以是客户端要单独消费信息,就必须得每个客户端单独监听一个queue。以是我最终想实现的是服务端只声明exchange,客户端来建立queue和绑定exchange。然则在看种种rabbitmq博文和讨论的时刻,我以为对exchange的模式和queue间的关系讲的都不是很清晰。以是我决议自己验证一下

fanout模式和direct模式

本文主要验证fanout模式和direct模式下以上料想是否可行。fanout模式就是赫赫有名的广播模式了,只要queue绑定了fanout的交换器,就可以直接的收到新闻,无需routingkey的介入。而direct模式就是通过routing key直接发送到绑定了同样routing key的行列中。那么,在这两种exchange的模式下,是否都可以实现服务端仅建立exchange,客户端建立queue并绑定exchange呢?

Direct模式验证

我们先把交换器、routingkey、行列的名称界说好:

  1. 交换器为directTest
  2. routingkey为direct_routing_key
  3. 行列测试3个,首先测试Direct_test_queue_1,再行测试Direct_test_queue_2,再行测试Direct_test_queue_3

代码使用spring boot框架快速搭建。我们先计划好需要几个类来完成这个事情:

  1. 针对生产者,需要RabbitmqConfig,用来设置exchange的
  2. 针对生产者,需要DirectRabbitSender,用来实现Direct模式的新闻发送
  3. 针对消费者,需要DirectConsumerOne,来测试第一个行列Direct_test_queue_1天生和新闻吸收
  4. 针对消费者,需要DirectConsumerTwo,来测试第二个行列Direct_test_queue_2天生和新闻吸收
  5. 针对消费者,需要DirectConsumerThree,来测试第三个行列Direct_test_queue_3天生和新闻吸收
  6. 我们还需要一个测试类RabbitmqApplicationTests,用于测试新闻的发送和吸收

rabbitmq先设置一个DirectExchange

@Bean DirectExchange directExchange(){ return new DirectExchange("directTest", true, false); } 

我们可以看到Direct交换器的名称界说为了directTest,这时刻还未绑定任何的行列。启动程序,若我们的设想没错,则rabbitmq中应该已经天生了directTest的exchange。


Bingo!directTest交换器乐成建立。接下来,我们去编写DirectRabbitSender的代码

@Component public class DirectRabbitSender{ @Autowired private RabbitTemplate rabbitTemplate; private final String EXCHANGE_NAME = "directTest"; private final String ROUTING_KEY = "direct_routing_key"; public void send(Object message) { rabbitTemplate.convertAndSend(EXCHANGE_NAME, ROUTING_KEY, message); } } 

我们可以看到代码中,通过rabbitTemplate发送新闻到了交换器为directTest,routingkey为direct_routing_key的地方。但这时刻我们没有任何行列了,自然接不到新闻。现在我们去编写第一个消费者DirectConsumerOne来接受新闻。

@Component @RabbitListener(bindings = @QueueBinding( value = @Queue(value = "Direct_test_queue_1", durable = "true"), exchange = @Exchange(value = "directTest"), key = "direct_routing_key" )) public class DirectConsumerOne { @RabbitHandler private void onMessage(String message){ System.out.println("监听行列Direct_test_queue_1接到新闻" message); } } 

通过代码可以看到,我们通过@QueueBinding把Direct_test_queue_1行列绑定到了directTest和direct_routing_key上。Direct_test_queue_1并没有在rabbitmq建立,这并没有关系。一般来说,@RabbitListener会自动去建立行列。启动程序,我们去看一下rabbitmq里行列是不是建立了。

Bingo!再次验证乐成。我们去看看绑定关系是不是准确。这时刻Direct_test_queue_1应该绑定到了名为directTest的交换器,而绑定的routingkey为direct_routing_key

biubiubiu!绑定关系完全准确。到了这里,我们举行最后一步,写了单元测试去发送新闻,查看控制台中消费者是否乐成收到新闻。RabbitmqApplicationTests的代码如下:

@SpringBootTest class RabbitmqApplicationTests { @Autowired private DirectRabbitSender directRabbitSender; @Test void contextLoads() { } @Test public void directSendTest(){ directRabbitSender.send("direct-sender"); directRabbitSender.send("direct-sender_test"); } } 

启动测试类,然后去查看控制台。

没错,这就是我们想要到达的效果!基本可以宣布Direct模式验证乐成。服务端天生exchange,客户端去天生行列绑定的方式在direct模式下完全可行。为了保险起见,再验证一下天生多个消费者绑定到同一个行列是否可行。

DirectConsumerTwo代码如下:

@Component @RabbitListener(bindings = @QueueBinding( value = @Queue(value = "Direct_test_queue_2", durable = "true"), exchange = @Exchange(value = "directTest"), key = "direct_routing_key" )) public class DirectConsumerTwo { @RabbitHandler private void onMessage(String message){ System.out.println("监听行列Direct_test_queue_2接到新闻" message); } } 

DirectConsumerThree代码如下:

@Component @RabbitListener(bindings = @QueueBinding( value = @Queue(value = "Direct_test_queue_3", durable = "true"), exchange = @Exchange(value = "directTest"), key = "direct_routing_key" )) public class DirectConsumerThree { @RabbitHandler private void onMessage(String message){ System.out.println("监听行列Direct_test_queue_3接到新闻" message); } } 

启动测试类,我们去看两个地方:

  1. rabbitmq是否建立了客户端绑定的三个行列Direct_test_queue_1、Direct_test_queue_2、Direct_test_queue_3
  2. 消费者应该各自收到2条新闻(Test中发送了两条,参看上面 RabbitmqApplicationTests 的代码)。那3个行列,控制台中应该打印了6条新闻。

hohohoho!建立乐成,而且绑定关系我看了也全都准确。我们去看控制台

6条!没有任何偏差,至此,可以宣布Direct模式下,完全支持我们最初的想法:服务端天生exchange,客户端去天生行列绑定的方式在direct模式下完全可行。

fanout模式验证

接下来我们验证一下fanout的方式,基本操作流程和Direct模式一致。代码的结构也差不多:

  1. 针对生产者,需要RabbitmqConfig,直接在Direct模式下的rabbitmqConfig里直接添加Fanout的交换器设置
  2. 针对生产者,需要FanoutRabbitSender,用来实现Fanout模式的新闻发送
  3. 针对消费者,需要FanoutConsumerOne,来测试第一个行列Fanout_test_queue_1天生和新闻吸收
  4. 针对消费者,需要FanoutConsumerTwo,来测试第二个行列Fanout_test_queue_2天生和新闻吸收
  5. 针对消费者,需要FanoutConsumerThree,来测试第三个行列Fanout_test_queue_3天生和新闻吸收
  6. 测试类RabbitmqApplicationTests也直接复用Direact模式下测试的类

我就不多BB,直接上代码了。

RabbitmqConfig代码如下

@Configuration public class RabbitmqConfig { @Bean DirectExchange directExchange(){ return new DirectExchange("directTest", true, false); } @Bean FanoutExchange fanoutExchange(){ return new FanoutExchange("fanoutTest", true, false); } } 

FanoutRabbitSender的代码如下,此处和direct模式的区别是Fanout中没有routingkey,以是代码里也没界说routingkey:

@Component public class FanoutRabbitSender{ @Autowired private RabbitTemplate rabbitTemplate; private final String EXCHANGE_NAME = "fanoutTest"; public void send(Object message) { rabbitTemplate.convertAndSend(EXCHANGE_NAME, null, message); } } 

我们到这里先启动程序试试,看看fanoutTest的交换器在没有绑定行列的情况下是否天生了。

棒棒棒!和我们想的一样,那接下往复写完所有的消费者,这里和Direct模式最主要的区别是@Exchange中必须要指定type为fanout。direct模式的代码里没指定是因为@Exchange的type默认值就是direct。我直接上代码了:

/** * 监听器自动去声明queue=fanout_test_queue_1,并绑定到fanoutTest交换器 */ @Component @RabbitListener(bindings = @QueueBinding( value = @Queue(value = "fanout_test_queue_1", durable = "true"), exchange = @Exchange(value = "fanoutTest", type = ExchangeTypes.FANOUT) )) public class FanoutConsumerOne { @RabbitHandler private void onMessage(String message){ System.out.println("监听行列fanout_test_queue_1接到新闻" message); } } @Component @RabbitListener(bindings = @QueueBinding( value = @Queue(value = "fanout_test_queue_2", durable = "true"), exchange = @Exchange(value = "fanoutTest", type = ExchangeTypes.FANOUT) )) public class FanoutConsumerTwo { @RabbitHandler private void onMessage(String message){ System.out.println("监听行列fanout_test_queue_2接到新闻" message); } } @Component @RabbitListener(bindings = @QueueBinding( value = @Queue(value = "fanout_test_queue_3", durable = "true"), exchange = @Exchange(value = "fanoutTest", type = ExchangeTypes.FANOUT) )) public class FanoutConsumerThree { @RabbitHandler private void onMessage(String message){ System.out.println("监听行列fanout_test_queue_3接到新闻" message); } } 

接着去测试类RabbitmqApplicationTests中加上fanout的发送测试,然后注释掉direct的单元测试,以便一会造成滋扰

@SpringBootTest class RabbitmqApplicationTests { @Autowired private DirectRabbitSender directRabbitSender; @Autowired private FanoutRabbitSender fanoutRabbitSender; @Test void contextLoads() { } // @Test // public void directSendTest(){ // directRabbitSender.send("direct-sender"); // directRabbitSender.send("direct-sender_test"); // } @Test public void fanoutSendTest(){ fanoutRabbitSender.send("fanout-sender_1"); fanoutRabbitSender.send("fanout-sender_2"); } } 

代码都完成了,现在我们启动测试类,看看控制台是否正常收到了新闻

看图看图,fanout模式下也完全认证乐成!!!那我们可以宣布,文章开头的料想完全可以实现。

总结

服务端只声明exchange,客户端来建立queue和绑定exchange的方式完全可行。而且在Direct和Fanout模式下都可行。

那我们可以推测在Header模式的交换器和Topic模式的交换器下应该也大差不差。详细列位可自行验证,基本流程和上面direct和fanout的流程差不多。

,

欧博allbet网址

欢迎进入欧博allbet网址(Allbet Game):www.aLLbetgame.us,欧博官网是欧博集团的官方网站。欧博官网开放Allbet注册、Allbe代理、Allbet电脑客户端、Allbet手机版下载等业务。

TAG:
阅读:
广告 330*360
广告 330*360
Sunbet_进入申博sunbet官网
微信二维码扫一扫
关注微信公众号
新闻自媒体 Copyright © 2002-2019 Sunbet 版权所有
二维码
意见反馈 二维码