Эх сурвалжийг харах

Обновлен модуль кролика

Sergey Korovin 7 жил өмнө
parent
commit
82e5fe37fe
3 өөрчлөгдсөн 402 нэмэгдсэн , 104 устгасан
  1. 277 103
      lib/rabbit_async.pm
  2. 124 0
      lib/rabbit_async_rec.pm
  3. 1 1
      telegram.pl

+ 277 - 103
lib/rabbit_async.pm

@@ -11,6 +11,10 @@ use AnyEvent;
 use AnyEvent::RabbitMQ;
 use Data::Dumper;
 use Mojo::JSON qw/j/;
+use Data::UUID;
+use Promises qw/deferred collect/;
+
+my $uuid_gen = new Data::UUID;
 
 sub new
 {
@@ -18,26 +22,48 @@ sub new
   my $arg = shift;
   my $callback = shift;
     
+  my $self = $class->create($arg);
+  $self->connect($callback)->done(sub { $callback->($self) if $callback });
+  
+  return $self;
+}
+
+sub create
+{
+  my $class = shift;
+  my $arg = shift;
+    
   my $host = $arg->{host};
   my $user = $arg->{user};
   my $password = $arg->{password};
   
   my $rab = AnyEvent::RabbitMQ->new->load_xml_spec();
-#  $rab->{verbose}= 1;
+  $rab->{verbose} = 1 if $arg->{verbose};
   my $on_error = $arg->{on_error} || sub { say "RabbitMQ error: ".shift(); exit() };
+  my $on_close = $arg->{on_close} || $on_error;
+  my $on_connect_error = $arg->{on_connect_error} || $on_error;
   
-  my $self = bless {rabbit=>$rab, on_error=>$on_error, host=>$host, user=>$user, password=>$password};
-  $self->_connect($callback);
-              
+  my $self = bless {
+    rabbit=>$rab, 
+    on_error=>$on_error, 
+    on_close => $on_close,
+    on_connect_error => $on_connect_error,
+    host=>$host, 
+    user=>$user, 
+    product => $arg->{product} || "rabbit_async",
+    password=>$password,
+    rpc_calls => {},
+  };
+
   return $self;
 }
 
 sub _connect
 {
   my $self = shift;
-  my $callback = shift;
-
-  $self->{rabbit}->connect(
+  
+  my $def = deferred;
+  my %params = (
     host => $self->{host},
     port => 5672,
     user => $self->{user},
@@ -45,47 +71,72 @@ sub _connect
     vhost => "/",
     timeout => 1,
     tls => 0,
-    tune => { heartbeat => 30, channel_max => 1 },
-    
-    on_failure => sub { $self->{on_error}->("connect failure: ".$_[2]) },
-    
+    tune => { heartbeat => 60, channel_max => 1 },
+    client_properties => {
+      product => $self->{product},
+    },
+    on_failure => sub { $def->reject(join(" ",@_)) },
     on_read_failure => sub { $self->{on_error}->(@_) },
-    
     on_return  => sub {
       my $frame = shift;
       $self->{on_error}->("Unable to deliver frame" . Dumper($frame));
     },
-    
     on_close   => sub {
       my $why = shift;
       if (ref($why)) {
         my $method_frame = $why->method_frame;
-        $self->{on_error}->($method_frame->reply_code. ": ". $method_frame->reply_text);
+        $self->{on_close}->("Connection closed:" . $method_frame->reply_text);
       }
       else {
-        $self->{on_error}->($why);
+        $self->{on_close}->("Connection closed: $why");
       }
     },
-    
-    on_success => sub 
-    {
-      my $ar = shift;
-      $ar->open_channel(
-        on_success => sub 
-        {
-          my $channel = shift;
-          $self->{channel} = $channel;
-          $callback->() if $callback;
-        },
-        on_failure =>  sub { $self->{on_error}->("channel failure: ".$_[2]) },
-        on_close   => sub 
-        {
-          my $method_frame = shift->method_frame;
-          $self->{on_error}->($method_frame->reply_code . $method_frame->reply_text);
-        },
-      );
-    },
+    on_success => sub { $def->resolve() },
   );
+  
+  $self->{rabbit}->connect(%params);
+  return $def->promise;
+}
+
+sub connect
+{
+  my $self = shift;
+  
+  return $self->_connect->then(sub
+  {
+    $self->_open_channel;
+  })->then(sub
+  {
+    my $channel = shift;
+    $self->{chan} = $channel;
+  })->catch(sub 
+  {
+    $self->{on_connect_error}->(@_);
+  });
+}
+
+sub _open_channel
+{
+  my $self = shift;
+        
+  my $deferred = deferred;
+  my %params = ();
+          
+  $params{on_success} = sub { $deferred->resolve(shift) };
+  $params{on_failure} = sub { $deferred->reject(@_) };
+  $params{on_close} = sub {
+      my $why = shift;
+      if (ref($why)) {
+        my $method_frame = $why->method_frame;
+        $self->{on_error}->("Channel closed: " . $method_frame->reply_text);
+      }
+      else {
+        $self->{on_error}->("Channel closed: $why");
+      }
+  };
+                          
+  $self->{rabbit}->open_channel(%params);
+  return $deferred->promise;
 }
 
 sub send
@@ -94,17 +145,79 @@ sub send
   my $key = shift;
   my $obj = shift || {};
   my $header = shift || {};
-  
+
+  $obj = (j($obj) || {}) if ref($obj);
+
   $header->{delivery_mode} = 2;
 
-  $self->{channel}->publish(
-    body => j($obj),
+  $self->{chan}->publish(
+    body => $obj,
     exchange => "mol",
     routing_key => $key,
     header => $header,
   );
 }
 
+sub call
+{
+  my $self = shift;
+  my $sub = pop;
+  my $key = shift;
+  my $obj = shift || {};
+  my $header = shift || {};
+
+  my $uniq = $uuid_gen->create_str();
+  $self->{rpc_calls}->{$uniq} = $sub;
+
+  $self->_reply_queue->then(sub {
+    my $queue = shift;
+    
+    $header->{reply_to} = $queue;
+    $header->{correlation_id} = $uniq;
+  
+    $self->send($key,$obj,$header);
+  });
+}
+
+sub _reply_queue
+{
+  my $self = shift;
+  return deferred->resolve($self->{reply_queue})->promise if $self->{reply_queue};
+  
+  return $self->_declare_queue(
+    queue => "",
+    no_ack => 1,
+    durable => 0,
+    exclusive => 1
+  )->then(sub {
+      my $queue = shift;
+      return $self->_bind_queue($queue,$queue);
+  })->then(sub {
+      my $queue = shift;
+      $self->{reply_queue} = $queue;
+      return $self->_consume(
+        queue => $queue,
+        no_ack => 1,
+        sub 
+        {
+          my $msg = shift;
+
+          my $corr_id = $msg->{header}->{correlation_id};
+          if (exists $self->{rpc_calls}->{$corr_id})
+          {
+            (delete $self->{rpc_calls}->{$corr_id})->({
+              message => $msg,
+              content => j($msg->{body}->{payload}) || {},
+              header =>  $msg->{header},
+              routing_key => $msg->{deliver}->{method_frame}->{routing_key},
+            });
+          }
+        });
+     })
+     ->then(sub { $self->{reply_queue} })
+     ->catch(sub { $self->{on_error}->(@_) });
+}
+
 sub emit
 {
   my $self = shift;
@@ -112,107 +225,142 @@ sub emit
   my $obj = shift || {};
   my $header = shift || {};
   
+  $obj = (j($obj) || {}) if ref($obj);
+  
   $header->{delivery_mode} = 1;
   
-  $self->{channel}->publish(
-    body => j($obj),
+  $self->{chan}->publish(
+    body => $obj,
     exchange => "mol",
     routing_key => $key,
     header => $header,
   );
 }
 
+sub _declare_queue
+{
+  my $self = shift;
+  my %params = @_;
+  my $deferred = deferred;
+
+  $params{on_success} = sub {
+    my $method = shift;
+    my $name   = $method->method_frame->queue;
+    $deferred->resolve($name);
+  };
+  
+  $params{on_failure} = sub { $deferred->reject("_declare_queue failure [".$params{queue}."]: ".$_[2]) };
+  
+  $self->{chan}->declare_queue(%params);
+  return $deferred->promise;
+}
+
+sub _bind_queue
+{
+  my $self = shift;
+  my $queue = shift;
+  my $key = shift;
+
+  my $deferred = deferred;
+  
+  $self->{chan}->bind_queue(
+    queue => $queue,
+    exchange => "mol",
+    routing_key => $key,
+    on_success => sub { $deferred->resolve($queue) },
+    on_failure => sub { $deferred->reject("_bind_queue failure [$queue/$key]: ".$_[0]) },
+  );
+
+  return $deferred->promise;
+}
+
+sub _bind_queue_to_many_keys
+{
+  my $self = shift;
+  my $queue = shift;
+  my @keys = @_;
+  
+  return collect(map { $self->_bind_queue($queue,$_) } @keys)->then(sub { $queue });
+}
+
+sub _consume
+{
+  my $self = shift;
+  my $sub = pop;
+  my %params = @_;
+
+  my $deferred = deferred;
+  
+  $params{on_failure} = sub { $deferred->reject("_consume failure [".$params{queue}."]: ".$_[2]) };
+  $params{on_success} = sub { $deferred->resolve() };
+  $params{on_consume} = $sub;
+  
+  $self->{chan}->consume(%params);
+  return $deferred->promise;
+}
+
 sub subscribe
 {
   my $self = shift;
   my $keys = shift;
   my $callback = shift;
   
-  $self->{channel}->declare_queue(
+  $self->_declare_queue(
     queue => "",
     no_ack => 1,
     durable => 0,
-    exclusive => 1,
-    on_failure => sub { $self->{on_error}->("declare queue failure: ".$_[2]) },
-    on_success => sub
-    {
-      my $method = shift;
-      my $name   = $method->method_frame->queue;
-      
+    exclusive => 1
+  )->then(sub {
+      my $queue = shift;
       $keys = [ $keys] unless ref($keys) eq "ARRAY";
-      foreach (@$keys)
-      {
-        $self->{channel}->bind_queue(
-          queue => $name,
-          exchange => "mol",
-          routing_key => $_,
-          on_failure => sub { $self->{on_error}->("bind queue failure: ".$_[0]) },
-          on_success => sub
-          {
-            $self->{channel}->consume(
-              queue => $name,
-              no_ack => 1,
-              on_failure => sub { $self->{on_error}->("consume failure: ".$_[0]) },
-              on_consume => sub
-              {
-                my $msg = shift;
-                $callback->({
-                  message => $msg,
-                  content => j($msg->{body}->{payload}),
-                  header =>  $msg->{header},
-                  routing_key => $msg->{deliver}->{method_frame}->{routing_key},
-               });
-              },
-            );
-          },
-        );
-      }
-    }
-  );
+      return $self->_bind_queue_to_many_keys($queue,@$keys);
+  })->then(sub {
+      my $queue = shift;
+      return $self->_consume(
+        queue => $queue,
+        no_ack => 1,
+        sub 
+        {
+          my $msg = shift;
+          $callback->({
+            message => $msg,
+            content => j($msg->{body}->{payload}) || {},
+            header =>  $msg->{header},
+            routing_key => $msg->{deliver}->{method_frame}->{routing_key},
+          });
+        });
+     })->catch(sub { $self->{on_error}->(@_) });
 }
 
-
 sub listen_queue
 {
   my $self = shift;
   my $queue = shift;
   my $bind = shift;
   my $callback = shift;
-  
-  $self->{channel}->declare_queue(
+
+  $self->_declare_queue(
     queue => $queue,
     no_ack => 0,
     durable => 1,
     auto_delete => 0,
-    on_failure => sub { $self->{on_error}->("declare queue failure: ".$_[2]) },
-    on_success => sub
-    {
-      my $method = shift;
-      my $name   = $method->method_frame->queue;
-      
-      $self->{channel}->bind_queue(
-          queue => $queue,
-          exchange => "mol",
-          routing_key => $bind,
-      );
-      
-      $self->{channel}->consume(
+  )->then(sub {
+      return $self->_bind_queue($queue,$bind)
+  })->then(sub {
+    return $self->_consume(
         queue => $queue,
         no_ack => 0,
-        on_failure => sub { $self->{on_error}->("consume failure: ".$_[2]) },
-        on_consume => sub
+        sub
         {
           my $msg = shift;
           $callback->({
             message => $msg,
-            content => j($msg->{body}->{payload}),
+            content => j($msg->{body}->{payload}) || {},
             header =>  $msg->{header},
             routing_key => $msg->{deliver}->{method_frame}->{routing_key},
           });
-        },
-      );
-    }
-  );
+        });
+  })->catch(sub { $self->{on_error}->(@_) });
 }
 
 sub ack
@@ -220,7 +368,7 @@ sub ack
   my $self = shift;
   my $m = shift;
   
-  $self->{channel}->ack(
+  $self->{chan}->ack(
     delivery_tag => $m->{message}->{deliver}->{method_frame}->{delivery_tag},
   );
 }
@@ -230,7 +378,7 @@ sub reject
   my $self = shift;
   my $m = shift;
   
-  $self->{channel}->reject(
+  $self->{chan}->reject(
     delivery_tag => $m->{message}->{deliver}->{method_frame}->{delivery_tag},
     requeue => 0,
   );
@@ -241,10 +389,36 @@ sub requeue
   my $self = shift;
   my $m = shift;
 
-  $self->{channel}->reject(
-    delivery_tag => $m->{deliver}->{method_frame}->{delivery_tag},
+  $self->{chan}->reject(
+    delivery_tag => $m->{message}->{deliver}->{method_frame}->{delivery_tag},
     requeue => 1,
   );
 }
 
+sub reply
+{
+  my $self = shift;
+  my $msg = shift;
+  my $obj = shift || {};
+  my $header = shift || {};
+
+  $header->{delivery_mode} = 2;
+  $header->{correlation_id} = $msg->{header}->{correlation_id};
+
+  $obj = (j($obj) || {}) if ref($obj);
+
+  $self->{chan}->publish(
+    body => $obj,
+    exchange => "mol",
+    routing_key => $msg->{header}->{reply_to},
+    header => $header,
+  );
+}
+
+sub DESTROY
+{
+  my $self = shift;
+  $self->{rabbit} = undef;
+}
+
 1;

+ 124 - 0
lib/rabbit_async_rec.pm

@@ -0,0 +1,124 @@
+#!/usr/bin/perl
+
+# Реконнект для rabbit_async
+# Юрий Жиловец, 06.04.2018
+
+package rabbit_async_rec;
+
+use Modern::Perl;
+use Mojo::IOLoop;
+
+use rabbit_async;
+
+use constant SCALE => 2;
+use constant MIN_TIMEOUT => 1;
+use constant MAX_TIMEOUT => 30;
+
+sub new
+{
+  my $class = shift;
+  my $arg = shift;
+  my $callback = shift;
+  
+  my $self = bless {
+    timeout => 0,
+    arg => $arg,
+    callback => $callback,
+    delayed => [],
+  };
+  
+  $arg->{on_close} = sub { $self->_reconnect_later };
+  $arg->{on_error} = sub { say STDERR "RabbitMQ error: " . shift };
+  $arg->{on_connect_error} = sub { 
+    unless ($self->{timeout})
+    {
+      # если ошибка произошла при старте, это фатально
+      say STDERR "RabbitMQ connect error: ", @_;
+      exit();
+    }
+    else
+    {
+      say STDERR "RabbitMQ error: ",@_;
+      $self->_reconnect_later;
+    }
+  };
+  
+  $self->_connect;
+            
+  return $self;
+}
+
+sub _reconnect_later
+{
+  my $self = shift;
+  
+  # событие запросто может придти второй раз на соединении, которое теоретически уже закрылось
+  $self->{rabbit}->{on_close} = $self->{rabbit}->{on_error} = sub {};
+  $self->{rabbit} = undef;
+  
+  $self->{timeout} = $self->_calc_timeout;
+  say STDERR "rabbit_async_rec: will try to reconnect in $self->{timeout} sec...";
+    
+  Mojo::IOLoop->timer($self->{timeout} => sub { $self->_connect });    
+}
+
+sub _connect
+{
+  my $self = shift;
+
+  say STDERR "rabit_async_rec: connecting to ", $self->{arg}->{host};
+  
+  my $rabbit;
+  
+  my $cb = sub
+  {
+    $self->{rabbit} = $rabbit;
+
+    foreach (@{ $self->{delayed} })
+    {
+      my $method = $_->{method};
+      $self->{rabbit}->$method(@{ $_->{args} });
+    }
+  
+    $self->{delayed} = [];
+    
+    $self->{callback}->();
+  };
+  
+  $rabbit = new rabbit_async($self->{arg}, $cb);
+}
+
+sub _calc_timeout
+{
+  my $self = shift;
+  
+  my $timeout = MIN_TIMEOUT;
+  if ($self->{timeout})
+  {
+    $timeout = $self->{timeout} * SCALE;
+    $timeout = MAX_TIMEOUT if $timeout > MAX_TIMEOUT;
+  }
+
+  return $timeout;  
+}
+
+sub AUTOLOAD
+{
+  my $self = shift or return undef;
+  my @args = @_;
+
+  ( my $method = $rabbit_async_rec::AUTOLOAD ) =~ s{.*::}{};
+  
+  if ($self->{rabbit})
+  {
+    # делегируем запросы настоящему модулю
+    $self->{rabbit}->$method(@args);
+  }
+  else
+  {
+    # запоминаем запрос для последующего выполнения
+    push @{ $self->{delayed} }, { method=>$method, args=>\@args };
+  }
+}
+
+1;

+ 1 - 1
telegram.pl

@@ -22,7 +22,7 @@ my $confdir = "config/".app->mode;
 use FindBin qw/$Bin/;
 use lib "$Bin/lib";
 
-use rabbit_async;
+use rabbit_async_rec;
 
 plugin yaml_config => {
   file      => "$confdir/$NAME.cfg",