| 123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250 |
- #!/usr/bin/perl
- # Асинхронный доступ к RabbitMQ
- # Юрий Жиловец, 20.08.2015
- package rabbit_async;
- use Modern::Perl;
- use EV;
- use AnyEvent;
- use AnyEvent::RabbitMQ;
- use Data::Dumper;
- use Mojo::JSON qw/j/;
- sub new
- {
- my $class = shift;
- my $arg = shift;
- my $callback = shift;
-
- my $host = $arg->{host};
- my $user = $arg->{user};
- my $password = $arg->{password};
-
- my $rab = AnyEvent::RabbitMQ->new->load_xml_spec();
- # $rab->{verbose}= 1;
- my $on_error = $arg->{on_error} || sub { say "RabbitMQ error: ".shift(); exit() };
-
- my $self = bless {rabbit=>$rab, on_error=>$on_error, host=>$host, user=>$user, password=>$password};
- $self->_connect($callback);
-
- return $self;
- }
- sub _connect
- {
- my $self = shift;
- my $callback = shift;
- $self->{rabbit}->connect(
- host => $self->{host},
- port => 5672,
- user => $self->{user},
- pass => $self->{password},
- vhost => "/",
- timeout => 1,
- tls => 0,
- tune => { heartbeat => 30, channel_max => 1 },
-
- on_failure => sub { $self->{on_error}->("connect failure: ".$_[2]) },
-
- on_read_failure => sub { $self->{on_error}->(@_) },
-
- on_return => sub {
- my $frame = shift;
- $self->{on_error}->("Unable to deliver frame" . Dumper($frame));
- },
-
- on_close => sub {
- my $why = shift;
- if (ref($why)) {
- my $method_frame = $why->method_frame;
- $self->{on_error}->($method_frame->reply_code. ": ". $method_frame->reply_text);
- }
- else {
- $self->{on_error}->($why);
- }
- },
-
- on_success => sub
- {
- my $ar = shift;
- $ar->open_channel(
- on_success => sub
- {
- my $channel = shift;
- $self->{channel} = $channel;
- $callback->() if $callback;
- },
- on_failure => sub { $self->{on_error}->("channel failure: ".$_[2]) },
- on_close => sub
- {
- my $method_frame = shift->method_frame;
- $self->{on_error}->($method_frame->reply_code . $method_frame->reply_text);
- },
- );
- },
- );
- }
- sub send
- {
- my $self = shift;
- my $key = shift;
- my $obj = shift || {};
- my $header = shift || {};
-
- $header->{delivery_mode} = 2;
- $self->{channel}->publish(
- body => j($obj),
- exchange => "mol",
- routing_key => $key,
- header => $header,
- );
- }
- sub emit
- {
- my $self = shift;
- my $key = shift;
- my $obj = shift || {};
- my $header = shift || {};
-
- $header->{delivery_mode} = 1;
-
- $self->{channel}->publish(
- body => j($obj),
- exchange => "mol",
- routing_key => $key,
- header => $header,
- );
- }
- sub subscribe
- {
- my $self = shift;
- my $keys = shift;
- my $callback = shift;
-
- $self->{channel}->declare_queue(
- queue => "",
- no_ack => 1,
- durable => 0,
- exclusive => 1,
- on_failure => sub { $self->{on_error}->("declare queue failure: ".$_[2]) },
- on_success => sub
- {
- my $method = shift;
- my $name = $method->method_frame->queue;
-
- $keys = [ $keys] unless ref($keys) eq "ARRAY";
- foreach (@$keys)
- {
- $self->{channel}->bind_queue(
- queue => $name,
- exchange => "mol",
- routing_key => $_,
- on_failure => sub { $self->{on_error}->("bind queue failure: ".$_[0]) },
- on_success => sub
- {
- $self->{channel}->consume(
- queue => $name,
- no_ack => 1,
- on_failure => sub { $self->{on_error}->("consume failure: ".$_[0]) },
- on_consume => sub
- {
- my $msg = shift;
- $callback->({
- message => $msg,
- content => j($msg->{body}->{payload}),
- header => $msg->{header},
- routing_key => $msg->{deliver}->{method_frame}->{routing_key},
- });
- },
- );
- },
- );
- }
- }
- );
- }
- sub listen_queue
- {
- my $self = shift;
- my $queue = shift;
- my $bind = shift;
- my $callback = shift;
-
- $self->{channel}->declare_queue(
- queue => $queue,
- no_ack => 0,
- durable => 1,
- auto_delete => 0,
- on_failure => sub { $self->{on_error}->("declare queue failure: ".$_[2]) },
- on_success => sub
- {
- my $method = shift;
- my $name = $method->method_frame->queue;
-
- $self->{channel}->bind_queue(
- queue => $queue,
- exchange => "mol",
- routing_key => $bind,
- );
-
- $self->{channel}->consume(
- queue => $queue,
- no_ack => 0,
- on_failure => sub { $self->{on_error}->("consume failure: ".$_[2]) },
- on_consume => sub
- {
- my $msg = shift;
- $callback->({
- message => $msg,
- content => j($msg->{body}->{payload}),
- header => $msg->{header},
- routing_key => $msg->{deliver}->{method_frame}->{routing_key},
- });
- },
- );
- }
- );
- }
- sub ack
- {
- my $self = shift;
- my $m = shift;
-
- $self->{channel}->ack(
- delivery_tag => $m->{deliver}->{method_frame}->{delivery_tag},
- );
- }
- sub reject
- {
- my $self = shift;
- my $m = shift;
-
- $self->{channel}->reject(
- delivery_tag => $m->{deliver}->{method_frame}->{delivery_tag},
- requeue => 0,
- );
- }
- sub requeue
- {
- my $self = shift;
- my $m = shift;
- $self->{channel}->reject(
- delivery_tag => $m->{deliver}->{method_frame}->{delivery_tag},
- requeue => 1,
- );
- }
- 1;
|