在RabbitMQ啟動的時候, 會預先執行一些boot steps, 其中有一步就是會啟動rabbitnodemonitor.
rabbit.erl (program entry point)
% 先啟動node_monitor這個process
114 -rabbit_boot_step({rabbit_node_monitor,
115 [{description, "node monitor"},
116 {mfa, {rabbit_sup, start_restartable_child,
117 [rabbit_node_monitor]}},
118 {requires, kernel_ready},
119 {enables, core_initialized}]}).
所以實際在monitor cluster node的module是在rabbitnodemonitor.erl檔案裡面.
在erlang的世界當中, 其實要組成cluster是相當容易的, 每一個node只要在啟動的時候, 設定node的名字(cluster內部不重複), 並且搭配同樣的cookie, 彼此之間就可以互相溝通.
所以在nodemonitor 這個process起來的時候他會呼叫init function, 而netkernel:monitor_nodes, 代表他會接收所有他監控node的status message.
rabbitnodemonitor.erl
init([]) ->
ok = net_kernel:monitor_nodes(true),
{ok, no_state}.
到這時候, node_monitor就做好了初始化的動作, 接下來就要開始cluster內部node資料傳遞, 所以boot steps後面的動作就是notify cluster
rabbit.erl (program entry point)
155 -rabbit_boot_step({notify_cluster,
156 [{description, "notify cluster nodes"},
157 {mfa, {rabbit_node_monitor, notify_cluster, []}},
158 {requires, networking}]}).
在51行, 可以看到把自己的資訊multicast出去
rabbitnodemonitor.erl
% 接下來在收集整個cluster的information, 並且把自己的資料傳出去, 讓cluster的其他node知道
47 notify_cluster() ->
48 Node = node(), % 這是代表自己的node
% 這邊因為每個rabbitmq 起來, 都會有自己的mnesia, 所以可以藉由這個來收集已經起來的rabbitmq node
49 Nodes = rabbit_mnesia:running_clustered_nodes() -- [Node],
50 %% notify other rabbits of this rabbit
51 case rpc:multicall(Nodes, rabbit_node_monitor, rabbit_running_on,
52 [Node], ?RABBIT_UP_RPC_TIMEOUT) of
53 {_, [] } -> ok;
54 {_, Bad} -> rabbit_log:info("failed to contact nodes ~p~n", [Bad])
55 end,
56 %% register other active rabbits with this rabbit
57 [ rabbit_node_monitor:rabbit_running_on(N) || N <- Nodes ],
58 ok.
而接下來的程式就是當monitor_node這個process根據收到各種status message所做的處理
rabbitnodemonitor.erl
66 handle_call(_Request, _From, State) ->
67 {noreply, State}.
68
69 handle_cast({rabbit_running_on, Node}, State) ->
70 rabbit_log:info("node ~p up~n", [Node]),
% 收到新的node, 所以納入監控
71 erlang:monitor(process, {rabbit, Node}),
% 呼叫自己的event handler 處理新啟動的node
72 ok = rabbit_alarm:on_node_up(Node),
73 {noreply, State};
74 handle_cast(_Msg, State) ->
75 {noreply, State}.
76
77 handle_info({nodedown, Node}, State) ->
% 收到有監控的node 正常掛掉了
78 rabbit_log:info("node ~p down~n", [Node]),
79 ok = handle_dead_rabbit(Node),
80 {noreply, State};
81 handle_info({'DOWN', _MRef, process, {rabbit, Node}, _Reason}, State) ->
% 收到突然掛掉的node
82 rabbit_log:info("node ~p lost 'rabbit'~n", [Node]),
83 ok = handle_dead_rabbit(Node),
84 {noreply, State};
96 %% TODO: This may turn out to be a performance hog when there are lots
97 %% of nodes. We really only need to execute some of these statements
98 %% on *one* node, rather than all of them.
99 handle_dead_rabbit(Node) ->
100 ok = rabbit_networking:on_node_down(Node),
101 ok = rabbit_amqqueue:on_node_down(Node),
102 ok = rabbit_alarm:on_node_down(Node).
接下來針對有新node啟動的情形來看一下他怎麼處理, 他會呼叫 rabbitalarm 這個module來處理, 在這裡的flow有點不太一樣, 要分成兩個部份來看, 現在的情形是Node A跟Node B, Node A收到Node B nodeup 的message, 所以Node A呼叫自己的 rabbit_alarm module 來處理, 進入這邊之後, Node A沒作甚饃特別的事情(反正Node A已經有monitor Node B了), 但是他會送一個{register, self(), ...} event給Node B去處理.
rabbit_alarm.erl on Node A
95 handle_event({node_up, Node}, State) ->
96 %% Must do this via notify and not call to avoid possible deadlock.
97 ok = gen_event:notify(
98 {alarm_handler, Node},
99 {register, self(), {?MODULE, remote_conserve_memory, []}}),
100 {ok, State};
接著我們來看Node B這邊做了甚饃事情~ 他接著也會monitor Node A, 並且看Node A是不是有在他的alarmednodes裡面, 如果有的話, 就會執行傳進來的Module, Function, Arguments, 就是rabbitalarm.erl 69-71行做的事情, 接著再把Node A加進alertees裡面(之後有alert發生要通知的對象之一), 這樣子就完成加入新Node的動作.
rabbit_alarm.erl on Node B
67 %% Can't use alarm_handler:{set,clear}_alarm because that doesn't
68 %% permit notifying a remote node.
% 在送set_alarm event 回給 Node A
69 remote_conserve_memory(Pid, true) ->
70 gen_event:notify({alarm_handler, node(Pid)},
71 {set_alarm, {{vm_memory_high_watermark, node()}, []}});
72 remote_conserve_memory(Pid, false) ->
73 gen_event:notify({alarm_handler, node(Pid)},
74 {clear_alarm, {vm_memory_high_watermark, node()}}).
% 收到Node A的 event
82 handle_call({register, Pid, HighMemMFA}, State) ->
83 {ok, 0 < sets:size(State#alarms.alarmed_nodes),
84 internal_register(Pid, HighMemMFA, State)};
% 首先M, F, A 分別是 ?MODULE, remote_conserve_memory, []
158 internal_register(Pid, {M, F, A} = HighMemMFA,
159 State = #alarms{alertees = Alertees}) ->
160 _MRef = erlang:monitor(process, Pid),
161 case sets:is_element(node(), State#alarms.alarmed_nodes) of
162 true -> ok = apply(M, F, A ++ [Pid, true]);
163 false -> ok
164 end,
165 NewAlertees = dict:store(Pid, HighMemMFA, Alertees),
166 State#alarms{alertees = NewAlertees}.
所以其實monitor cluster node的部份在erlang的實做是相當的簡單~
至於關於rabbit_alarm.erl的部份, 裡面有關於monitor memory部份, 留到下次再說~
沒有留言:
張貼留言