hugh 的个人博客

Everyday is a new day

利用外部mq中间件解决单实例的简单消息队列重启丢失的问题

1. 解决node中耗时操作对请求的影响

通过fork方法创建一个子进制,进行操作
详情

问题

一旦单个实例被异常重启, 会导致数据丢失

解决

采用外部mq中间件, 一般mq中间件都支持集群部署、容灾处理。
这里选用rabbitmq做示例

可以使用docker 安装rabbitmq作为测试消息队列

安装

docker pull rabbitmq:management

安装带管理端的rabbitmq镜像

docker run -d --name amqp.test  -p 5671:5671 -p 5672:5672 -p 4369:4369 -p 25672:25672 -p 15671:15671 -p 15672:15672 rabbitmq:management

启动一个mq实例, -p为映射容器端口到主机端口
启动成功之后,可以在浏览器中访问

http://127.0.0.1:15672

这是mq的管理客户端

连接

我们使用node作为客户端链接,

npm install  amqplib

连接到服务器分3步

  • 创建连接
const amqp = require("amqplib")
this.conn = amqp.connect(this.rabbitMqConfig)
  • 创建channel
 this.channel = this.conn.then(function(conn) {
               return conn.createChannel();
           }, function(e) {
               logger.error("rabbitmq conn close" + e.message);
           })
  • 收发消息
// 发消息 sendToQueue
 this._getChannel().then(function(ch) {
            return ch.assertQueue(name).then(function(ok) {
                ch.sendToQueue(name, Buffer.from(JSON.stringify(obj)));
            });
        }).catch(logger.error);
// 收消息
this._getChannel().then(function(ch) {
            ch.on('close', function() {
                logger.error("rabbitmq channel close, ready to reconn");
                self.conn = null
                self.channel = null
                self.channelFlag = false
                let timer = setInterval(function() {
                    if(self.channelFlag) {
                        clearInterval(timer)
                        logger.info("rabbitmq channel reconn success");
                        return;
                    }
                    self.conn = null
                    self.channel = null
                    self.popFormQueue(name)
                }, 1000)
            })
            self.channelFlag = true;
            return ch.assertQueue(name).then(function(ok) {
                return ch.consume(name, function(msg) {
                    if (msg !== null) {
                        // logger.info(msg.content.toString());
                        try{
                            if(self.ruleService) {
                                let curMsg = JSON.parse(msg.content.toString());
                                if(curMsg.item){
                                    curMsg.item.isFromMQ = true
                                } 
                                self.ruleService.doInterceptor(curMsg.item)
                            }
                        }catch(e) {
                            logger.error("rabbitmq fail:" + msg.content.toString())
                        }
                        ch.ack(msg);
                    }
                });
            });
        }, function(e) {
            logger.error("rabbitmq channel create error 1" + e,message)
        }).then(null, function(e) {
            logger.error("rabbitmq channel create error 2" + e,message)
        });

接收消息,会复用同一个channel, 需要明确处理channel的error事件, 在链接失败时, 主动尝试重连。


标题:利用外部mq中间件解决单实例的简单消息队列重启丢失的问题
作者:hugh0524
地址:https://blog.uproject.cn/articles/2019/11/23/1574523239654.html