Solo  当前访客:0 开始使用

nodeJs child_process中fork的运用


在做前端监控项目时, 用户侧消息,通过kafka侧的推送接口,推送到node服务,node服务会根据配置的规则进行存储及报警

消息发送的频次会比较高, 同时node是单线程的, 如果进行复杂的io操作, 必然会影响上游系统

解决方案:

1. 接口收到消息后, 放入内存处理队列。 简单实现如下


this.queueMap = {}

 addToQueue: function(name, obj) {
        var queue = this.queueMap[name];
        if(queue) {
            queue.push(obj)
        }else {
            this.queueMap[name] = [obj];
        }
    }

    popFormQueue: function(name) {
        var queue = this.queueMap[name];
        if(queue) {
            return queue.shift();
        }else {
            return null;
        }
    }

 

2. 编写子线程启动脚本

process.on('message', function(m){
    if(m) {
   
        var curItem = null;
        try{
          
            for(var i = 0; i< m.data.length; i++) {
                curItem = m.data[i]
                try{
                  // 执行业务处理部分  
          ruleService.doInterceptor(curItem.item, process)
                }catch(e){
                    process.send({info: "处理异常:"+e.message, data: curItem})
                }
            }
        }catch(e) {
            process.send({info: "处理异常:"+e.message})
        }

        process.send({info: '任务处理完毕, 等待执行下一个...'});
    }

});
process.send({info: '子线程已经开启,等待处理...'});

 

event: message -> 用于接受父线程发来的消息

method: send -> 用于向父线程发送消息

3. 从父线程fork一个子线程出来

var child = null
function reConnect() {
 child = child_process.fork(path.resolve('./childProcess.js'), [], {silent: true});
    child.on('message', function(m){
        console.log('message from child: ' + JSON.stringify(m));
        if(m && m.type == "notify") {
// 当接受到子线程特定消息时,执行响应的动作
            exeNotifyWeb(m.users, m.info);
        }
    });

    child.on('error', function(err) {
        console.log(err)
    })

    child.stdout.setEncoding('utf8');
    child.stdout.on('data', function(data){
        console.log(data);
    })

    child.on('exit', function(code){
        console.log("===exit=="+code)
   // 如果子线程由于异常意外退出, 则重新启动一个
       reconnect()
    })
}
reConnect(); // 初始化创建一个
   // 使用定时任务 从任务队列中取出任务,交由子线程执行
    schedule.scheduleJob('* * * * * *', function() {
        // console.log("======task====")
        var items = [];
        for(var i=0;i<10;i++){
            let curItem = popFormQueue("track_queue")
            if(curItem) {
                items.push(curItem)
            }
        }

        if(items.length > 0)
            child.send({data:items});
    })


标题:nodeJs child_process中fork的运用
作者:hugh0524
地址:https://blog.uproject.cn/articles/2019/02/26/1551161564273.html

, , , 0 0