AMQP supports attaching a process to handle messages sent to an exchange.
authorMatt Mullins <mmullins@mmlx.us>
Thu, 25 Aug 2011 05:02:26 +0000 (00:02 -0500)
committerMatt Mullins <mmullins@mmlx.us>
Sun, 28 Aug 2011 19:55:49 +0000 (14:55 -0500)
amqp/amqp.app [new file with mode: 0644]
amqp/amqp_app.erl [new file with mode: 0644]
amqp/amqp_bot_connection.erl [new file with mode: 0644]
amqp/amqp_bot_listener.erl [new file with mode: 0644]
amqp/amqp_bot_sup.erl [new file with mode: 0644]

diff --git a/amqp/amqp.app b/amqp/amqp.app
new file mode 100644 (file)
index 0000000..1562c04
--- /dev/null
@@ -0,0 +1,7 @@
+{application, amqp,
+ [{description, "AMQP client operations"},
+  {vsn, "1"},
+  {modules, [amqp_app, amqp_bot_sup, amqp_bot_connection, amqp_bot_listener]},
+  {applications, [core]},
+  {mod, {amqp_app, []}}
+ ]}.
diff --git a/amqp/amqp_app.erl b/amqp/amqp_app.erl
new file mode 100644 (file)
index 0000000..ee3d030
--- /dev/null
@@ -0,0 +1,8 @@
+%% @doc AMQP client call-back module
+-module(amqp_app).
+-behavior(application).
+
+-export([start/2]).
+
+start(normal, _Args) ->
+    amqp_bot_sup:start_link().
diff --git a/amqp/amqp_bot_connection.erl b/amqp/amqp_bot_connection.erl
new file mode 100644 (file)
index 0000000..e0744eb
--- /dev/null
@@ -0,0 +1,39 @@
+-module(amqp_bot_connection).
+-behavior(gen_server).
+-vsn(1).
+
+-export([
+          start_link/0,
+          get_connection/0,
+                 open_channel/0
+        ]).
+
+-export([
+          init/1,
+          handle_call/3
+        ]).
+
+-include_lib("amqp_client/include/amqp_client.hrl").
+
+% Public-facing
+start_link() ->
+    gen_server:start_link({local, ?MODULE}, ?MODULE, none, []).
+
+get_connection() ->
+    gen_server:call(?MODULE, get_connection).
+
+open_channel() ->
+    Connection = get_connection(),
+    amqp_connection:open_channel(Connection).
+
+% Callbacks
+init(_Args) ->
+    start_link_connection().
+
+handle_call(get_connection, _From, Connection) ->
+    {reply, Connection, Connection}.
+
+start_link_connection() ->
+    {ok, Pid} = amqp_connection:start(#amqp_params_network{}),
+    link(Pid),
+    {ok, Pid}.
diff --git a/amqp/amqp_bot_listener.erl b/amqp/amqp_bot_listener.erl
new file mode 100644 (file)
index 0000000..8d24d8f
--- /dev/null
@@ -0,0 +1,38 @@
+%% @doc Utilities to create a queue and listen to AMQP events
+-module(amqp_bot_listener).
+-vsn(1).
+
+-define(EXCHANGE, <<"irc">>).
+
+-export([
+           listen_for_events/1
+        ]).
+
+-include_lib("amqp_client/include/amqp_client.hrl").
+
+%% @doc Sets up the current PID as an AMQP consumer on the given routing key.
+%% Currently, this creates the channel to use for this -- eventually, I would
+%% like to be able to support allowing the client to undo this action, but
+%% since we're throwing away the channel reference.
+listen_for_events(RoutingKeyString) ->
+    {ok, Channel} = amqp_bot_connection:open_channel(),
+    RoutingKeyBinary = list_to_binary(RoutingKeyString),
+    Pid = self(),
+    #'exchange.declare_ok'{} = amqp_channel:call(Channel,
+        #'exchange.declare'{ exchange = ?EXCHANGE,
+                             type = <<"topic">>
+                           }),
+    #'queue.declare_ok'{ queue = Queue } = amqp_channel:call(Channel,
+        #'queue.declare'{ exclusive = true }),
+    #'queue.bind_ok'{} = amqp_channel:call(Channel,
+        #'queue.bind'{ exchange = ?EXCHANGE,
+                       queue = Queue,
+                       routing_key = RoutingKeyBinary
+                     }),
+    #'basic.consume_ok'{ consumer_tag = ConsumerTag } =
+        amqp_channel:subscribe(Channel,
+            #'basic.consume'{ queue = Queue,
+                              no_ack = true
+                            },
+            Pid),
+    {ok, ConsumerTag}.
diff --git a/amqp/amqp_bot_sup.erl b/amqp/amqp_bot_sup.erl
new file mode 100644 (file)
index 0000000..a1a69c4
--- /dev/null
@@ -0,0 +1,37 @@
+%% @doc Top-level supervisor for the AMQP application
+-module(amqp_bot_sup).
+-behavior(supervisor).
+
+%% Public functions
+-export([start_link/0]).
+
+%% Callbacks for supervisor behavior
+-export([init/1]).
+-export([add_child/4]).
+
+-compile(export_all).
+
+start_link() ->
+    supervisor:start_link({local, ?MODULE}, ?MODULE, []).
+
+init(_) ->
+    Children = [
+                 {amqp_bot_connection, % Id
+                  {amqp_bot_connection, start_link, []}, % MFA
+                  permanent,
+                  5,
+                  worker,
+                  [amqp_bot_connection]
+                 }],
+    Restart = {one_for_all, 5, 60},
+    {ok, {Restart, Children}}.
+
+add_child(Id, M, F, A) ->
+    ChildSpec = {Id, % Id
+                 {M, F, A},
+                 permanent,
+                 5,
+                 worker,
+                 [M]
+                },
+    supervisor:start_child(?MODULE, ChildSpec).