最近在公司的代码里面看到RabbitMQ相关的代码,于是带着好奇心研究了下RabbitMQ.
RabbitMQ的核心是交换机和队列。
交换机的功能主要是接收消息并且转发到绑定的队列,交换机不存 储消息,在启用ack模式后,交换机找不到队列会返回错误。队列用于临时存储消息和转发消息。
交换机有四种类型:Direct, topic, Headers and Fanout。
Direct:direct 类型的行为是”先匹配, 再投送”. 即在绑定时设定一个 routing_key, 消息的routing_key 匹配时, 才会被交换器投送到绑定的队列中去.[精确匹配类型]
Topic:按规则转发消息(最灵活)[模式匹配]
Headers:设置header attribute参数类型的交换机
RabbitMQ的队列类型有两种,即时队列和延时队列【DelayQueue】。
即时队列:队列中的消息会被立即消费;
延时队列:队列中的消息会在指定的时间延时之后被消费。
下面将从即时队列和延时队列的具体实现来详解RabbitMQ.
即时队列详解:
DirectExchange:
DirectExchangeTest.java
|
|
HelloSender.java
|
|
HelloReceiver.java
|
|
RabbitConfig.java
|
|
FanoutExchange:
FanoutExchangeTest.java
|
|
FanoutSender .java
FanoutReceiverA .java……FanoutReceiverC .java
FanoutRabbitConfig.java
|
|
TopicExchangeTest.java
|
|
TopicSender .java
TopicReceiver .java
TopicReceiver2 .java
TopicRabbitConfig .java
topic 和 direct 类似, 只是匹配上支持了”模式”, 在”点分”的 routing_key 形式中, 可以使用两个通配符:
“*”表示一个词.
“#”表示零个或多个词.
在上面的TopicExchange中,当调用send1()方法,执行this.rabbitTemplate.convertAndSend("topicExchange","topic.message", context);
时,实际上只有两个队列,即采用with(“topic.#”);和with(“topic.message”);绑定的两个队列会接受到消息;
以上就是非常简单的即时队列的详解,线面是延时队列的详解。
延时队列有哪些用处呢?打个比方,加入用户下了单,但是在30分钟内没有支付我们就返回支付失败提示这个情景就可以采用延时队列来处理。
延时队列的原理图如下:
客户端:指具体往MQ发生消息端, 客户端将消息内容进行自定义包装, 将消息中附带目标队列名称。如:客户端向队列Q1发送字符串“hello” , 延时时间为60秒, 包装后修改为{“queueName”:”Q1”,”body”: “hello”},此时,将消息发送到DLX死信队列,而非Q1队列,并将消息设置为60秒超时。
DLX:死信队列,用来存储有超时时间信息的消息, 并且可以设置当消息超时时,转发到另一个指定队列(此处设置转发到router), 死信队列无消费者,当接收到客户端消息之后,等待消息超时,将消息转发到指定的Router队列
Router: 转发队列,用来接收死信队列超时消息, 如上示例消息,在接收到之后,消费者将消息解析,获取queueName,body,再向所获取的queueName队列中发送一条消息,内容为body.
延时队列的具体实现代码如下:
DelaySendTest.java
|
|
QueueMessage.java
MessageException.java
|
|
DeafaultMessageServiceImpl.java
|
|
QueueConfiguration.java
|
|
TradeProcessor.java
|
|
HelloProcessor.java
|
|
延时队列在可视化界面的表现形式为:
其实最终的原理非常简单:
归纳起来就是:客户端发送消息到指定的交换机,进入死信队列【死信队列没有消费者】,在死信队列里超时处理器等待消息超时,消息超时后转发消息给转发队列,转发队列的消息消费者监听到消息后进行对应的逻辑处理即可。