2011年5月20日 星期五

[Javascript] Node.js CPU bound action

Node.js 是一個base在 V8 javascript engine 上的event-driven server framework, 因為目前V8的限制, 所以他目前只有single thread 在處理所有event.

所以如果我寫出以下的code, 就會造成單一request block後面的request, 這是一個echo server, 當有資料進來的時候, 會呼叫function(data), 如果我在這個function裡面作一些time consuming的動作, 就會造成其他的request delay.

var net = require("net");          
// echo server example             
var TCPServer = net.createServer(function (socket){
    // on "data" event             
    socket.on("data", function(data){
        request = data.toString('utf8');
        now = new Date().getTime();
        // it will wait for 5 seconds to block following request                                            
        while(new Date().getTime() < now + 5000) {
           // do nothing           
        }                          
        socket.write(request);     
    });                            
});                                
                                   
TCPServer.listen(7777, "127.0.0.1");


所以針對一些cpu bound的運算, 目前我查到的解法有兩種, 第一種就是普通的fork,
fork出child process去執行你要得command, 等到執行結束, 在利用callback繼續下去, 這樣就可以讓CPU bound的運算不至於影響到其他的request.
var net = require("net");         
var sys = require('sys');         
var exec = require('child_process').exec;
                                  
                                  
// echo server example            
var TCPServer = net.createServer(function (socket){              
    sys.debug('new socket coming!');
    socket.on('data', function(d) {
        sys.debug('new data coming ' + d);
        var sleep = exec('sleep 5', function(error, stdout, stderr) {                                                                                                     
            socket.write('finish job: ' + d);
        });                       
    });                           
});                               
                                  
TCPServer.listen(7777, "127.0.0.1");



Webworker, 這是node.js的某一個ext module, 他是利用unix domain socket來達到IPC (inter process communication), 他提供比較抽象的API, 透過postMessage, onmessage這種API,我下面的實做只提供message傳遞, 當worker事情做完就把結果回傳給server. postMessage 的參數需要以JSON的格式傳過去, 在postMessage的時候可以把file descriptor當第二個參數傳給worker process, 但是這部份我一直無法成功實驗出來...

Server.js
=================
var net = require("net");                                                                                                                                                 
var sys = require('sys');
var Worker = require('webworker').Worker;
var index = 0;
var workers = [];
var socks = {};
var sock_num = 0;
 
for(var i = 0; i < 8; i++) {
    workers.push(new Worker(''));
    // when some result message return from worker process
    workers[i].onmessage = function(msg) {
        var sock_index = msg.data.index;
        var result = msg.data.result;
        sys.debug('master: ' + sock_index);
        socks[parseInt(sock_index)].write(result);
    };
}
 
// echo server example
var TCPServer = net.createServer(function (socket){
    var l = sock_num;
 
    sock_num++;
    socks[l] = socket;
    // when client send data
    socket.on('data', function(d) {
        var data = d.toString('utf8').trim();
 
        sys.debug(index + ' worker data: ' + data);
        sys.debug('socks: ' + l);
        workers[index].postMessage({'text': data, 'index': index, sock_index: l});
        index++;
    });
    // when client close connection
    socket.on('close', function() {
        sys.debug('delete socks: ' + l);
        delete socks[l];
    });
});
 
TCPServer.listen(7777, "127.0.0.1");


Worker.js
=================
var http = require('http');    
var net = require('net');      
var sys = require('sys');      
    
function sleep() {
    var now = new Date().getTime();
    // it will wait for 5 seconds to block following request
    while(new Date().getTime() < now + 5000) {
        ;
    }
};  
    
onmessage = function(msg) {    
    sys.debug('worker index: ' + msg.data.index);
    sys.debug('worker sock_index: ' + msg.data.sock_index);
    sys.debug('worker text: ' + msg.data.text);
    
    var result_text = msg.data.text + ' processed\n';
                                                                                                                                                                          
    // time consuming operation
    sleep();
    sys.debug('worker after sleep');
    postMessage({index: msg.data.sock_index, result: result_text});
};  
    
onclose = function() {
    sys.debug('Worker shuttting down.');
};  


下面這是後來gibson試出來的用法~
master.js

=====================
var http = require('http');    
var sys = require('sys');                                                                                                                                                  
var path = require('path');
var net = require('net');
var Worker = require('webworker').Worker;
var workers = [];
var wid=0;
 
for (var i = 0; i < 8; i++) {
    workers[i] = new Worker(path.join(__dirname, 'worker.js'));
}
 
net.createServer(function(socket){      
    socket.pause();
    wid = (++wid) % 8;
    sys.debug('pass to worker '+ wid + ' fd:' + socket.fd);
    workers[wid].postMessage({'wid':wid}, socket.fd);
}).listen(8080);

worker.js
=====================
var http = require('http');    
var sys = require('sys');                                                                                                                                                  
var net = require('net');
 
var srv = net.createServer(function(socket){
    socket.on('data', function(data){
        buf = data.toString('utf8');
        socket.write('['+process.pid+'] received: ' + buf); 
    });
});
 
onmessage = function(msg) {
    sys.debug('worker received msg.data.wid: ' + msg.data.wid);
    sys.debug('worker received msg.fd: ' + msg.fd);
    var socket = net.Stream(msg.fd);
    socket.type = srv.type;
    socket.server = srv;
    socket.resume();
    srv.emit('connection', socket);
};

Reference: http://blog.mixu.net/2011/02/01/understanding-the-node-js-event-loop/ http://blog.std.in/2010/07/08/nodejs-webworker-design/ http://developer.yahoo.com/blogs/ydn/posts/2010/07/multicore_http_server_with_nodejs/ http://nodejs.org/docs/v0.4.7/api/child_processes.html