| 123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364365366367368369370371372373374375376377378379380381382383384385386387388389390391392393394395396397398399400401402403404405406407408409410411412413414415416417418419420421422423424 | #!/usr/bin/perl# Асинхронный доступ к RabbitMQ# Юрий Жиловец, 20.08.2015package rabbit_async;use Modern::Perl;use EV;use AnyEvent;use AnyEvent::RabbitMQ;use Data::Dumper;use Mojo::JSON qw/j/;use Data::UUID;use Promises qw/deferred collect/;my $uuid_gen = new Data::UUID;sub new{  my $class = shift;  my $arg = shift;  my $callback = shift;      my $self = $class->create($arg);  $self->connect($callback)->done(sub { $callback->($self) if $callback });    return $self;}sub create{  my $class = shift;  my $arg = shift;      my $host = $arg->{host};  my $user = $arg->{user};  my $password = $arg->{password};    my $rab = AnyEvent::RabbitMQ->new->load_xml_spec();  $rab->{verbose} = 1 if $arg->{verbose};  my $on_error = $arg->{on_error} || sub { say "RabbitMQ error: ".shift(); exit() };  my $on_close = $arg->{on_close} || $on_error;  my $on_connect_error = $arg->{on_connect_error} || $on_error;    my $self = bless {    rabbit=>$rab,     on_error=>$on_error,     on_close => $on_close,    on_connect_error => $on_connect_error,    host=>$host,     user=>$user,     product => $arg->{product} || "rabbit_async",    password=>$password,    rpc_calls => {},  };  return $self;}sub _connect{  my $self = shift;    my $def = deferred;  my %params = (    host => $self->{host},    port => 5672,    user => $self->{user},    pass => $self->{password},    vhost => "/",    timeout => 1,    tls => 0,    tune => { heartbeat => 60, channel_max => 1 },    client_properties => {      product => $self->{product},    },    on_failure => sub { $def->reject(join(" ",@_)) },    on_read_failure => sub { $self->{on_error}->(@_) },    on_return  => sub {      my $frame = shift;      $self->{on_error}->("Unable to deliver frame" . Dumper($frame));    },    on_close   => sub {      my $why = shift;      if (ref($why)) {        my $method_frame = $why->method_frame;        $self->{on_close}->("Connection closed:" . $method_frame->reply_text);      }      else {        $self->{on_close}->("Connection closed: $why");      }    },    on_success => sub { $def->resolve() },  );    $self->{rabbit}->connect(%params);  return $def->promise;}sub connect{  my $self = shift;    return $self->_connect->then(sub  {    $self->_open_channel;  })->then(sub  {    my $channel = shift;    $self->{chan} = $channel;  })->catch(sub   {    $self->{on_connect_error}->(@_);  });}sub _open_channel{  my $self = shift;          my $deferred = deferred;  my %params = ();            $params{on_success} = sub { $deferred->resolve(shift) };  $params{on_failure} = sub { $deferred->reject(@_) };  $params{on_close} = sub {      my $why = shift;      if (ref($why)) {        my $method_frame = $why->method_frame;        $self->{on_error}->("Channel closed: " . $method_frame->reply_text);      }      else {        $self->{on_error}->("Channel closed: $why");      }  };                            $self->{rabbit}->open_channel(%params);  return $deferred->promise;}sub send{  my $self = shift;  my $key = shift;  my $obj = shift || {};  my $header = shift || {};  $obj = (j($obj) || {}) if ref($obj);  $header->{delivery_mode} = 2;  $self->{chan}->publish(    body => $obj,    exchange => "mol",    routing_key => $key,    header => $header,  );}sub call{  my $self = shift;  my $sub = pop;  my $key = shift;  my $obj = shift || {};  my $header = shift || {};  my $uniq = $uuid_gen->create_str();  $self->{rpc_calls}->{$uniq} = $sub;  $self->_reply_queue->then(sub {    my $queue = shift;        $header->{reply_to} = $queue;    $header->{correlation_id} = $uniq;      $self->send($key,$obj,$header);  });}sub _reply_queue{  my $self = shift;  return deferred->resolve($self->{reply_queue})->promise if $self->{reply_queue};    return $self->_declare_queue(    queue => "",    no_ack => 1,    durable => 0,    exclusive => 1  )->then(sub {      my $queue = shift;      return $self->_bind_queue($queue,$queue);  })->then(sub {      my $queue = shift;      $self->{reply_queue} = $queue;      return $self->_consume(        queue => $queue,        no_ack => 1,        sub         {          my $msg = shift;          my $corr_id = $msg->{header}->{correlation_id};          if (exists $self->{rpc_calls}->{$corr_id})          {            (delete $self->{rpc_calls}->{$corr_id})->({              message => $msg,              content => j($msg->{body}->{payload}) || {},              header =>  $msg->{header},              routing_key => $msg->{deliver}->{method_frame}->{routing_key},            });          }        });     })     ->then(sub { $self->{reply_queue} })     ->catch(sub { $self->{on_error}->(@_) });}sub emit{  my $self = shift;  my $key = shift;  my $obj = shift || {};  my $header = shift || {};    $obj = (j($obj) || {}) if ref($obj);    $header->{delivery_mode} = 1;    $self->{chan}->publish(    body => $obj,    exchange => "mol",    routing_key => $key,    header => $header,  );}sub _declare_queue{  my $self = shift;  my %params = @_;  my $deferred = deferred;  $params{on_success} = sub {    my $method = shift;    my $name   = $method->method_frame->queue;    $deferred->resolve($name);  };    $params{on_failure} = sub { $deferred->reject("_declare_queue failure [".$params{queue}."]: ".$_[2]) };    $self->{chan}->declare_queue(%params);  return $deferred->promise;}sub _bind_queue{  my $self = shift;  my $queue = shift;  my $key = shift;  my $deferred = deferred;    $self->{chan}->bind_queue(    queue => $queue,    exchange => "mol",    routing_key => $key,    on_success => sub { $deferred->resolve($queue) },    on_failure => sub { $deferred->reject("_bind_queue failure [$queue/$key]: ".$_[0]) },  );  return $deferred->promise;}sub _bind_queue_to_many_keys{  my $self = shift;  my $queue = shift;  my @keys = @_;    return collect(map { $self->_bind_queue($queue,$_) } @keys)->then(sub { $queue });}sub _consume{  my $self = shift;  my $sub = pop;  my %params = @_;  my $deferred = deferred;    $params{on_failure} = sub { $deferred->reject("_consume failure [".$params{queue}."]: ".$_[2]) };  $params{on_success} = sub { $deferred->resolve() };  $params{on_consume} = $sub;    $self->{chan}->consume(%params);  return $deferred->promise;}sub subscribe{  my $self = shift;  my $keys = shift;  my $callback = shift;    $self->_declare_queue(    queue => "",    no_ack => 1,    durable => 0,    exclusive => 1  )->then(sub {      my $queue = shift;      $keys = [ $keys] unless ref($keys) eq "ARRAY";      return $self->_bind_queue_to_many_keys($queue,@$keys);  })->then(sub {      my $queue = shift;      return $self->_consume(        queue => $queue,        no_ack => 1,        sub         {          my $msg = shift;          $callback->({            message => $msg,            content => j($msg->{body}->{payload}) || {},            header =>  $msg->{header},            routing_key => $msg->{deliver}->{method_frame}->{routing_key},          });        });     })->catch(sub { $self->{on_error}->(@_) });}sub listen_queue{  my $self = shift;  my $queue = shift;  my $bind = shift;  my $callback = shift;  $self->_declare_queue(    queue => $queue,    no_ack => 0,    durable => 1,    auto_delete => 0,  )->then(sub {      return $self->_bind_queue($queue,$bind)  })->then(sub {    return $self->_consume(        queue => $queue,        no_ack => 0,        sub        {          my $msg = shift;          $callback->({            message => $msg,            content => j($msg->{body}->{payload}) || {},            header =>  $msg->{header},            routing_key => $msg->{deliver}->{method_frame}->{routing_key},          });        });  })->catch(sub { $self->{on_error}->(@_) });}sub ack{  my $self = shift;  my $m = shift;    $self->{chan}->ack(    delivery_tag => $m->{message}->{deliver}->{method_frame}->{delivery_tag},  );}sub reject{  my $self = shift;  my $m = shift;    $self->{chan}->reject(    delivery_tag => $m->{message}->{deliver}->{method_frame}->{delivery_tag},    requeue => 0,  );}sub requeue{  my $self = shift;  my $m = shift;  $self->{chan}->reject(    delivery_tag => $m->{message}->{deliver}->{method_frame}->{delivery_tag},    requeue => 1,  );}sub reply{  my $self = shift;  my $msg = shift;  my $obj = shift || {};  my $header = shift || {};  $header->{delivery_mode} = 2;  $header->{correlation_id} = $msg->{header}->{correlation_id};  $obj = (j($obj) || {}) if ref($obj);  $self->{chan}->publish(    body => $obj,    exchange => "mol",    routing_key => $msg->{header}->{reply_to},    header => $header,  );}sub DESTROY{  my $self = shift;  $self->{rabbit} = undef;}1;
 |