Yuriy Zhilovets 7 жил өмнө
parent
commit
aaad2136b5

+ 46 - 28
lib/rabbit_async.pm

@@ -53,6 +53,7 @@ sub create
     product => $arg->{product} || "rabbit_async",
     password=>$password,
     rpc_calls => {},
+    delayed => [],
   };
 
   return $self;
@@ -112,6 +113,7 @@ sub connect
   })->catch(sub 
   {
     $self->{on_connect_error}->(@_);
+    die @_;
   });
 }
 
@@ -134,28 +136,62 @@ sub _open_channel
         $self->{on_error}->("Channel closed: $why");
       }
   };
-                          
+  
   $self->{rabbit}->open_channel(%params);
   return $deferred->promise;
 }
 
 sub send
+{
+  shift->_send(2, @_);
+}
+
+sub emit
+{
+  shift->_send(1, @_);
+}
+
+sub _send
+{
+  my $self = shift;
+  
+  if ($self->{chan}->is_active)
+  {
+    $self->_publish(0, @_);
+    
+    foreach (@{ $self->{delayed} })
+    {
+      $self->_publish(@$_);
+    }
+  }
+  else
+  {
+    push @{ $self->{delayed} }, [ 0.1, @_];
+  }
+}
+
+sub _publish
 {
   my $self = shift;
+  my $delay = shift;
+  my $mode = shift;
   my $key = shift;
   my $obj = shift || {};
   my $header = shift || {};
 
-  $obj = (j($obj) || {}) if ref($obj);
+  Mojo::IOLoop->timer($delay => sub
+  {
+      $obj = (j($obj) || {}) if ref($obj);
 
-  $header->{delivery_mode} = 2;
+      $header->{delivery_mode} = $mode;
 
-  $self->{chan}->publish(
-    body => $obj,
-    exchange => "mol",
-    routing_key => $key,
-    header => $header,
-  );
+      $self->{chan}->publish(
+        body => $obj,
+        exchange => "mol",
+        routing_key => $key,
+        header => $header,
+      );
+  });
 }
 
 sub call
@@ -218,24 +254,6 @@ sub _reply_queue
      ->catch(sub { $self->{on_error}->(@_) });
 }
 
-sub emit
-{
-  my $self = shift;
-  my $key = shift;
-  my $obj = shift || {};
-  my $header = shift || {};
-  
-  $obj = (j($obj) || {}) if ref($obj);
-  
-  $header->{delivery_mode} = 1;
-  
-  $self->{chan}->publish(
-    body => $obj,
-    exchange => "mol",
-    routing_key => $key,
-    header => $header,
-  );
-}
 
 sub _declare_queue
 {
@@ -342,7 +360,7 @@ sub listen_queue
   $self->_declare_queue(
     queue => $queue,
     no_ack => 0,
-    durable => 1,
+    durable => 0,
     auto_delete => 0,
   )->then(sub {
       return $self->_bind_queue($queue,$bind)

+ 3 - 3
lib/rabbit_async_rec.pm

@@ -28,8 +28,7 @@ sub new
   };
   
   $arg->{on_close} = sub { $self->_reconnect_later };
-  $arg->{on_error} = sub { say STDERR "RabbitMQ error: " . shift };
-  $arg->{on_connect_error} = sub { 
+  $arg->{on_error} = $arg->{on_connect_error} = sub { 
     unless ($self->{timeout})
     {
       # если ошибка произошла при старте, это фатально
@@ -66,7 +65,7 @@ sub _connect
 {
   my $self = shift;
 
-  say STDERR "rabit_async_rec: connecting to ", $self->{arg}->{host};
+  say STDERR "rabbit_async_rec: connecting to ", $self->{arg}->{host};
   
   my $rabbit;
   
@@ -81,6 +80,7 @@ sub _connect
     }
   
     $self->{delayed} = [];
+    $self->{timeout} = 0;
     
     $self->{callback}->();
   };