Move IRC->AMQP message sending into AMQP process
authorMatt Mullins <mmullins@mmlx.us>
Mon, 4 Jan 2016 07:38:43 +0000 (23:38 -0800)
committerMatt Mullins <mmullins@mmlx.us>
Mon, 4 Jan 2016 07:38:43 +0000 (23:38 -0800)
This allows the message to be dropped if, for example, the AMQP process is
sleeping on the rate limit timer and causes a timeout instead of handling the
get_connection call.  Previously, this would crash the init process of
irc_amqp_listener, and since the gen_server timeout is only 5 seconds, the
irc_object_sup would reach its max restart intensity and exit.

As an added bonus: this is actually more reasonable message passing anyway.

amqp/amqp_bot_connection.erl
irc/irc_amqp_listener.erl

index 2bc3189..42fa65b 100644 (file)
@@ -4,13 +4,14 @@
 
 -export([
           start_link/1,
-          get_connection/0,
-                 open_channel/0
+          open_channel/0, % TODO: get rid of open_channel as an external call
+          send_message/2
         ]).
 
 -export([
           init/1,
-          handle_call/3
+          handle_call/3,
+          handle_cast/2
         ]).
 
 -define(RESTART, timer:seconds(30)).
@@ -28,6 +29,9 @@ open_channel() ->
     Connection = get_connection(),
     amqp_connection:open_channel(Connection).
 
+send_message(#'basic.publish'{} = Headers, Content) ->
+    gen_server:cast(?MODULE, {send_message, Headers, Content}).
+
 % Callbacks
 init({RatelimitTableId}) ->
     rate_limit:wait_and_reset(RatelimitTableId, amqp_connection, ?RESTART),
@@ -36,6 +40,15 @@ init({RatelimitTableId}) ->
 handle_call(get_connection, _From, Connection) ->
     {reply, Connection, Connection}.
 
+handle_cast({send_message, Headers, Content}, Connection) ->
+    % Currently only support basic.publish requests.
+    #'basic.publish'{} = Headers,
+
+    {ok, Channel} = amqp_connection:open_channel(Connection),
+    amqp_channel:call(Channel, Headers, Content),
+    amqp_channel:close(Channel),
+    {noreply, Connection}.
+
 start_link_connection() ->
     Username = get_config(username, "guest"),
     Password = get_config(password, "guest"),
index a10f2db..35e6f5f 100644 (file)
@@ -88,15 +88,14 @@ send_message(RoutingKey, ReplyTo, Body) ->
     BinKey = list_to_binary(RoutingKey),
     BinReplyKey = list_to_binary(ReplyTo),
     BinBody = list_to_binary(Body),
-    {ok, Channel} = amqp_bot_connection:open_channel(),
-    amqp_channel:call(Channel, #'basic.publish'{ routing_key = BinKey,
-                                                 exchange = ?EXCHANGE },
-                               #amqp_msg{ payload = BinBody,
-                                          props = #'P_basic' {
-                                              reply_to = BinReplyKey
-                                          }
-                                        }),
-    amqp_channel:close(Channel).
+    Headers = #'basic.publish'{ routing_key = BinKey,
+                                exchange = ?EXCHANGE },
+    Content = #amqp_msg{ payload = BinBody,
+                         props = #'P_basic' {
+                              reply_to = BinReplyKey
+                         }
+                       },
+    amqp_bot_connection:send_message(Headers, Content).
 
 %% @doc Encode an IRC-domain string as an AMQP-compatible routing key -- that
 %% is, in the set [A-Za-z0-9}.  We encode them as follows: