| 123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124 | #!/usr/bin/perl# Реконнект для rabbit_async# Юрий Жиловец, 06.04.2018package rabbit_async_rec;use Modern::Perl;use Mojo::IOLoop;use rabbit_async;use constant SCALE => 2;use constant MIN_TIMEOUT => 1;use constant MAX_TIMEOUT => 180;sub new{  my $class = shift;  my $arg = shift;  my $callback = shift;    my $self = bless {    timeout => 0,    arg => $arg,    callback => $callback,    delayed => [],    was_started => 0,  };    $arg->{on_close} = sub { $self->_reconnect_later };  $arg->{on_error} = $arg->{on_connect_error} = sub {     if ($self->{was_started})    {      say STDERR "RabbitMQ error: ",@_;      $self->_reconnect_later;    }    else    {      # если ошибка произошла при старте, это фатально      say STDERR "RabbitMQ connect error: ", @_;      exit();    }  };    $self->_connect;              return $self;}sub _reconnect_later{  my $self = shift;    # событие запросто может придти второй раз на соединении, которое теоретически уже закрылось  $self->{rabbit}->{on_close} = $self->{rabbit}->{on_error} = sub {};  $self->{rabbit} = undef;    $self->{timeout} = $self->_calc_timeout;  say STDERR "rabbit_async_rec: will try to reconnect in $self->{timeout} sec...";      Mojo::IOLoop->timer($self->{timeout} => sub { $self->_connect });    }sub _connect{  my $self = shift;  say STDERR "rabbit_async_rec: connecting to ", $self->{arg}->{host};    my $rabbit;    my $cb = sub  {    $self->{rabbit} = $rabbit;    foreach (@{ $self->{delayed} })    {      my $method = $_->{method};      $self->{rabbit}->$method(@{ $_->{args} });    }      $self->{delayed} = [];    $self->{timeout} = 0;    $self->{was_started} = 1;        $self->{callback}->();  };    $rabbit = new rabbit_async($self->{arg}, $cb);}sub _calc_timeout{  my $self = shift;    return MIN_TIMEOUT if $self->{timeout} == 0;    my $timeout = $self->{timeout} * SCALE;  $timeout = MAX_TIMEOUT if $timeout > MAX_TIMEOUT;  return $timeout;  }sub AUTOLOAD{  my $self = shift or return undef;  my @args = @_;  ( my $method = $rabbit_async_rec::AUTOLOAD ) =~ s{.*::}{};    if ($self->{rabbit})  {    # делегируем запросы настоящему модулю    $self->{rabbit}->$method(@args);  }  else  {    # запоминаем запрос для последующего выполнения    push @{ $self->{delayed} }, { method=>$method, args=>\@args };  }}1;
 |