#!/usr/bin/perl # Асинхронный доступ к RabbitMQ # Юрий Жиловец, 20.08.2015 package rabbit_async; use Modern::Perl; use EV; use AnyEvent; use AnyEvent::RabbitMQ; use Data::Dumper; use Mojo::JSON qw/j/; use Data::UUID; use Mojo::Promise; use constant DEFAULT_CALL_TIMEOUT => 15; my $uuid_gen = new Data::UUID; sub new { my $class = shift; my $arg = shift; my $callback = shift; my $self = $class->create($arg); $self->connect($callback)->then(sub { $callback->($self) if $callback }); return $self; } sub create { my $class = shift; my $arg = shift; my $host = $arg->{host}; my $user = $arg->{user}; my $password = $arg->{password}; my $rab = AnyEvent::RabbitMQ->new->load_xml_spec(); $rab->{verbose} = 1 if $arg->{verbose}; my $on_error = $arg->{on_error} || sub { say "RabbitMQ error: ".shift(); exit() }; my $on_close = $arg->{on_close} || $on_error; my $on_connect_error = $arg->{on_connect_error} || $on_error; my $self = bless { rabbit=>$rab, on_error=>$on_error, on_close => $on_close, on_connect_error => $on_connect_error, host=>$host, user=>$user, product => $arg->{product} || "rabbit_async", password=>$password, rpc_calls => {}, delayed => [], }; return $self; } sub _connect { my $self = shift; my $def = Mojo::Promise->new; my %params = ( host => $self->{host}, port => 5672, user => $self->{user}, pass => $self->{password}, vhost => "/", timeout => 1, tls => 0, tune => { heartbeat => 60, channel_max => 1 }, client_properties => { product => $self->{product}, }, on_failure => sub { $def->reject(join(" ",@_)) }, on_read_failure => sub { $self->{on_error}->(@_) }, on_return => sub { my $frame = shift; $self->{on_error}->("Unable to deliver frame" . Dumper($frame)); }, on_close => sub { my $why = shift; if (ref($why)) { my $method_frame = $why->method_frame; $self->{on_close}->("Connection closed:" . $method_frame->reply_text); } else { $self->{on_close}->("Connection closed: $why"); } }, on_success => sub { $def->resolve() }, ); $self->{rabbit}->connect(%params); return $def; } sub connect { my $self = shift; return $self->_connect->then(sub { $self->_open_channel; })->then(sub { my $channel = shift; $self->{chan} = $channel; })->catch(sub { $self->{on_connect_error}->(@_); die @_; }); } sub _open_channel { my $self = shift; my $deferred = Mojo::Promise->new; my %params = (); $params{on_success} = sub { $deferred->resolve(shift) }; $params{on_failure} = sub { $deferred->reject(@_) }; $params{on_close} = sub { my $why = shift; if (ref($why)) { my $method_frame = $why->method_frame; $self->{on_error}->("Channel closed: " . $method_frame->reply_text); } else { $self->{on_error}->("Channel closed: $why"); } }; $self->{rabbit}->open_channel(%params); return $deferred; } sub send { shift->_send(2, @_); } sub emit { shift->_send(1, @_); } sub _send { my $self = shift; if ($self->{chan}->is_active) { $self->_publish(0, @_); foreach (@{ $self->{delayed} }) { $self->_publish(@$_); } } else { push @{ $self->{delayed} }, [ 0.1, @_]; } } sub _publish { my $self = shift; my $delay = shift; my $mode = shift; my $key = shift; my $obj = shift || {}; my $header = shift || {}; Mojo::IOLoop->timer($delay => sub { $obj = (j($obj) || {}) if ref($obj); $header->{delivery_mode} = $mode; $self->{chan}->publish( body => $obj, exchange => "mol", routing_key => $key, header => $header, ); }); } my $ttt; sub call { my $self = shift; my $sub = pop; my $key = shift; my $obj = shift || {}; my $header = shift || {}; my $timeout = shift || DEFAULT_CALL_TIMEOUT; my $uniq = $uuid_gen->create_str(); $self->{rpc_calls}->{$uniq} = $sub; my $timer = AnyEvent->timer(after=>$timeout, cb => sub { my $sub = delete $self->{rpc_calls}->{$uniq}; $sub->({error=>"Timeout"}) if $sub; delete $self->{rpc_calls}->{"$uniq-timer"}; }); $ttt=$timer; $self->{rpc_calls}->{"$uniq-timer"} = $timer; $self->_reply_queue->then(sub { my $queue = shift; $header->{reply_to} = $queue; $header->{correlation_id} = $uniq; $self->send($key,$obj,$header); }); } sub _reply_queue { my $self = shift; return Mojo::Promise->new->resolve($self->{reply_queue}) if $self->{reply_queue}; return $self->_declare_queue( queue => "", no_ack => 1, durable => 0, exclusive => 1 )->then(sub { my $queue = shift; return $self->_bind_queue($queue,$queue); })->then(sub { my $queue = shift; $self->{reply_queue} = $queue; return $self->_consume( queue => $queue, no_ack => 1, sub { my $msg = shift; my $corr_id = $msg->{header}->{correlation_id}; my $sub = delete $self->{rpc_calls}->{$corr_id}; $sub->({ message => $msg, content => j($msg->{body}->{payload}) || {}, header => $msg->{header}, routing_key => $msg->{deliver}->{method_frame}->{routing_key}, }) if $sub; }); }) ->then(sub { $self->{reply_queue} }) ->catch(sub { $self->{on_error}->(@_) }); } sub _declare_queue { my $self = shift; my %params = @_; my $deferred = Mojo::Promise->new; $params{on_success} = sub { my $method = shift; my $name = $method->method_frame->queue; $deferred->resolve($name); }; $params{on_failure} = sub { $deferred->reject("_declare_queue failure [".$params{queue}."]: ".$_[2]) }; $self->{chan}->declare_queue(%params); return $deferred; } sub _bind_queue { my $self = shift; my $queue = shift; my $key = shift; my $deferred = Mojo::Promise->new; $self->{chan}->bind_queue( queue => $queue, exchange => "mol", routing_key => $key, on_success => sub { $deferred->resolve($queue) }, on_failure => sub { $deferred->reject("_bind_queue failure [$queue/$key]: ".$_[0]) }, ); return $deferred; } sub _bind_queue_to_many_keys { my $self = shift; my $queue = shift; my @keys = @_; return Mojo::Promise->all(map { $self->_bind_queue($queue,$_) } @keys)->then(sub { $queue }); } sub _consume { my $self = shift; my $sub = pop; my %params = @_; my $deferred = Mojo::Promise->new; $params{on_failure} = sub { $deferred->reject("_consume failure [".$params{queue}."]: ".$_[2]) }; $params{on_success} = sub { $deferred->resolve() }; $params{on_consume} = $sub; $self->{chan}->consume(%params); return $deferred; } sub subscribe { my $self = shift; my $keys = shift; my $callback = shift; $self->subscribe_queue("", $keys, $callback); } sub subscribe_queue { my $self = shift; my $queue = shift; my $keys = shift; my $callback = shift; $self->_declare_queue( queue => $queue, no_ack => 1, durable => 0, exclusive => 1 )->then(sub { my $queue = shift; $keys = [ $keys] unless ref($keys) eq "ARRAY"; return $self->_bind_queue_to_many_keys($queue,@$keys); })->then(sub { my $queue = shift; return $self->_consume( queue => $queue, no_ack => 1, sub { my $msg = shift; $callback->({ message => $msg, content => j($msg->{body}->{payload}) || {}, header => $msg->{header}, routing_key => $msg->{deliver}->{method_frame}->{routing_key}, }); }); })->catch(sub { $self->{on_error}->(@_) }); } sub listen_queue { my $self = shift; my $queue = shift; my $bind = shift; my $callback = shift; $self->_declare_queue( queue => $queue, no_ack => 0, durable => 0, auto_delete => 0, )->then(sub { return $self->_bind_queue_to_many_keys($queue, ref($bind) ? @$bind : $bind) })->then(sub { return $self->_consume( queue => $queue, no_ack => 0, sub { my $msg = shift; $callback->({ message => $msg, content => j($msg->{body}->{payload}) || {}, header => $msg->{header}, routing_key => $msg->{deliver}->{method_frame}->{routing_key}, }); }); })->catch(sub { $self->{on_error}->(@_) }); } sub ack { my $self = shift; my $m = shift; $self->{chan}->ack( delivery_tag => $m->{message}->{deliver}->{method_frame}->{delivery_tag}, ); } sub reject { my $self = shift; my $m = shift; $self->{chan}->reject( delivery_tag => $m->{message}->{deliver}->{method_frame}->{delivery_tag}, requeue => 0, ); } sub requeue { my $self = shift; my $m = shift; $self->{chan}->reject( delivery_tag => $m->{message}->{deliver}->{method_frame}->{delivery_tag}, requeue => 1, ); } sub reply { my $self = shift; my $msg = shift; my $obj = shift || {}; my $header = shift || {}; $header->{delivery_mode} = 2; $header->{correlation_id} = $msg->{header}->{correlation_id}; $obj = (j($obj) || {}) if ref($obj); $self->{chan}->publish( body => $obj, exchange => "mol", routing_key => $msg->{header}->{reply_to}, header => $header, ); } sub prefetch { my $self = shift; my $cnt = shift; my $deferred = Mojo::Promise->new; $self->{chan}->qos( on_failure => sub { $deferred->reject(join(" ",@_)) }, on_success => sub { $deferred->resolve() }, prefetch_count => $cnt, global => 0, # только для канала, хотя особой разницы нет - у нас нету мультиплексированных соединений ); return $deferred; } sub DESTROY { my $self = shift; $self->{rabbit} = undef; } 1;