Use one AMQP listener with wildcard, per ircnet.
authorMatt Mullins <mmullins@mmlx.us>
Mon, 14 Jan 2013 06:42:24 +0000 (22:42 -0800)
committerMatt Mullins <mmullins@mmlx.us>
Mon, 14 Jan 2013 06:42:24 +0000 (22:42 -0800)
irc/irc_amqp_listener.erl
irc/irc_conn.erl
irc/irc_object_sup.erl

index 2218955..125c3be 100644 (file)
@@ -5,7 +5,9 @@
 -vsn(1).
 
 -export([
-          start_link/3
+          start_link/2,
+          encode_routing_key/1,
+          decode_routing_key/1
         ]).
 
 -export([
 -include("irc_util.hrl").
 -include_lib("amqp_client/include/amqp_client.hrl").
 
-start_link(ConnectionPid, Destination, RoutingKey) ->
-    gen_server:start_link(?MODULE, {ConnectionPid, Destination, RoutingKey}, []).
+start_link(ConnectionPid, RoutingKey) ->
+    gen_server:start_link(?MODULE, {ConnectionPid, RoutingKey}, []).
 
-init({ConnectionPid, Destination, RoutingKey}) ->
+init({ConnectionPid, RoutingKey}) ->
     {ok, _} = amqp_bot_listener:listen_for_events(RoutingKey),
-    {ok, {ConnectionPid, Destination}}.
+    {ok, {ConnectionPid}}.
 
 %% Ignore the message indicating that the listener is operating correctly
 handle_info(#'basic.consume_ok'{}, State) ->
@@ -30,10 +32,61 @@ handle_info(#'basic.consume_ok'{}, State) ->
 handle_info(
         { #'basic.deliver'{routing_key = RoutingKey},
           #amqp_msg{payload = BinContent} },
-        State = { ConnectionPid, Destination }) ->
+        State = {ConnectionPid}) ->
+    error_logger:info_msg("Got a message ~p for routing key ~p~n",
+            [BinContent, RoutingKey]),
+    [_, _, DestinationKey] = string:tokens(binary_to_list(RoutingKey), "."),
+    Destination = decode_routing_key(DestinationKey),
     Content = binary_to_list(BinContent),
     Command = #irc_command{ command = "PRIVMSG",
                             middles = [ Destination ],
                             trailing = Content },
     irc_conn:send_command(ConnectionPid, Command),
     {noreply, State}.
+
+%% @doc Encode an IRC-domain string as an AMQP-compatible routing key -- that
+%% is, in the set [A-Za-z0-9}.  We encode them as follows:
+%%   * lower-case letters are unchanged.
+%%   * capital letters are encoded as "C" followed by the lower-case letter.
+%%   * anything else is encoded as "X" followed by the hex representation of
+%%     the character.
+%%
+%% Example:
+%%     "#my-AWEsome|channel" -> "X23myX2DCaCwCesomeX7Cchannel"
+%%
+%% Yes, this is ugly, but it's straightforward and channel names stay
+%% reasonably readable in this fashion.
+encode_routing_key(Text) ->
+       lists:flatmap(fun encode_character/1, Text).
+
+encode_character(Char) ->
+       if
+        Char >= $a, Char =< $z; Char >= $0, Char =< $9 ->
+            [Char];
+        Char >= $A, Char =< $Z ->
+            [$C, string:to_lower(Char)];
+        true ->
+            [$X] ++ integer_to_list(Char, 16)
+    end.
+
+%% @doc Decode a routing key, from the encoding specified for
+%% encode_routing_key/1.
+%%
+%% TODO: This function accepts some malformed strings.  Should probably handle
+%% them as errors instead.
+decode_routing_key(RoutingKey) ->
+    lists:reverse(decode_routing_key(RoutingKey, [])).
+
+decode_routing_key([], Acc) ->
+    Acc;
+
+decode_routing_key(RoutingKey, Acc) ->
+    This = case RoutingKey of
+        "X" ++ [H1, H2 | Tail] ->
+            list_to_integer([H1, H2], 16);
+        "C" ++ [Char | Tail] ->
+            string:to_upper(Char);
+        [Anything | Tail] ->
+            Anything
+    end,
+    decode_routing_key(Tail, [This | Acc]).
index 643f967..9b983c6 100644 (file)
@@ -92,6 +92,7 @@ handle_cast(create_object_sup, State) ->
                 supervisor = Supervisor,
                 table_id = TableId } = State,
     {ok, ObjectSupPid} = irc_net_sup:create_object_sup(Instance, Supervisor, TableId),
+       add_amqp_listener(Instance, ObjectSupPid),
     {noreply, State#irc_state{ object_sup = ObjectSupPid }}.
 
 %% Handle incoming data
@@ -193,19 +194,13 @@ send_command(Command) ->
 
 join_channel(Channel, State) ->
     Command = #irc_command{command = "JOIN", middles = [Channel]},
-    send_command(Command),
-    add_amqp_listener(Channel, State).
-
-add_amqp_listener(Channel, State) ->
-    #irc_state{object_sup = ObjectSup,
-               instance = Instance
-              } = State,
-    case lists:keyfind({Instance, Channel}, 1, config:get_config(irc_amqp_bind)) of
-        {{Instance, Channel}, RoutingKey} ->
-            {ok, _ChildPid} = irc_object_sup:add_amqp_listener(
-                    ObjectSup, self(), Channel, RoutingKey);
-        false -> ok
-    end.
+    send_command(Command).
+
+add_amqp_listener(Instance, ObjectSup) ->
+       InstanceKey = irc_amqp_listener:encode_routing_key(atom_to_list(Instance)),
+       RoutingKey = InstanceKey ++ ".output.*",
+       {ok, _ChildPid} = irc_object_sup:add_amqp_listener(
+                       ObjectSup, self(), RoutingKey).
 
 %% @doc Splits a buffer into full lines, keeping whatever is left over
 %% Returns either {ok, [Line], Remaining} or none.
index ab00a87..055de24 100644 (file)
@@ -6,7 +6,7 @@
 
 -export([
           start_link/3,
-          add_amqp_listener/4
+          add_amqp_listener/3
         ]).
 -export([init/1]).
 
@@ -18,10 +18,9 @@ init({_Instance, _Supervisor, _TableId}) ->
     RestartStrategy = {one_for_one, 5, 60},
     {ok, {RestartStrategy, Children}}.
 
-add_amqp_listener(ObjectSup, ConnectionPid, Channel, RoutingKey) ->
-    Child = {"amqp_" ++ Channel,
-             % TODO: ↓ fix the routing key
-             {irc_amqp_listener, start_link, [ConnectionPid, Channel, RoutingKey]},
+add_amqp_listener(ObjectSup, ConnectionPid, RoutingKey) ->
+    Child = {"amqp_" ++ RoutingKey,
+             {irc_amqp_listener, start_link, [ConnectionPid, RoutingKey]},
              transient,
              brutal_kill,
              worker,