From: Matt Mullins Date: Mon, 14 Jan 2013 06:42:24 +0000 (-0800) Subject: Use one AMQP listener with wildcard, per ircnet. X-Git-Tag: v8~1 X-Git-Url: http://git.mmlx.us/?a=commitdiff_plain;h=0d2be5fb1836b2d2d5e79bdeedccb0134a73fed4;p=erlbot.git Use one AMQP listener with wildcard, per ircnet. --- diff --git a/irc/irc_amqp_listener.erl b/irc/irc_amqp_listener.erl index 2218955..125c3be 100644 --- a/irc/irc_amqp_listener.erl +++ b/irc/irc_amqp_listener.erl @@ -5,7 +5,9 @@ -vsn(1). -export([ - start_link/3 + start_link/2, + encode_routing_key/1, + decode_routing_key/1 ]). -export([ @@ -16,12 +18,12 @@ -include("irc_util.hrl"). -include_lib("amqp_client/include/amqp_client.hrl"). -start_link(ConnectionPid, Destination, RoutingKey) -> - gen_server:start_link(?MODULE, {ConnectionPid, Destination, RoutingKey}, []). +start_link(ConnectionPid, RoutingKey) -> + gen_server:start_link(?MODULE, {ConnectionPid, RoutingKey}, []). -init({ConnectionPid, Destination, RoutingKey}) -> +init({ConnectionPid, RoutingKey}) -> {ok, _} = amqp_bot_listener:listen_for_events(RoutingKey), - {ok, {ConnectionPid, Destination}}. + {ok, {ConnectionPid}}. %% Ignore the message indicating that the listener is operating correctly handle_info(#'basic.consume_ok'{}, State) -> @@ -30,10 +32,61 @@ handle_info(#'basic.consume_ok'{}, State) -> handle_info( { #'basic.deliver'{routing_key = RoutingKey}, #amqp_msg{payload = BinContent} }, - State = { ConnectionPid, Destination }) -> + State = {ConnectionPid}) -> + error_logger:info_msg("Got a message ~p for routing key ~p~n", + [BinContent, RoutingKey]), + [_, _, DestinationKey] = string:tokens(binary_to_list(RoutingKey), "."), + Destination = decode_routing_key(DestinationKey), Content = binary_to_list(BinContent), Command = #irc_command{ command = "PRIVMSG", middles = [ Destination ], trailing = Content }, irc_conn:send_command(ConnectionPid, Command), {noreply, State}. + +%% @doc Encode an IRC-domain string as an AMQP-compatible routing key -- that +%% is, in the set [A-Za-z0-9}. We encode them as follows: +%% * lower-case letters are unchanged. +%% * capital letters are encoded as "C" followed by the lower-case letter. +%% * anything else is encoded as "X" followed by the hex representation of +%% the character. +%% +%% Example: +%% "#my-AWEsome|channel" -> "X23myX2DCaCwCesomeX7Cchannel" +%% +%% Yes, this is ugly, but it's straightforward and channel names stay +%% reasonably readable in this fashion. +encode_routing_key(Text) -> + lists:flatmap(fun encode_character/1, Text). + +encode_character(Char) -> + if + Char >= $a, Char =< $z; Char >= $0, Char =< $9 -> + [Char]; + Char >= $A, Char =< $Z -> + [$C, string:to_lower(Char)]; + true -> + [$X] ++ integer_to_list(Char, 16) + end. + +%% @doc Decode a routing key, from the encoding specified for +%% encode_routing_key/1. +%% +%% TODO: This function accepts some malformed strings. Should probably handle +%% them as errors instead. +decode_routing_key(RoutingKey) -> + lists:reverse(decode_routing_key(RoutingKey, [])). + +decode_routing_key([], Acc) -> + Acc; + +decode_routing_key(RoutingKey, Acc) -> + This = case RoutingKey of + "X" ++ [H1, H2 | Tail] -> + list_to_integer([H1, H2], 16); + "C" ++ [Char | Tail] -> + string:to_upper(Char); + [Anything | Tail] -> + Anything + end, + decode_routing_key(Tail, [This | Acc]). diff --git a/irc/irc_conn.erl b/irc/irc_conn.erl index 643f967..9b983c6 100644 --- a/irc/irc_conn.erl +++ b/irc/irc_conn.erl @@ -92,6 +92,7 @@ handle_cast(create_object_sup, State) -> supervisor = Supervisor, table_id = TableId } = State, {ok, ObjectSupPid} = irc_net_sup:create_object_sup(Instance, Supervisor, TableId), + add_amqp_listener(Instance, ObjectSupPid), {noreply, State#irc_state{ object_sup = ObjectSupPid }}. %% Handle incoming data @@ -193,19 +194,13 @@ send_command(Command) -> join_channel(Channel, State) -> Command = #irc_command{command = "JOIN", middles = [Channel]}, - send_command(Command), - add_amqp_listener(Channel, State). - -add_amqp_listener(Channel, State) -> - #irc_state{object_sup = ObjectSup, - instance = Instance - } = State, - case lists:keyfind({Instance, Channel}, 1, config:get_config(irc_amqp_bind)) of - {{Instance, Channel}, RoutingKey} -> - {ok, _ChildPid} = irc_object_sup:add_amqp_listener( - ObjectSup, self(), Channel, RoutingKey); - false -> ok - end. + send_command(Command). + +add_amqp_listener(Instance, ObjectSup) -> + InstanceKey = irc_amqp_listener:encode_routing_key(atom_to_list(Instance)), + RoutingKey = InstanceKey ++ ".output.*", + {ok, _ChildPid} = irc_object_sup:add_amqp_listener( + ObjectSup, self(), RoutingKey). %% @doc Splits a buffer into full lines, keeping whatever is left over %% Returns either {ok, [Line], Remaining} or none. diff --git a/irc/irc_object_sup.erl b/irc/irc_object_sup.erl index ab00a87..055de24 100644 --- a/irc/irc_object_sup.erl +++ b/irc/irc_object_sup.erl @@ -6,7 +6,7 @@ -export([ start_link/3, - add_amqp_listener/4 + add_amqp_listener/3 ]). -export([init/1]). @@ -18,10 +18,9 @@ init({_Instance, _Supervisor, _TableId}) -> RestartStrategy = {one_for_one, 5, 60}, {ok, {RestartStrategy, Children}}. -add_amqp_listener(ObjectSup, ConnectionPid, Channel, RoutingKey) -> - Child = {"amqp_" ++ Channel, - % TODO: ↓ fix the routing key - {irc_amqp_listener, start_link, [ConnectionPid, Channel, RoutingKey]}, +add_amqp_listener(ObjectSup, ConnectionPid, RoutingKey) -> + Child = {"amqp_" ++ RoutingKey, + {irc_amqp_listener, start_link, [ConnectionPid, RoutingKey]}, transient, brutal_kill, worker,