2011年8月28日 星期日

[Erlang] RabbitMQ rabbit_alarm module

在Erlang裡面, 我們可以很簡單的實做一個alarm handler, 他可以接收各種alarm event, 並根據event  message來做出回應, 同時Erlang不限制你只能有一個alarm handler, 你可以根據不同功能來實做相對應的alarm handler, 只要註冊不同的名字, 在你觸發事件的時候, 可以選擇要給哪些alarm handler處理.

當要觸發事件的時候, 只需要呼叫gen_event:notify, 並且在handle_event處理相對應的event即可.

在rabbitmq內部他有實做一套memory monitor的機制, 這個機制是可以運作在cluster node內部或是單一node的情況. 而這個memory monitr有兩個部份, 關於內部所有用到的queue, channel所使用的memory monitor下次在介紹, 這次的memory monitor 只監控整體的使用量, 當到達高水位的時候, 便會block新進來的connection, 並且通知cluster其他node也要開始節省使用.

所在檔案rabbit_alarm.erl & vm_memory_monitor.erl

rabbitmq 在啟動的時候, boot的階段會去spawn一個alarm process.

rabbit.erl

 88 -rabbit_boot_step({rabbit_alarm,           
 89          [{description, "alarm handler"},
 90           {mfa,         {rabbit_alarm, start, []}},
 91           {requires,    kernel_ready},
 92           {enables,     core_initialized}]}).

在rabbit_alarm 這個process啟動的時候做了以下的事情, 他會從ebin/rabbit.app 裡面你所設定的memory條件來設定並啟動另外一個monitor process 來monitor memory, 當memory到達那個條件後的時候給你alarm.


rabbit_alarm.erl

 45 start() ->
        % 新增一個新的alarm handler, 名字是rabbit_alarm
 46     ok = alarm_handler:add_alarm_handler(?MODULE, []),
 47     {ok, MemoryWatermark} = application:get_env(vm_memory_high_watermark),
 48     ok = case MemoryWatermark == 0 of
            % 如果沒設定就不監控
 49          true  -> ok;
            % 啟動另外一個monitor process
 50          false -> rabbit_sup:start_restartable_child(vm_memory_monitor,
 51                [MemoryWatermark])
 52     end,
 53     ok.

    % rabbit_alarm本身有maintain兩個table
    % alertees 代表發生alarm他要通知的其他node
    % alarmed_nodes 代表已經發生過alarm的node
 78 init([]) ->
 79     {ok, #alarms{alertees      = dict:new(),
 80                  alarmed_nodes = sets:new()}}.

接下來先來看他監控的方式.

vm_memory_monitor.erl

105 start_link(Args) ->
106     gen_server:start_link({local, ?SERVER}, ?MODULE, [Args], []).
107  
108 init([MemFraction]) ->
109     TotalMemory =
        % get_total_memory 可以查看該檔案內部
        % 針對不同OS的處理
110         case get_total_memory() of
111             unknown ->
112                 error_logger:warning_msg(
113                   "Unknown total memory size for your OS ~p. "
114                   "Assuming memory size is ~pMB.~n",
115                   [os:type(), trunc(?MEMORY_SIZE_FOR_UNKNOWN_OS/1048576)]),
116                 ?MEMORY_SIZE_FOR_UNKNOWN_OS;
117             M -> M
118         end,
119     MemLimit = get_mem_limit(MemFraction, TotalMemory),
120     error_logger:info_msg("Memory limit set to ~pMB.~n",
121                           [trunc(MemLimit/1048576)]),
        % 啟動timer, 固定時間檢查memory usage
122     TRef = start_timer(?DEFAULT_MEMORY_CHECK_INTERVAL),
123     State = #state { total_memory = TotalMemory,
124                      memory_limit = MemLimit,
125                      timeout = ?DEFAULT_MEMORY_CHECK_INTERVAL,
126                      timer = TRef,
127                      alarmed = false},
128     {ok, internal_update(State)}.

171 internal_update(State = #state { memory_limit = MemLimit,
172                                  alarmed = Alarmed}) ->
        % erlang VM內部目前使用的
173     MemUsed = erlang:memory(total),
174     NewAlarmed = MemUsed > MemLimit,
175     case {Alarmed, NewAlarmed} of
176         {false, true} ->
            % 如果之前還沒發出過alarm
            % 就會發出alarm, 給alarm_handler
177             emit_update_info(set, MemUsed, MemLimit),
178             alarm_handler:set_alarm({{vm_memory_high_watermark, node()}, []});
179         {true, false} ->
            % 反之則清理掉alarm
180             emit_update_info(clear, MemUsed, MemLimit),
181             alarm_handler:clear_alarm({vm_memory_high_watermark, node()});
182         _ ->
183             ok
184     end,
185     State #state {alarmed = NewAlarmed}.
186  
    % 印出log
187 emit_update_info(State, MemUsed, MemLimit) ->
188     error_logger:info_msg(
189       "vm_memory_high_watermark ~p. Memory used:~p allowed:~p~n",
190       [State, MemUsed, MemLimit]).
191  
    % 定時update memory usage state
192 start_timer(Timeout) ->
193     {ok, TRef} = timer:apply_interval(Timeout, ?MODULE, update, []),
194     TRef.

現在回到rabbitm_alarm.erl 看他怎麼處理當收到high memory watermark的alarm.

rabbit_alarm.erl

 89 handle_event({set_alarm, {{vm_memory_high_watermark, Node}, []}}, State) ->
 90     {ok, maybe_alert(fun sets:add_element/2, Node, State)};
 91  
 92 handle_event({clear_alarm, {vm_memory_high_watermark, Node}}, State) ->
 93     {ok, maybe_alert(fun sets:del_element/2, Node, State)};


126 maybe_alert(SetFun, Node, State = #alarms{alarmed_nodes = AN,
127                      alertees      = Alertees}) ->
128     AN1 = SetFun(Node, AN),
129     BeforeSz = sets:size(AN),
130     AfterSz  = sets:size(AN1),
131     %% If we have changed our alarm state, inform the remotes.
132     IsLocal = Node =:= node(),
        % 如果是local alarm
        % 就會通知其他的remote node
133     if IsLocal andalso BeforeSz < AfterSz -> ok = alert_remote(true,  Alertees);
134        IsLocal andalso BeforeSz > AfterSz -> ok = alert_remote(false, Alertees);
135        true                               -> ok
136     end,
137     %% If the overall alarm state has changed, inform the locals.
138     case {BeforeSz, AfterSz} of
139         {0, 1} -> ok = alert_local(true,  Alertees);
140         {1, 0} -> ok = alert_local(false, Alertees);
141         {_, _} -> ok
142     end,
        % 更新alarmed_node
143     State#alarms{alarmed_nodes = AN1}.

145 alert_local(Alert, Alertees)  -> alert(Alert, Alertees, fun erlang:'=:='/2).
146      
147 alert_remote(Alert, Alertees) -> alert(Alert, Alertees, fun erlang:'=/='/2).
148      
    % 找出所有需要通知的node, 並且執行Node之前register的callback
149 alert(Alert, Alertees, NodeComparator) ->
150     Node = node(),                                                                                                                                                     
151     dict:fold(fun (Pid, {M, F, A}, ok) ->
152                       case NodeComparator(Node, node(Pid)) of
153                           true  -> apply(M, F, A ++ [Pid, Alert]);
154                           false -> ok
155                       end
156               end, ok, Alertees).

所以之後其他的module, 只需要register callback, 當memory用量過高, 要怎麼處理就行, 這部份的code 是放在rabbit_reader.erl, 也就是當connection coming的時候.

rabbit_reader.erl

691     State1 = internal_conserve_memory(
           % register callback
           % 當記憶體使用過量
           % 便會呼叫
692        rabbit_alarm:register(self(), {?MODULE, conserve_memory, []}),
693        State#v1{connection_state = running,
694                connection = NewConnection}),

    % 當使用過量, state改為blocking
    % 不在接收新的connection
348 internal_conserve_memory(true,  State = #v1{connection_state = running}) ->
349     State#v1{connection_state = blocking};
350 internal_conserve_memory(false, State = #v1{connection_state = blocking}) ->                                                                                           
351     State#v1{connection_state = running};
352 internal_conserve_memory(false, State = #v1{connection_state = blocked,
353                                             heartbeater      = Heartbeater}) ->
354     ok = rabbit_heartbeat:resume_monitor(Heartbeater),
355     State#v1{connection_state = running};
356 internal_conserve_memory(_Conserve, State) ->
357     State

另外當新的cluster node 啟動的時候, 假設現在Node A收到Node B up的event, Node A會跟Node B註冊一個remote_conserve_memory callback, 當Node B 記憶體過大, 他就會呼叫Node A 開始節省memory, 直到原本的alarm被clear.

rabbit_alarm.erl

 95 handle_event({node_up, Node}, State) ->         
 96     %% Must do this via notify and not call to avoid possible deadlock.
 97     ok = gen_event:notify(                      
 98            {alarm_handler, Node},               
 99            {register, self(), {?MODULE, remote_conserve_memory, []}}),
100     {ok, State};

 69 remote_conserve_memory(Pid, true) ->
        % 叫自己開始節省記憶體
 70     gen_event:notify({alarm_handler, node(Pid)},
 71                      {set_alarm, {{vm_memory_high_watermark, node()}, []}});