利用外部mq中间件解决单实例的简单消息队列重启丢失的问题
1. 解决node中耗时操作对请求的影响
通过fork方法创建一个子进制,进行操作
详情
问题
一旦单个实例被异常重启, 会导致数据丢失
解决
采用外部mq中间件, 一般mq中间件都支持集群部署、容灾处理。
这里选用rabbitmq做示例
可以使用docker 安装rabbitmq作为测试消息队列
安装
docker pull rabbitmq:management
安装带管理端的rabbitmq镜像
docker run -d --name amqp.test -p 5671:5671 -p 5672:5672 -p 4369:4369 -p 25672:25672 -p 15671:15671 -p 15672:15672 rabbitmq:management
启动一个mq实例, -p为映射容器端口到主机端口
启动成功之后,可以在浏览器中访问
http://127.0.0.1:15672
这是mq的管理客户端
连接
我们使用node作为客户端链接,
npm install amqplib
连接到服务器分3步
- 创建连接
const amqp = require("amqplib")
this.conn = amqp.connect(this.rabbitMqConfig)
- 创建channel
this.channel = this.conn.then(function(conn) {
return conn.createChannel();
}, function(e) {
logger.error("rabbitmq conn close" + e.message);
})
- 收发消息
// 发消息 sendToQueue
this._getChannel().then(function(ch) {
return ch.assertQueue(name).then(function(ok) {
ch.sendToQueue(name, Buffer.from(JSON.stringify(obj)));
});
}).catch(logger.error);
// 收消息
this._getChannel().then(function(ch) {
ch.on('close', function() {
logger.error("rabbitmq channel close, ready to reconn");
self.conn = null
self.channel = null
self.channelFlag = false
let timer = setInterval(function() {
if(self.channelFlag) {
clearInterval(timer)
logger.info("rabbitmq channel reconn success");
return;
}
self.conn = null
self.channel = null
self.popFormQueue(name)
}, 1000)
})
self.channelFlag = true;
return ch.assertQueue(name).then(function(ok) {
return ch.consume(name, function(msg) {
if (msg !== null) {
// logger.info(msg.content.toString());
try{
if(self.ruleService) {
let curMsg = JSON.parse(msg.content.toString());
if(curMsg.item){
curMsg.item.isFromMQ = true
}
self.ruleService.doInterceptor(curMsg.item)
}
}catch(e) {
logger.error("rabbitmq fail:" + msg.content.toString())
}
ch.ack(msg);
}
});
});
}, function(e) {
logger.error("rabbitmq channel create error 1" + e,message)
}).then(null, function(e) {
logger.error("rabbitmq channel create error 2" + e,message)
});
接收消息,会复用同一个channel, 需要明确处理channel的error事件, 在链接失败时, 主动尝试重连。
标题:利用外部mq中间件解决单实例的简单消息队列重启丢失的问题
作者:hugh0524
地址:https://blog.uproject.cn/articles/2019/11/23/1574523239654.html
0 0