#!/usr/bin/perl # Асинхронный доступ к RabbitMQ # Юрий Жиловец, 20.08.2015 package rabbit_async; use Modern::Perl; use EV; use AnyEvent; use AnyEvent::RabbitMQ; use Data::Dumper; use Mojo::JSON qw/j/; use Data::UUID; use Promises qw/deferred collect/; my $uuid_gen = new Data::UUID; sub new { my $class = shift; my $arg = shift; my $callback = shift; my $self = $class->create($arg); $self->connect($callback)->done(sub { $callback->($self) if $callback }); return $self; } sub create { my $class = shift; my $arg = shift; my $host = $arg->{host}; my $user = $arg->{user}; my $password = $arg->{password}; my $rab = AnyEvent::RabbitMQ->new->load_xml_spec(); $rab->{verbose} = 1 if $arg->{verbose}; my $on_error = $arg->{on_error} || sub { say "RabbitMQ error: ".shift(); exit() }; my $on_close = $arg->{on_close} || $on_error; my $on_connect_error = $arg->{on_connect_error} || $on_error; my $self = bless { rabbit=>$rab, on_error=>$on_error, on_close => $on_close, on_connect_error => $on_connect_error, host=>$host, user=>$user, product => $arg->{product} || "rabbit_async", password=>$password, rpc_calls => {}, }; return $self; } sub _connect { my $self = shift; my $def = deferred; my %params = ( host => $self->{host}, port => 5672, user => $self->{user}, pass => $self->{password}, vhost => "/", timeout => 1, tls => 0, tune => { heartbeat => 60, channel_max => 1 }, client_properties => { product => $self->{product}, }, on_failure => sub { $def->reject(join(" ",@_)) }, on_read_failure => sub { $self->{on_error}->(@_) }, on_return => sub { my $frame = shift; $self->{on_error}->("Unable to deliver frame" . Dumper($frame)); }, on_close => sub { my $why = shift; if (ref($why)) { my $method_frame = $why->method_frame; $self->{on_close}->("Connection closed:" . $method_frame->reply_text); } else { $self->{on_close}->("Connection closed: $why"); } }, on_success => sub { $def->resolve() }, ); $self->{rabbit}->connect(%params); return $def->promise; } sub connect { my $self = shift; return $self->_connect->then(sub { $self->_open_channel; })->then(sub { my $channel = shift; $self->{chan} = $channel; })->catch(sub { $self->{on_connect_error}->(@_); }); } sub _open_channel { my $self = shift; my $deferred = deferred; my %params = (); $params{on_success} = sub { $deferred->resolve(shift) }; $params{on_failure} = sub { $deferred->reject(@_) }; $params{on_close} = sub { my $why = shift; if (ref($why)) { my $method_frame = $why->method_frame; $self->{on_error}->("Channel closed: " . $method_frame->reply_text); } else { $self->{on_error}->("Channel closed: $why"); } }; $self->{rabbit}->open_channel(%params); return $deferred->promise; } sub send { my $self = shift; my $key = shift; my $obj = shift || {}; my $header = shift || {}; $obj = (j($obj) || {}) if ref($obj); $header->{delivery_mode} = 2; $self->{chan}->publish( body => $obj, exchange => "mol", routing_key => $key, header => $header, ); } sub call { my $self = shift; my $sub = pop; my $key = shift; my $obj = shift || {}; my $header = shift || {}; my $uniq = $uuid_gen->create_str(); $self->{rpc_calls}->{$uniq} = $sub; $self->_reply_queue->then(sub { my $queue = shift; $header->{reply_to} = $queue; $header->{correlation_id} = $uniq; $self->send($key,$obj,$header); }); } sub _reply_queue { my $self = shift; return deferred->resolve($self->{reply_queue})->promise if $self->{reply_queue}; return $self->_declare_queue( queue => "", no_ack => 1, durable => 0, exclusive => 1 )->then(sub { my $queue = shift; return $self->_bind_queue($queue,$queue); })->then(sub { my $queue = shift; $self->{reply_queue} = $queue; return $self->_consume( queue => $queue, no_ack => 1, sub { my $msg = shift; my $corr_id = $msg->{header}->{correlation_id}; if (exists $self->{rpc_calls}->{$corr_id}) { (delete $self->{rpc_calls}->{$corr_id})->({ message => $msg, content => j($msg->{body}->{payload}) || {}, header => $msg->{header}, routing_key => $msg->{deliver}->{method_frame}->{routing_key}, }); } }); }) ->then(sub { $self->{reply_queue} }) ->catch(sub { $self->{on_error}->(@_) }); } sub emit { my $self = shift; my $key = shift; my $obj = shift || {}; my $header = shift || {}; $obj = (j($obj) || {}) if ref($obj); $header->{delivery_mode} = 1; $self->{chan}->publish( body => $obj, exchange => "mol", routing_key => $key, header => $header, ); } sub _declare_queue { my $self = shift; my %params = @_; my $deferred = deferred; $params{on_success} = sub { my $method = shift; my $name = $method->method_frame->queue; $deferred->resolve($name); }; $params{on_failure} = sub { $deferred->reject("_declare_queue failure [".$params{queue}."]: ".$_[2]) }; $self->{chan}->declare_queue(%params); return $deferred->promise; } sub _bind_queue { my $self = shift; my $queue = shift; my $key = shift; my $deferred = deferred; $self->{chan}->bind_queue( queue => $queue, exchange => "mol", routing_key => $key, on_success => sub { $deferred->resolve($queue) }, on_failure => sub { $deferred->reject("_bind_queue failure [$queue/$key]: ".$_[0]) }, ); return $deferred->promise; } sub _bind_queue_to_many_keys { my $self = shift; my $queue = shift; my @keys = @_; return collect(map { $self->_bind_queue($queue,$_) } @keys)->then(sub { $queue }); } sub _consume { my $self = shift; my $sub = pop; my %params = @_; my $deferred = deferred; $params{on_failure} = sub { $deferred->reject("_consume failure [".$params{queue}."]: ".$_[2]) }; $params{on_success} = sub { $deferred->resolve() }; $params{on_consume} = $sub; $self->{chan}->consume(%params); return $deferred->promise; } sub subscribe { my $self = shift; my $keys = shift; my $callback = shift; $self->_declare_queue( queue => "", no_ack => 1, durable => 0, exclusive => 1 )->then(sub { my $queue = shift; $keys = [ $keys] unless ref($keys) eq "ARRAY"; return $self->_bind_queue_to_many_keys($queue,@$keys); })->then(sub { my $queue = shift; return $self->_consume( queue => $queue, no_ack => 1, sub { my $msg = shift; $callback->({ message => $msg, content => j($msg->{body}->{payload}) || {}, header => $msg->{header}, routing_key => $msg->{deliver}->{method_frame}->{routing_key}, }); }); })->catch(sub { $self->{on_error}->(@_) }); } sub listen_queue { my $self = shift; my $queue = shift; my $bind = shift; my $callback = shift; $self->_declare_queue( queue => $queue, no_ack => 0, durable => 1, auto_delete => 0, )->then(sub { return $self->_bind_queue($queue,$bind) })->then(sub { return $self->_consume( queue => $queue, no_ack => 0, sub { my $msg = shift; $callback->({ message => $msg, content => j($msg->{body}->{payload}) || {}, header => $msg->{header}, routing_key => $msg->{deliver}->{method_frame}->{routing_key}, }); }); })->catch(sub { $self->{on_error}->(@_) }); } sub ack { my $self = shift; my $m = shift; $self->{chan}->ack( delivery_tag => $m->{message}->{deliver}->{method_frame}->{delivery_tag}, ); } sub reject { my $self = shift; my $m = shift; $self->{chan}->reject( delivery_tag => $m->{message}->{deliver}->{method_frame}->{delivery_tag}, requeue => 0, ); } sub requeue { my $self = shift; my $m = shift; $self->{chan}->reject( delivery_tag => $m->{message}->{deliver}->{method_frame}->{delivery_tag}, requeue => 1, ); } sub reply { my $self = shift; my $msg = shift; my $obj = shift || {}; my $header = shift || {}; $header->{delivery_mode} = 2; $header->{correlation_id} = $msg->{header}->{correlation_id}; $obj = (j($obj) || {}) if ref($obj); $self->{chan}->publish( body => $obj, exchange => "mol", routing_key => $msg->{header}->{reply_to}, header => $header, ); } sub DESTROY { my $self = shift; $self->{rabbit} = undef; } 1;