#!/usr/bin/perl # Реконнект для rabbit_async # Юрий Жиловец, 06.04.2018 package rabbit_async_rec; use Modern::Perl; use Mojo::IOLoop; use rabbit_async; use constant SCALE => 2; use constant MIN_TIMEOUT => 1; use constant MAX_TIMEOUT => 180; sub new { my $class = shift; my $arg = shift; my $callback = shift; my $self = bless { timeout => 0, arg => $arg, callback => $callback, delayed => [], was_started => 0, }; $arg->{on_close} = sub { $self->_reconnect_later }; $arg->{on_error} = $arg->{on_connect_error} = sub { if ($self->{was_started}) { say STDERR "RabbitMQ error: ",@_; $self->_reconnect_later; } else { # если ошибка произошла при старте, это фатально say STDERR "RabbitMQ connect error: ", @_; exit(); } }; $self->_connect; return $self; } sub _reconnect_later { my $self = shift; # событие запросто может придти второй раз на соединении, которое теоретически уже закрылось $self->{rabbit}->{on_close} = $self->{rabbit}->{on_error} = sub {}; $self->{rabbit} = undef; $self->{timeout} = $self->_calc_timeout; say STDERR "rabbit_async_rec: will try to reconnect in $self->{timeout} sec..."; Mojo::IOLoop->timer($self->{timeout} => sub { $self->_connect }); } sub _connect { my $self = shift; say STDERR "rabbit_async_rec: connecting to ", $self->{arg}->{host}; my $rabbit; my $cb = sub { $self->{rabbit} = $rabbit; foreach (@{ $self->{delayed} }) { my $method = $_->{method}; $self->{rabbit}->$method(@{ $_->{args} }); } $self->{delayed} = []; $self->{timeout} = 0; $self->{was_started} = 1; $self->{callback}->(); }; $rabbit = new rabbit_async($self->{arg}, $cb); } sub _calc_timeout { my $self = shift; return MIN_TIMEOUT if $self->{timeout} == 0; my $timeout = $self->{timeout} * SCALE; $timeout = MAX_TIMEOUT if $timeout > MAX_TIMEOUT; return $timeout; } sub AUTOLOAD { my $self = shift or return undef; my @args = @_; ( my $method = $rabbit_async_rec::AUTOLOAD ) =~ s{.*::}{}; if ($self->{rabbit}) { # делегируем запросы настоящему модулю $self->{rabbit}->$method(@args); } else { # запоминаем запрос для последующего выполнения push @{ $self->{delayed} }, { method=>$method, args=>\@args }; } } 1;