From 151385d7187b3b2cc0c12ad91601e390d6228d4f Mon Sep 17 00:00:00 2001 From: Matt Mullins Date: Thu, 25 Aug 2011 00:02:26 -0500 Subject: [PATCH] AMQP supports attaching a process to handle messages sent to an exchange. --- amqp/amqp.app | 7 +++++++ amqp/amqp_app.erl | 8 ++++++++ amqp/amqp_bot_connection.erl | 39 +++++++++++++++++++++++++++++++++++++++ amqp/amqp_bot_listener.erl | 38 ++++++++++++++++++++++++++++++++++++++ amqp/amqp_bot_sup.erl | 37 +++++++++++++++++++++++++++++++++++++ 5 files changed, 129 insertions(+) create mode 100644 amqp/amqp.app create mode 100644 amqp/amqp_app.erl create mode 100644 amqp/amqp_bot_connection.erl create mode 100644 amqp/amqp_bot_listener.erl create mode 100644 amqp/amqp_bot_sup.erl diff --git a/amqp/amqp.app b/amqp/amqp.app new file mode 100644 index 0000000..1562c04 --- /dev/null +++ b/amqp/amqp.app @@ -0,0 +1,7 @@ +{application, amqp, + [{description, "AMQP client operations"}, + {vsn, "1"}, + {modules, [amqp_app, amqp_bot_sup, amqp_bot_connection, amqp_bot_listener]}, + {applications, [core]}, + {mod, {amqp_app, []}} + ]}. diff --git a/amqp/amqp_app.erl b/amqp/amqp_app.erl new file mode 100644 index 0000000..ee3d030 --- /dev/null +++ b/amqp/amqp_app.erl @@ -0,0 +1,8 @@ +%% @doc AMQP client call-back module +-module(amqp_app). +-behavior(application). + +-export([start/2]). + +start(normal, _Args) -> + amqp_bot_sup:start_link(). diff --git a/amqp/amqp_bot_connection.erl b/amqp/amqp_bot_connection.erl new file mode 100644 index 0000000..e0744eb --- /dev/null +++ b/amqp/amqp_bot_connection.erl @@ -0,0 +1,39 @@ +-module(amqp_bot_connection). +-behavior(gen_server). +-vsn(1). + +-export([ + start_link/0, + get_connection/0, + open_channel/0 + ]). + +-export([ + init/1, + handle_call/3 + ]). + +-include_lib("amqp_client/include/amqp_client.hrl"). + +% Public-facing +start_link() -> + gen_server:start_link({local, ?MODULE}, ?MODULE, none, []). + +get_connection() -> + gen_server:call(?MODULE, get_connection). + +open_channel() -> + Connection = get_connection(), + amqp_connection:open_channel(Connection). + +% Callbacks +init(_Args) -> + start_link_connection(). + +handle_call(get_connection, _From, Connection) -> + {reply, Connection, Connection}. + +start_link_connection() -> + {ok, Pid} = amqp_connection:start(#amqp_params_network{}), + link(Pid), + {ok, Pid}. diff --git a/amqp/amqp_bot_listener.erl b/amqp/amqp_bot_listener.erl new file mode 100644 index 0000000..8d24d8f --- /dev/null +++ b/amqp/amqp_bot_listener.erl @@ -0,0 +1,38 @@ +%% @doc Utilities to create a queue and listen to AMQP events +-module(amqp_bot_listener). +-vsn(1). + +-define(EXCHANGE, <<"irc">>). + +-export([ + listen_for_events/1 + ]). + +-include_lib("amqp_client/include/amqp_client.hrl"). + +%% @doc Sets up the current PID as an AMQP consumer on the given routing key. +%% Currently, this creates the channel to use for this -- eventually, I would +%% like to be able to support allowing the client to undo this action, but +%% since we're throwing away the channel reference. +listen_for_events(RoutingKeyString) -> + {ok, Channel} = amqp_bot_connection:open_channel(), + RoutingKeyBinary = list_to_binary(RoutingKeyString), + Pid = self(), + #'exchange.declare_ok'{} = amqp_channel:call(Channel, + #'exchange.declare'{ exchange = ?EXCHANGE, + type = <<"topic">> + }), + #'queue.declare_ok'{ queue = Queue } = amqp_channel:call(Channel, + #'queue.declare'{ exclusive = true }), + #'queue.bind_ok'{} = amqp_channel:call(Channel, + #'queue.bind'{ exchange = ?EXCHANGE, + queue = Queue, + routing_key = RoutingKeyBinary + }), + #'basic.consume_ok'{ consumer_tag = ConsumerTag } = + amqp_channel:subscribe(Channel, + #'basic.consume'{ queue = Queue, + no_ack = true + }, + Pid), + {ok, ConsumerTag}. diff --git a/amqp/amqp_bot_sup.erl b/amqp/amqp_bot_sup.erl new file mode 100644 index 0000000..a1a69c4 --- /dev/null +++ b/amqp/amqp_bot_sup.erl @@ -0,0 +1,37 @@ +%% @doc Top-level supervisor for the AMQP application +-module(amqp_bot_sup). +-behavior(supervisor). + +%% Public functions +-export([start_link/0]). + +%% Callbacks for supervisor behavior +-export([init/1]). +-export([add_child/4]). + +-compile(export_all). + +start_link() -> + supervisor:start_link({local, ?MODULE}, ?MODULE, []). + +init(_) -> + Children = [ + {amqp_bot_connection, % Id + {amqp_bot_connection, start_link, []}, % MFA + permanent, + 5, + worker, + [amqp_bot_connection] + }], + Restart = {one_for_all, 5, 60}, + {ok, {Restart, Children}}. + +add_child(Id, M, F, A) -> + ChildSpec = {Id, % Id + {M, F, A}, + permanent, + 5, + worker, + [M] + }, + supervisor:start_child(?MODULE, ChildSpec). -- 2.11.0