2011年12月27日 星期二

[C++] template class instantiation

驗證一下, 在template class當中, 如果有method不依靠template class type T, 這樣還是會產生出兩份不一樣的code, 因為template本身在instantiation的時候就會被當作兩個不同的class.


#include <iostream>
template
class Foo
{
public:
T goo(T a) { return a + a; };
int foo(int a) { return a;};
};

int main()
{
Foo a;
Foo b;

a.goo(12);
b.goo(1.2);
a.foo(1);
b.foo(1);
return 0;
}

可以看到main裡面, _ZN3FooIiE3fooEi, _ZN3FooIdE3fooEi 是被call到兩個不同的function, 就算裡面code是完全一樣的


.file "template_gen.cpp"
.local _ZStL8__ioinit
.comm _ZStL8__ioinit,1,1
.text
.globl main
.type main, @function
main:
.LFB959:
.cfi_startproc
.cfi_personality 0x3,__gxx_personality_v0
pushq %rbp
.cfi_def_cfa_offset 16
movq %rsp, %rbp
.cfi_offset 6, -16
.cfi_def_cfa_register 6
subq $16, %rsp
leaq -1(%rbp), %rax
movl $12, %esi
movq %rax, %rdi
call _ZN3FooIiE3gooEi
movsd .LC0(%rip), %xmm0
leaq -2(%rbp), %rax
movq %rax, %rdi
call _ZN3FooIdE3gooEd
leaq -1(%rbp), %rax
movl $1, %esi
movq %rax, %rdi
call _ZN3FooIiE3fooEi
leaq -2(%rbp), %rax
movl $1, %esi
movq %rax, %rdi
call _ZN3FooIdE3fooEi
movl $0, %eax
leave
ret
.cfi_endproc

...略

_ZN3FooIiE3fooEi:
.LFB962:
.cfi_startproc
.cfi_personality 0x3,__gxx_personality_v0
pushq %rbp
.cfi_def_cfa_offset 16
movq %rsp, %rbp
.cfi_offset 6, -16
.cfi_def_cfa_register 6
movq %rdi, -8(%rbp)
movl %esi, -12(%rbp)
movl -12(%rbp), %eax
leave
ret
.cfi_endproc
.LFE962:
.size _ZN3FooIiE3fooEi, .-_ZN3FooIiE3fooEi
.section .text._ZN3FooIdE3fooEi,"axG",@progbits,_ZN3FooIdE3fooEi,comdat
.align 2
.weak _ZN3FooIdE3fooEi
.type _ZN3FooIdE3fooEi, @function
_ZN3FooIdE3fooEi:
.LFB963:
.cfi_startproc
.cfi_personality 0x3,__gxx_personality_v0
pushq %rbp
.cfi_def_cfa_offset 16
movq %rsp, %rbp
.cfi_offset 6, -16
.cfi_def_cfa_register 6
movq %rdi, -8(%rbp)
movl %esi, -12(%rbp)
movl -12(%rbp), %eax
leave
ret
.cfi_endproc

...略

2011年12月15日 星期四

[Linux] process use multi-core CPU



/*
* The following code use sched_setaffinity to set CPU mask
* it will fork process and each process will run on only one core
*
* you can read from /proc/cpuinfo to get the number of processor
*/
int setup_multicore(int n)
{
cpu_set_t *cpuset = CPU_ALLOC(n);
size_t size = CPU_ALLOC_SIZE(n);
CPU_ZERO_S(size, cpuset);
CPU_SET_S(0, size, cpuset);
if (sched_setaffinity(getpid(), size, cpuset) < 0) {
CPU_FREE(cpuset);
return -1;
}
pid_t pid = 0;
// fork a child for each core
for (int i = 1; i < n; ++i) {
pid = fork();
if (pid 0)
continue;
CPU_ZERO_S(size, cpuset);
CPU_SET_S(i, size, cpuset);
if (sched_setaffinity(getpid(), size, cpuset) < 0) {
CPU_FREE(cpuset);
return -1;
}
break;
}
CPU_FREE(cpuset);
}

2011年12月7日 星期三

[Linux] Signal should not be block

The following signal should be deliverd on the thread that generated the original error. Blocking them interferes with proper recovery


sigdelset(sig_mask, SIGABRT);
sigdelset(sig_mask, SIGBUS);
sigdelset(sig_mask, SIGEMT);
sigdelset(sig_mask, SIGFPE);
sigdelset(sig_mask, SIGILL);
sigdelset(sig_mask, SIGIOT);
sigdelset(sig_mask, SIGPIPE);
sigdelset(sig_mask, SIGSEGV);
sigdelset(sig_mask, SIGSYS);
sigdelset(sig_mask, SIGTRAP);

待續...

2011年12月1日 星期四

[Pthread] signal handler on other thread

在multithread程式裡面, 不管你是由child thread還是main thread 註冊signal handler, 在signal handler都是在main thread處理.
可以從下面的範例看出


#include
#include
#include
#include

static void hdl (int sig, siginfo_t *siginfo, void *context)
{
pthread_t ret = pthread_self();
sleep(2);
printf ("Sending PID: %ld, UID: %ld, self=%u\n",
(long)siginfo->si_pid, (long)siginfo->si_uid, pthread_self());
}

void *child1_fun(void *data)
{
struct sigaction act;

memset (&act, '\0', sizeof(act));
printf("child1 pid=%u\n", pthread_self());
/* Use the sa_sigaction field because the handles has two additional parameters */
act.sa_sigaction = &hdl;
/* The SA_SIGINFO flag tells sigaction() to use the sa_sigaction field, not sa_handler. */
act.sa_flags = SA_SIGINFO;
if (sigaction(SIGALRM, &act, NULL) < 0) {
perror ("sigaction");
return 1;
}
pause();
}

void *child2_fun(void *data)
{
pthread_t *child1 = (pthread_t *) data;
printf("child2 pid=%u\n", pthread_self());
//alarm(2);
struct itimerval timer;

timer.it_value.tv_sec = 5 ;
timer.it_value.tv_usec = 0;
timer.it_interval.tv_sec = 5;
timer.it_interval.tv_usec = 0;
setitimer(ITIMER_REAL, &timer, NULL);
// pthread_kill(*child1, SIGALRM);
}
int main (int argc, char *argv[])
{
pthread_t child1, child2;
sigset_t newSet;
sigset_t oset;
sigemptyset(&newSet);
sigaddset(&newSet, SIGALRM);
pthread_sigmask(SIG_BLOCK, &newSet, NULL);

printf("main pid=%u\n", pthread_self());
pthread_create(&child1, NULL, child1_fun, NULL);
pthread_create(&child2, NULL, child2_fun, &child1);
pthread_join(child1, NULL);

return 0;
}

可以看到都是main thread在處理signal.
Output:


ytshen@ytshen-ThinkCentre-A58:~/temp$ ./a.out
main pid=256317184
child1 pid=248338176
child2 pid=239945472
Sending PID: 0, UID: 0, self=256317184
Sending PID: 0, UID: 0, self=256317184
^C
ytshen@ytshen-ThinkCentre-A58:~/temp$

當我們希望是尤其他的child thread來處理signal, 我們就必須要改成這樣, 要先在main thread用sigmask 把不想接得signal設成SIG_BLOCK (非常重要! 不然當signal發生他會當你沒有註冊任何signal handler, 造成program stop), 在child thread 呼叫sigwait的時候, 他就會wait你想要wait的signal


#include
#include
#include
#include
#include
#include

void *child1_fun(void *data)
{
sigset_t newSet;
sigemptyset(&newSet);
sigaddset(&newSet, SIGALRM);
int signum;
while(1) {
printf("child1 pid=%u start to wait\n", pthread_self());
sigwait(&newSet, &signum);
printf("child1 pid=%u get signal = %d!\n", pthread_self(), signum);
}
}

void *child2_fun(void *data)
{
pthread_t *child1 = (pthread_t *) data;
printf("child2 pid=%u\n", pthread_self());
//alarm(2);
struct itimerval timer;

timer.it_value.tv_sec = 5 ;
timer.it_value.tv_usec = 0;
timer.it_interval.tv_sec = 5;
timer.it_interval.tv_usec = 0;
setitimer(ITIMER_REAL, &timer, NULL);
// pthread_kill(*child1, SIGALRM);
}

int main (int argc, char *argv[])
{
pthread_t child1, child2;
sigset_t newSet;
sigset_t oset;
sigemptyset(&newSet);
sigaddset(&newSet, SIGALRM);
pthread_sigmask(SIG_BLOCK, &newSet, NULL);

printf("main pid=%u\n", pthread_self());
pthread_create(&child1, NULL, child1_fun, NULL);
pthread_create(&child2, NULL, child2_fun, &child1);
pthread_join(child1, NULL);

}

Output:


ytshen@ytshen-ThinkCentre-A58:~/temp$ ./a.out
main pid=1378113280
child1 pid=1370134272 start to wait
child2 pid=1361741568
child1 pid=1370134272 get signal = 14!
child1 pid=1370134272 start to wait
child1 pid=1370134272 get signal = 14!
child1 pid=1370134272 start to wait
^C
ytshen@ytshen-ThinkCentre-A58:~/temp$

Updated!!
保險作法是在main thread create其他所有thread之前, sigprocmask 把該signal block住, 在create child thread(這樣所有child thread也會繼承main thread signal mask), 然後在要處理signal的child thread去sigwait, 這樣就不會有其他沒有block該signal的child thread因為沒有處理而被terminate!

Reference:
http://www.cognitus.net/html/howto/pthreadSemiFAQ_8.html
http://stackoverflow.com/questions/5282099/signal-handling-in-pthreads
http://blog.csdn.net/wozaiwogu/article/details/4361456
http://blog.csdn.net/fytzzh/article/details/660457

2011年11月9日 星期三

[Python] property decorator

正常用法是http://docs.python.org/library/functions.html#property

以下是鬼打牆用法...

應該把Dgram裡面的started改名字才對~

似乎在舊得python可以work


class Base(object):
a = 1
def __init__(self, listener, handle=None, backlog=None, spawn='default'):
self._xxx = 'xxx'
# the faulure is because of started name is used by derived object Dram
self.started = None

class Dgram(Base):
b = 2
def __init__(self, listener, handle=None, backlog=None, spawn='default', **args):
Base.__init__(self, listener, handle=handle, backlog=backlog, spawn=spawn)
self.delay = 123

@property
def started(self):
return self._recv_event is not None or self._start_receving_timer is not None

def echo(msg, addr):
print 'echo ' + msg

if __name__ == '__main__':
s = Dgram('123', echo)

ytshen@ytshen-ThinkCentre-A58:~/temp$ python test_obj.py
Traceback (most recent call last):
File "test_obj.py", line 23, in
s = Dgram('123', echo)
File "test_obj.py", line 12, in __init__
Base.__init__(self, listener, handle=handle, backlog=backlog, spawn=spawn)
File "test_obj.py", line 6, in __init__
self.started = None
AttributeError: can't set attribute

2011年11月2日 星期三

[RabbitMQ] priority queue

目前RabbitMQ沒有直接支援priority queue的功能, 所以目前只能work around, 可以在client動點手腳

建立logical queue, 並根據priority的數量, 來建立physical queue (ex: 如果只有high & low priority, 那就建立兩個queue)

方法一
client polling, 按照priority從相對應的physical queue取出message

方法二
client subsribe queue, 並且限制channel prefetch counts, 防止subscription flood, 並且在local建立in memory priority queue, 直到message被處理, 在送ack給RabbitMQ server 

Reference
http://dougbarth.github.com/2011/07/01/approximating-priority-with-rabbitmq.html 
http://dougbarth.github.com/2011/06/10/keeping-the-rabbit-on-a-leash.html 

2011年10月31日 星期一

[C++] wrapper C++ method in C (inline)

先說結論, 這應該是不可行的, 等於你想要直接把C++ embed在C裡面. 通常inline是要寫在header file, 讓使用者直接include.

在這情形就等於你要讓C program直接include C++ library, 所以無法達成.

那如果寫在.cpp裡面, 編成.o, 再跟C program 一起compile.


// this is foo.cpp
#ifdef __cplusplus
extern "C" {
#endif

#include "foo.h"

int foo()
{
int i = 0;

return i++;
}

#ifdef __cplusplus
}
#endif

// this is foo.h
inline int foo();

接下來main.c


#include
#include "foo.h"

int main()
{
int t = foo();

printf("%d\n", t);
return 0;
}

但是在compile main.o foo.o會找不到foo這個symbol, 利用strings也找不到, 這是因為inline寫在foo.cpp, compile展開完之後, foo因為被宣告程inline, 所以就沒有出現在foo.o裡面了.

因此, 在C裡面要wrap C++ library並且inline 應該是不可行的, 不過可以使用wrapper function回傳function pointer的方式作到~

2011年10月24日 星期一

[C++] pthread_cancel issue

今天遇到一個很神奇的情況, 發現program在call pthread_cancel的時候居然abort, 下面是簡化過後的code, create一個thread, 並且在之後把他cancel.


#include

#include

#include
#include
using namespace std;

void* sleepyThread(void*)
{
try
{
cerr << "enter sleep" << endl;
sleep(20);
}
catch(...)
{
cerr <<"catch all";
}
}

int main()
{
pthread_t thread;
int id=pthread_create(&thread, NULL, &sleepyThread, NULL);

cerr<<"lets try to cancel it..."<< id << endl;
sleep(1);
pthread_cancel(thread);
pthread_join(thread, NULL);
}

經過實驗, 發現sleepyThread不catch all exception或是不呼叫pthreadjoin就可以正常運作, 不然一旦child thread有catch all就會產生abort,
從http://stackoverflow.com/questions/4766768/unhandled-forced-unwind-causes-abort 得知,
在呼叫pthread 
cancel的時候會產生unwind exception, 這時一定要re-throw, 不然會有問題.
基本上在call pthread
cancel他是async的方式, 只是把thread state設成cancel, 之後就會等待. 所以之前實驗把pthreadjoin拿掉不會abort, 只是因為他還沒走到cancellation point.

來看一下當發生abort時的backtrace, 從gdb上面看到他最後呼叫了unwind_cleanup


(gdb) r
Starting program: /home/ytshen/a.out
[Thread debugging using libthread_db enabled]
[New Thread 0x7ffff709c700 (LWP 17360)]
lets try to cancel it...0
enter sleep
helloFATAL: exception not rethrown

Program received signal SIGABRT, Aborted.
[Switching to Thread 0x7ffff709c700 (LWP 17360)]
0x00007ffff70d0ba5 in raise () from /lib/libc.so.6
(gdb) bt
#0 0x00007ffff70d0ba5 in raise () from /lib/libc.so.6
#1 0x00007ffff70d46b0 in abort () from /lib/libc.so.6
#2 0x00007ffff7bcd311 in unwind_cleanup () from /lib/libpthread.so.0
#3 0x0000000000400b81 in sleepyThread(void*) ()
#4 0x00007ffff7bc6971 in start_thread () from /lib/libpthread.so.0
#5 0x00007ffff718392d in clone () from /lib/libc.so.6
#6 0x0000000000000000 in ?? ()

所以接下來就直接去看pthread code, 他有註冊unwindcleanup, 當exception被catch的話會被call,
如果看pthread
cancel這個function可以知道底層他其實找出thread id並且送給他signal. (From http://sourceware.org/git/?p=glibc.git;a=blobplain;f=nptl/pthread_cancel.c;hb=HEAD)

(pthread是pure C, 但是如果是用C++, 底層會用signal handler並且在裡面throw exception, 這應該是為了C++必須符合離開scope要把local variable都給destroy, 所以採用exception方式, 可以從下面的pthread_create function得知, 他把parent signal mask reset)


/* If the parent was running cancellation handlers while creating
264 the thread the new thread inherited the signal mask. Reset the
265 cancellation signal mask. */
if (__builtin_expect (pd->parent_cancelhandling & CANCELING_BITMASK, 0))
...
__sigemptyset (&mask);
__sigaddset (&mask, SIGCANCEL);

Ans: 

http://kolpackov.net/projects/glibc/cxx-unwind/

http://groups.google.com/group/comp.programming.threads/browse_thread/thread/652bcf186fbbf697/f63757846514e5e5?pli=1

 

從以下code, 可以發現當exception被抓住, 就會呼叫abort,


// From http://sourceware.org/git/?p=glibc.git;a=blob;f=nptl/unwind.c
103 unwind_cleanup (_Unwind_Reason_Code reason, struct _Unwind_Exception *exc)
104 {
105 /* When we get here a C++ catch block didn't rethrow the object. We
106 cannot handle this case and therefore abort. */
107 # define STR_N_LEN(str) str, strlen (str)
108 INTERNAL_SYSCALL_DECL (err);
109 INTERNAL_SYSCALL (write, err, 3, STDERR_FILENO,
110 STR_N_LEN ("FATAL: exception not rethrown\n"));
111 abort ();
112 }
...
117 void
118 __cleanup_fct_attribute __attribute ((noreturn))
119 __pthread_unwind (__pthread_unwind_buf_t *buf)
120 {
121 struct pthread_unwind_buf *ibuf = (struct pthread_unwind_buf *) buf;
122 struct pthread *self = THREAD_SELF;
123
124 #ifdef HAVE_FORCED_UNWIND
125 /* This is not a catchable exception, so don't provide any details about
126 the exception type. We do need to initialize the field though. */
127 THREAD_SETMEM (self, exc.exception_class, 0);
128 THREAD_SETMEM (self, exc.exception_cleanup, unwind_cleanup);
129
130 _Unwind_ForcedUnwind (&self->exc, unwind_stop, ibuf);
131 #else

結論:
寫C++ code的時候, 如果有catch all exception, 最後還是要re-throw 或是根本不要在C++裡面cancel thread, 不然在multi-thread的情況下, 有可能有未預期的情況出現!!

問題:
為什饃是在pthreadjoin才會造成abort, 從code來看, 應該在pthreadcancel就會trigger catch all exception, 怎麼會在pthreadjoin才發生... 
我在pthread_
join上面沒有看到類似的code, 不過大家都不推薦在C++裡面cancel thread!

Reference:

  • http://stackoverflow.com/questions/4766768/unhandled-forced-unwind-causes-abort
  • http://stackoverflow.com/questions/4760687/cancelling-a-thread-using-pthread-cancel-good-practice-or-bad
  • http://udrepper.livejournal.com/21541.html
  • http://sourceware.org/git/?p=glibc.git;a=blob;f=nptl/pthread_cancel.c;h=55bb0da922ba1ed1c4bd33478075e1b41f2baaff;hb=3a33e487eeb65e2f1f633581c56bee2c60d0ca43
  • http://skaark.wordpress.com/2010/08/26/pthread_cancel-considered-harmful/

2011年10月16日 星期日

[RabbitMQ] rabbit_mnesia module

RabbitMQ所有的meta data (ex: exchange, user, virtual host)也就是server state 都是存在erlang內建的mnesia distribute database, 並且會存在cluster內部每個node, 這個module主要就是node啟動的時候, 初始化mnesia 這個 database的動作, 包含如果是已經存在其他cluster node, 那就會去跟其他node 同步, 不然就會自己建立新的table. 

並且他會檢查erlang version, 是不是一樣, 還有當node reset, 在ram mode and disc mode之間轉換要做的事情.

2011年10月15日 星期六

[RabbitMQ] HA on cluster

原來RabbitMQ在使用cluster的時候, 所有的meta都會在所有node各存一份, 除了queue以外, 所以當其中某個node掛掉的時候, 在這個node當中queue裡面的message會全部遺失,

自RabbitMQ 從2.6之後開始support active-active HA, 使用的方法也很簡單, 只需要在declare queue的時候多加"x-ha-policy" 這個argument就可以了.

他是用mirror queue的方式來達到HA的目的, 也就是說, 你可以選擇要把某個queue mirror到哪些其他的nodes.

另外他mirror的方式, 是把master node mirror到其他slave node, 而且要從slave node加進來之後, 他才會開始sync, 之前已經存在master的msg並不會sync.

如果master 掛掉, 他會選擇存在最久的slave node來當作下一個master node.

以目前的2.6.1, "x-ha-policy" 支援兩種方式:

1. all

  把目前的queue之後的message都mirror到cluster裡面其他的所有nodes!

2. nodes

  可以自己選擇要mirror到哪些node, 就算是目前不存在cluster的node也行, mirror會等到那個node加進cluster之後才開始mirror,
  這個需要多加一個參數"x-ha-policy-params", 並把node list 當成value
  !!這個目前有問題, 需要等到下一個release才會修正!!

以下是用pika這個python amqp client API的例子:

    channel.exchange_declare(exchange='test.topic', type='topic')
channel.queue_declare(queue="test_topic", durable=True,
exclusive=False, auto_delete=False,
arguments={'x-ha-policy': 'all'},
callback=on_queue_declared)

Reference:
http://www.rabbitmq.com/ha.html

2011年9月28日 星期三

[Linux] Fork & exec

Fork 會把除了process id 之外, 整個program 都複製一份給child process (先不管copy on write), 在child process 執行exec之後, 原本program的資料就會lost, 全部取代成新的program, 但是fd不會自動close, 除非你有設定close-on-exec, 不然要不你需要手動close, 或是用來當作跟parent process溝通的管道.

p.s 當child process在處理NFS之類的fd, 要關掉fd, 有時會需要很長得時間才關得掉 (NFS server unreachable or 你改了ethernet address), 似乎這時候process會在D (uninterruptable) 這個state 停留非常久的時間...簡單來說, 在處理NFS這種網路的fd, 需要注意以免被watchdog

Reference
http://en.wikipedia.org/wiki/Fork-exec 
http://stackoverflow.com/questions/1643304/how-to-set-close-on-exec-by-default 

2011年9月27日 星期二

[XML] The difference between value-of and copy-of

差別:

  • xsl:value-of returns all the TEXT within the selected tag(s).
  • xsl:copy-of returns all the ELEMENTS (both tags and text) of the selected tag(s).

一言以蔽之, value-of 只會回傳你選擇node裡面的text, 但是copy-of 卻是會把整個node structure都回傳回來.

例子:

<Name>
 <Family>Smith</Family>
 <Given>John</Given>
</Name>

XSLT:
<xsl:value-of select="Name"/> 

<!-- 輸出element Name下面的所有text -->

Output:
Smith
John

 

XSLT:

<xsl:value-of select="Name"/> 

輸出Name底下的所有element (包含tag)

Output:

<Name>

 <Family>Smith</Family>

 <Given>John</Given>

</Name>

 

XSLT:
<xsl:value-of select="Name/node()"/> 

輸出Name底下子節點的所有element (包含tag), 不包含自己

Output:

 <Family>Smith</Family>

 <Given>John</Given>

 

XSLT:

<xsl:value-of select="Name/text()"/> 
 輸出Name底下的text, 但是這邊沒有, 所以沒有輸出

Output:

Reference:
http://dev.ektron.com/blogs.aspx?id=10472

2011年9月26日 星期一

[C++] Constructor call other constructor


#include

class Foo
{
public:
Foo() { Foo(1); };
Foo(int a) { m = a; std::cout <<"Foo " << a << std::endl; }

private:
int m;
};

int main()
{
Foo a, b(2);
return 0;
}

It is ok to call other constructor on method body, but calling other constructor on Initialize list is not allowed!
The following example is not allowed.


#include

class Foo
{
public:
// this will cause compile error!!
Foo() : this(1) { };
Foo(int a) { m = a; std::cout <<"Foo " << a << std::endl; }

private:
int m;
};

int main()
{
Foo a, b(2);
return 0;
}

Reference:
http://stackoverflow.com/questions/308276/c-call-constructor-from-constructor

2011年9月16日 星期五

[RabbitMQ] cluster setting

在Ubuntu機器上面安裝很方便, 先安裝完erlang,另外要在裝erlang-nox, 接著直接把抓下來的rabbitmq-server deb安裝就行


$ sudo apt-get install erlang
$ sudo apt-get install erlang-nox
$ dpkg -i rabbitmq-server_2.6.1-1_all.deb
$

另外rabbitmq有提供使用plugin, 如果你是用deb安裝, 那你只需要把所有plugin (.ez)放到 /usr/lib/rabbitmq/lib/rabbitmqserver-2.6.1/plugins下就行了.
我是安裝rabbitmq-management plugin, 在cluster裡面, 只需要有一台安裝完整management plugin, 其他台都指需要裝management
agent就行~


$ ls /usr/lib/rabbitmq/lib/rabbitmq_server-2.6.1/plugins/
amqp_client-2.6.1.ez
rabbitmq_mochiweb-2.6.1.ez
mochiweb-1.3-rmq2.6.1-git9a53dbd.ez
README
rabbitmq_management-2.6.1.ez
webmachine-1.7.0-rmq2.6.1-hg0c4b60a.ez
rabbitmq_management_agent-2.6.1.ez

在erlang的世界裡面, 需要設定同樣的cookie,不同的node彼此之間才可以溝通, 所以假設我們有lab1, lab2, lab3三台機器, 只要把lab1 上面的/var/lib/rabbitmq/.erlang.cookie copy到其他兩台機器上面, 就可以溝通了, 之後就可以透過rabbitmqctl來建立cluster, 有幾點要特別注意

  • 更換.erlang.cookie完之後, 要記得rm -rf /var/lib/rabbitmq/mnesia
  • 要記得更改/etc/hosts裡面關於cluster ip host mapping, 這樣erlang才可以查到domain name的ip
  • 之後要自動設定, 可以寫檔案在/etc/rabbitmq 下面

$ rm -rf /var/lib/rabbitmq/mnesia
$ rabbitmqctl cluster_status
...
$ rabbitmqctl stop_app
...
$ rabbitmqctl reset
...
$ rabbitmqctl cluster rabbit@lab001 # add lab002 as mem mode
...
$ rabbitmqctl cluster rabbit@lab001 rabbit@lab003 # add lab003 as disk mode
...
$ rabbitmqctl cluster_status
...
$ cat /etc/rabbitmq/rabbitmq-env.conf
RABBITMQ_CONFIG_FILE=/etc/rabbitmq/rabbitmq.conf
$ cat /etc/rabbitmq/rabbitmq.conf
[ {rabbit, [ {cluster_nodes, ['rabbit@lab001', 'rabbit@lab003']} ]} ].
$

另外提到一點, erlang的shell可以拿來幫助debug, 當你選定某組cookie來跑rabbitmq server, 假設跑在lab001, 可以利用erlang shell確認連線有沒有錯誤,
假如回傳的是pang, 那代表連線有問題~ 另外也要確定epmd 這個erlang內建的daemon有沒有跑起來~


lab002 $ erl -sname ooo -setcookie
Erlang R13B03 (erts-5.7.4) [source] [64-bit] [smp:2:2] [rq:2] [async-threads:0] [hipe] [kernel-poll:false]

Eshell V5.7.4 (abort with ^G)
(oo@lab002)1> net_adm:ping('rabbit@lab001').
pong
(oo@lab002)2>

$ epmd -names
epmd: up and running on port 4369 with data:
name rabbit at port 58909

Reference
http://www.rabbitmq.com/clustering.html

[Network] TCP hole punching under NAT

NAT (network address translation) box, 目前可以分為四種address mapping方式

  • Full Cone mapping
  • Restricted Cone mapping
  • Port Restricted Cone mapping
  • Symmetric mapping

當我們要從Addr1: Port1 經過NAT1: Port 0出去到Addr2: Port 2的時候

Full Cone mapping:
NAT1: Port 0會處理任何跟Addr1:Port 1的連線, 就算是Addr3:Port3, 只要能猜到NAT1:Port 0, 都可以跟Addr1: Port 1溝通

Restricted Cone mapping:
只有當Addr1: Port 1之前有連線到Addr 3, Addr 3 才可以透過NAT1: Port 0連進來 Addr1: Port 1

Port Restricted Cone mapping:
只有當Addr1: Port 1之前有連線到Addr 3: Port 3, Addr 3:Port 3 才可以透過NAT1: Port 0連進來 Addr1: Port 1

Symmetric mapping:
NAT會自動為每一組Addr1: Port 1所連出去的IP Port都準備獨立的Port mapping, 例如
Addr1: Port 1 -> NAT: Port 0 -> Addr2: Port 2 
Addr1: Port 1 -> NAT: Port 4 -> Addr2: Port 3

這四種mapping的方式會影響到, 你如何利用TCP 來對NAT 打洞 (TCP hole punching), 前三種都算好解, 畢竟妳只要有辦法知道Addr 1:Port 1連出去的 NAT: Port 0, 你就可以跟他溝通, 第四種可能只能靠port predict的方式來做

2011年9月6日 星期二

[C++] Derived class no need to write virtual when base class already has

From
http://stackoverflow.com/questions/4895294/c-virtual-keyword-for-functions-in-derived-classes-is-it-necessary

But I think it's more readable and clear to write on virtual...

2011年8月30日 星期二

[C++] Effective C++ Item 42: Understand the two meanings of typename

今天剛好遇到一個小問題, 之前剛好在翻Effective C++, 翻一下書剛好就找到了, 順便在加深一下印象~

typename除了直接使用在template裡面當然宣告template parameter的地方, 如以下情形之外

template
class Foo
{
public:
    void foo(T a);
};

當在程式內部的name跟template parameter有相依關係的時候, 這個就叫做dependent name, 在C++當中, 他預設不會幫你假設這樣的name是type, 除非你有明確的告訴parser.

所以像以下的情形, 你就必須在前面加上typename, 這樣compiler才會認為宣告的T::iterator 也是一個type, 才不會有compiler error.


template
class Foo
{
public:
    void foo(T a);
    void goo(typename T::iterator i);
private:
    typename std::vector::iterator a;
};

2011年8月28日 星期日

[Erlang] RabbitMQ rabbit_alarm module

在Erlang裡面, 我們可以很簡單的實做一個alarm handler, 他可以接收各種alarm event, 並根據event  message來做出回應, 同時Erlang不限制你只能有一個alarm handler, 你可以根據不同功能來實做相對應的alarm handler, 只要註冊不同的名字, 在你觸發事件的時候, 可以選擇要給哪些alarm handler處理.

當要觸發事件的時候, 只需要呼叫gen_event:notify, 並且在handle_event處理相對應的event即可.

在rabbitmq內部他有實做一套memory monitor的機制, 這個機制是可以運作在cluster node內部或是單一node的情況. 而這個memory monitr有兩個部份, 關於內部所有用到的queue, channel所使用的memory monitor下次在介紹, 這次的memory monitor 只監控整體的使用量, 當到達高水位的時候, 便會block新進來的connection, 並且通知cluster其他node也要開始節省使用.

所在檔案rabbit_alarm.erl & vm_memory_monitor.erl

rabbitmq 在啟動的時候, boot的階段會去spawn一個alarm process.

rabbit.erl

 88 -rabbit_boot_step({rabbit_alarm,           
 89          [{description, "alarm handler"},
 90           {mfa,         {rabbit_alarm, start, []}},
 91           {requires,    kernel_ready},
 92           {enables,     core_initialized}]}).

在rabbit_alarm 這個process啟動的時候做了以下的事情, 他會從ebin/rabbit.app 裡面你所設定的memory條件來設定並啟動另外一個monitor process 來monitor memory, 當memory到達那個條件後的時候給你alarm.


rabbit_alarm.erl

 45 start() ->
        % 新增一個新的alarm handler, 名字是rabbit_alarm
 46     ok = alarm_handler:add_alarm_handler(?MODULE, []),
 47     {ok, MemoryWatermark} = application:get_env(vm_memory_high_watermark),
 48     ok = case MemoryWatermark == 0 of
            % 如果沒設定就不監控
 49          true  -> ok;
            % 啟動另外一個monitor process
 50          false -> rabbit_sup:start_restartable_child(vm_memory_monitor,
 51                [MemoryWatermark])
 52     end,
 53     ok.

    % rabbit_alarm本身有maintain兩個table
    % alertees 代表發生alarm他要通知的其他node
    % alarmed_nodes 代表已經發生過alarm的node
 78 init([]) ->
 79     {ok, #alarms{alertees      = dict:new(),
 80                  alarmed_nodes = sets:new()}}.

接下來先來看他監控的方式.

vm_memory_monitor.erl

105 start_link(Args) ->
106     gen_server:start_link({local, ?SERVER}, ?MODULE, [Args], []).
107  
108 init([MemFraction]) ->
109     TotalMemory =
        % get_total_memory 可以查看該檔案內部
        % 針對不同OS的處理
110         case get_total_memory() of
111             unknown ->
112                 error_logger:warning_msg(
113                   "Unknown total memory size for your OS ~p. "
114                   "Assuming memory size is ~pMB.~n",
115                   [os:type(), trunc(?MEMORY_SIZE_FOR_UNKNOWN_OS/1048576)]),
116                 ?MEMORY_SIZE_FOR_UNKNOWN_OS;
117             M -> M
118         end,
119     MemLimit = get_mem_limit(MemFraction, TotalMemory),
120     error_logger:info_msg("Memory limit set to ~pMB.~n",
121                           [trunc(MemLimit/1048576)]),
        % 啟動timer, 固定時間檢查memory usage
122     TRef = start_timer(?DEFAULT_MEMORY_CHECK_INTERVAL),
123     State = #state { total_memory = TotalMemory,
124                      memory_limit = MemLimit,
125                      timeout = ?DEFAULT_MEMORY_CHECK_INTERVAL,
126                      timer = TRef,
127                      alarmed = false},
128     {ok, internal_update(State)}.

171 internal_update(State = #state { memory_limit = MemLimit,
172                                  alarmed = Alarmed}) ->
        % erlang VM內部目前使用的
173     MemUsed = erlang:memory(total),
174     NewAlarmed = MemUsed > MemLimit,
175     case {Alarmed, NewAlarmed} of
176         {false, true} ->
            % 如果之前還沒發出過alarm
            % 就會發出alarm, 給alarm_handler
177             emit_update_info(set, MemUsed, MemLimit),
178             alarm_handler:set_alarm({{vm_memory_high_watermark, node()}, []});
179         {true, false} ->
            % 反之則清理掉alarm
180             emit_update_info(clear, MemUsed, MemLimit),
181             alarm_handler:clear_alarm({vm_memory_high_watermark, node()});
182         _ ->
183             ok
184     end,
185     State #state {alarmed = NewAlarmed}.
186  
    % 印出log
187 emit_update_info(State, MemUsed, MemLimit) ->
188     error_logger:info_msg(
189       "vm_memory_high_watermark ~p. Memory used:~p allowed:~p~n",
190       [State, MemUsed, MemLimit]).
191  
    % 定時update memory usage state
192 start_timer(Timeout) ->
193     {ok, TRef} = timer:apply_interval(Timeout, ?MODULE, update, []),
194     TRef.

現在回到rabbitm_alarm.erl 看他怎麼處理當收到high memory watermark的alarm.

rabbit_alarm.erl

 89 handle_event({set_alarm, {{vm_memory_high_watermark, Node}, []}}, State) ->
 90     {ok, maybe_alert(fun sets:add_element/2, Node, State)};
 91  
 92 handle_event({clear_alarm, {vm_memory_high_watermark, Node}}, State) ->
 93     {ok, maybe_alert(fun sets:del_element/2, Node, State)};


126 maybe_alert(SetFun, Node, State = #alarms{alarmed_nodes = AN,
127                      alertees      = Alertees}) ->
128     AN1 = SetFun(Node, AN),
129     BeforeSz = sets:size(AN),
130     AfterSz  = sets:size(AN1),
131     %% If we have changed our alarm state, inform the remotes.
132     IsLocal = Node =:= node(),
        % 如果是local alarm
        % 就會通知其他的remote node
133     if IsLocal andalso BeforeSz < AfterSz -> ok = alert_remote(true,  Alertees);
134        IsLocal andalso BeforeSz > AfterSz -> ok = alert_remote(false, Alertees);
135        true                               -> ok
136     end,
137     %% If the overall alarm state has changed, inform the locals.
138     case {BeforeSz, AfterSz} of
139         {0, 1} -> ok = alert_local(true,  Alertees);
140         {1, 0} -> ok = alert_local(false, Alertees);
141         {_, _} -> ok
142     end,
        % 更新alarmed_node
143     State#alarms{alarmed_nodes = AN1}.

145 alert_local(Alert, Alertees)  -> alert(Alert, Alertees, fun erlang:'=:='/2).
146      
147 alert_remote(Alert, Alertees) -> alert(Alert, Alertees, fun erlang:'=/='/2).
148      
    % 找出所有需要通知的node, 並且執行Node之前register的callback
149 alert(Alert, Alertees, NodeComparator) ->
150     Node = node(),                                                                                                                                                     
151     dict:fold(fun (Pid, {M, F, A}, ok) ->
152                       case NodeComparator(Node, node(Pid)) of
153                           true  -> apply(M, F, A ++ [Pid, Alert]);
154                           false -> ok
155                       end
156               end, ok, Alertees).

所以之後其他的module, 只需要register callback, 當memory用量過高, 要怎麼處理就行, 這部份的code 是放在rabbit_reader.erl, 也就是當connection coming的時候.

rabbit_reader.erl

691     State1 = internal_conserve_memory(
           % register callback
           % 當記憶體使用過量
           % 便會呼叫
692        rabbit_alarm:register(self(), {?MODULE, conserve_memory, []}),
693        State#v1{connection_state = running,
694                connection = NewConnection}),

    % 當使用過量, state改為blocking
    % 不在接收新的connection
348 internal_conserve_memory(true,  State = #v1{connection_state = running}) ->
349     State#v1{connection_state = blocking};
350 internal_conserve_memory(false, State = #v1{connection_state = blocking}) ->                                                                                           
351     State#v1{connection_state = running};
352 internal_conserve_memory(false, State = #v1{connection_state = blocked,
353                                             heartbeater      = Heartbeater}) ->
354     ok = rabbit_heartbeat:resume_monitor(Heartbeater),
355     State#v1{connection_state = running};
356 internal_conserve_memory(_Conserve, State) ->
357     State

另外當新的cluster node 啟動的時候, 假設現在Node A收到Node B up的event, Node A會跟Node B註冊一個remote_conserve_memory callback, 當Node B 記憶體過大, 他就會呼叫Node A 開始節省memory, 直到原本的alarm被clear.

rabbit_alarm.erl

 95 handle_event({node_up, Node}, State) ->         
 96     %% Must do this via notify and not call to avoid possible deadlock.
 97     ok = gen_event:notify(                      
 98            {alarm_handler, Node},               
 99            {register, self(), {?MODULE, remote_conserve_memory, []}}),
100     {ok, State};

 69 remote_conserve_memory(Pid, true) ->
        % 叫自己開始節省記憶體
 70     gen_event:notify({alarm_handler, node(Pid)},
 71                      {set_alarm, {{vm_memory_high_watermark, node()}, []}});

2011年8月26日 星期五

[Erlang] RabbitMQ cluster node monitor module

在RabbitMQ啟動的時候, 會預先執行一些boot steps, 其中有一步就是會啟動rabbitnodemonitor.


rabbit.erl (program entry point)

% 先啟動node_monitor這個process
114 -rabbit_boot_step({rabbit_node_monitor,
115 [{description, "node monitor"},
116 {mfa, {rabbit_sup, start_restartable_child,
117 [rabbit_node_monitor]}},
118 {requires, kernel_ready},
119 {enables, core_initialized}]}).

所以實際在monitor cluster node的module是在rabbitnodemonitor.erl檔案裡面.

在erlang的世界當中, 其實要組成cluster是相當容易的, 每一個node只要在啟動的時候, 設定node的名字(cluster內部不重複), 並且搭配同樣的cookie, 彼此之間就可以互相溝通.

所以在nodemonitor 這個process起來的時候他會呼叫init function, 而netkernel:monitor_nodes, 代表他會接收所有他監控node的status message.


rabbitnodemonitor.erl

init([]) ->
ok = net_kernel:monitor_nodes(true),
{ok, no_state}.

到這時候, node_monitor就做好了初始化的動作, 接下來就要開始cluster內部node資料傳遞, 所以boot steps後面的動作就是notify cluster


rabbit.erl (program entry point)

155 -rabbit_boot_step({notify_cluster,
156 [{description, "notify cluster nodes"},
157 {mfa, {rabbit_node_monitor, notify_cluster, []}},
158 {requires, networking}]}).

在51行, 可以看到把自己的資訊multicast出去


rabbitnodemonitor.erl

% 接下來在收集整個cluster的information, 並且把自己的資料傳出去, 讓cluster的其他node知道
47 notify_cluster() ->
48 Node = node(), % 這是代表自己的node
% 這邊因為每個rabbitmq 起來, 都會有自己的mnesia, 所以可以藉由這個來收集已經起來的rabbitmq node
49 Nodes = rabbit_mnesia:running_clustered_nodes() -- [Node],
50 %% notify other rabbits of this rabbit
51 case rpc:multicall(Nodes, rabbit_node_monitor, rabbit_running_on,
52 [Node], ?RABBIT_UP_RPC_TIMEOUT) of
53 {_, [] } -> ok;
54 {_, Bad} -> rabbit_log:info("failed to contact nodes ~p~n", [Bad])
55 end,
56 %% register other active rabbits with this rabbit
57 [ rabbit_node_monitor:rabbit_running_on(N) || N <- Nodes ],
58 ok.

而接下來的程式就是當monitor_node這個process根據收到各種status message所做的處理


rabbitnodemonitor.erl

66 handle_call(_Request, _From, State) ->
67 {noreply, State}.
68
69 handle_cast({rabbit_running_on, Node}, State) ->
70 rabbit_log:info("node ~p up~n", [Node]),
% 收到新的node, 所以納入監控
71 erlang:monitor(process, {rabbit, Node}),
% 呼叫自己的event handler 處理新啟動的node
72 ok = rabbit_alarm:on_node_up(Node),
73 {noreply, State};
74 handle_cast(_Msg, State) ->
75 {noreply, State}.
76
77 handle_info({nodedown, Node}, State) ->
% 收到有監控的node 正常掛掉了
78 rabbit_log:info("node ~p down~n", [Node]),
79 ok = handle_dead_rabbit(Node),
80 {noreply, State};
81 handle_info({'DOWN', _MRef, process, {rabbit, Node}, _Reason}, State) ->
% 收到突然掛掉的node
82 rabbit_log:info("node ~p lost 'rabbit'~n", [Node]),
83 ok = handle_dead_rabbit(Node),
84 {noreply, State};

96 %% TODO: This may turn out to be a performance hog when there are lots
97 %% of nodes. We really only need to execute some of these statements
98 %% on *one* node, rather than all of them.
99 handle_dead_rabbit(Node) ->
100 ok = rabbit_networking:on_node_down(Node),
101 ok = rabbit_amqqueue:on_node_down(Node),
102 ok = rabbit_alarm:on_node_down(Node).

接下來針對有新node啟動的情形來看一下他怎麼處理, 他會呼叫 rabbitalarm 這個module來處理, 在這裡的flow有點不太一樣, 要分成兩個部份來看, 現在的情形是Node A跟Node B, Node A收到Node B nodeup 的message, 所以Node A呼叫自己的 rabbit_alarm module 來處理, 進入這邊之後, Node A沒作甚饃特別的事情(反正Node A已經有monitor Node B了), 但是他會送一個{register, self(), ...} event給Node B去處理.


rabbit_alarm.erl on Node A

95 handle_event({node_up, Node}, State) ->
96 %% Must do this via notify and not call to avoid possible deadlock.
97 ok = gen_event:notify(
98 {alarm_handler, Node},
99 {register, self(), {?MODULE, remote_conserve_memory, []}}),
100 {ok, State};

接著我們來看Node B這邊做了甚饃事情~ 他接著也會monitor Node A, 並且看Node A是不是有在他的alarmednodes裡面, 如果有的話, 就會執行傳進來的Module, Function, Arguments, 就是rabbitalarm.erl 69-71行做的事情, 接著再把Node A加進alertees裡面(之後有alert發生要通知的對象之一), 這樣子就完成加入新Node的動作.


rabbit_alarm.erl on Node B

67 %% Can't use alarm_handler:{set,clear}_alarm because that doesn't
68 %% permit notifying a remote node.
% 在送set_alarm event 回給 Node A
69 remote_conserve_memory(Pid, true) ->
70 gen_event:notify({alarm_handler, node(Pid)},
71 {set_alarm, {{vm_memory_high_watermark, node()}, []}});
72 remote_conserve_memory(Pid, false) ->
73 gen_event:notify({alarm_handler, node(Pid)},
74 {clear_alarm, {vm_memory_high_watermark, node()}}).

% 收到Node A的 event
82 handle_call({register, Pid, HighMemMFA}, State) ->
83 {ok, 0 < sets:size(State#alarms.alarmed_nodes),
84 internal_register(Pid, HighMemMFA, State)};

% 首先M, F, A 分別是 ?MODULE, remote_conserve_memory, []
158 internal_register(Pid, {M, F, A} = HighMemMFA,
159 State = #alarms{alertees = Alertees}) ->
160 _MRef = erlang:monitor(process, Pid),
161 case sets:is_element(node(), State#alarms.alarmed_nodes) of
162 true -> ok = apply(M, F, A ++ [Pid, true]);
163 false -> ok
164 end,
165 NewAlertees = dict:store(Pid, HighMemMFA, Alertees),
166 State#alarms{alertees = NewAlertees}.

所以其實monitor cluster node的部份在erlang的實做是相當的簡單~

至於關於rabbit_alarm.erl的部份, 裡面有關於monitor memory部份, 留到下次再說~

[Linux] system info

Use sudo dmidecode -t memory to see how much memory capacity in your machine. There are some more information about hardware information can be found by using this command.

2011年8月23日 星期二

[Erlang] Prime generator

好久沒寫erlang, 有點生疏了~ 剛好看到有人在講prime generator, 就寫了一個~:P


-module(prime).

-export([generate_prime/1]).

filter_number(Pid, P) ->
receive
N when N rem P /= 0 ->
% In this case, the N can not be divided by P,
% so pass it to next prime process
Pid ! N;
_N -> ok
end,
filter_number(Pid, P).

prime() ->
Prime = receive P -> P end,
io:format("Prime ~p~n", [Prime]),
Pid = spawn(fun prime/0),
filter_number(Pid, P).

generate_prime(N) ->
% create the first prime process
Pid = spawn(fun prime/0),
lists:map(fun(X) -> Pid ! X end, lists:seq(2, N)),
ok.

2011年8月18日 星期四

[GDB] Reverse step for GDB

從GDB 7.0之後就support Process record 也就是所謂的"reverse debugging", 一般而言我們在gdb上面可以執行step, continue, next這些指令, 但是一旦走到下一步之後, 就無法回到下一步~ "reverse debugging" 就是可以還原到上一步, 或是還原上一次的breakpoint. 目前這個功能有平台上的限制, 只能支援以下平台

  • i386-linux
  • amd64-linux
  • moxie-elf / moxie-linux

他底層其實就是gdb 會把你每一步執行的指令logging起來,搭配memory跟register的狀態, 所以才能成功還原上一步. 

假設我有下面的程式

#include
#include

int main()
{
int i;

for(i=0;i<100;i++) {
int t =
printf("i = %d, t = %d\n", i, t);
}
return 0;
}

現在開始進行gdb debug

$ gcc -g tests.c # compile code
$ gdb a.out # start to debug
(gdb) r
The program being debugged has been started already.
Start it from the beginning? (y or n) y
Starting program: /home/ytshen/a.out

Breakpoint 1, main () at tests.c:9
9 int t = random();
2: t = 0
1: i = 0
(gdb) record # 代表我們開始紀錄, 之後才能使用 reverse-step或是相關指令
(gdb) c
Continuing.
ci = 0, t = 1804289383

Breakpoint 1, main () at tests.c:9
9 int t = random();
2: t = 1804289383
1: i = 1
(gdb) c
Continuing.
i = 1, t = 846930886

Breakpoint 1, main () at tests.c:9
9 int t = random();
2: t = 846930886
1: i = 2
(gdb) c
Continuing.
i = 2, t = 1681692777

Breakpoint 1, main () at tests.c:9
9 int t = random();
2: t = 1681692777
1: i = 3
(gdb) c
Continuing.
i = 3, t = 1714636915

Breakpoint 1, main () at tests.c:9
9 int t = random();
2: t = 1714636915
1: i = 4
# 接下來開始進行reverse
(gdb) reverse-continue
Continuing.

Breakpoint 1, main () at tests.c:9
9 int t = random();
2: t = 1681692777
1: i = 3
# 可以看到所有的state都被保留下來
(gdb) reverse-continue
Continuing.

Breakpoint 1, main () at tests.c:9
9 int t = random();
2: t = 846930886
1: i = 2
(gdb) reverse-continue
Continuing.

Breakpoint 1, main () at tests.c:9
9 int t = random();
2: t = 1804289383
1: i = 1
(gdb)

上面就是一個簡單的例子, 利用gdb來進行reverse debugging, 他還有一些簡單的指令
ex:

  • "record stop": 停止紀錄指令
  • "record delete": 刪除之前的紀錄
  • "info record": 
  • "set record stop-at-limit": 設定logging buffer滿之後的行為, 如果是on, 在buffer滿了就會停下來, off則會蓋掉舊得logging指令
  • "set record insn-number-max": 設定紀錄指令的大小, 預設是3292
  • "reverse-step": 上一步, 如果是function call則會進入function開頭
  • "reverse-continue": 還原到上一個break point
  • "reverse-next": 上一步, 但是如果是function call會直接執行完到return
  • "set exec-direction [forward | reverse]: 設定好方向之後, 可以直接使用continue, next, step

Reference

 

2011年8月2日 星期二

[C] Low level IO primitives

Low level IO like open/write/close should call fdatasync() when you want to make sure data is write to disk, because this still keep some data buffered in file descriptor.

And about stdio library like fopen/fwrite/fclose should call fflush() before fdatasync(), because stdio library keep some data buffered in the FILE structure.

2011年7月28日 星期四

[C++] NULL reference and NULL pointer deference

通常如果function是在接受reference的參數, 不像是pointer 需要去檢查是不是NULL,但是下面這種情況卻會造成segmentation fault.
原因是因為deference NULL pointer (ex: *p) 本身就是未定義行為, 而NULL reference只可能出現在未定義行為的地方.


#include < stdio.h >
struct obj
{
int t;
};

void foo(obj &a)
{
std::cout << a.t;
}

int main()
{
obj *p = NULL;

foo(*p);
return 0;

}

以下的程式就算我把foo裡面的std::cout comment掉, 並且用-O0, 他就不會造成crash, 造理來說*p應該就要crash,
這邊可以看一下輸出的assembly, 就可以理解其實底層實做reference 也是用到pointer, 所以只要沒有真的access NULL pointer, 是不會造成crash


#include <stdio.h>

struct obj
{
int t;
};

void foo(obj &a)
{
}

int main()
{
obj *p = NULL;

foo(*p);
return 0;
}

可以看到main裡面assembly, 他其實也只是把pointer當參數傳進去~


movq $0, -8(%rbp)
movq -8(%rbp), %rax
movq %rax, %rdi
call _Z3fooR3obj

雖然可以透過下面的方式去檢查, 但是我覺得這樣寫很奇怪, 畢竟問題不是在foo function.


#include <stdio.h>

struct obj
{
int t;
};

void foo(obj &a)
{
if( &a == NULL) {
std::cout << "null\n";
return;
}
std::cout << a.t;
}

int main()
{
obj *p = NULL;

foo(*p);
return 0;
}

所以只有caller需要自己去檢查傳進去的pointer是不是NULL

Reference: 
http://stackoverflow.com/questions/4364536/c-null-reference
http://www.nicollet.net/2009/11/can-references-be-null/
http://discuss.fogcreek.com/joelonsoftware/default.asp?cmd=show&ixPost=17154 

2011年7月27日 星期三

[C++] Diffie-Hellman Key Exchange (Client API)

DHKeyCrypt.h


#if !defined(dhkeycrypt_include)

#define dhkeycrypt_include

#include
#include
#include

/*
* Example:
*
* // initial steps
* const char *p, *g, *public_key;
* char *buf;
* int buf_size;
* DHKeyCrypt *key = new DHKeyCrypt(256); // mean generate 256 bits prime
*
* // p, g, public_key will point to the hex representation string
* key->GenerateKey(&p, &g, &public_key);
*
* // use the 3rd party public key to compute session key
* key->ComputeKey(other_pubkey);
*
* // usage
* key->Encode(buf, buf_size); // encode buf
*
*/
class DHKeyCrypt
{
private:
char *m_g[2];
DH *m_dh;
char *m_p;
char *m_pubkey;
unsigned char *m_session_key;

int m_key_len;
int m_gindex;

public:
DHKeyCrypt(int size);
void GenerateKey(const char **p, const char **g, const char **public_key);
int ComputeKey(const char *pubkey_3rd);
void Encode(char *buf, int buf_len);
~DHKeyCrypt();
};

#endif

DHKeyCrypt.cpp


#include "DHKeyCrypt.h"
#include
#include

/*
* generate size bits prime
*/
DHKeyCrypt::DHKeyCrypt(int size) : m_session_key(NULL)
{
m_g[0] = strdup("2");
m_g[1] = strdup("5");
m_gindex = time(NULL) & 1;
m_dh = DH_generate_parameters(size, m_g[m_gindex][0] - '0', NULL, NULL);
DH_generate_key(m_dh);
m_p = BN_bn2hex(m_dh->p);
m_pubkey = BN_bn2hex(m_dh->pub_key);
}

void DHKeyCrypt::GenerateKey(const char **p, const char **g, const char **public_key)
{
*p = m_p;
*g = m_g[m_gindex];
*public_key = m_pubkey;
}

int DHKeyCrypt::ComputeKey(const char *pubkey_3rd)
{
BIGNUM *bn = NULL;

if(!BN_hex2bn(&bn, pubkey_3rd))
return -1;
m_session_key = new (std::nothrow) unsigned char [DH_size(m_dh)];
if(!m_session_key) return -1;
m_key_len = DH_compute_key(m_session_key, bn, m_dh);
if(m_key_len == -1) return -1;
/*
printf("SessionKey: ");
for(int i=0; i < m_key_len; i++)
printf("%02X", m_session_key[i]);
printf("\n");
*/
return 0;
}

void DHKeyCrypt::Encode(char *buf, int buf_len)
{
for(int i=0; i < buf_len; i++)
buf[i] = buf[i] ^ m_session_key[i % m_key_len];
}

DHKeyCrypt::~DHKeyCrypt()
{
OPENSSL_free(m_p);
OPENSSL_free(m_pubkey);
free(m_g[0]);
free(m_g[1]);
delete [] m_session_key;
DH_free(m_dh);
}

2011年6月29日 星期三

[Linux] driver's log

dmesg 如果有開syslogd & klogd的話, 會把printk印出的message 寫到/var/log/kern.log之下
關機的方式有好幾種
如果是shutdown -h的話 那會照順序執行/etc/rc0.d/ 下面的script, 簡單而言如果要找關機時發生的錯誤, 應該是可以到 /var/log/kern.loug去找

2011年6月10日 星期五

[Linux] vfork & waitpid

在Linux底下有vfork & fork 兩種方式可以讓你create child process去作事情,  vfork & fork的差別請參照 http://csie-tw.blogspot.com/2009/04/vforkuclinux.html

最近遇到很奇怪的情形, 就是通常parent process fork child process之後, 要不parent process必須waitpid 去等待child process結束, 不然就是你必須在parent process上面註冊SIGCHLD 的signal handler (這個SIGCHLD會在child process terminated的時候發送給parent process), 結果我在parent process當中居然waitpid return -1, 訊息是no child process!  所以猜測parent process在還沒有執行到waitpid的時候, child process就結束了

所以稍微寫了小程式測試


  • version1: vfork + execv with no signal handler (parent process 在執行waitpid前先sleep)
    • waitpid return 0, WEXITSTATUS(status)可以正常拿到child process exit code
  • version1: vfork + execv with SIGCHLD signal handler (parent process 在執行waitpid前先sleep)
    • waitpid return -1, no child process, WEXITSTATUS(status)拿不到child process exit code
  • version1: vfork + execv with ignore SIGCHLD signal(parent process 在執行waitpid前先sleep)
    • waitpid return -1, no child process, WEXITSTATUS(status)拿不到child process exit code

所以最後問題在於只要你有註冊SIGCHILD相關處理, 就無法拿到child process exit code!

另外還有一種情形, 在linux system call的時候, 有甚饃會被系統interrupt, 這時候通常的解法就是retry~
http://book.chinaunix.net/special/ebook/addisonWesley/APUE2/0201433079/ch10lev1sec5.html

2011年5月20日 星期五

[Javascript] Node.js CPU bound action

Node.js 是一個base在 V8 javascript engine 上的event-driven server framework, 因為目前V8的限制, 所以他目前只有single thread 在處理所有event.

所以如果我寫出以下的code, 就會造成單一request block後面的request, 這是一個echo server, 當有資料進來的時候, 會呼叫function(data), 如果我在這個function裡面作一些time consuming的動作, 就會造成其他的request delay.

var net = require("net");          
// echo server example             
var TCPServer = net.createServer(function (socket){
    // on "data" event             
    socket.on("data", function(data){
        request = data.toString('utf8');
        now = new Date().getTime();
        // it will wait for 5 seconds to block following request                                            
        while(new Date().getTime() < now + 5000) {
           // do nothing           
        }                          
        socket.write(request);     
    });                            
});                                
                                   
TCPServer.listen(7777, "127.0.0.1");


所以針對一些cpu bound的運算, 目前我查到的解法有兩種, 第一種就是普通的fork,
fork出child process去執行你要得command, 等到執行結束, 在利用callback繼續下去, 這樣就可以讓CPU bound的運算不至於影響到其他的request.
var net = require("net");         
var sys = require('sys');         
var exec = require('child_process').exec;
                                  
                                  
// echo server example            
var TCPServer = net.createServer(function (socket){              
    sys.debug('new socket coming!');
    socket.on('data', function(d) {
        sys.debug('new data coming ' + d);
        var sleep = exec('sleep 5', function(error, stdout, stderr) {                                                                                                     
            socket.write('finish job: ' + d);
        });                       
    });                           
});                               
                                  
TCPServer.listen(7777, "127.0.0.1");



Webworker, 這是node.js的某一個ext module, 他是利用unix domain socket來達到IPC (inter process communication), 他提供比較抽象的API, 透過postMessage, onmessage這種API,我下面的實做只提供message傳遞, 當worker事情做完就把結果回傳給server. postMessage 的參數需要以JSON的格式傳過去, 在postMessage的時候可以把file descriptor當第二個參數傳給worker process, 但是這部份我一直無法成功實驗出來...

Server.js
=================
var net = require("net");                                                                                                                                                 
var sys = require('sys');
var Worker = require('webworker').Worker;
var index = 0;
var workers = [];
var socks = {};
var sock_num = 0;
 
for(var i = 0; i < 8; i++) {
    workers.push(new Worker(''));
    // when some result message return from worker process
    workers[i].onmessage = function(msg) {
        var sock_index = msg.data.index;
        var result = msg.data.result;
        sys.debug('master: ' + sock_index);
        socks[parseInt(sock_index)].write(result);
    };
}
 
// echo server example
var TCPServer = net.createServer(function (socket){
    var l = sock_num;
 
    sock_num++;
    socks[l] = socket;
    // when client send data
    socket.on('data', function(d) {
        var data = d.toString('utf8').trim();
 
        sys.debug(index + ' worker data: ' + data);
        sys.debug('socks: ' + l);
        workers[index].postMessage({'text': data, 'index': index, sock_index: l});
        index++;
    });
    // when client close connection
    socket.on('close', function() {
        sys.debug('delete socks: ' + l);
        delete socks[l];
    });
});
 
TCPServer.listen(7777, "127.0.0.1");


Worker.js
=================
var http = require('http');    
var net = require('net');      
var sys = require('sys');      
    
function sleep() {
    var now = new Date().getTime();
    // it will wait for 5 seconds to block following request
    while(new Date().getTime() < now + 5000) {
        ;
    }
};  
    
onmessage = function(msg) {    
    sys.debug('worker index: ' + msg.data.index);
    sys.debug('worker sock_index: ' + msg.data.sock_index);
    sys.debug('worker text: ' + msg.data.text);
    
    var result_text = msg.data.text + ' processed\n';
                                                                                                                                                                          
    // time consuming operation
    sleep();
    sys.debug('worker after sleep');
    postMessage({index: msg.data.sock_index, result: result_text});
};  
    
onclose = function() {
    sys.debug('Worker shuttting down.');
};  


下面這是後來gibson試出來的用法~
master.js

=====================
var http = require('http');    
var sys = require('sys');                                                                                                                                                  
var path = require('path');
var net = require('net');
var Worker = require('webworker').Worker;
var workers = [];
var wid=0;
 
for (var i = 0; i < 8; i++) {
    workers[i] = new Worker(path.join(__dirname, 'worker.js'));
}
 
net.createServer(function(socket){      
    socket.pause();
    wid = (++wid) % 8;
    sys.debug('pass to worker '+ wid + ' fd:' + socket.fd);
    workers[wid].postMessage({'wid':wid}, socket.fd);
}).listen(8080);

worker.js
=====================
var http = require('http');    
var sys = require('sys');                                                                                                                                                  
var net = require('net');
 
var srv = net.createServer(function(socket){
    socket.on('data', function(data){
        buf = data.toString('utf8');
        socket.write('['+process.pid+'] received: ' + buf); 
    });
});
 
onmessage = function(msg) {
    sys.debug('worker received msg.data.wid: ' + msg.data.wid);
    sys.debug('worker received msg.fd: ' + msg.fd);
    var socket = net.Stream(msg.fd);
    socket.type = srv.type;
    socket.server = srv;
    socket.resume();
    srv.emit('connection', socket);
};

Reference: http://blog.mixu.net/2011/02/01/understanding-the-node-js-event-loop/ http://blog.std.in/2010/07/08/nodejs-webworker-design/ http://developer.yahoo.com/blogs/ydn/posts/2010/07/multicore_http_server_with_nodejs/ http://nodejs.org/docs/v0.4.7/api/child_processes.html

2011年5月5日 星期四

[vim] Display Ansi in vim

Download from http://www.vim.org/scripts/script.php?script_id=302

After download and extract.

$ vim AnsiEsc.vba
:so %
:q
Then you can use :AnsiEsc to toggles ansi escape sequence highlighting!

:h ansiesc for more information.

2011年5月4日 星期三

[Linux] setuid

When you want to execute a program which has root privilege. Here is what you can do.
First program.c
#include
#include
#include
#include                                                            

int main()
{
  setuid( 0 );
  system( "<program>" );

  return 0;
}
Then in the shell enter the following commands:
$ gcc -o program program.c
$ sudo chown root program
$ sudo u+s program
Then you can execute program to execute your program under root privilege!

2011年4月23日 星期六

[book] Beautiful Code - population count

面試的時候 有時會問到, 給你一個數字, 求出他二進位表示當中 有多少個1

ex:

510 = 1012 所以答案就是2 (因為有兩個1)

跳過最直接的解法 (就是一位一位去數), 下面這個方式應該是有看過這題目會提出的解法

 


int pop(unsigned int n)
{
int count = 0;
while(n) {
n = n & (n - 1);
count++;
}
return count;
}

建表的方法


int pop(unsigned int n)
{
char table[256] = {0, 1, 1, ...};
return table[n & 0xff] + table[(n>>8)&0xff] + table[(n>>16)&0xff] + table[n>>24];
}

但是其實還有其他也蠻有趣的解法~ 就是常常會聽到的divide & conquere

 


int pop2(unsigned int n)
{
n = (n & 0x55555555) + ((n >> 1) & 0x55555555);
n = (n & 0x33333333) + ((n >> 2) & 0x33333333);
n = (n & 0x0f0f0f0f) + ((n >> 4) & 0x0f0f0f0f);
n = (n & 0x00ff00ff) + ((n >> 8) & 0x00ff00ff);
n = (n & 0x0000ffff) + ((n >> 16) & 0x0000ffff);
return n;
}

int pop3(unsigned int n)
{
n = (n & 0x55555555) + ((n >> 1) & 0x55555555);
n = (n & 0x33333333) + ((n >> 2) & 0x33333333);
// 因為 4bit 互加, 不會干擾到前面的欄位, 也就是不會進位前一個4 bites
// 順便清除 結果8 bits的前4個bits
n = (n + (n >> 4)) & 0x0f0f0f0f;
n = (n + (n >> 8));
n = (n + (n >> 16));
// 因為最高只會是32, 所以最後只要取出我們要得bits(6個bits)
return (n & 0x3f);
}

有了這些, 接下來如果我們要比較x, y誰的1比較多, 當然可以先算出個別的數字在比較
如果要求出兩個數字總共有幾個1, 也可以利用上面4 bits相加部會溢位, 在第三步就先加起來~
相減的話, 可以利用pop(x) - pop(y) = pop(x) - (32 - pop(y')) = pop(x) + pop(y') - 32
y' 是y 的1補數

不過書上提供另一種作法, 就是先把雙方重複的1先刪掉, 在一起數


int pop_comp(unsigned int x, unsigned int y)
{
unsigned int x1 = x & (~y);
unsigned int y1 = y & (~x);

while(1) {
if(x1 == 0) return y1?-1:0;
if(y1 == 0) return 1;
x1 = x1 & (x1 - 1);
y1 = y1 & (y1 - 1);
}
}

而且接下來要求出Hamming distance (兩個數字的二進位當中有幾位元不一樣)也很容易
先求出兩個數字的xor, 在找出他二進位有幾個1