2011年8月26日 星期五

[Erlang] RabbitMQ cluster node monitor module

在RabbitMQ啟動的時候, 會預先執行一些boot steps, 其中有一步就是會啟動rabbitnodemonitor.


rabbit.erl (program entry point)

% 先啟動node_monitor這個process
114 -rabbit_boot_step({rabbit_node_monitor,
115 [{description, "node monitor"},
116 {mfa, {rabbit_sup, start_restartable_child,
117 [rabbit_node_monitor]}},
118 {requires, kernel_ready},
119 {enables, core_initialized}]}).

所以實際在monitor cluster node的module是在rabbitnodemonitor.erl檔案裡面.

在erlang的世界當中, 其實要組成cluster是相當容易的, 每一個node只要在啟動的時候, 設定node的名字(cluster內部不重複), 並且搭配同樣的cookie, 彼此之間就可以互相溝通.

所以在nodemonitor 這個process起來的時候他會呼叫init function, 而netkernel:monitor_nodes, 代表他會接收所有他監控node的status message.


rabbitnodemonitor.erl

init([]) ->
ok = net_kernel:monitor_nodes(true),
{ok, no_state}.

到這時候, node_monitor就做好了初始化的動作, 接下來就要開始cluster內部node資料傳遞, 所以boot steps後面的動作就是notify cluster


rabbit.erl (program entry point)

155 -rabbit_boot_step({notify_cluster,
156 [{description, "notify cluster nodes"},
157 {mfa, {rabbit_node_monitor, notify_cluster, []}},
158 {requires, networking}]}).

在51行, 可以看到把自己的資訊multicast出去


rabbitnodemonitor.erl

% 接下來在收集整個cluster的information, 並且把自己的資料傳出去, 讓cluster的其他node知道
47 notify_cluster() ->
48 Node = node(), % 這是代表自己的node
% 這邊因為每個rabbitmq 起來, 都會有自己的mnesia, 所以可以藉由這個來收集已經起來的rabbitmq node
49 Nodes = rabbit_mnesia:running_clustered_nodes() -- [Node],
50 %% notify other rabbits of this rabbit
51 case rpc:multicall(Nodes, rabbit_node_monitor, rabbit_running_on,
52 [Node], ?RABBIT_UP_RPC_TIMEOUT) of
53 {_, [] } -> ok;
54 {_, Bad} -> rabbit_log:info("failed to contact nodes ~p~n", [Bad])
55 end,
56 %% register other active rabbits with this rabbit
57 [ rabbit_node_monitor:rabbit_running_on(N) || N <- Nodes ],
58 ok.

而接下來的程式就是當monitor_node這個process根據收到各種status message所做的處理


rabbitnodemonitor.erl

66 handle_call(_Request, _From, State) ->
67 {noreply, State}.
68
69 handle_cast({rabbit_running_on, Node}, State) ->
70 rabbit_log:info("node ~p up~n", [Node]),
% 收到新的node, 所以納入監控
71 erlang:monitor(process, {rabbit, Node}),
% 呼叫自己的event handler 處理新啟動的node
72 ok = rabbit_alarm:on_node_up(Node),
73 {noreply, State};
74 handle_cast(_Msg, State) ->
75 {noreply, State}.
76
77 handle_info({nodedown, Node}, State) ->
% 收到有監控的node 正常掛掉了
78 rabbit_log:info("node ~p down~n", [Node]),
79 ok = handle_dead_rabbit(Node),
80 {noreply, State};
81 handle_info({'DOWN', _MRef, process, {rabbit, Node}, _Reason}, State) ->
% 收到突然掛掉的node
82 rabbit_log:info("node ~p lost 'rabbit'~n", [Node]),
83 ok = handle_dead_rabbit(Node),
84 {noreply, State};

96 %% TODO: This may turn out to be a performance hog when there are lots
97 %% of nodes. We really only need to execute some of these statements
98 %% on *one* node, rather than all of them.
99 handle_dead_rabbit(Node) ->
100 ok = rabbit_networking:on_node_down(Node),
101 ok = rabbit_amqqueue:on_node_down(Node),
102 ok = rabbit_alarm:on_node_down(Node).

接下來針對有新node啟動的情形來看一下他怎麼處理, 他會呼叫 rabbitalarm 這個module來處理, 在這裡的flow有點不太一樣, 要分成兩個部份來看, 現在的情形是Node A跟Node B, Node A收到Node B nodeup 的message, 所以Node A呼叫自己的 rabbit_alarm module 來處理, 進入這邊之後, Node A沒作甚饃特別的事情(反正Node A已經有monitor Node B了), 但是他會送一個{register, self(), ...} event給Node B去處理.


rabbit_alarm.erl on Node A

95 handle_event({node_up, Node}, State) ->
96 %% Must do this via notify and not call to avoid possible deadlock.
97 ok = gen_event:notify(
98 {alarm_handler, Node},
99 {register, self(), {?MODULE, remote_conserve_memory, []}}),
100 {ok, State};

接著我們來看Node B這邊做了甚饃事情~ 他接著也會monitor Node A, 並且看Node A是不是有在他的alarmednodes裡面, 如果有的話, 就會執行傳進來的Module, Function, Arguments, 就是rabbitalarm.erl 69-71行做的事情, 接著再把Node A加進alertees裡面(之後有alert發生要通知的對象之一), 這樣子就完成加入新Node的動作.


rabbit_alarm.erl on Node B

67 %% Can't use alarm_handler:{set,clear}_alarm because that doesn't
68 %% permit notifying a remote node.
% 在送set_alarm event 回給 Node A
69 remote_conserve_memory(Pid, true) ->
70 gen_event:notify({alarm_handler, node(Pid)},
71 {set_alarm, {{vm_memory_high_watermark, node()}, []}});
72 remote_conserve_memory(Pid, false) ->
73 gen_event:notify({alarm_handler, node(Pid)},
74 {clear_alarm, {vm_memory_high_watermark, node()}}).

% 收到Node A的 event
82 handle_call({register, Pid, HighMemMFA}, State) ->
83 {ok, 0 < sets:size(State#alarms.alarmed_nodes),
84 internal_register(Pid, HighMemMFA, State)};

% 首先M, F, A 分別是 ?MODULE, remote_conserve_memory, []
158 internal_register(Pid, {M, F, A} = HighMemMFA,
159 State = #alarms{alertees = Alertees}) ->
160 _MRef = erlang:monitor(process, Pid),
161 case sets:is_element(node(), State#alarms.alarmed_nodes) of
162 true -> ok = apply(M, F, A ++ [Pid, true]);
163 false -> ok
164 end,
165 NewAlertees = dict:store(Pid, HighMemMFA, Alertees),
166 State#alarms{alertees = NewAlertees}.

所以其實monitor cluster node的部份在erlang的實做是相當的簡單~

至於關於rabbit_alarm.erl的部份, 裡面有關於monitor memory部份, 留到下次再說~

[Linux] system info

Use sudo dmidecode -t memory to see how much memory capacity in your machine. There are some more information about hardware information can be found by using this command.

2011年8月23日 星期二

[Erlang] Prime generator

好久沒寫erlang, 有點生疏了~ 剛好看到有人在講prime generator, 就寫了一個~:P


-module(prime).

-export([generate_prime/1]).

filter_number(Pid, P) ->
receive
N when N rem P /= 0 ->
% In this case, the N can not be divided by P,
% so pass it to next prime process
Pid ! N;
_N -> ok
end,
filter_number(Pid, P).

prime() ->
Prime = receive P -> P end,
io:format("Prime ~p~n", [Prime]),
Pid = spawn(fun prime/0),
filter_number(Pid, P).

generate_prime(N) ->
% create the first prime process
Pid = spawn(fun prime/0),
lists:map(fun(X) -> Pid ! X end, lists:seq(2, N)),
ok.

2011年8月18日 星期四

[GDB] Reverse step for GDB

從GDB 7.0之後就support Process record 也就是所謂的"reverse debugging", 一般而言我們在gdb上面可以執行step, continue, next這些指令, 但是一旦走到下一步之後, 就無法回到下一步~ "reverse debugging" 就是可以還原到上一步, 或是還原上一次的breakpoint. 目前這個功能有平台上的限制, 只能支援以下平台

  • i386-linux
  • amd64-linux
  • moxie-elf / moxie-linux

他底層其實就是gdb 會把你每一步執行的指令logging起來,搭配memory跟register的狀態, 所以才能成功還原上一步. 

假設我有下面的程式

#include
#include

int main()
{
int i;

for(i=0;i<100;i++) {
int t =
printf("i = %d, t = %d\n", i, t);
}
return 0;
}

現在開始進行gdb debug

$ gcc -g tests.c # compile code
$ gdb a.out # start to debug
(gdb) r
The program being debugged has been started already.
Start it from the beginning? (y or n) y
Starting program: /home/ytshen/a.out

Breakpoint 1, main () at tests.c:9
9 int t = random();
2: t = 0
1: i = 0
(gdb) record # 代表我們開始紀錄, 之後才能使用 reverse-step或是相關指令
(gdb) c
Continuing.
ci = 0, t = 1804289383

Breakpoint 1, main () at tests.c:9
9 int t = random();
2: t = 1804289383
1: i = 1
(gdb) c
Continuing.
i = 1, t = 846930886

Breakpoint 1, main () at tests.c:9
9 int t = random();
2: t = 846930886
1: i = 2
(gdb) c
Continuing.
i = 2, t = 1681692777

Breakpoint 1, main () at tests.c:9
9 int t = random();
2: t = 1681692777
1: i = 3
(gdb) c
Continuing.
i = 3, t = 1714636915

Breakpoint 1, main () at tests.c:9
9 int t = random();
2: t = 1714636915
1: i = 4
# 接下來開始進行reverse
(gdb) reverse-continue
Continuing.

Breakpoint 1, main () at tests.c:9
9 int t = random();
2: t = 1681692777
1: i = 3
# 可以看到所有的state都被保留下來
(gdb) reverse-continue
Continuing.

Breakpoint 1, main () at tests.c:9
9 int t = random();
2: t = 846930886
1: i = 2
(gdb) reverse-continue
Continuing.

Breakpoint 1, main () at tests.c:9
9 int t = random();
2: t = 1804289383
1: i = 1
(gdb)

上面就是一個簡單的例子, 利用gdb來進行reverse debugging, 他還有一些簡單的指令
ex:

  • "record stop": 停止紀錄指令
  • "record delete": 刪除之前的紀錄
  • "info record": 
  • "set record stop-at-limit": 設定logging buffer滿之後的行為, 如果是on, 在buffer滿了就會停下來, off則會蓋掉舊得logging指令
  • "set record insn-number-max": 設定紀錄指令的大小, 預設是3292
  • "reverse-step": 上一步, 如果是function call則會進入function開頭
  • "reverse-continue": 還原到上一個break point
  • "reverse-next": 上一步, 但是如果是function call會直接執行完到return
  • "set exec-direction [forward | reverse]: 設定好方向之後, 可以直接使用continue, next, step

Reference

 

2011年8月2日 星期二

[C] Low level IO primitives

Low level IO like open/write/close should call fdatasync() when you want to make sure data is write to disk, because this still keep some data buffered in file descriptor.

And about stdio library like fopen/fwrite/fclose should call fflush() before fdatasync(), because stdio library keep some data buffered in the FILE structure.

2011年7月28日 星期四

[C++] NULL reference and NULL pointer deference

通常如果function是在接受reference的參數, 不像是pointer 需要去檢查是不是NULL,但是下面這種情況卻會造成segmentation fault.
原因是因為deference NULL pointer (ex: *p) 本身就是未定義行為, 而NULL reference只可能出現在未定義行為的地方.


#include < stdio.h >
struct obj
{
int t;
};

void foo(obj &a)
{
std::cout << a.t;
}

int main()
{
obj *p = NULL;

foo(*p);
return 0;

}

以下的程式就算我把foo裡面的std::cout comment掉, 並且用-O0, 他就不會造成crash, 造理來說*p應該就要crash,
這邊可以看一下輸出的assembly, 就可以理解其實底層實做reference 也是用到pointer, 所以只要沒有真的access NULL pointer, 是不會造成crash


#include <stdio.h>

struct obj
{
int t;
};

void foo(obj &a)
{
}

int main()
{
obj *p = NULL;

foo(*p);
return 0;
}

可以看到main裡面assembly, 他其實也只是把pointer當參數傳進去~


movq $0, -8(%rbp)
movq -8(%rbp), %rax
movq %rax, %rdi
call _Z3fooR3obj

雖然可以透過下面的方式去檢查, 但是我覺得這樣寫很奇怪, 畢竟問題不是在foo function.


#include <stdio.h>

struct obj
{
int t;
};

void foo(obj &a)
{
if( &a == NULL) {
std::cout << "null\n";
return;
}
std::cout << a.t;
}

int main()
{
obj *p = NULL;

foo(*p);
return 0;
}

所以只有caller需要自己去檢查傳進去的pointer是不是NULL

Reference: 
http://stackoverflow.com/questions/4364536/c-null-reference
http://www.nicollet.net/2009/11/can-references-be-null/
http://discuss.fogcreek.com/joelonsoftware/default.asp?cmd=show&ixPost=17154 

2011年7月27日 星期三

[C++] Diffie-Hellman Key Exchange (Client API)

DHKeyCrypt.h


#if !defined(dhkeycrypt_include)

#define dhkeycrypt_include

#include
#include
#include

/*
* Example:
*
* // initial steps
* const char *p, *g, *public_key;
* char *buf;
* int buf_size;
* DHKeyCrypt *key = new DHKeyCrypt(256); // mean generate 256 bits prime
*
* // p, g, public_key will point to the hex representation string
* key->GenerateKey(&p, &g, &public_key);
*
* // use the 3rd party public key to compute session key
* key->ComputeKey(other_pubkey);
*
* // usage
* key->Encode(buf, buf_size); // encode buf
*
*/
class DHKeyCrypt
{
private:
char *m_g[2];
DH *m_dh;
char *m_p;
char *m_pubkey;
unsigned char *m_session_key;

int m_key_len;
int m_gindex;

public:
DHKeyCrypt(int size);
void GenerateKey(const char **p, const char **g, const char **public_key);
int ComputeKey(const char *pubkey_3rd);
void Encode(char *buf, int buf_len);
~DHKeyCrypt();
};

#endif

DHKeyCrypt.cpp


#include "DHKeyCrypt.h"
#include
#include

/*
* generate size bits prime
*/
DHKeyCrypt::DHKeyCrypt(int size) : m_session_key(NULL)
{
m_g[0] = strdup("2");
m_g[1] = strdup("5");
m_gindex = time(NULL) & 1;
m_dh = DH_generate_parameters(size, m_g[m_gindex][0] - '0', NULL, NULL);
DH_generate_key(m_dh);
m_p = BN_bn2hex(m_dh->p);
m_pubkey = BN_bn2hex(m_dh->pub_key);
}

void DHKeyCrypt::GenerateKey(const char **p, const char **g, const char **public_key)
{
*p = m_p;
*g = m_g[m_gindex];
*public_key = m_pubkey;
}

int DHKeyCrypt::ComputeKey(const char *pubkey_3rd)
{
BIGNUM *bn = NULL;

if(!BN_hex2bn(&bn, pubkey_3rd))
return -1;
m_session_key = new (std::nothrow) unsigned char [DH_size(m_dh)];
if(!m_session_key) return -1;
m_key_len = DH_compute_key(m_session_key, bn, m_dh);
if(m_key_len == -1) return -1;
/*
printf("SessionKey: ");
for(int i=0; i < m_key_len; i++)
printf("%02X", m_session_key[i]);
printf("\n");
*/
return 0;
}

void DHKeyCrypt::Encode(char *buf, int buf_len)
{
for(int i=0; i < buf_len; i++)
buf[i] = buf[i] ^ m_session_key[i % m_key_len];
}

DHKeyCrypt::~DHKeyCrypt()
{
OPENSSL_free(m_p);
OPENSSL_free(m_pubkey);
free(m_g[0]);
free(m_g[1]);
delete [] m_session_key;
DH_free(m_dh);
}