nodeJs child_process中fork的运用
在做前端监控项目时, 用户侧消息,通过kafka侧的推送接口,推送到node服务,node服务会根据配置的规则进行存储及报警
消息发送的频次会比较高, 同时node是单线程的, 如果进行复杂的io操作, 必然会影响上游系统
解决方案:
1. 接口收到消息后, 放入内存处理队列。 简单实现如下
this.queueMap = {}
addToQueue: function(name, obj) {
var queue = this.queueMap[name];
if(queue) {
queue.push(obj)
}else {
this.queueMap[name] = [obj];
}
}
popFormQueue: function(name) {
var queue = this.queueMap[name];
if(queue) {
return queue.shift();
}else {
return null;
}
}
2. 编写子线程启动脚本
process.on('message', function(m){
if(m) {
var curItem = null;
try{
for(var i = 0; i< m.data.length; i++) {
curItem = m.data[i]
try{
// 执行业务处理部分
ruleService.doInterceptor(curItem.item, process)
}catch(e){
process.send({info: "处理异常:"+e.message, data: curItem})
}
}
}catch(e) {
process.send({info: "处理异常:"+e.message})
}
process.send({info: '任务处理完毕, 等待执行下一个...'});
}
});
process.send({info: '子线程已经开启,等待处理...'});
event: message -> 用于接受父线程发来的消息
method: send -> 用于向父线程发送消息
3. 从父线程fork一个子线程出来
var child = null
function reConnect() {
child = child_process.fork(path.resolve('./childProcess.js'), [], {silent: true});
child.on('message', function(m){
console.log('message from child: ' + JSON.stringify(m));
if(m && m.type == "notify") {
// 当接受到子线程特定消息时,执行响应的动作
exeNotifyWeb(m.users, m.info);
}
});
child.on('error', function(err) {
console.log(err)
})
child.stdout.setEncoding('utf8');
child.stdout.on('data', function(data){
console.log(data);
})
child.on('exit', function(code){
console.log("===exit=="+code)
// 如果子线程由于异常意外退出, 则重新启动一个
reconnect()
})
}
reConnect(); // 初始化创建一个
// 使用定时任务 从任务队列中取出任务,交由子线程执行
schedule.scheduleJob('* * * * * *', function() {
// console.log("======task====")
var items = [];
for(var i=0;i<10;i++){
let curItem = popFormQueue("track_queue")
if(curItem) {
items.push(curItem)
}
}
if(items.length > 0)
child.send({data:items});
})
标题:nodeJs child_process中fork的运用
作者:hugh0524
地址:https://blog.uproject.cn/articles/2019/02/26/1551161564273.html
0 0