Upgrade to irc_amqp_listener with new childspec.
authorMatt Mullins <mmullins@mmlx.us>
Sun, 5 May 2013 07:00:20 +0000 (00:00 -0700)
committerMatt Mullins <mmullins@mmlx.us>
Sun, 5 May 2013 07:30:36 +0000 (00:30 -0700)
Bump the irc_conn state version and use the code_change capablities to recreate
the irc_amqp_listener process with the appropriate childspec upon up/downgrade.

irc/irc_amqp_listener.erl
irc/irc_conn.erl
irc/irc_object_sup.erl

index fe4fc7f..785660f 100644 (file)
@@ -8,6 +8,7 @@
 
 -export([
           start_link/2,
+          stop/1,
           encode_routing_key/1,
           decode_routing_key/1,
           send_message/3
@@ -15,6 +16,7 @@
 
 -export([
           init/1,
+          handle_call/3,
           handle_info/2,
           code_change/3,
           terminate/2
@@ -31,6 +33,16 @@ init({ConnectionPid, RoutingKey}) ->
     {ok, _, ListeningChannel} = amqp_bot_listener:listen_for_events(RoutingKey),
     {ok, {ConnectionPid, ListeningChannel}}.
 
+stop(Pid) ->
+    gen_server:call(Pid, stop).
+
+%% Handle a public request to stop gracefully, since the previous version of
+%% the irc_object_sup was set up to brutal_kill this process.  The
+%% irc_amqp_listener is restarted in the upgrade process to update the
+%% childspec.
+handle_call(stop, From, {ConnectionPid, ListeningChannel}) ->
+    {stop, shutdown, {ConnectionPid, ListeningChannel, From}}.
+
 %% Ignore the message indicating that the listener is operating correctly
 handle_info(#'basic.consume_ok'{}, State) ->
     {noreply, State};
@@ -61,6 +73,10 @@ code_change(2, {ConnectionPid}, _) ->
 code_change({down, 2}, {ConnectionPid, _ListeningChannel}, _) ->
     {ok, {ConnectionPid}}.
 
+terminate(shutdown, {ConnectionPid, ListeningChannel, From}) ->
+    terminate(shutdown, {ConnectionPid, ListeningChannel}),
+    gen_server:reply(From, ok);
+
 terminate(Reason, {ConnectionPid, ListeningChannel}) ->
     error_logger:warning_msg("Tearing down channel ~p for reason ~p", [ListeningChannel, Reason]),
        amqp_channel:close(ListeningChannel).
index 05eff3c..2900037 100644 (file)
@@ -1,6 +1,6 @@
 -module(irc_conn).
 -behavior(gen_server).
--vsn(3).
+-vsn(4).
 
 -define(RECONNECT_TIME, 30000).
 
@@ -158,6 +158,31 @@ do_privmsg(_Command = #irc_command{middles = Middles, trailing = Text}) ->
 terminate(_Reason, _State) ->
     ok.
 
+code_change(3, State, ok) ->
+    RoutingKey = get_routing_key(State#irc_state.instance),
+    AmqpListener = irc_object_sup:get_amqp_listener(
+            State#irc_state.object_sup,
+            RoutingKey),
+    case AmqpListener of
+        {error, notfound} -> ok;
+        Pid -> irc_amqp_listener:stop(Pid)
+    end,
+    irc_object_sup:delete_amqp_listener(
+            State#irc_state.object_sup,
+            RoutingKey),
+    gen_server:cast(self(), create_object_sup),
+    {ok, State};
+
+code_change({down, 3}, State, ok) ->
+    RoutingKey = get_routing_key(State#irc_state.instance),
+    % Downgrade doesn't need to ask the irc_amqp_listener to stop gracefully,
+    % since the childspec actually allows it to terminate itself cleanly.
+    irc_object_sup:delete_amqp_listener(
+            State#irc_state.object_sup,
+            RoutingKey),
+    gen_server:cast(self(), create_object_sup),
+    {ok, State};
+
 code_change(_OldVsn, _State, _Extra) ->
        {error, unsupported}.
 
index dd4b44a..f76694d 100644 (file)
@@ -6,7 +6,9 @@
 
 -export([
           start_link/3,
-          add_amqp_listener/3
+          add_amqp_listener/3,
+          get_amqp_listener/2,
+          delete_amqp_listener/2
         ]).
 -export([init/1]).
 
@@ -27,3 +29,18 @@ add_amqp_listener(ObjectSup, ConnectionPid, RoutingKey) ->
              [irc_amqp_listener]
             },
     supervisor:start_child(ObjectSup, Child).
+
+get_amqp_listener(ObjectSup, RoutingKey) ->
+    ChildId = "amqp_" ++ RoutingKey,
+    Children = supervisor:which_children(ObjectSup),
+    case lists:filter(fun (X) -> element(1, X) == ChildId end, Children) of
+        [{ChildId, Pid, _Type, _Mods}] ->
+            Pid;
+        _ ->
+            {error, notfound}
+    end.
+
+delete_amqp_listener(ObjectSup, RoutingKey) ->
+    ChildId = "amqp_" ++ RoutingKey,
+    supervisor:terminate_child(ObjectSup, ChildId),
+    supervisor:delete_child(ObjectSup, ChildId).