rabbit_async_rec.pm 2.5 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124
  1. #!/usr/bin/perl
  2. # Реконнект для rabbit_async
  3. # Юрий Жиловец, 06.04.2018
  4. package rabbit_async_rec;
  5. use Modern::Perl;
  6. use Mojo::IOLoop;
  7. use rabbit_async;
  8. use constant SCALE => 2;
  9. use constant MIN_TIMEOUT => 1;
  10. use constant MAX_TIMEOUT => 30;
  11. sub new
  12. {
  13. my $class = shift;
  14. my $arg = shift;
  15. my $callback = shift;
  16. my $self = bless {
  17. timeout => 0,
  18. arg => $arg,
  19. callback => $callback,
  20. delayed => [],
  21. };
  22. $arg->{on_close} = sub { $self->_reconnect_later };
  23. $arg->{on_error} = sub { say STDERR "RabbitMQ error: " . shift };
  24. $arg->{on_connect_error} = sub {
  25. unless ($self->{timeout})
  26. {
  27. # если ошибка произошла при старте, это фатально
  28. say STDERR "RabbitMQ connect error: ", @_;
  29. exit();
  30. }
  31. else
  32. {
  33. say STDERR "RabbitMQ error: ",@_;
  34. $self->_reconnect_later;
  35. }
  36. };
  37. $self->_connect;
  38. return $self;
  39. }
  40. sub _reconnect_later
  41. {
  42. my $self = shift;
  43. # событие запросто может придти второй раз на соединении, которое теоретически уже закрылось
  44. $self->{rabbit}->{on_close} = $self->{rabbit}->{on_error} = sub {};
  45. $self->{rabbit} = undef;
  46. $self->{timeout} = $self->_calc_timeout;
  47. say STDERR "rabbit_async_rec: will try to reconnect in $self->{timeout} sec...";
  48. Mojo::IOLoop->timer($self->{timeout} => sub { $self->_connect });
  49. }
  50. sub _connect
  51. {
  52. my $self = shift;
  53. say STDERR "rabit_async_rec: connecting to ", $self->{arg}->{host};
  54. my $rabbit;
  55. my $cb = sub
  56. {
  57. $self->{rabbit} = $rabbit;
  58. foreach (@{ $self->{delayed} })
  59. {
  60. my $method = $_->{method};
  61. $self->{rabbit}->$method(@{ $_->{args} });
  62. }
  63. $self->{delayed} = [];
  64. $self->{callback}->();
  65. };
  66. $rabbit = new rabbit_async($self->{arg}, $cb);
  67. }
  68. sub _calc_timeout
  69. {
  70. my $self = shift;
  71. my $timeout = MIN_TIMEOUT;
  72. if ($self->{timeout})
  73. {
  74. $timeout = $self->{timeout} * SCALE;
  75. $timeout = MAX_TIMEOUT if $timeout > MAX_TIMEOUT;
  76. }
  77. return $timeout;
  78. }
  79. sub AUTOLOAD
  80. {
  81. my $self = shift or return undef;
  82. my @args = @_;
  83. ( my $method = $rabbit_async_rec::AUTOLOAD ) =~ s{.*::}{};
  84. if ($self->{rabbit})
  85. {
  86. # делегируем запросы настоящему модулю
  87. $self->{rabbit}->$method(@args);
  88. }
  89. else
  90. {
  91. # запоминаем запрос для последующего выполнения
  92. push @{ $self->{delayed} }, { method=>$method, args=>\@args };
  93. }
  94. }
  95. 1;