當要觸發事件的時候, 只需要呼叫gen_event:notify, 並且在handle_event處理相對應的event即可.
在rabbitmq內部他有實做一套memory monitor的機制, 這個機制是可以運作在cluster node內部或是單一node的情況. 而這個memory monitr有兩個部份, 關於內部所有用到的queue, channel所使用的memory monitor下次在介紹, 這次的memory monitor 只監控整體的使用量, 當到達高水位的時候, 便會block新進來的connection, 並且通知cluster其他node也要開始節省使用.
所在檔案rabbit_alarm.erl & vm_memory_monitor.erl
rabbitmq 在啟動的時候, boot的階段會去spawn一個alarm process.
rabbit.erl 88 -rabbit_boot_step({rabbit_alarm, 89 [{description, "alarm handler"}, 90 {mfa, {rabbit_alarm, start, []}}, 91 {requires, kernel_ready}, 92 {enables, core_initialized}]}).
在rabbit_alarm 這個process啟動的時候做了以下的事情, 他會從ebin/rabbit.app 裡面你所設定的memory條件來設定並啟動另外一個monitor process 來monitor memory, 當memory到達那個條件後的時候給你alarm.
rabbit_alarm.erl 45 start() -> % 新增一個新的alarm handler, 名字是rabbit_alarm 46 ok = alarm_handler:add_alarm_handler(?MODULE, []), 47 {ok, MemoryWatermark} = application:get_env(vm_memory_high_watermark), 48 ok = case MemoryWatermark == 0 of % 如果沒設定就不監控 49 true -> ok; % 啟動另外一個monitor process 50 false -> rabbit_sup:start_restartable_child(vm_memory_monitor, 51 [MemoryWatermark]) 52 end, 53 ok. % rabbit_alarm本身有maintain兩個table % alertees 代表發生alarm他要通知的其他node % alarmed_nodes 代表已經發生過alarm的node 78 init([]) -> 79 {ok, #alarms{alertees = dict:new(), 80 alarmed_nodes = sets:new()}}.
接下來先來看他監控的方式.
vm_memory_monitor.erl 105 start_link(Args) -> 106 gen_server:start_link({local, ?SERVER}, ?MODULE, [Args], []). 107 108 init([MemFraction]) -> 109 TotalMemory = % get_total_memory 可以查看該檔案內部 % 針對不同OS的處理 110 case get_total_memory() of 111 unknown -> 112 error_logger:warning_msg( 113 "Unknown total memory size for your OS ~p. " 114 "Assuming memory size is ~pMB.~n", 115 [os:type(), trunc(?MEMORY_SIZE_FOR_UNKNOWN_OS/1048576)]), 116 ?MEMORY_SIZE_FOR_UNKNOWN_OS; 117 M -> M 118 end, 119 MemLimit = get_mem_limit(MemFraction, TotalMemory), 120 error_logger:info_msg("Memory limit set to ~pMB.~n", 121 [trunc(MemLimit/1048576)]), % 啟動timer, 固定時間檢查memory usage 122 TRef = start_timer(?DEFAULT_MEMORY_CHECK_INTERVAL), 123 State = #state { total_memory = TotalMemory, 124 memory_limit = MemLimit, 125 timeout = ?DEFAULT_MEMORY_CHECK_INTERVAL, 126 timer = TRef, 127 alarmed = false}, 128 {ok, internal_update(State)}. 171 internal_update(State = #state { memory_limit = MemLimit, 172 alarmed = Alarmed}) -> % erlang VM內部目前使用的 173 MemUsed = erlang:memory(total), 174 NewAlarmed = MemUsed > MemLimit, 175 case {Alarmed, NewAlarmed} of 176 {false, true} -> % 如果之前還沒發出過alarm % 就會發出alarm, 給alarm_handler 177 emit_update_info(set, MemUsed, MemLimit), 178 alarm_handler:set_alarm({{vm_memory_high_watermark, node()}, []}); 179 {true, false} -> % 反之則清理掉alarm 180 emit_update_info(clear, MemUsed, MemLimit), 181 alarm_handler:clear_alarm({vm_memory_high_watermark, node()}); 182 _ -> 183 ok 184 end, 185 State #state {alarmed = NewAlarmed}. 186 % 印出log 187 emit_update_info(State, MemUsed, MemLimit) -> 188 error_logger:info_msg( 189 "vm_memory_high_watermark ~p. Memory used:~p allowed:~p~n", 190 [State, MemUsed, MemLimit]). 191 % 定時update memory usage state 192 start_timer(Timeout) -> 193 {ok, TRef} = timer:apply_interval(Timeout, ?MODULE, update, []), 194 TRef.
現在回到rabbitm_alarm.erl 看他怎麼處理當收到high memory watermark的alarm.
rabbit_alarm.erl 89 handle_event({set_alarm, {{vm_memory_high_watermark, Node}, []}}, State) -> 90 {ok, maybe_alert(fun sets:add_element/2, Node, State)}; 91 92 handle_event({clear_alarm, {vm_memory_high_watermark, Node}}, State) -> 93 {ok, maybe_alert(fun sets:del_element/2, Node, State)}; 126 maybe_alert(SetFun, Node, State = #alarms{alarmed_nodes = AN, 127 alertees = Alertees}) -> 128 AN1 = SetFun(Node, AN), 129 BeforeSz = sets:size(AN), 130 AfterSz = sets:size(AN1), 131 %% If we have changed our alarm state, inform the remotes. 132 IsLocal = Node =:= node(), % 如果是local alarm % 就會通知其他的remote node 133 if IsLocal andalso BeforeSz < AfterSz -> ok = alert_remote(true, Alertees); 134 IsLocal andalso BeforeSz > AfterSz -> ok = alert_remote(false, Alertees); 135 true -> ok 136 end, 137 %% If the overall alarm state has changed, inform the locals. 138 case {BeforeSz, AfterSz} of 139 {0, 1} -> ok = alert_local(true, Alertees); 140 {1, 0} -> ok = alert_local(false, Alertees); 141 {_, _} -> ok 142 end, % 更新alarmed_node 143 State#alarms{alarmed_nodes = AN1}. 145 alert_local(Alert, Alertees) -> alert(Alert, Alertees, fun erlang:'=:='/2). 146 147 alert_remote(Alert, Alertees) -> alert(Alert, Alertees, fun erlang:'=/='/2). 148 % 找出所有需要通知的node, 並且執行Node之前register的callback 149 alert(Alert, Alertees, NodeComparator) -> 150 Node = node(), 151 dict:fold(fun (Pid, {M, F, A}, ok) -> 152 case NodeComparator(Node, node(Pid)) of 153 true -> apply(M, F, A ++ [Pid, Alert]); 154 false -> ok 155 end 156 end, ok, Alertees).
所以之後其他的module, 只需要register callback, 當memory用量過高, 要怎麼處理就行, 這部份的code 是放在rabbit_reader.erl, 也就是當connection coming的時候.
rabbit_reader.erl 691 State1 = internal_conserve_memory( % register callback % 當記憶體使用過量 % 便會呼叫 692 rabbit_alarm:register(self(), {?MODULE, conserve_memory, []}), 693 State#v1{connection_state = running, 694 connection = NewConnection}), % 當使用過量, state改為blocking % 不在接收新的connection 348 internal_conserve_memory(true, State = #v1{connection_state = running}) -> 349 State#v1{connection_state = blocking}; 350 internal_conserve_memory(false, State = #v1{connection_state = blocking}) -> 351 State#v1{connection_state = running}; 352 internal_conserve_memory(false, State = #v1{connection_state = blocked, 353 heartbeater = Heartbeater}) -> 354 ok = rabbit_heartbeat:resume_monitor(Heartbeater), 355 State#v1{connection_state = running}; 356 internal_conserve_memory(_Conserve, State) -> 357 State
另外當新的cluster node 啟動的時候, 假設現在Node A收到Node B up的event, Node A會跟Node B註冊一個remote_conserve_memory callback, 當Node B 記憶體過大, 他就會呼叫Node A 開始節省memory, 直到原本的alarm被clear.
rabbit_alarm.erl 95 handle_event({node_up, Node}, State) -> 96 %% Must do this via notify and not call to avoid possible deadlock. 97 ok = gen_event:notify( 98 {alarm_handler, Node}, 99 {register, self(), {?MODULE, remote_conserve_memory, []}}), 100 {ok, State}; 69 remote_conserve_memory(Pid, true) -> % 叫自己開始節省記憶體 70 gen_event:notify({alarm_handler, node(Pid)}, 71 {set_alarm, {{vm_memory_high_watermark, node()}, []}});
沒有留言:
張貼留言