rabbit_async_rec.pm 2.5 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124
  1. #!/usr/bin/perl
  2. # Реконнект для rabbit_async
  3. # Юрий Жиловец, 06.04.2018
  4. package rabbit_async_rec;
  5. use Modern::Perl;
  6. use Mojo::IOLoop;
  7. use rabbit_async;
  8. use constant SCALE => 2;
  9. use constant MIN_TIMEOUT => 1;
  10. use constant MAX_TIMEOUT => 30;
  11. sub new
  12. {
  13. my $class = shift;
  14. my $arg = shift;
  15. my $callback = shift;
  16. my $self = bless {
  17. timeout => 0,
  18. arg => $arg,
  19. callback => $callback,
  20. delayed => [],
  21. };
  22. $arg->{on_close} = sub { $self->_reconnect_later };
  23. $arg->{on_error} = $arg->{on_connect_error} = sub {
  24. unless ($self->{timeout})
  25. {
  26. # если ошибка произошла при старте, это фатально
  27. say STDERR "RabbitMQ connect error: ", @_;
  28. exit();
  29. }
  30. else
  31. {
  32. say STDERR "RabbitMQ error: ",@_;
  33. $self->_reconnect_later;
  34. }
  35. };
  36. $self->_connect;
  37. return $self;
  38. }
  39. sub _reconnect_later
  40. {
  41. my $self = shift;
  42. # событие запросто может придти второй раз на соединении, которое теоретически уже закрылось
  43. $self->{rabbit}->{on_close} = $self->{rabbit}->{on_error} = sub {};
  44. $self->{rabbit} = undef;
  45. $self->{timeout} = $self->_calc_timeout;
  46. say STDERR "rabbit_async_rec: will try to reconnect in $self->{timeout} sec...";
  47. Mojo::IOLoop->timer($self->{timeout} => sub { $self->_connect });
  48. }
  49. sub _connect
  50. {
  51. my $self = shift;
  52. say STDERR "rabbit_async_rec: connecting to ", $self->{arg}->{host};
  53. my $rabbit;
  54. my $cb = sub
  55. {
  56. $self->{rabbit} = $rabbit;
  57. foreach (@{ $self->{delayed} })
  58. {
  59. my $method = $_->{method};
  60. $self->{rabbit}->$method(@{ $_->{args} });
  61. }
  62. $self->{delayed} = [];
  63. $self->{timeout} = 0;
  64. $self->{callback}->();
  65. };
  66. $rabbit = new rabbit_async($self->{arg}, $cb);
  67. }
  68. sub _calc_timeout
  69. {
  70. my $self = shift;
  71. my $timeout = MIN_TIMEOUT;
  72. if ($self->{timeout})
  73. {
  74. $timeout = $self->{timeout} * SCALE;
  75. $timeout = MAX_TIMEOUT if $timeout > MAX_TIMEOUT;
  76. }
  77. return $timeout;
  78. }
  79. sub AUTOLOAD
  80. {
  81. my $self = shift or return undef;
  82. my @args = @_;
  83. ( my $method = $rabbit_async_rec::AUTOLOAD ) =~ s{.*::}{};
  84. if ($self->{rabbit})
  85. {
  86. # делегируем запросы настоящему модулю
  87. $self->{rabbit}->$method(@args);
  88. }
  89. else
  90. {
  91. # запоминаем запрос для последующего выполнения
  92. push @{ $self->{delayed} }, { method=>$method, args=>\@args };
  93. }
  94. }
  95. 1;