| 123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364365366367368369370371372373374375376377378379380381382383384385386387388389390391392393394395396397398399400401402403404405406407408409410411412413414415416417418419420421422423424425426427428429430431432433434435436437438439440441442443444445446447448449450451452453454455456457458459460461462463464465466467468469470471472473474475476477478479480 |
- #!/usr/bin/perl
- # Асинхронный доступ к RabbitMQ
- # Юрий Жиловец, 20.08.2015
- package rabbit_async;
- use Modern::Perl;
- use EV;
- use AnyEvent;
- use AnyEvent::RabbitMQ;
- use Data::Dumper;
- use Mojo::JSON qw/j/;
- use Data::UUID;
- use Mojo::Promise;
- use constant DEFAULT_CALL_TIMEOUT => 15;
- my $uuid_gen = new Data::UUID;
- sub new
- {
- my $class = shift;
- my $arg = shift;
- my $callback = shift;
-
- my $self = $class->create($arg);
- $self->connect($callback)->then(sub { $callback->($self) if $callback });
-
- return $self;
- }
- sub create
- {
- my $class = shift;
- my $arg = shift;
-
- my $host = $arg->{host};
- my $user = $arg->{user};
- my $password = $arg->{password};
-
- my $rab = AnyEvent::RabbitMQ->new->load_xml_spec();
- $rab->{verbose} = 1 if $arg->{verbose};
- my $on_error = $arg->{on_error} || sub { say "RabbitMQ error: ".shift(); exit() };
- my $on_close = $arg->{on_close} || $on_error;
- my $on_connect_error = $arg->{on_connect_error} || $on_error;
-
- my $self = bless {
- rabbit=>$rab,
- on_error=>$on_error,
- on_close => $on_close,
- on_connect_error => $on_connect_error,
- host=>$host,
- user=>$user,
- product => $arg->{product} || "rabbit_async",
- password=>$password,
- rpc_calls => {},
- delayed => [],
- };
- return $self;
- }
- sub _connect
- {
- my $self = shift;
-
- my $def = Mojo::Promise->new;
- my %params = (
- host => $self->{host},
- port => 5672,
- user => $self->{user},
- pass => $self->{password},
- vhost => "/",
- timeout => 1,
- tls => 0,
- tune => { heartbeat => 60, channel_max => 1 },
- client_properties => {
- product => $self->{product},
- },
- on_failure => sub { $def->reject(join(" ",@_)) },
- on_read_failure => sub { $self->{on_error}->(@_) },
- on_return => sub {
- my $frame = shift;
- $self->{on_error}->("Unable to deliver frame" . Dumper($frame));
- },
- on_close => sub {
- my $why = shift;
- if (ref($why)) {
- my $method_frame = $why->method_frame;
- $self->{on_close}->("Connection closed:" . $method_frame->reply_text);
- }
- else {
- $self->{on_close}->("Connection closed: $why");
- }
- },
- on_success => sub { $def->resolve() },
- );
-
- $self->{rabbit}->connect(%params);
- return $def;
- }
- sub connect
- {
- my $self = shift;
-
- return $self->_connect->then(sub
- {
- $self->_open_channel;
- })->then(sub
- {
- my $channel = shift;
- $self->{chan} = $channel;
- })->catch(sub
- {
- $self->{on_connect_error}->(@_);
- die @_;
- });
- }
- sub _open_channel
- {
- my $self = shift;
-
- my $deferred = Mojo::Promise->new;
- my %params = ();
-
- $params{on_success} = sub { $deferred->resolve(shift) };
- $params{on_failure} = sub { $deferred->reject(@_) };
- $params{on_close} = sub {
- my $why = shift;
- if (ref($why)) {
- my $method_frame = $why->method_frame;
- $self->{on_error}->("Channel closed: " . $method_frame->reply_text);
- }
- else {
- $self->{on_error}->("Channel closed: $why");
- }
- };
-
- $self->{rabbit}->open_channel(%params);
- return $deferred;
- }
- sub send
- {
- shift->_send(2, @_);
- }
- sub emit
- {
- shift->_send(1, @_);
- }
- sub _send
- {
- my $self = shift;
-
- if ($self->{chan}->is_active)
- {
- $self->_publish(0, @_);
-
- foreach (@{ $self->{delayed} })
- {
- $self->_publish(@$_);
- }
- }
- else
- {
- push @{ $self->{delayed} }, [ 0.1, @_];
- }
- }
- sub _publish
- {
- my $self = shift;
- my $delay = shift;
- my $mode = shift;
- my $key = shift;
- my $obj = shift || {};
- my $header = shift || {};
- Mojo::IOLoop->timer($delay => sub
- {
- $obj = (j($obj) || {}) if ref($obj);
- $header->{delivery_mode} = $mode;
- $self->{chan}->publish(
- body => $obj,
- exchange => "mol",
- routing_key => $key,
- header => $header,
- );
- });
- }
- my $ttt;
- sub call
- {
- my $self = shift;
- my $sub = pop;
- my $key = shift;
- my $obj = shift || {};
- my $header = shift || {};
- my $timeout = shift || DEFAULT_CALL_TIMEOUT;
- my $uniq = $uuid_gen->create_str();
- $self->{rpc_calls}->{$uniq} = $sub;
-
- my $timer = AnyEvent->timer(after=>$timeout, cb => sub
- {
- my $sub = delete $self->{rpc_calls}->{$uniq};
- $sub->({error=>"Timeout"}) if $sub;
- delete $self->{rpc_calls}->{"$uniq-timer"};
- });
- $ttt=$timer;
- $self->{rpc_calls}->{"$uniq-timer"} = $timer;
- $self->_reply_queue->then(sub {
- my $queue = shift;
-
- $header->{reply_to} = $queue;
- $header->{correlation_id} = $uniq;
-
- $self->send($key,$obj,$header);
- });
- }
- sub _reply_queue
- {
- my $self = shift;
- return Mojo::Promise->new->resolve($self->{reply_queue}) if $self->{reply_queue};
-
- return $self->_declare_queue(
- queue => "",
- no_ack => 1,
- durable => 0,
- exclusive => 1
- )->then(sub {
- my $queue = shift;
- return $self->_bind_queue($queue,$queue);
- })->then(sub {
- my $queue = shift;
- $self->{reply_queue} = $queue;
- return $self->_consume(
- queue => $queue,
- no_ack => 1,
- sub
- {
- my $msg = shift;
- my $corr_id = $msg->{header}->{correlation_id};
- my $sub = delete $self->{rpc_calls}->{$corr_id};
- $sub->({
- message => $msg,
- content => j($msg->{body}->{payload}) || {},
- header => $msg->{header},
- routing_key => $msg->{deliver}->{method_frame}->{routing_key},
- }) if $sub;
- });
- })
- ->then(sub { $self->{reply_queue} })
- ->catch(sub { $self->{on_error}->(@_) });
- }
- sub _declare_queue
- {
- my $self = shift;
- my %params = @_;
- my $deferred = Mojo::Promise->new;
- $params{on_success} = sub {
- my $method = shift;
- my $name = $method->method_frame->queue;
- $deferred->resolve($name);
- };
-
- $params{on_failure} = sub { $deferred->reject("_declare_queue failure [".$params{queue}."]: ".$_[2]) };
-
- $self->{chan}->declare_queue(%params);
- return $deferred;
- }
- sub _bind_queue
- {
- my $self = shift;
- my $queue = shift;
- my $key = shift;
- my $deferred = Mojo::Promise->new;
-
- $self->{chan}->bind_queue(
- queue => $queue,
- exchange => "mol",
- routing_key => $key,
- on_success => sub { $deferred->resolve($queue) },
- on_failure => sub { $deferred->reject("_bind_queue failure [$queue/$key]: ".$_[0]) },
- );
- return $deferred;
- }
- sub _bind_queue_to_many_keys
- {
- my $self = shift;
- my $queue = shift;
- my @keys = @_;
- return Mojo::Promise->all(map { $self->_bind_queue($queue,$_) } @keys)->then(sub { $queue });
- }
- sub _consume
- {
- my $self = shift;
- my $sub = pop;
- my %params = @_;
- my $deferred = Mojo::Promise->new;
-
- $params{on_failure} = sub { $deferred->reject("_consume failure [".$params{queue}."]: ".$_[2]) };
- $params{on_success} = sub { $deferred->resolve() };
- $params{on_consume} = $sub;
-
- $self->{chan}->consume(%params);
- return $deferred;
- }
- sub subscribe
- {
- my $self = shift;
- my $keys = shift;
- my $callback = shift;
-
- $self->subscribe_queue("", $keys, $callback);
- }
- sub subscribe_queue
- {
- my $self = shift;
- my $queue = shift;
- my $keys = shift;
- my $callback = shift;
-
- $self->_declare_queue(
- queue => $queue,
- no_ack => 1,
- durable => 0,
- exclusive => 1
- )->then(sub {
- my $queue = shift;
- $keys = [ $keys] unless ref($keys) eq "ARRAY";
- return $self->_bind_queue_to_many_keys($queue,@$keys);
- })->then(sub {
- my $queue = shift;
- return $self->_consume(
- queue => $queue,
- no_ack => 1,
- sub
- {
- my $msg = shift;
- $callback->({
- message => $msg,
- content => j($msg->{body}->{payload}) || {},
- header => $msg->{header},
- routing_key => $msg->{deliver}->{method_frame}->{routing_key},
- });
- });
- })->catch(sub { $self->{on_error}->(@_) });
- }
- sub listen_queue
- {
- my $self = shift;
- my $queue = shift;
- my $bind = shift;
- my $callback = shift;
- $self->_declare_queue(
- queue => $queue,
- no_ack => 0,
- durable => 0,
- auto_delete => 0,
- )->then(sub {
- return $self->_bind_queue_to_many_keys($queue, ref($bind) ? @$bind : $bind)
- })->then(sub {
- return $self->_consume(
- queue => $queue,
- no_ack => 0,
- sub
- {
- my $msg = shift;
- $callback->({
- message => $msg,
- content => j($msg->{body}->{payload}) || {},
- header => $msg->{header},
- routing_key => $msg->{deliver}->{method_frame}->{routing_key},
- });
- });
- })->catch(sub { $self->{on_error}->(@_) });
- }
- sub ack
- {
- my $self = shift;
- my $m = shift;
-
- $self->{chan}->ack(
- delivery_tag => $m->{message}->{deliver}->{method_frame}->{delivery_tag},
- );
- }
- sub reject
- {
- my $self = shift;
- my $m = shift;
-
- $self->{chan}->reject(
- delivery_tag => $m->{message}->{deliver}->{method_frame}->{delivery_tag},
- requeue => 0,
- );
- }
- sub requeue
- {
- my $self = shift;
- my $m = shift;
- $self->{chan}->reject(
- delivery_tag => $m->{message}->{deliver}->{method_frame}->{delivery_tag},
- requeue => 1,
- );
- }
- sub reply
- {
- my $self = shift;
- my $msg = shift;
- my $obj = shift || {};
- my $header = shift || {};
- $header->{delivery_mode} = 2;
- $header->{correlation_id} = $msg->{header}->{correlation_id};
- $obj = (j($obj) || {}) if ref($obj);
- $self->{chan}->publish(
- body => $obj,
- exchange => "mol",
- routing_key => $msg->{header}->{reply_to},
- header => $header,
- );
- }
- sub prefetch
- {
- my $self = shift;
- my $cnt = shift;
-
- my $deferred = Mojo::Promise->new;
- $self->{chan}->qos(
- on_failure => sub { $deferred->reject(join(" ",@_)) },
- on_success => sub { $deferred->resolve() },
- prefetch_count => $cnt,
- global => 0, # только для канала, хотя особой разницы нет - у нас нету мультиплексированных соединений
- );
-
- return $deferred;
- }
- sub DESTROY
- {
- my $self = shift;
- $self->{rabbit} = undef;
- }
- 1;
|