hugh 的个人博客

mapreduce使用的坑 - mac reduce对数据集进行汇总计算时,出现数据样本不全的问题

1. mapreduce

复习下 map reduce, 经典图例如下

2. 问题案例

需要对性能数据分数段进行统计平均耗时,使用 mapreduce 如下

async getPerformanceAvgByHour(query) {
	let model = this.trackingTmplDao.getModel()
	query.trackingtype = "performance";
	var o = {}
	o.map = function () {
		var d = new Date(this.localts)
		d.setMinutes(0)
		d.setMilliseconds(0)
		d.setSeconds(0)
		emit(d.getTime(), this.parameters)
	};
	o.reduce = function (k, vals) {
		var len = vals.length;
		function financial(x) {
		return Number.parseFloat(x).toFixed(2);
	}
	var sum = vals.reduce((cur,item) => {
		return {dns: (item.dns >> 0) + cur.dns,
		tcp:(item.tcp>> 0) + cur.tcp,
		ssl:(item.ssl>> 0) + cur.ssl,
		ttfb:(item.ttfb>> 0) + cur.ttfb,
		trans:(item.trans>> 0) + cur.trans,
		dom:(item.dom>> 0) + cur.dom,
		res:(item.res>> 0) + cur.res,
		firstbyte:(item.firstbyte>> 0) + cur.firstbyte,
		fpt:(item.firstbyte>> 0) + cur.firstbyte,
		tti:(item.tti>> 0) + cur.tti,
		ready:(item.ready>> 0) + cur.ready,
		load:(item.load>> 0) + cur.load}
		}, {dns: 0, tcp:0, ssl:0, ttfb:0, trans:0, dom:0, res:0,firstbyte:0,fpt:0,tti:0,ready:0, load:0})
		// return sum
		var sumTotal = {total: len, dns: financial(sum.dns/len), tcp:financial(sum.tcp/len), ssl:financial(sum.ssl/len), ttfb:financial(sum.ttfb/len), trans:financial(sum.trans/len), dom:financial(sum.dom/len),
		res:financial(sum.res/len),firstbyte:financial(sum.firstbyte/len),fpt:financial(sum.fpt/len),tti:financial(sum.tti/len),ready:financial(sum.ready/len), load:financial(sum.load/len)}
		var sumTotal = sum
		sumTotal.total = len
		return sumTotal
	};
	
	
	    o.out = { replace: 'sumEachHour_performance_ForResults' }
	    o.verbose = true;
	    o.query = query
	    
	
	    let res = await model.mapReduce(o)
	    let r = await res.model.find()
	    return r
}

总样本数:433, 实际计算结果集只有 73 条

3. 原因

  • MongoDB can invoke the reduce function more than once for the same key. In this case, the previous output from the reduce function for that key will become one of the input values to the next reduce function invocation for that key.

reduce fun 会被分批次调用多次,如果在方法内进行汇总, 后面调用的得到的结果会覆盖前面的

4. 解决方案

利用 scope(在 map\reduce\finalize 方法作用域中可访问)暂存数据

利用 finalize 在最终一步,对 scope 存储的数据进行汇总计算

async getPerformanceAvgByHour(query) {
	let model = this.trackingTmplDao.getModel()
	query.trackingtype = "performance";
	var o = {}
	o.map = function () {
		var d = new Date(this.localts)
		d.setMinutes(0)
		d.setMilliseconds(0)
		d.setSeconds(0)
		emit(d.getTime(), this.parameters)
	};
	o.reduce = function (k, vals) {
	var len = vals.length;
	function financial(x) {
	return Number.parseFloat(x).toFixed(2);
	}
	var sum = vals.reduce((cur,item) => {
	return {dns: (item.dns >> 0) + cur.dns,
	tcp:(item.tcp>> 0) + cur.tcp,
	ssl:(item.ssl>> 0) + cur.ssl,
	ttfb:(item.ttfb>> 0) + cur.ttfb,
	trans:(item.trans>> 0) + cur.trans,
	dom:(item.dom>> 0) + cur.dom,
	res:(item.res>> 0) + cur.res,
	firstbyte:(item.firstbyte>> 0) + cur.firstbyte,
	fpt:(item.firstbyte>> 0) + cur.firstbyte,
	tti:(item.tti>> 0) + cur.tti,
	ready:(item.ready>> 0) + cur.ready,
	load:(item.load>> 0) + cur.load}
	}, {dns: 0, tcp:0, ssl:0, ttfb:0, trans:0, dom:0, res:0,firstbyte:0,fpt:0,tti:0,ready:0, load:0})
	
	
	        var sumTotal = sum
	        sumTotal.total = len
	
	        if(result[k]) {
	            result[k].push(sumTotal)
	        }else {
	            result[k] = [sumTotal]
	        }
	
	        return sumTotal
	    };
	
	    o.out = { replace: 'sumEachHour_performance_ForResults' }
	    o.verbose = true;
	    o.query = query
	    o.scope = {result: {}}
	    o.finalize = function(key, rval) {
	        function financial(x) {
	            return Number.parseFloat(x).toFixed(2);
	        }
	        var sum = result[key].reduce((cur,item) => {
	           return {dns: (item.dns >> 0) + cur.dns,
	                tcp:(item.tcp>> 0) + cur.tcp,
	                ssl:(item.ssl>> 0) + cur.ssl,
	                ttfb:(item.ttfb>> 0) + cur.ttfb,
	                trans:(item.trans>> 0) + cur.trans,
	                dom:(item.dom>> 0) + cur.dom,
	                res:(item.res>> 0) + cur.res,
	                firstbyte:(item.firstbyte>> 0) + cur.firstbyte,
	                fpt:(item.firstbyte>> 0) + cur.firstbyte,
	                tti:(item.tti>> 0) + cur.tti,
	                ready:(item.ready>> 0) + cur.ready,
	                load:(item.load>> 0) + cur.load,
	                total: item.total + cur.total
	            }
	        }, {total:0, dns: 0, tcp:0, ssl:0, ttfb:0, trans:0, dom:0, res:0,firstbyte:0,fpt:0,tti:0,ready:0, load:0})
	        var len = sum.total;
	        var sumTotal = {total: len, dns: financial(sum.dns/len), tcp:financial(sum.tcp/len), ssl:financial(sum.ssl/len), ttfb:financial(sum.ttfb/len), trans:financial(sum.trans/len), dom:financial(sum.dom/len),
	               res:financial(sum.res/len),firstbyte:financial(sum.firstbyte/len),fpt:financial(sum.fpt/len),tti:financial(sum.tti/len),ready:financial(sum.ready/len), load:financial(sum.load/len)}
	
	        return sumTotal
	    }
	
	    let res = await model.mapReduce(o)
	    let r = await res.model.find()
	    logger.info(JSON.stringify(r))
	    return r
}

参考: https://docs.mongodb.com/manual/reference/command/mapReduce/#mapreduce-reduce-cmd


标题:mapreduce使用的坑 - mac reduce对数据集进行汇总计算时,出现数据样本不全的问题
作者:hugh0524
地址:https://blog.uproject.cn/articles/2019/06/13/1560396099637.html