rabbit_async_rec.pm 2.6 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124
  1. #!/usr/bin/perl
  2. # Реконнект для rabbit_async
  3. # Юрий Жиловец, 06.04.2018
  4. package rabbit_async_rec;
  5. use Modern::Perl;
  6. use Mojo::IOLoop;
  7. use rabbit_async;
  8. use constant SCALE => 2;
  9. use constant MIN_TIMEOUT => 1;
  10. use constant MAX_TIMEOUT => 180;
  11. sub new
  12. {
  13. my $class = shift;
  14. my $arg = shift;
  15. my $callback = shift;
  16. my $self = bless {
  17. timeout => 0,
  18. arg => $arg,
  19. callback => $callback,
  20. delayed => [],
  21. was_started => 0,
  22. };
  23. $arg->{on_close} = sub { $self->_reconnect_later };
  24. $arg->{on_error} = $arg->{on_connect_error} = sub {
  25. if ($self->{was_started})
  26. {
  27. say STDERR "RabbitMQ error: ",@_;
  28. $self->_reconnect_later;
  29. }
  30. else
  31. {
  32. # если ошибка произошла при старте, это фатально
  33. say STDERR "RabbitMQ connect error: ", @_;
  34. exit();
  35. }
  36. };
  37. $self->_connect;
  38. return $self;
  39. }
  40. sub _reconnect_later
  41. {
  42. my $self = shift;
  43. # событие запросто может придти второй раз на соединении, которое теоретически уже закрылось
  44. $self->{rabbit}->{on_close} = $self->{rabbit}->{on_error} = sub {};
  45. $self->{rabbit} = undef;
  46. $self->{timeout} = $self->_calc_timeout;
  47. say STDERR "rabbit_async_rec: will try to reconnect in $self->{timeout} sec...";
  48. Mojo::IOLoop->timer($self->{timeout} => sub { $self->_connect });
  49. }
  50. sub _connect
  51. {
  52. my $self = shift;
  53. say STDERR "rabbit_async_rec: connecting to ", $self->{arg}->{host};
  54. my $rabbit;
  55. my $cb = sub
  56. {
  57. $self->{rabbit} = $rabbit;
  58. foreach (@{ $self->{delayed} })
  59. {
  60. my $method = $_->{method};
  61. $self->{rabbit}->$method(@{ $_->{args} });
  62. }
  63. $self->{delayed} = [];
  64. $self->{timeout} = 0;
  65. $self->{was_started} = 1;
  66. $self->{callback}->();
  67. };
  68. $rabbit = new rabbit_async($self->{arg}, $cb);
  69. }
  70. sub _calc_timeout
  71. {
  72. my $self = shift;
  73. return MIN_TIMEOUT if $self->{timeout} == 0;
  74. my $timeout = $self->{timeout} * SCALE;
  75. $timeout = MAX_TIMEOUT if $timeout > MAX_TIMEOUT;
  76. return $timeout;
  77. }
  78. sub AUTOLOAD
  79. {
  80. my $self = shift or return undef;
  81. my @args = @_;
  82. ( my $method = $rabbit_async_rec::AUTOLOAD ) =~ s{.*::}{};
  83. if ($self->{rabbit})
  84. {
  85. # делегируем запросы настоящему модулю
  86. $self->{rabbit}->$method(@args);
  87. }
  88. else
  89. {
  90. # запоминаем запрос для последующего выполнения
  91. push @{ $self->{delayed} }, { method=>$method, args=>\@args };
  92. }
  93. }
  94. 1;