| 123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124 |
- #!/usr/bin/perl
- # Реконнект для rabbit_async
- # Юрий Жиловец, 06.04.2018
- package rabbit_async_rec;
- use Modern::Perl;
- use Mojo::IOLoop;
- use rabbit_async;
- use constant SCALE => 2;
- use constant MIN_TIMEOUT => 1;
- use constant MAX_TIMEOUT => 180;
- sub new
- {
- my $class = shift;
- my $arg = shift;
- my $callback = shift;
-
- my $self = bless {
- timeout => 0,
- arg => $arg,
- callback => $callback,
- delayed => [],
- was_started => 0,
- };
-
- $arg->{on_close} = sub { $self->_reconnect_later };
- $arg->{on_error} = $arg->{on_connect_error} = sub {
- if ($self->{was_started})
- {
- say STDERR "RabbitMQ error: ",@_;
- $self->_reconnect_later;
- }
- else
- {
- # если ошибка произошла при старте, это фатально
- say STDERR "RabbitMQ connect error: ", @_;
- exit();
- }
- };
-
- $self->_connect;
-
- return $self;
- }
- sub _reconnect_later
- {
- my $self = shift;
-
- # событие запросто может придти второй раз на соединении, которое теоретически уже закрылось
- $self->{rabbit}->{on_close} = $self->{rabbit}->{on_error} = sub {};
- $self->{rabbit} = undef;
-
- $self->{timeout} = $self->_calc_timeout;
- say STDERR "rabbit_async_rec: will try to reconnect in $self->{timeout} sec...";
-
- Mojo::IOLoop->timer($self->{timeout} => sub { $self->_connect });
- }
- sub _connect
- {
- my $self = shift;
- say STDERR "rabbit_async_rec: connecting to ", $self->{arg}->{host};
-
- my $rabbit;
-
- my $cb = sub
- {
- $self->{rabbit} = $rabbit;
- foreach (@{ $self->{delayed} })
- {
- my $method = $_->{method};
- $self->{rabbit}->$method(@{ $_->{args} });
- }
-
- $self->{delayed} = [];
- $self->{timeout} = 0;
- $self->{was_started} = 1;
-
- $self->{callback}->();
- };
-
- $rabbit = new rabbit_async($self->{arg}, $cb);
- }
- sub _calc_timeout
- {
- my $self = shift;
-
- return MIN_TIMEOUT if $self->{timeout} == 0;
-
- my $timeout = $self->{timeout} * SCALE;
- $timeout = MAX_TIMEOUT if $timeout > MAX_TIMEOUT;
- return $timeout;
- }
- sub AUTOLOAD
- {
- my $self = shift or return undef;
- my @args = @_;
- ( my $method = $rabbit_async_rec::AUTOLOAD ) =~ s{.*::}{};
-
- if ($self->{rabbit})
- {
- # делегируем запросы настоящему модулю
- $self->{rabbit}->$method(@args);
- }
- else
- {
- # запоминаем запрос для последующего выполнения
- push @{ $self->{delayed} }, { method=>$method, args=>\@args };
- }
- }
- 1;
|