#!/usr/bin/perl # Асинхронный доступ к RabbitMQ # Юрий Жиловец, 20.08.2015 package rabbit_async; use Modern::Perl; use EV; use AnyEvent; use AnyEvent::RabbitMQ; use Data::Dumper; use Mojo::JSON qw/j/; sub new { my $class = shift; my $arg = shift; my $callback = shift; my $host = $arg->{host}; my $user = $arg->{user}; my $password = $arg->{password}; my $rab = AnyEvent::RabbitMQ->new->load_xml_spec(); # $rab->{verbose}= 1; my $on_error = $arg->{on_error} || sub { say "RabbitMQ error: ".shift(); exit() }; my $self = bless {rabbit=>$rab, on_error=>$on_error, host=>$host, user=>$user, password=>$password}; $self->_connect($callback); return $self; } sub _connect { my $self = shift; my $callback = shift; $self->{rabbit}->connect( host => $self->{host}, port => 5672, user => $self->{user}, pass => $self->{password}, vhost => "/", timeout => 1, tls => 0, tune => { heartbeat => 30, channel_max => 1 }, on_failure => sub { $self->{on_error}->("connect failure: ".$_[2]) }, on_read_failure => sub { $self->{on_error}->(@_) }, on_return => sub { my $frame = shift; $self->{on_error}->("Unable to deliver frame" . Dumper($frame)); }, on_close => sub { my $why = shift; if (ref($why)) { my $method_frame = $why->method_frame; $self->{on_error}->($method_frame->reply_code. ": ". $method_frame->reply_text); } else { $self->{on_error}->($why); } }, on_success => sub { my $ar = shift; $ar->open_channel( on_success => sub { my $channel = shift; $self->{channel} = $channel; $callback->() if $callback; }, on_failure => sub { $self->{on_error}->("channel failure: ".$_[2]) }, on_close => sub { my $method_frame = shift->method_frame; $self->{on_error}->($method_frame->reply_code . $method_frame->reply_text); }, ); }, ); } sub send { my $self = shift; my $key = shift; my $obj = shift || {}; my $header = shift || {}; $header->{delivery_mode} = 2; $self->{channel}->publish( body => j($obj), exchange => "mol", routing_key => $key, header => $header, ); } sub emit { my $self = shift; my $key = shift; my $obj = shift || {}; my $header = shift || {}; $header->{delivery_mode} = 1; $self->{channel}->publish( body => j($obj), exchange => "mol", routing_key => $key, header => $header, ); } sub subscribe { my $self = shift; my $keys = shift; my $callback = shift; $self->{channel}->declare_queue( queue => "", no_ack => 1, durable => 0, exclusive => 1, on_failure => sub { $self->{on_error}->("declare queue failure: ".$_[2]) }, on_success => sub { my $method = shift; my $name = $method->method_frame->queue; $keys = [ $keys] unless ref($keys) eq "ARRAY"; foreach (@$keys) { $self->{channel}->bind_queue( queue => $name, exchange => "mol", routing_key => $_, on_failure => sub { $self->{on_error}->("bind queue failure: ".$_[0]) }, on_success => sub { $self->{channel}->consume( queue => $name, no_ack => 1, on_failure => sub { $self->{on_error}->("consume failure: ".$_[0]) }, on_consume => sub { my $msg = shift; $callback->({ message => $msg, content => j($msg->{body}->{payload}), header => $msg->{header}, routing_key => $msg->{deliver}->{method_frame}->{routing_key}, }); }, ); }, ); } } ); } sub listen_queue { my $self = shift; my $queue = shift; my $bind = shift; my $callback = shift; $self->{channel}->declare_queue( queue => $queue, no_ack => 0, durable => 1, auto_delete => 0, on_failure => sub { $self->{on_error}->("declare queue failure: ".$_[2]) }, on_success => sub { my $method = shift; my $name = $method->method_frame->queue; $self->{channel}->bind_queue( queue => $queue, exchange => "mol", routing_key => $bind, ); $self->{channel}->consume( queue => $queue, no_ack => 0, on_failure => sub { $self->{on_error}->("consume failure: ".$_[2]) }, on_consume => sub { my $msg = shift; $callback->({ message => $msg, content => j($msg->{body}->{payload}), header => $msg->{header}, routing_key => $msg->{deliver}->{method_frame}->{routing_key}, }); }, ); } ); } sub ack { my $self = shift; my $m = shift; $self->{channel}->ack( delivery_tag => $m->{deliver}->{method_frame}->{delivery_tag}, ); } sub reject { my $self = shift; my $m = shift; $self->{channel}->reject( delivery_tag => $m->{deliver}->{method_frame}->{delivery_tag}, requeue => 0, ); } sub requeue { my $self = shift; my $m = shift; $self->{channel}->reject( delivery_tag => $m->{deliver}->{method_frame}->{delivery_tag}, requeue => 1, ); } 1;