| 123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250 | #!/usr/bin/perl# Асинхронный доступ к RabbitMQ# Юрий Жиловец, 20.08.2015package rabbit_async;use Modern::Perl;use EV;use AnyEvent;use AnyEvent::RabbitMQ;use Data::Dumper;use Mojo::JSON qw/j/;sub new{  my $class = shift;  my $arg = shift;  my $callback = shift;      my $host = $arg->{host};  my $user = $arg->{user};  my $password = $arg->{password};    my $rab = AnyEvent::RabbitMQ->new->load_xml_spec();#  $rab->{verbose}= 1;  my $on_error = $arg->{on_error} || sub { say "RabbitMQ error: ".shift(); exit() };    my $self = bless {rabbit=>$rab, on_error=>$on_error, host=>$host, user=>$user, password=>$password};  $self->_connect($callback);                return $self;}sub _connect{  my $self = shift;  my $callback = shift;  $self->{rabbit}->connect(    host => $self->{host},    port => 5672,    user => $self->{user},    pass => $self->{password},    vhost => "/",    timeout => 1,    tls => 0,    tune => { heartbeat => 30, channel_max => 1 },        on_failure => sub { $self->{on_error}->("connect failure: ".$_[2]) },        on_read_failure => sub { $self->{on_error}->(@_) },        on_return  => sub {      my $frame = shift;      $self->{on_error}->("Unable to deliver frame" . Dumper($frame));    },        on_close   => sub {      my $why = shift;      if (ref($why)) {        my $method_frame = $why->method_frame;        $self->{on_error}->($method_frame->reply_code. ": ". $method_frame->reply_text);      }      else {        $self->{on_error}->($why);      }    },        on_success => sub     {      my $ar = shift;      $ar->open_channel(        on_success => sub         {          my $channel = shift;          $self->{channel} = $channel;          $callback->() if $callback;        },        on_failure =>  sub { $self->{on_error}->("channel failure: ".$_[2]) },        on_close   => sub         {          my $method_frame = shift->method_frame;          $self->{on_error}->($method_frame->reply_code . $method_frame->reply_text);        },      );    },  );}sub send{  my $self = shift;  my $key = shift;  my $obj = shift || {};  my $header = shift || {};    $header->{delivery_mode} = 2;  $self->{channel}->publish(    body => j($obj),    exchange => "mol",    routing_key => $key,    header => $header,  );}sub emit{  my $self = shift;  my $key = shift;  my $obj = shift || {};  my $header = shift || {};    $header->{delivery_mode} = 1;    $self->{channel}->publish(    body => j($obj),    exchange => "mol",    routing_key => $key,    header => $header,  );}sub subscribe{  my $self = shift;  my $keys = shift;  my $callback = shift;    $self->{channel}->declare_queue(    queue => "",    no_ack => 1,    durable => 0,    exclusive => 1,    on_failure => sub { $self->{on_error}->("declare queue failure: ".$_[2]) },    on_success => sub    {      my $method = shift;      my $name   = $method->method_frame->queue;            $keys = [ $keys] unless ref($keys) eq "ARRAY";      foreach (@$keys)      {        $self->{channel}->bind_queue(          queue => $name,          exchange => "mol",          routing_key => $_,          on_failure => sub { $self->{on_error}->("bind queue failure: ".$_[0]) },          on_success => sub          {            $self->{channel}->consume(              queue => $name,              no_ack => 1,              on_failure => sub { $self->{on_error}->("consume failure: ".$_[0]) },              on_consume => sub              {                my $msg = shift;                $callback->({                  message => $msg,                  content => j($msg->{body}->{payload}),                  header =>  $msg->{header},                  routing_key => $msg->{deliver}->{method_frame}->{routing_key},               });              },            );          },        );      }    }  );}sub listen_queue{  my $self = shift;  my $queue = shift;  my $bind = shift;  my $callback = shift;    $self->{channel}->declare_queue(    queue => $queue,    no_ack => 0,    durable => 1,    auto_delete => 0,    on_failure => sub { $self->{on_error}->("declare queue failure: ".$_[2]) },    on_success => sub    {      my $method = shift;      my $name   = $method->method_frame->queue;            $self->{channel}->bind_queue(          queue => $queue,          exchange => "mol",          routing_key => $bind,      );            $self->{channel}->consume(        queue => $queue,        no_ack => 0,        on_failure => sub { $self->{on_error}->("consume failure: ".$_[2]) },        on_consume => sub        {          my $msg = shift;          $callback->({            message => $msg,            content => j($msg->{body}->{payload}),            header =>  $msg->{header},            routing_key => $msg->{deliver}->{method_frame}->{routing_key},          });        },      );    }  );}sub ack{  my $self = shift;  my $m = shift;    $self->{channel}->ack(    delivery_tag => $m->{message}->{deliver}->{method_frame}->{delivery_tag},  );}sub reject{  my $self = shift;  my $m = shift;    $self->{channel}->reject(    delivery_tag => $m->{message}->{deliver}->{method_frame}->{delivery_tag},    requeue => 0,  );}sub requeue{  my $self = shift;  my $m = shift;  $self->{channel}->reject(    delivery_tag => $m->{deliver}->{method_frame}->{delivery_tag},    requeue => 1,  );}1;
 |