在Erlang裡面, 我們可以很簡單的實做一個alarm handler, 他可以接收各種alarm event, 並根據event message來做出回應, 同時Erlang不限制你只能有一個alarm handler, 你可以根據不同功能來實做相對應的alarm handler, 只要註冊不同的名字, 在你觸發事件的時候, 可以選擇要給哪些alarm handler處理.
當要觸發事件的時候, 只需要呼叫gen_event:notify, 並且在handle_event處理相對應的event即可.
在rabbitmq內部他有實做一套memory monitor的機制, 這個機制是可以運作在cluster node內部或是單一node的情況. 而這個memory monitr有兩個部份, 關於內部所有用到的queue, channel所使用的memory monitor下次在介紹, 這次的memory monitor 只監控整體的使用量, 當到達高水位的時候, 便會block新進來的connection, 並且通知cluster其他node也要開始節省使用.
所在檔案rabbit_alarm.erl & vm_memory_monitor.erl
rabbitmq 在啟動的時候, boot的階段會去spawn一個alarm process.
rabbit.erl
88 -rabbit_boot_step({rabbit_alarm,
89 [{description, "alarm handler"},
90 {mfa, {rabbit_alarm, start, []}},
91 {requires, kernel_ready},
92 {enables, core_initialized}]}).
在rabbit_alarm 這個process啟動的時候做了以下的事情, 他會從ebin/rabbit.app 裡面你所設定的memory條件來設定並啟動另外一個monitor process 來monitor memory, 當memory到達那個條件後的時候給你alarm.
rabbit_alarm.erl
45 start() ->
% 新增一個新的alarm handler, 名字是rabbit_alarm
46 ok = alarm_handler:add_alarm_handler(?MODULE, []),
47 {ok, MemoryWatermark} = application:get_env(vm_memory_high_watermark),
48 ok = case MemoryWatermark == 0 of
% 如果沒設定就不監控
49 true -> ok;
% 啟動另外一個monitor process
50 false -> rabbit_sup:start_restartable_child(vm_memory_monitor,
51 [MemoryWatermark])
52 end,
53 ok.
% rabbit_alarm本身有maintain兩個table
% alertees 代表發生alarm他要通知的其他node
% alarmed_nodes 代表已經發生過alarm的node
78 init([]) ->
79 {ok, #alarms{alertees = dict:new(),
80 alarmed_nodes = sets:new()}}.
接下來先來看他監控的方式.
vm_memory_monitor.erl
105 start_link(Args) ->
106 gen_server:start_link({local, ?SERVER}, ?MODULE, [Args], []).
107
108 init([MemFraction]) ->
109 TotalMemory =
% get_total_memory 可以查看該檔案內部
% 針對不同OS的處理
110 case get_total_memory() of
111 unknown ->
112 error_logger:warning_msg(
113 "Unknown total memory size for your OS ~p. "
114 "Assuming memory size is ~pMB.~n",
115 [os:type(), trunc(?MEMORY_SIZE_FOR_UNKNOWN_OS/1048576)]),
116 ?MEMORY_SIZE_FOR_UNKNOWN_OS;
117 M -> M
118 end,
119 MemLimit = get_mem_limit(MemFraction, TotalMemory),
120 error_logger:info_msg("Memory limit set to ~pMB.~n",
121 [trunc(MemLimit/1048576)]),
% 啟動timer, 固定時間檢查memory usage
122 TRef = start_timer(?DEFAULT_MEMORY_CHECK_INTERVAL),
123 State = #state { total_memory = TotalMemory,
124 memory_limit = MemLimit,
125 timeout = ?DEFAULT_MEMORY_CHECK_INTERVAL,
126 timer = TRef,
127 alarmed = false},
128 {ok, internal_update(State)}.
171 internal_update(State = #state { memory_limit = MemLimit,
172 alarmed = Alarmed}) ->
% erlang VM內部目前使用的
173 MemUsed = erlang:memory(total),
174 NewAlarmed = MemUsed > MemLimit,
175 case {Alarmed, NewAlarmed} of
176 {false, true} ->
% 如果之前還沒發出過alarm
% 就會發出alarm, 給alarm_handler
177 emit_update_info(set, MemUsed, MemLimit),
178 alarm_handler:set_alarm({{vm_memory_high_watermark, node()}, []});
179 {true, false} ->
% 反之則清理掉alarm
180 emit_update_info(clear, MemUsed, MemLimit),
181 alarm_handler:clear_alarm({vm_memory_high_watermark, node()});
182 _ ->
183 ok
184 end,
185 State #state {alarmed = NewAlarmed}.
186
% 印出log
187 emit_update_info(State, MemUsed, MemLimit) ->
188 error_logger:info_msg(
189 "vm_memory_high_watermark ~p. Memory used:~p allowed:~p~n",
190 [State, MemUsed, MemLimit]).
191
% 定時update memory usage state
192 start_timer(Timeout) ->
193 {ok, TRef} = timer:apply_interval(Timeout, ?MODULE, update, []),
194 TRef.
現在回到rabbitm_alarm.erl 看他怎麼處理當收到high memory watermark的alarm.
rabbit_alarm.erl
89 handle_event({set_alarm, {{vm_memory_high_watermark, Node}, []}}, State) ->
90 {ok, maybe_alert(fun sets:add_element/2, Node, State)};
91
92 handle_event({clear_alarm, {vm_memory_high_watermark, Node}}, State) ->
93 {ok, maybe_alert(fun sets:del_element/2, Node, State)};
126 maybe_alert(SetFun, Node, State = #alarms{alarmed_nodes = AN,
127 alertees = Alertees}) ->
128 AN1 = SetFun(Node, AN),
129 BeforeSz = sets:size(AN),
130 AfterSz = sets:size(AN1),
131 %% If we have changed our alarm state, inform the remotes.
132 IsLocal = Node =:= node(),
% 如果是local alarm
% 就會通知其他的remote node
133 if IsLocal andalso BeforeSz < AfterSz -> ok = alert_remote(true, Alertees);
134 IsLocal andalso BeforeSz > AfterSz -> ok = alert_remote(false, Alertees);
135 true -> ok
136 end,
137 %% If the overall alarm state has changed, inform the locals.
138 case {BeforeSz, AfterSz} of
139 {0, 1} -> ok = alert_local(true, Alertees);
140 {1, 0} -> ok = alert_local(false, Alertees);
141 {_, _} -> ok
142 end,
% 更新alarmed_node
143 State#alarms{alarmed_nodes = AN1}.
145 alert_local(Alert, Alertees) -> alert(Alert, Alertees, fun erlang:'=:='/2).
146
147 alert_remote(Alert, Alertees) -> alert(Alert, Alertees, fun erlang:'=/='/2).
148
% 找出所有需要通知的node, 並且執行Node之前register的callback
149 alert(Alert, Alertees, NodeComparator) ->
150 Node = node(),
151 dict:fold(fun (Pid, {M, F, A}, ok) ->
152 case NodeComparator(Node, node(Pid)) of
153 true -> apply(M, F, A ++ [Pid, Alert]);
154 false -> ok
155 end
156 end, ok, Alertees).
所以之後其他的module, 只需要register callback, 當memory用量過高, 要怎麼處理就行, 這部份的code 是放在rabbit_reader.erl, 也就是當connection coming的時候.
rabbit_reader.erl
691 State1 = internal_conserve_memory(
% register callback
% 當記憶體使用過量
% 便會呼叫
692 rabbit_alarm:register(self(), {?MODULE, conserve_memory, []}),
693 State#v1{connection_state = running,
694 connection = NewConnection}),
% 當使用過量, state改為blocking
% 不在接收新的connection
348 internal_conserve_memory(true, State = #v1{connection_state = running}) ->
349 State#v1{connection_state = blocking};
350 internal_conserve_memory(false, State = #v1{connection_state = blocking}) ->
351 State#v1{connection_state = running};
352 internal_conserve_memory(false, State = #v1{connection_state = blocked,
353 heartbeater = Heartbeater}) ->
354 ok = rabbit_heartbeat:resume_monitor(Heartbeater),
355 State#v1{connection_state = running};
356 internal_conserve_memory(_Conserve, State) ->
357 State
另外當新的cluster node 啟動的時候, 假設現在Node A收到Node B up的event, Node A會跟Node B註冊一個remote_conserve_memory callback, 當Node B 記憶體過大, 他就會呼叫Node A 開始節省memory, 直到原本的alarm被clear.
rabbit_alarm.erl
95 handle_event({node_up, Node}, State) ->
96 %% Must do this via notify and not call to avoid possible deadlock.
97 ok = gen_event:notify(
98 {alarm_handler, Node},
99 {register, self(), {?MODULE, remote_conserve_memory, []}}),
100 {ok, State};
69 remote_conserve_memory(Pid, true) ->
% 叫自己開始節省記憶體
70 gen_event:notify({alarm_handler, node(Pid)},
71 {set_alarm, {{vm_memory_high_watermark, node()}, []}});