-export([
start_link/2,
+ stop/1,
encode_routing_key/1,
decode_routing_key/1,
send_message/3
-export([
init/1,
+ handle_call/3,
handle_info/2,
code_change/3,
terminate/2
{ok, _, ListeningChannel} = amqp_bot_listener:listen_for_events(RoutingKey),
{ok, {ConnectionPid, ListeningChannel}}.
+stop(Pid) ->
+ gen_server:call(Pid, stop).
+
+%% Handle a public request to stop gracefully, since the previous version of
+%% the irc_object_sup was set up to brutal_kill this process. The
+%% irc_amqp_listener is restarted in the upgrade process to update the
+%% childspec.
+handle_call(stop, From, {ConnectionPid, ListeningChannel}) ->
+ {stop, shutdown, {ConnectionPid, ListeningChannel, From}}.
+
%% Ignore the message indicating that the listener is operating correctly
handle_info(#'basic.consume_ok'{}, State) ->
{noreply, State};
code_change({down, 2}, {ConnectionPid, _ListeningChannel}, _) ->
{ok, {ConnectionPid}}.
+terminate(shutdown, {ConnectionPid, ListeningChannel, From}) ->
+ terminate(shutdown, {ConnectionPid, ListeningChannel}),
+ gen_server:reply(From, ok);
+
terminate(Reason, {ConnectionPid, ListeningChannel}) ->
error_logger:warning_msg("Tearing down channel ~p for reason ~p", [ListeningChannel, Reason]),
amqp_channel:close(ListeningChannel).
-module(irc_conn).
-behavior(gen_server).
--vsn(3).
+-vsn(4).
-define(RECONNECT_TIME, 30000).
terminate(_Reason, _State) ->
ok.
+code_change(3, State, ok) ->
+ RoutingKey = get_routing_key(State#irc_state.instance),
+ AmqpListener = irc_object_sup:get_amqp_listener(
+ State#irc_state.object_sup,
+ RoutingKey),
+ case AmqpListener of
+ {error, notfound} -> ok;
+ Pid -> irc_amqp_listener:stop(Pid)
+ end,
+ irc_object_sup:delete_amqp_listener(
+ State#irc_state.object_sup,
+ RoutingKey),
+ gen_server:cast(self(), create_object_sup),
+ {ok, State};
+
+code_change({down, 3}, State, ok) ->
+ RoutingKey = get_routing_key(State#irc_state.instance),
+ % Downgrade doesn't need to ask the irc_amqp_listener to stop gracefully,
+ % since the childspec actually allows it to terminate itself cleanly.
+ irc_object_sup:delete_amqp_listener(
+ State#irc_state.object_sup,
+ RoutingKey),
+ gen_server:cast(self(), create_object_sup),
+ {ok, State};
+
code_change(_OldVsn, _State, _Extra) ->
{error, unsupported}.
-export([
start_link/3,
- add_amqp_listener/3
+ add_amqp_listener/3,
+ get_amqp_listener/2,
+ delete_amqp_listener/2
]).
-export([init/1]).
[irc_amqp_listener]
},
supervisor:start_child(ObjectSup, Child).
+
+get_amqp_listener(ObjectSup, RoutingKey) ->
+ ChildId = "amqp_" ++ RoutingKey,
+ Children = supervisor:which_children(ObjectSup),
+ case lists:filter(fun (X) -> element(1, X) == ChildId end, Children) of
+ [{ChildId, Pid, _Type, _Mods}] ->
+ Pid;
+ _ ->
+ {error, notfound}
+ end.
+
+delete_amqp_listener(ObjectSup, RoutingKey) ->
+ ChildId = "amqp_" ++ RoutingKey,
+ supervisor:terminate_child(ObjectSup, ChildId),
+ supervisor:delete_child(ObjectSup, ChildId).