|
|
@@ -11,6 +11,10 @@ use AnyEvent;
|
|
|
use AnyEvent::RabbitMQ;
|
|
|
use Data::Dumper;
|
|
|
use Mojo::JSON qw/j/;
|
|
|
+use Data::UUID;
|
|
|
+use Promises qw/deferred collect/;
|
|
|
+
|
|
|
+my $uuid_gen = new Data::UUID;
|
|
|
|
|
|
sub new
|
|
|
{
|
|
|
@@ -18,26 +22,48 @@ sub new
|
|
|
my $arg = shift;
|
|
|
my $callback = shift;
|
|
|
|
|
|
+ my $self = $class->create($arg);
|
|
|
+ $self->connect($callback)->done(sub { $callback->($self) if $callback });
|
|
|
+
|
|
|
+ return $self;
|
|
|
+}
|
|
|
+
|
|
|
+sub create
|
|
|
+{
|
|
|
+ my $class = shift;
|
|
|
+ my $arg = shift;
|
|
|
+
|
|
|
my $host = $arg->{host};
|
|
|
my $user = $arg->{user};
|
|
|
my $password = $arg->{password};
|
|
|
|
|
|
my $rab = AnyEvent::RabbitMQ->new->load_xml_spec();
|
|
|
-# $rab->{verbose}= 1;
|
|
|
+ $rab->{verbose} = 1 if $arg->{verbose};
|
|
|
my $on_error = $arg->{on_error} || sub { say "RabbitMQ error: ".shift(); exit() };
|
|
|
+ my $on_close = $arg->{on_close} || $on_error;
|
|
|
+ my $on_connect_error = $arg->{on_connect_error} || $on_error;
|
|
|
|
|
|
- my $self = bless {rabbit=>$rab, on_error=>$on_error, host=>$host, user=>$user, password=>$password};
|
|
|
- $self->_connect($callback);
|
|
|
-
|
|
|
+ my $self = bless {
|
|
|
+ rabbit=>$rab,
|
|
|
+ on_error=>$on_error,
|
|
|
+ on_close => $on_close,
|
|
|
+ on_connect_error => $on_connect_error,
|
|
|
+ host=>$host,
|
|
|
+ user=>$user,
|
|
|
+ product => $arg->{product} || "rabbit_async",
|
|
|
+ password=>$password,
|
|
|
+ rpc_calls => {},
|
|
|
+ };
|
|
|
+
|
|
|
return $self;
|
|
|
}
|
|
|
|
|
|
sub _connect
|
|
|
{
|
|
|
my $self = shift;
|
|
|
- my $callback = shift;
|
|
|
-
|
|
|
- $self->{rabbit}->connect(
|
|
|
+
|
|
|
+ my $def = deferred;
|
|
|
+ my %params = (
|
|
|
host => $self->{host},
|
|
|
port => 5672,
|
|
|
user => $self->{user},
|
|
|
@@ -45,47 +71,72 @@ sub _connect
|
|
|
vhost => "/",
|
|
|
timeout => 1,
|
|
|
tls => 0,
|
|
|
- tune => { heartbeat => 30, channel_max => 1 },
|
|
|
-
|
|
|
- on_failure => sub { $self->{on_error}->("connect failure: ".$_[2]) },
|
|
|
-
|
|
|
+ tune => { heartbeat => 60, channel_max => 1 },
|
|
|
+ client_properties => {
|
|
|
+ product => $self->{product},
|
|
|
+ },
|
|
|
+ on_failure => sub { $def->reject(join(" ",@_)) },
|
|
|
on_read_failure => sub { $self->{on_error}->(@_) },
|
|
|
-
|
|
|
on_return => sub {
|
|
|
my $frame = shift;
|
|
|
$self->{on_error}->("Unable to deliver frame" . Dumper($frame));
|
|
|
},
|
|
|
-
|
|
|
on_close => sub {
|
|
|
my $why = shift;
|
|
|
if (ref($why)) {
|
|
|
my $method_frame = $why->method_frame;
|
|
|
- $self->{on_error}->($method_frame->reply_code. ": ". $method_frame->reply_text);
|
|
|
+ $self->{on_close}->("Connection closed:" . $method_frame->reply_text);
|
|
|
}
|
|
|
else {
|
|
|
- $self->{on_error}->($why);
|
|
|
+ $self->{on_close}->("Connection closed: $why");
|
|
|
}
|
|
|
},
|
|
|
-
|
|
|
- on_success => sub
|
|
|
- {
|
|
|
- my $ar = shift;
|
|
|
- $ar->open_channel(
|
|
|
- on_success => sub
|
|
|
- {
|
|
|
- my $channel = shift;
|
|
|
- $self->{channel} = $channel;
|
|
|
- $callback->() if $callback;
|
|
|
- },
|
|
|
- on_failure => sub { $self->{on_error}->("channel failure: ".$_[2]) },
|
|
|
- on_close => sub
|
|
|
- {
|
|
|
- my $method_frame = shift->method_frame;
|
|
|
- $self->{on_error}->($method_frame->reply_code . $method_frame->reply_text);
|
|
|
- },
|
|
|
- );
|
|
|
- },
|
|
|
+ on_success => sub { $def->resolve() },
|
|
|
);
|
|
|
+
|
|
|
+ $self->{rabbit}->connect(%params);
|
|
|
+ return $def->promise;
|
|
|
+}
|
|
|
+
|
|
|
+sub connect
|
|
|
+{
|
|
|
+ my $self = shift;
|
|
|
+
|
|
|
+ return $self->_connect->then(sub
|
|
|
+ {
|
|
|
+ $self->_open_channel;
|
|
|
+ })->then(sub
|
|
|
+ {
|
|
|
+ my $channel = shift;
|
|
|
+ $self->{chan} = $channel;
|
|
|
+ })->catch(sub
|
|
|
+ {
|
|
|
+ $self->{on_connect_error}->(@_);
|
|
|
+ });
|
|
|
+}
|
|
|
+
|
|
|
+sub _open_channel
|
|
|
+{
|
|
|
+ my $self = shift;
|
|
|
+
|
|
|
+ my $deferred = deferred;
|
|
|
+ my %params = ();
|
|
|
+
|
|
|
+ $params{on_success} = sub { $deferred->resolve(shift) };
|
|
|
+ $params{on_failure} = sub { $deferred->reject(@_) };
|
|
|
+ $params{on_close} = sub {
|
|
|
+ my $why = shift;
|
|
|
+ if (ref($why)) {
|
|
|
+ my $method_frame = $why->method_frame;
|
|
|
+ $self->{on_error}->("Channel closed: " . $method_frame->reply_text);
|
|
|
+ }
|
|
|
+ else {
|
|
|
+ $self->{on_error}->("Channel closed: $why");
|
|
|
+ }
|
|
|
+ };
|
|
|
+
|
|
|
+ $self->{rabbit}->open_channel(%params);
|
|
|
+ return $deferred->promise;
|
|
|
}
|
|
|
|
|
|
sub send
|
|
|
@@ -94,17 +145,79 @@ sub send
|
|
|
my $key = shift;
|
|
|
my $obj = shift || {};
|
|
|
my $header = shift || {};
|
|
|
-
|
|
|
+
|
|
|
+ $obj = (j($obj) || {}) if ref($obj);
|
|
|
+
|
|
|
$header->{delivery_mode} = 2;
|
|
|
|
|
|
- $self->{channel}->publish(
|
|
|
- body => j($obj),
|
|
|
+ $self->{chan}->publish(
|
|
|
+ body => $obj,
|
|
|
exchange => "mol",
|
|
|
routing_key => $key,
|
|
|
header => $header,
|
|
|
);
|
|
|
}
|
|
|
|
|
|
+sub call
|
|
|
+{
|
|
|
+ my $self = shift;
|
|
|
+ my $sub = pop;
|
|
|
+ my $key = shift;
|
|
|
+ my $obj = shift || {};
|
|
|
+ my $header = shift || {};
|
|
|
+
|
|
|
+ my $uniq = $uuid_gen->create_str();
|
|
|
+ $self->{rpc_calls}->{$uniq} = $sub;
|
|
|
+
|
|
|
+ $self->_reply_queue->then(sub {
|
|
|
+ my $queue = shift;
|
|
|
+
|
|
|
+ $header->{reply_to} = $queue;
|
|
|
+ $header->{correlation_id} = $uniq;
|
|
|
+
|
|
|
+ $self->send($key,$obj,$header);
|
|
|
+ });
|
|
|
+}
|
|
|
+
|
|
|
+sub _reply_queue
|
|
|
+{
|
|
|
+ my $self = shift;
|
|
|
+ return deferred->resolve($self->{reply_queue})->promise if $self->{reply_queue};
|
|
|
+
|
|
|
+ return $self->_declare_queue(
|
|
|
+ queue => "",
|
|
|
+ no_ack => 1,
|
|
|
+ durable => 0,
|
|
|
+ exclusive => 1
|
|
|
+ )->then(sub {
|
|
|
+ my $queue = shift;
|
|
|
+ return $self->_bind_queue($queue,$queue);
|
|
|
+ })->then(sub {
|
|
|
+ my $queue = shift;
|
|
|
+ $self->{reply_queue} = $queue;
|
|
|
+ return $self->_consume(
|
|
|
+ queue => $queue,
|
|
|
+ no_ack => 1,
|
|
|
+ sub
|
|
|
+ {
|
|
|
+ my $msg = shift;
|
|
|
+
|
|
|
+ my $corr_id = $msg->{header}->{correlation_id};
|
|
|
+ if (exists $self->{rpc_calls}->{$corr_id})
|
|
|
+ {
|
|
|
+ (delete $self->{rpc_calls}->{$corr_id})->({
|
|
|
+ message => $msg,
|
|
|
+ content => j($msg->{body}->{payload}) || {},
|
|
|
+ header => $msg->{header},
|
|
|
+ routing_key => $msg->{deliver}->{method_frame}->{routing_key},
|
|
|
+ });
|
|
|
+ }
|
|
|
+ });
|
|
|
+ })
|
|
|
+ ->then(sub { $self->{reply_queue} })
|
|
|
+ ->catch(sub { $self->{on_error}->(@_) });
|
|
|
+}
|
|
|
+
|
|
|
sub emit
|
|
|
{
|
|
|
my $self = shift;
|
|
|
@@ -112,107 +225,142 @@ sub emit
|
|
|
my $obj = shift || {};
|
|
|
my $header = shift || {};
|
|
|
|
|
|
+ $obj = (j($obj) || {}) if ref($obj);
|
|
|
+
|
|
|
$header->{delivery_mode} = 1;
|
|
|
|
|
|
- $self->{channel}->publish(
|
|
|
- body => j($obj),
|
|
|
+ $self->{chan}->publish(
|
|
|
+ body => $obj,
|
|
|
exchange => "mol",
|
|
|
routing_key => $key,
|
|
|
header => $header,
|
|
|
);
|
|
|
}
|
|
|
|
|
|
+sub _declare_queue
|
|
|
+{
|
|
|
+ my $self = shift;
|
|
|
+ my %params = @_;
|
|
|
+ my $deferred = deferred;
|
|
|
+
|
|
|
+ $params{on_success} = sub {
|
|
|
+ my $method = shift;
|
|
|
+ my $name = $method->method_frame->queue;
|
|
|
+ $deferred->resolve($name);
|
|
|
+ };
|
|
|
+
|
|
|
+ $params{on_failure} = sub { $deferred->reject("_declare_queue failure [".$params{queue}."]: ".$_[2]) };
|
|
|
+
|
|
|
+ $self->{chan}->declare_queue(%params);
|
|
|
+ return $deferred->promise;
|
|
|
+}
|
|
|
+
|
|
|
+sub _bind_queue
|
|
|
+{
|
|
|
+ my $self = shift;
|
|
|
+ my $queue = shift;
|
|
|
+ my $key = shift;
|
|
|
+
|
|
|
+ my $deferred = deferred;
|
|
|
+
|
|
|
+ $self->{chan}->bind_queue(
|
|
|
+ queue => $queue,
|
|
|
+ exchange => "mol",
|
|
|
+ routing_key => $key,
|
|
|
+ on_success => sub { $deferred->resolve($queue) },
|
|
|
+ on_failure => sub { $deferred->reject("_bind_queue failure [$queue/$key]: ".$_[0]) },
|
|
|
+ );
|
|
|
+
|
|
|
+ return $deferred->promise;
|
|
|
+}
|
|
|
+
|
|
|
+sub _bind_queue_to_many_keys
|
|
|
+{
|
|
|
+ my $self = shift;
|
|
|
+ my $queue = shift;
|
|
|
+ my @keys = @_;
|
|
|
+
|
|
|
+ return collect(map { $self->_bind_queue($queue,$_) } @keys)->then(sub { $queue });
|
|
|
+}
|
|
|
+
|
|
|
+sub _consume
|
|
|
+{
|
|
|
+ my $self = shift;
|
|
|
+ my $sub = pop;
|
|
|
+ my %params = @_;
|
|
|
+
|
|
|
+ my $deferred = deferred;
|
|
|
+
|
|
|
+ $params{on_failure} = sub { $deferred->reject("_consume failure [".$params{queue}."]: ".$_[2]) };
|
|
|
+ $params{on_success} = sub { $deferred->resolve() };
|
|
|
+ $params{on_consume} = $sub;
|
|
|
+
|
|
|
+ $self->{chan}->consume(%params);
|
|
|
+ return $deferred->promise;
|
|
|
+}
|
|
|
+
|
|
|
sub subscribe
|
|
|
{
|
|
|
my $self = shift;
|
|
|
my $keys = shift;
|
|
|
my $callback = shift;
|
|
|
|
|
|
- $self->{channel}->declare_queue(
|
|
|
+ $self->_declare_queue(
|
|
|
queue => "",
|
|
|
no_ack => 1,
|
|
|
durable => 0,
|
|
|
- exclusive => 1,
|
|
|
- on_failure => sub { $self->{on_error}->("declare queue failure: ".$_[2]) },
|
|
|
- on_success => sub
|
|
|
- {
|
|
|
- my $method = shift;
|
|
|
- my $name = $method->method_frame->queue;
|
|
|
-
|
|
|
+ exclusive => 1
|
|
|
+ )->then(sub {
|
|
|
+ my $queue = shift;
|
|
|
$keys = [ $keys] unless ref($keys) eq "ARRAY";
|
|
|
- foreach (@$keys)
|
|
|
- {
|
|
|
- $self->{channel}->bind_queue(
|
|
|
- queue => $name,
|
|
|
- exchange => "mol",
|
|
|
- routing_key => $_,
|
|
|
- on_failure => sub { $self->{on_error}->("bind queue failure: ".$_[0]) },
|
|
|
- on_success => sub
|
|
|
- {
|
|
|
- $self->{channel}->consume(
|
|
|
- queue => $name,
|
|
|
- no_ack => 1,
|
|
|
- on_failure => sub { $self->{on_error}->("consume failure: ".$_[0]) },
|
|
|
- on_consume => sub
|
|
|
- {
|
|
|
- my $msg = shift;
|
|
|
- $callback->({
|
|
|
- message => $msg,
|
|
|
- content => j($msg->{body}->{payload}),
|
|
|
- header => $msg->{header},
|
|
|
- routing_key => $msg->{deliver}->{method_frame}->{routing_key},
|
|
|
- });
|
|
|
- },
|
|
|
- );
|
|
|
- },
|
|
|
- );
|
|
|
- }
|
|
|
- }
|
|
|
- );
|
|
|
+ return $self->_bind_queue_to_many_keys($queue,@$keys);
|
|
|
+ })->then(sub {
|
|
|
+ my $queue = shift;
|
|
|
+ return $self->_consume(
|
|
|
+ queue => $queue,
|
|
|
+ no_ack => 1,
|
|
|
+ sub
|
|
|
+ {
|
|
|
+ my $msg = shift;
|
|
|
+ $callback->({
|
|
|
+ message => $msg,
|
|
|
+ content => j($msg->{body}->{payload}) || {},
|
|
|
+ header => $msg->{header},
|
|
|
+ routing_key => $msg->{deliver}->{method_frame}->{routing_key},
|
|
|
+ });
|
|
|
+ });
|
|
|
+ })->catch(sub { $self->{on_error}->(@_) });
|
|
|
}
|
|
|
|
|
|
-
|
|
|
sub listen_queue
|
|
|
{
|
|
|
my $self = shift;
|
|
|
my $queue = shift;
|
|
|
my $bind = shift;
|
|
|
my $callback = shift;
|
|
|
-
|
|
|
- $self->{channel}->declare_queue(
|
|
|
+
|
|
|
+ $self->_declare_queue(
|
|
|
queue => $queue,
|
|
|
no_ack => 0,
|
|
|
durable => 1,
|
|
|
auto_delete => 0,
|
|
|
- on_failure => sub { $self->{on_error}->("declare queue failure: ".$_[2]) },
|
|
|
- on_success => sub
|
|
|
- {
|
|
|
- my $method = shift;
|
|
|
- my $name = $method->method_frame->queue;
|
|
|
-
|
|
|
- $self->{channel}->bind_queue(
|
|
|
- queue => $queue,
|
|
|
- exchange => "mol",
|
|
|
- routing_key => $bind,
|
|
|
- );
|
|
|
-
|
|
|
- $self->{channel}->consume(
|
|
|
+ )->then(sub {
|
|
|
+ return $self->_bind_queue($queue,$bind)
|
|
|
+ })->then(sub {
|
|
|
+ return $self->_consume(
|
|
|
queue => $queue,
|
|
|
no_ack => 0,
|
|
|
- on_failure => sub { $self->{on_error}->("consume failure: ".$_[2]) },
|
|
|
- on_consume => sub
|
|
|
+ sub
|
|
|
{
|
|
|
my $msg = shift;
|
|
|
$callback->({
|
|
|
message => $msg,
|
|
|
- content => j($msg->{body}->{payload}),
|
|
|
+ content => j($msg->{body}->{payload}) || {},
|
|
|
header => $msg->{header},
|
|
|
routing_key => $msg->{deliver}->{method_frame}->{routing_key},
|
|
|
});
|
|
|
- },
|
|
|
- );
|
|
|
- }
|
|
|
- );
|
|
|
+ });
|
|
|
+ })->catch(sub { $self->{on_error}->(@_) });
|
|
|
}
|
|
|
|
|
|
sub ack
|
|
|
@@ -220,7 +368,7 @@ sub ack
|
|
|
my $self = shift;
|
|
|
my $m = shift;
|
|
|
|
|
|
- $self->{channel}->ack(
|
|
|
+ $self->{chan}->ack(
|
|
|
delivery_tag => $m->{message}->{deliver}->{method_frame}->{delivery_tag},
|
|
|
);
|
|
|
}
|
|
|
@@ -230,7 +378,7 @@ sub reject
|
|
|
my $self = shift;
|
|
|
my $m = shift;
|
|
|
|
|
|
- $self->{channel}->reject(
|
|
|
+ $self->{chan}->reject(
|
|
|
delivery_tag => $m->{message}->{deliver}->{method_frame}->{delivery_tag},
|
|
|
requeue => 0,
|
|
|
);
|
|
|
@@ -241,10 +389,36 @@ sub requeue
|
|
|
my $self = shift;
|
|
|
my $m = shift;
|
|
|
|
|
|
- $self->{channel}->reject(
|
|
|
- delivery_tag => $m->{deliver}->{method_frame}->{delivery_tag},
|
|
|
+ $self->{chan}->reject(
|
|
|
+ delivery_tag => $m->{message}->{deliver}->{method_frame}->{delivery_tag},
|
|
|
requeue => 1,
|
|
|
);
|
|
|
}
|
|
|
|
|
|
+sub reply
|
|
|
+{
|
|
|
+ my $self = shift;
|
|
|
+ my $msg = shift;
|
|
|
+ my $obj = shift || {};
|
|
|
+ my $header = shift || {};
|
|
|
+
|
|
|
+ $header->{delivery_mode} = 2;
|
|
|
+ $header->{correlation_id} = $msg->{header}->{correlation_id};
|
|
|
+
|
|
|
+ $obj = (j($obj) || {}) if ref($obj);
|
|
|
+
|
|
|
+ $self->{chan}->publish(
|
|
|
+ body => $obj,
|
|
|
+ exchange => "mol",
|
|
|
+ routing_key => $msg->{header}->{reply_to},
|
|
|
+ header => $header,
|
|
|
+ );
|
|
|
+}
|
|
|
+
|
|
|
+sub DESTROY
|
|
|
+{
|
|
|
+ my $self = shift;
|
|
|
+ $self->{rabbit} = undef;
|
|
|
+}
|
|
|
+
|
|
|
1;
|