rabbitmq结合spring实现消息队列优先级的方法
发布时间 - 2026-01-10 23:06:14 点击率:次 1.1项目背景:做一个灾情预警的消息平台,灾情检查系统需要向消息平台里面推送消息,这里是典型的异构系统的消息传递,我们需要选择一个中间件作为消息队列,调研分析了rabbitmq,zeromq,activemq,kafka等消息中间件,综合性能,安全,可持久化等角度果断选择了rabbitmq作为我们的消息中间件 (其实这里是因为rabbitmq 是spring官方支持的,开发起来方便)。需求上我们有多种类型的消息,这里有紧急推送的和一般的等区分,高并发时,就会有对消息进行优先推送的情况出现,于是rabbitmq消息队优先级的推送功能是我们需要解决的首个技术点.

1.2技术调研:这里一个概念需要说明,为什么说是消息队列的优先级而不是消息的优先级,来看下消息队列的工作原理
生产者生成消息打到交换机里面(如果没有声明交换机,会打到default exchange里面),交换机绑定一个或多个队列,消息进入队列里面,消费者一直在监听队列,发现队列里面有消息就开始消费,这里就是一个消息传递的过程,queue是一个栈队列,栈是先进先出的,就是说消息来了依次排队,一个队列并不能实现消息的插队和优先推送的功能。但是如果说我们的多个队列有不同的优先级,不同优先级的消息通过roatingkey进入不同的队列,优先级高的队列消息被优先消费,这样也能形成一个相对意义上的优先级,所以说这里不是消息的优先级而是队列的优先级.
1.2.1 为什么说是相对意义上的优先级
有并发才有优先级,如果每个消息都能被瞬间处理也不会有消息优先推送的需求,那我们看看消息会在哪里阻塞
1,queue,很明显高并发的时候队列里面是会存在很多消息的,2,eschange ,高并发的时候producer发送给exchange的时候也会产生阻塞。
第一种情况由于我们队列已经定义优先级了,所以进入队列的消息都是同种优先级别的,并不需要插队。而对于第二种情况,消息在exchange时阻塞时并不能实现消息优先进入队列,依然是一个依次处理的情景,但是由于exchang到queue的处理速度极快,所有我们忽略了这块的优先级。
1.2.3 代码实现
在rabbitmq3.5版本之前,官方并没有实现队列优先级的功能,但论坛里面有一些插件可以实现(末尾附链接),这里我们主要说3.5版本之后的实现
1.2.3.1 Java代码
Connectionconn =RabbitMQConnectionUtil.getRabbitmqConnection();//创建连接
Channelchannel = conn.createChannel();//创建channel
Map<String,Object> arg = newHashMap<String, Object>();
arg.put("x-max-priority",10); //队列的属性参数 有10个优先级别
// 声明(创建)队列
//channel.queueDeclare(QUEUE_NAME, false, false, false, null);
channel.queueDeclare(QUEUE_NAME,true,false, false, arg);
// 消息内容
String message ="Hello World!";
channel.basicPublish("",QUEUE_NAME, null, message.getBytes());
BasicPropertiesprop =new BasicProperties(null, null, null, null, 1,
null, null, null, null, null, null, null, null,null);//消息的参数,声明该消息的优先级是1
channel.basicPublish("",QUEUE_NAME, prop, message.getBytes()); //消息发布
System.out.println("[x] Sent '" + message + "'");
//关闭通道和连接
channel.close();
conn.close();
客户端看下结果:
1.2.3.2结合spring实现:
1.2.3.2.1 xml配置:
<beans xmlns="http://www.springframework.org/schema/beans"
xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
xmlns:rabbit="http://www.springframework.org/schema/rabbit"
xsi:schemaLocation="http://www.springframework.org/schema/beans
http://www.springframework.org/schema/beans/spring-beans-3.0.xsd
http://www.springframework.org/schema/rabbit
http://www.springframework.org/schema/rabbit/spring-rabbit-1.0.xsd" >
<description>rabbitmq 连接服务配置</description>
<!-- 连接配置 -->
<rabbit:connection-factory id="connectionFactory" host="${rabbit.ip}" username="${rabbit.username}"
password="${rabbit.password}" port="${rabbit.port}" virtual-host="${rabbit.vhost}"/>
<rabbit:admin connection-factory="connectionFactory"/>
<rabbit:template id="amqpTemplate" connection-factory="connectionFactory" />
<!-- spring template声明-->
<!-- 声明一个队列 -->
<rabbit:queue id="test_queue_key" name="test_queue_key" durable="true" auto-delete="false" exclusive="false">
<rabbit:queue-arguments>
<entry key="x-max-priority">
<value type="java.lang.Integer">10</value>//这个地方一定是integer的,别的不好使!!
</entry>
</rabbit:queue-arguments>
</rabbit:queue>
<!-- 监听配置queues:监听的队列,多个的话用逗号(,)分隔 ref:监听器-->
<rabbit:listener-container connection-factory="connectionFactory" acknowledge="auto">
<rabbit:listener queue-names="test_queue_key" ref="queueListenter" method="onMessage"/>
</rabbit:listener-container>
<bean id="queueListenter" class="com.DF.spring.springAMQP.QueueListener" />
1.2.3.2.2代码部分:
producter:
AbstractApplicationContext ctx = new
ClassPathXmlApplicationContext("classpath:/spring/rabbitmq-contextDemo2.xml");
RabbitTemplate amqpTemplate = ctx.getBean(RabbitTemplate.class);
Random random = new Random();
for (int i=0; i< 1000; i++){
final int priority = random.nextInt(10 - 1 + 1) + 1;//随机的优先级
amqpTemplate.convertAndSend("test_queue_key", (Object)("hello world"), new MessagePostProcessor() {
@Override
public Message postProcessMessage(Message message) throws AmqpException {
message.getMessageProperties().setPriority(priority);
return message;
}
});
}
customer:
public class QueueListener implements MessageListener{
@Override
public void onMessage(Message message) {
try{
System.out.print("[x] 接收到的消息:"+new String(message.getBody(),"utf-8")+"&&&"+"优先级"+message.getMessageProperties().getPrority());
Thread.sleep(1000);
}catch(Exception e){
e.printStackTrace();
}
}
}
从客户端看下队列里面的消息:
我们发送随机优先级的消息进入队列,看看消费端打印出来的消息:
到这里,rabbitmq结合spring的demo功能实现......
以上就是本文的全部内容,希望对大家的学习有所帮助,也希望大家多多支持。
# java
# spring
# 消息队列
# 消息队列优先级
# SpringBoot使用RabbitMQ延时队列(小白必备)
# SpringBoot集成RabbitMQ的方法(死信队列)
# springboot实现rabbitmq的队列初始化和绑定
# Spring Boot与RabbitMQ结合实现延迟队列的示例
# 消息队列 RabbitMQ 与 Spring 整合使用的实例代码
# Spring学习笔记3之消息队列(rabbitmq)发送邮件功能
# Spring项目集成RabbitMQ及自动创建队列
# 多个
# 是一个
# 打到
# 并不能
# 都是
# 客户端
# 也不
# 来了
# 会有
# 是因为
# 也会
# 都能
# 也能
# 会在
# 意义上
# 才有
# 如果没有
# 做一个
# 如果说
# 可以实现
相关栏目:
【
网站优化151355 】
【
网络推广146373 】
【
网络技术251813 】
【
AI营销90571 】
相关推荐:
JavaScript模板引擎Template.js使用详解
Laravel如何实现图片防盗链功能_Laravel中间件验证Referer来源请求【方案】
深圳网站制作设计招聘,关于服装设计的流行趋势,哪里的资料比较全面?
Python结构化数据采集_字段抽取解析【教程】
作用域操作符会触发自动加载吗_php类自动加载机制与::调用【教程】
ai格式如何转html_将AI设计稿转换为HTML页面流程【页面】
Laravel Eloquent访问器与修改器是什么_Laravel Accessors & Mutators数据处理技巧
C++时间戳转换成日期时间的步骤和示例代码
laravel服务容器和依赖注入怎么理解_laravel服务容器与依赖注入解析
Laravel如何集成Inertia.js与Vue/React?(安装配置)
Python3.6正式版新特性预览
Windows10电脑怎么查看硬盘通电时间_Win10使用工具检测磁盘健康
如何在 Telegram Web View(iOS)中防止键盘遮挡底部输入框
创业网站制作流程,创业网站可靠吗?
Microsoft Edge如何解决网页加载问题 Edge浏览器加载问题修复
javascript基本数据类型及类型检测常用方法小结
如何在阿里云高效完成企业建站全流程?
如何在云服务器上快速搭建个人网站?
如何快速搭建支持数据库操作的智能建站平台?
如何在自有机房高效搭建专业网站?
Laravel怎么判断请求类型_Laravel Request isMethod用法
HTML5空格在Angular项目里怎么处理_Angular中空格的渲染问题【详解】
详解Huffman编码算法之Java实现
javascript中的try catch异常捕获机制用法分析
详解Android图表 MPAndroidChart折线图
网站制作免费,什么网站能看正片电影?
Java Adapter 适配器模式(类适配器,对象适配器)优缺点对比
Laravel如何实现多语言支持_Laravel本地化与国际化(i18n)配置教程
标题:Vue + Vuex + JWT 身份认证的正确实践与常见误区解析
如何使用 Go 正则表达式精准提取括号内首个纯字母标识符(忽略数字与嵌套)
绝密ChatGPT指令:手把手教你生成HR无法拒绝的求职信
Laravel如何使用Laravel Vite编译前端_Laravel10以上版本前端静态资源管理【教程】
Laravel与Inertia.js怎么结合_使用Laravel和Inertia构建现代单页应用
装修招标网站设计制作流程,装修招标流程?
网站制作企业,网站的banner和导航栏是指什么?
Laravel的.env文件有什么用_Laravel环境变量配置与管理详解
Laravel怎么生成二维码图片_Laravel集成Simple-QrCode扩展包与参数设置【实战】
如何在Windows服务器上快速搭建网站?
如何用虚拟主机快速搭建网站?详细步骤解析
Android滚轮选择时间控件使用详解
香港代理服务器配置指南:高匿IP选择、跨境加速与SEO优化技巧
PHP 500报错的快速解决方法
智能起名网站制作软件有哪些,制作logo的软件?
HTML5打空格有哪些误区_新手常犯的空格使用错误【技巧】
利用 Google AI 进行 YouTube 视频 SEO 描述优化
大连网站制作公司哪家好一点,大连买房网站哪个好?
Laravel如何使用Guzzle调用外部接口_Laravel发起HTTP请求与JSON数据解析【详解】
北京网站制作的公司有哪些,北京白云观官方网站?
宙斯浏览器怎么屏蔽图片浏览 节省手机流量使用设置方法
Laravel API路由如何设计_Laravel构建RESTful API的路由最佳实践

