RabbitMQ实现消息的延迟推送或延迟发送

RabbitMQ实现消息的延迟推送或延迟发送

一、RabbitMQ是什么?

1.RabbitMQ简介

RabbitMQ是有erlang语言开发,基于AMQP(Advanced Message Queue 高级消息队列协议)协议实现的消息队列。

常见的消息队列有:RabbitMQ、Kafka 和 ActiveMQ

2.RabbitMQ的优点

RabbitMQ最初起源于金融系统,用于不同模块之间的消息通讯。

优点:

可靠性:可持久化,消息传输和发布确认。

灵活性:通过交换机将消息路由到对应的队列。

集群:多台mq可组成集群,对外提供整体服务

支持多语言:支持多种语言

可界面操作:提供简易的用户操作界面

等等。

3.常用组件

1.生产者(Producer):消息的制造者

2.消费者(Consumer):消息的消费者

3.消息(Message):消息对象,包括业务参数和mq的参数

4.队列(Queue):缓存或暂存储消息的容器

5.连接(Connection):应用服务和mq进行交互的tcp连接

6.信道(Channel):AMQP命令都是通过信道来完成的,它是一个虚拟的通道,来复用ctp连接。

7.交换机(Exchange):负责从生产者接收消息,根据路由规则发送到指定的队列里面。

8.路由键(Routing Key):交换机根据路由键将消息发往指定的队列。

9.虚拟主机(Virtual Host):就像是一个RabbitMQ 服务。

4.RabbitMQ的结构图

一张图介绍RabbitMQ组成以及各个组件之间的关系(参考网上画的)。

5.交换机的类型

交换机是用来发送消息到指定的队列里面的,它使用哪种路由算法是由交换机类型和绑定的规则所决定的。

A-直连交换机:

直连交换机是根据交换机和队列之间绑定的路由键,来将消息发往指定的队列里面。如果交换机与多个队列绑定,则在发送携带路由键的消息时,只发给此路由键的队列,每个队列都是相同副本(比较适合一对一)。

例如:我用直连交换机test-direct-exchange根据路由键test-direct发送一条消息,然后去队列里面看消息,如图所示

B-扇形交换机:

扇形交换机是将消息发往与它绑定的队列,而不去理会绑定的路由键是否一致。如果交换机与多个队列绑定,每个队列都是相同副本,起到广播的作用。

例如:我用扇形交换机test-fanout-exchange根据路由键test-fanout发送一条消息,然后去队列里面看消息,如图所示

但是三个队列都收到了消息,可见扇形交换机会忽略其路由键

C-主题交换机:

主题交换机是通过消息的路由键跟交换机和队列之间的绑定路由键进行匹配,将消息发给匹配上的队列,跟直连交换机的一对多相似,但是他的路由键可以支持模糊匹配。

例如:我用主题交换机test-topic-exchange根据路由键test.topic2发送一条消息,然后去队列里面看消息,如图所示

根据消息路由键和绑定的路由键进行模糊匹配,推送消息。

D-头交换机:

头交换机是主题交换机有点相似,主题交换机是基于路由键,而头交换机是基于消息的headers数据,所以在发送消息给头交换机时指定Routing key是不起作用的。头交换机在绑定队列时需要指定参数Arguments,发送消息时需要指定headers和Arguments相匹配,消息才能被推到相应的队列。

例如:我用头交换机test-headers-exchange根据路由键test-headers1发送一条消息,然后去队列里面看消息,如图所示

如果前两个队列能收到消息,证明路由键不生效。

二、定时推送思路实现

rabbitmq实现延时消息主要有两种方式:

死信消息(队列ttl+死信exchange)

延时插件 (rabbitmq-delayed-message-exchange)

rabbitmq 实现方式一:队列ttl+死信exchange

简述:使用两个队列,一个队列接收消息不消费,等待指定时间后消息死亡,再由该队列绑定的死信exchange再次将其路由到另一个队列提供业务消费。

ttl 和 死信exchange 相关知识

ttl

先贴两个个rabbitmq官方文档:

Time-To-Live and Expiration:https://www.rabbitmq.com/ttl.html

Dead Letter Exchangeshttps:https://www.rabbitmq.com/dlx.html

我这里也简单介绍下:

rabbitmq 可以给 消息 和 队列 设置 ttl(生存时间)

队列设置:x-message-ttl=60000 (队列中所有消息都只有60s存活时间)

指定消息设置:expire=60000 指定消息只有60s存活时间

如果队列和消息同时设置了ttl,则取较小的那个作为ttl。消息死亡后不会被消费者消费。

死信exchange

死信(死亡的消息):

消费者使用 basic.reject 或 basic.nack 并将requeue参数设置为 false 来否定的消息ttl到期的消息队列超过长度限制被丢弃的消息

当一个队列设置了死信exchange 后,这个队列的死信都会被投递到死信exchange中,然后可以再次路由到其他队列中(如果指定了死信routing key 则死信消息routing key 变为设置的routing key,未设置则为原始 routing key)。

使用介绍

先声明一个消费队列 queue_dlx,用来接收死信消息,并提供消费;

然后声明一个死信exchange_dlx, 绑定 queue_dlx,接收消息后路由至queue_dlx;

声明一个延迟队列,queue_delay, 用来接收业务消息,但不提供消费,等待消息死亡后转至死信exchange。(即延迟)声明一个exchange,由业务发送消息到exchange,然后转至queue_delay.

一个消息的流程大概是:

简单分析

缺点:

1.只能支持固定延迟等级的消息

2.使用较复杂,得声明一堆队列&exchange

3.一个致命的问题就是消息顺序,不会按照延迟时间的先后顺序输出,而是按照queue本身先进先出的规则。即10秒延迟的消息如果是在20秒延迟消息后扔入的,那么也要等20秒延迟的消息输出后才能输出。除非消息的延迟时间是一致的否则无法满足业务要求

优点:

1.支持镜像队列复制,实现高可用

2.支持大量消息(成千上万)

3.适用场景: 使用固定延迟时间的场景。

备注:对于高版本(3.6及以上)的rabbitmq建议使用lazy-mode作为延迟队列,防止大量延时消息堆积而占用大量内存,从而触发rabbitmq换页阻塞队列。 (如果使用spring的话,即使低版本rabbitmq也不用太担心:spring-amqp默认发送持久化消息,即使触发换页,也只是把消息从内存中逐出而已。)

rabbitmq 实现方式二:rabbitmq延时插件

简述:延时消息不直接投递到队列中,而是先转储到本地Mnesia数据库中,然后定时器在消息到期后再将其投递到队列中。

延时插件使用

关于用法可以直接看这个文档或者网上搜一搜,这里就不介绍了。

github地址:https://github.com/rabbitmq/rabbitmq-delayed-message-exchange

其大概原理就是:指定了延时的消息,会被先保存在 Mnesia (erlang编写的数据库管理系统)中,然后有一个定时器去查询最近需要被投递的消息,将其投递到目标队列中。

简单分析

优点:

基本支持任意延迟时间(最大延迟时间不应超过 24.8 天)

使用 rabbitmq 的延迟插件(rabbitmq delayed message plugin)时,可以设置延迟的最大时间。但是,该最大时间会受到 rabbitmq 版本、erlang vm 版本以及操作系统等因素的影响。这是由于 erlang vm 定时器精度和 rabbitmq 的实现限制所导致的。对具体实现来说,还需要考虑消息数量、消息大小和队列容量等因素,以确保系统性能和可靠性。

如果需要发送长时间的延迟消息,可以使用缩短延迟时间的方法来实现,例如将一个延迟很长的消息分拆成多个延迟较短的子消息,在消息附加属性中指定分片信息,同时接收方在收到所有分片信息后再合并消息。

缺点:

延时不可靠,存在消息数量较大或使用很久后延迟不准确(会推迟), 无备份机制,延时消息存在单个节点磁盘中,不支持ram类型的节点 (数据得存磁盘里面)增加大量内存的占用 (经测试发现,发送大量延时消息后,rabbitmq内存占用明显增高,比普通消息还要高很多那种。)

延时消息时开启事务机制会事务无效

@Bean("rabbitTransactionManager")

public RabbitTransactionManager rabbitTransactionManager() {

RabbitTransactionManager manager = new RabbitTransactionManager();

ConnectionFactory factory = rabbitTemplate.getConnectionFactory();

manager.setConnectionFactory(factory);

return manager;

}

RabbitTransactionManager 会导致spring事务也无效,可能多数据源的问题

安装插件需要重启

适用场景::如果不是无关紧要的小业务,不建议使用。

3.基于队列ttl+死信exchange代码实现

实现消息延迟发送的具体思路是:首先创建一个交换机来当做死信交换机,再创建一个队列与这个死信交换机进行绑定就称作死信队列。其次创建一个交换机来当做正常交换机,在创建一个队列与这个正常交换机进行绑定,同时将死信交换机和死信路由键配置到这个正常队列里面。这样,当一条带有存活时间的消息通过正常交换机发送过来时,首先进入正常队列里面,然后到了存活时间,就会通过死信交换机根据路由键发送到死信队列里面,然后消费者消费死信队列里的消息,就达到了延迟消费的目的。

java代码实现。

1.首先创建maven项目,导入pom文件

org.springframework.boot

spring-boot-starter-amqp

org.springframework.boot

spring-boot-starter-web

org.springframework.boot

spring-boot-starter-test

test

org.springframework.amqp

spring-rabbit-test

test

org.springframework.boot

spring-boot-maven-plugin

2.配置配置文件(如果是yml,就用对应的书写规则)

spring.rabbitmq.host=localhost

spring.rabbitmq.username=guest

spring.rabbitmq.password=guest

spring.rabbitmq.port=5672

spring.rabbitmq.virtual-host=/

3.配置mq的相关组件

package com.wps.cn.config;

import org.springframework.amqp.core.Binding;

import org.springframework.amqp.core.BindingBuilder;

import org.springframework.amqp.core.DirectExchange;

import org.springframework.amqp.core.Queue;

import org.springframework.context.annotation.Bean;

import org.springframework.context.annotation.Configuration;

import java.util.HashMap;

import java.util.Map;

/**

* @author wangps

* @date 2022年11月22日 14:44

*/

@Configuration

public class QueueConfig {

public static final String NORMAL_QUEUE_NAME = "normal_queue_name";

public static final String NORMAL_EXCHANGE_NAME = "normal_exchange_name";

public static final String NORMAL_ROUTING_KEY = "normal_routing_key";

public static final String DLX_QUEUE_NAME = "dlx_queue_name";

public static final String DLX_EXCHANGE_NAME = "dlx_exchange_name";

public static final String DLX_ROUTING_KEY = "dlx_routing_key";

/**

* 死信队列

* @return

*/

@Bean

Queue dlxQueue() {

return new Queue(DLX_QUEUE_NAME, true);

}

/**

* 死信交换机

* @return

*/

@Bean

DirectExchange dlxExchange() {

return new DirectExchange(DLX_EXCHANGE_NAME);

}

/**

* 绑定死信队列和死信交换机

* @return

*/

@Bean

Binding dlxBinding() {

return BindingBuilder.bind(dlxQueue()).to(dlxExchange())

.with(DLX_ROUTING_KEY);

}

/**

* 普通消息队列

* @return

*/

@Bean

Queue normalQueue() {

Map args = new HashMap<>();

//设置消息过期时间,此方法是在队列的颗粒度设置,比较局限,所以在消息上设置过期时间

// args.put("x-message-ttl", 1000*5);

//设置死信交换机

args.put("x-dead-letter-exchange", DLX_EXCHANGE_NAME);

//设置死信 routing_key

args.put("x-dead-letter-routing-key", DLX_ROUTING_KEY);

return new Queue(NORMAL_QUEUE_NAME, true, false, false, args);

}

/**

* 普通交换机

* @return

*/

@Bean

DirectExchange normalExchange() {

return new DirectExchange(NORMAL_EXCHANGE_NAME);

}

/**

* 绑定普通队列和与之对应的交换机

* @return

*/

@Bean

Binding nomalBinding() {

return BindingBuilder.bind(normalQueue())

.to(normalExchange())

.with(NORMAL_ROUTING_KEY);

}

}

4.创建消费者

package com.wps.cn.consumer;

import com.rabbitmq.client.Channel;

import com.wps.cn.config.QueueConfig;

import org.slf4j.Logger;

import org.slf4j.LoggerFactory;

import org.springframework.amqp.core.Message;

import org.springframework.amqp.rabbit.annotation.RabbitListener;

import org.springframework.messaging.handler.annotation.Headers;

import org.springframework.stereotype.Component;

import java.util.Map;

/**

* @author wangps

* @date 2022年11月22日 14:55

*/

@Component

public class DlxConsumer {

private static final Logger logger = LoggerFactory.getLogger(DlxConsumer.class);

@RabbitListener(queues = QueueConfig.DLX_QUEUE_NAME)

public void process(String order, Message message, @Headers Map headers, Channel channel) {

logger.info("订单号消息", order);

System.out.println("执行结束...."+message);

}

}

5.创建controller当做生产者

package com.wps.cn.controller;

import com.wps.cn.config.QueueConfig;

import org.slf4j.Logger;

import org.slf4j.LoggerFactory;

import org.springframework.amqp.core.AmqpTemplate;

import org.springframework.beans.factory.annotation.Autowired;

import org.springframework.web.bind.annotation.GetMapping;

import org.springframework.web.bind.annotation.RequestMapping;

import org.springframework.web.bind.annotation.RestController;

import java.util.UUID;

/**

* @author wangps

* @date 2022年11月22日 15:50

*/

@RestController

@RequestMapping("/producer")

public class TestProducer {

private static final Logger logger = LoggerFactory.getLogger(TestProducer.class);

@Autowired

private AmqpTemplate rabbitTemplate;

@GetMapping("/sendMessage")

public Object submit(){

String orderId = UUID.randomUUID().toString();

logger.info("提交订单消息========",orderId);

rabbitTemplate.convertAndSend(

QueueConfig.NORMAL_EXCHANGE_NAME,QueueConfig.NORMAL_ROUTING_KEY,

orderId,

message -> {

message.getMessageProperties().setExpiration(1000*5+"");

return message;

});

return "{'orderId':'"+orderId+"'}";

}

}

6.创建启动类

package com.wps.cn;

import org.springframework.boot.SpringApplication;

import org.springframework.boot.autoconfigure.SpringBootApplication;

@SpringBootApplication

public class DxlRabbitmqTestApplication {

public static void main(String[] args) {

SpringApplication.run(DxlRabbitmqTestApplication.class, args);

}

}

7.运行启动类,然后在页面访问,模拟推送消息

http://localhost:8080/producer/sendMessage

观察日志可以看出,消息发出后,在5s后消费者收到消息,从而达到延迟消费的情况。

在这里插入图片描述

4.基于rabbitmq延时插件代码实现

环境准备

1.安装有 RabbitMQ 的服务器。

2.下载延时消息插件:rabbitmq_delayed_message_exchange

下载地址:https://www.rabbitmq.com/community-plugins.html

RabbitMQ延时队列插件下载页面

点击下载之后下载 rabbitmq_delayed_message_exchange-3.8.0.ez 这个文件。

3.将下载的文件复制到 RabbitMQ 的插件目录下(一般是:/opt/rabbitmq/plugins)。

4.启用插件:

rabbitmq-plugins enable rabbitmq_delayed_message_exchange

代码实现

1、导入 MAVEN 依赖

org.springframework.boot

spring-boot-starter-amqp

2、定义交换机、队列、路由KEY

/**

* RabbitMQ常量

*

* @author ZhengNC

* @date 2020/9/21 11:40

*/

public interface RabbitConstant {

/**

* 交换机

*/

interface Exchanges{

/**

* 延时交换机(通过延时插件实现 rabbitmq_delayed_message_exchange)

*/

String delayedExchange = "spring.boot.delayed.exchange";

}

/**

* 队列

*/

interface Queues{

/**

* 延时队列(通过延时插件实现)

*/

String delayedQueue = "spring.boot.delayed.queue";

}

/**

* 路由key

*/

interface RouterKey{

/**

* 延时路由key(通过延时插件实现)

*/

String delayedRouteKey = "delayed.route.key";

}

}

3、配置 RABBITMQ 绑定关系

package com.qixi.mq.delay.config;

import com.qixi.mq.delay.common.constant.RabbitConstant;

import org.springframework.amqp.core.*;

import org.springframework.beans.factory.annotation.Qualifier;

import org.springframework.context.annotation.Bean;

import org.springframework.context.annotation.Configuration;

import java.util.HashMap;

import java.util.Map;

/**

* RabbitMQ配置

*

* @author ZhengNC

* @date 2020/9/14 10:40

*/

@Configuration

public class RabbitConfig {

/**

* 延时队列(通过延时插件实现)

*

* @return

*/

@Bean("delayedQueue")

public Queue delayedQueue(){

return new Queue(RabbitConstant.Queues.delayedQueue);

}

/**

* 延时交换机Direct交换机 起名:(通过延时插件实现)

*定义了一个x-delayed-message类型的交换机,由于Spring AMQP中没有这个类型的交换机,

* 所以我们使用一个CustomExchange来定义这个插件构建的交换机,

* 它和其它交换机相同,实现了AbstructExchange。

* 唯一的区别是没有指定type类型。type类型可以自定义,

* 这样我们就可以通过构造方法自定义交换机的类型。

* 在使用到延迟交换机插件的时候,我们使用插件新添加了一个x-delayed-message类型的交换机。

* @return

*/

@Bean("delayedExchange")

public CustomExchange delayedExchange(){

Map map = new HashMap<>();

map.put("x-delayed-type", "direct");

return new CustomExchange(RabbitConstant.Exchanges.delayedExchange,

"x-delayed-message", true, false, map);

}

/**

* 绑定延时队列和延时交换机(延时插件实现方式)

*

* @param delayedQueue

* @param delayedExchange

* @return

*/

@Bean

public Binding delayedQueue_delayedExchange(

@Qualifier("delayedQueue") Queue delayedQueue,

@Qualifier("delayedExchange")CustomExchange delayedExchange){

return BindingBuilder.bind(delayedQueue)

.to(delayedExchange)

.with(RabbitConstant.RouterKey.delayedRouteKey)

.noargs();

}

}

4、延时消息生产者

package com.qixi.mq.delay.producer;

import com.qixi.mq.delay.common.constant.RabbitConstant;

import org.springframework.amqp.rabbit.core.RabbitTemplate;

import org.springframework.beans.factory.annotation.Autowired;

import org.springframework.stereotype.Service;

/**

* 延时消息生产者

*

* @author ZhengNC

* @date 2020/9/21 14:15

*/

@Service

public class TTLProducer {

@Autowired

private RabbitTemplate rabbitTemplate;

/**

* 发送一条延时消息(延时插件的实现方式)

*

* @param message

*/

public void sendDelayedMessage(String message){

rabbitTemplate.convertAndSend(

RabbitConstant.Exchanges.delayedExchange,

RabbitConstant.RouterKey.delayedRouteKey,

message,

msg -> {

//设置此消息延时十秒

msg.getMessageProperties()

.setHeader("x-delay", 10000);

return msg;

});

}

}

5、延时消息的消费者

package com.qixi.mq.delay.consumer;

import com.qixi.mq.delay.common.constant.RabbitConstant;

import org.springframework.amqp.rabbit.annotation.RabbitListener;

import org.springframework.stereotype.Component;

import java.time.LocalTime;

import java.time.format.DateTimeFormatter;

/**

* 延时消息的消费者

*

* @author ZhengNC

* @date 2020/9/21 14:08

*/

@Component

public class TTLConsumer {

/**

* 消费延时消息(延时插件实现)

*

* @param message

*/

@RabbitListener(queues = RabbitConstant.Queues.delayedQueue)

public void delayedConsumer(String message){

System.out.println("消费了一条消息,消费时间:"

+ DateTimeFormatter.ofPattern("HH:mm:ss")

.format(LocalTime.now()));

System.out.println(message);

}

}

6、编写接口测试发送消息

package com.qixi.mq.delay.controller;

import com.qixi.mq.delay.common.dto.ResponseEntity;

import com.qixi.mq.delay.producer.TTLProducer;

import org.springframework.beans.factory.annotation.Autowired;

import org.springframework.web.bind.annotation.GetMapping;

import org.springframework.web.bind.annotation.RequestMapping;

import org.springframework.web.bind.annotation.RestController;

import java.time.LocalTime;

import java.time.format.DateTimeFormatter;

/**

*

* @author ZhengNC

* @date 2020/9/21 14:27

*/

@RestController

@RequestMapping("ttl")

public class TTLProducerController {

@Autowired

private TTLProducer producer;

/**

* 发送延时消息(延时插件实现方式)

*

* @return

*/

@GetMapping("sendDelayedMsg")

public ResponseEntity sendDelayedMsg(){

DateTimeFormatter timeFormatter = DateTimeFormatter.ofPattern("HH:mm:ss");

StringBuilder message = new StringBuilder("这是一条延时消息,消息的发送时间为:");

message.append(timeFormatter.format(LocalTime.now()));

producer.sendDelayedMessage(message.toString());

return ResponseEntity.success();

}

}

7、测试结果

消费了一条消息,消费时间:10:00:11

这是一条延时消息,消息的发送时间为:10:00:01

相关推荐

Q宠物怎么结婚?有什么限制和好处?
beat365官方网站登录

Q宠物怎么结婚?有什么限制和好处?

⏳ 07-07 👁️ 6332
液金几年后需要换吗?
365bet欧洲版

液金几年后需要换吗?

⏳ 07-08 👁️ 4782