rabbit_async.pm 5.4 KB


  1. #!/usr/bin/perl
  2. # Асинхронный доступ к RabbitMQ
  3. # Юрий Жиловец, 20.08.2015
  4. package rabbit_async;
  5. use Modern::Perl;
  6. use EV;
  7. use AnyEvent;
  8. use AnyEvent::RabbitMQ;
  9. use Data::Dumper;
  10. use Mojo::JSON qw/j/;
  11. sub new
  12. {
  13. my $class = shift;
  14. my $arg = shift;
  15. my $callback = shift;
  16. my $host = $arg->{host};
  17. my $user = $arg->{user};
  18. my $password = $arg->{password};
  19. my $rab = AnyEvent::RabbitMQ->new->load_xml_spec();
  20. # $rab->{verbose}= 1;
  21. my $on_error = $arg->{on_error} || sub { say "RabbitMQ error: ".shift(); exit() };
  22. my $self = bless {rabbit=>$rab, on_error=>$on_error, host=>$host, user=>$user, password=>$password};
  23. $self->_connect($callback);
  24. return $self;
  25. }
  26. sub _connect
  27. {
  28. my $self = shift;
  29. my $callback = shift;
  30. $self->{rabbit}->connect(
  31. host => $self->{host},
  32. port => 5672,
  33. user => $self->{user},
  34. pass => $self->{password},
  35. vhost => "/",
  36. timeout => 1,
  37. tls => 0,
  38. tune => { heartbeat => 30, channel_max => 1 },
  39. on_failure => sub { $self->{on_error}->("connect failure: ".$_[2]) },
  40. on_read_failure => sub { $self->{on_error}->(@_) },
  41. on_return => sub {
  42. my $frame = shift;
  43. $self->{on_error}->("Unable to deliver frame" . Dumper($frame));
  44. },
  45. on_close => sub {
  46. my $why = shift;
  47. if (ref($why)) {
  48. my $method_frame = $why->method_frame;
  49. $self->{on_error}->($method_frame->reply_code. ": ". $method_frame->reply_text);
  50. }
  51. else {
  52. $self->{on_error}->($why);
  53. }
  54. },
  55. on_success => sub
  56. {
  57. my $ar = shift;
  58. $ar->open_channel(
  59. on_success => sub
  60. {
  61. my $channel = shift;
  62. $self->{channel} = $channel;
  63. $callback->() if $callback;
  64. },
  65. on_failure => sub { $self->{on_error}->("channel failure: ".$_[2]) },
  66. on_close => sub
  67. {
  68. my $method_frame = shift->method_frame;
  69. $self->{on_error}->($method_frame->reply_code . $method_frame->reply_text);
  70. },
  71. );
  72. },
  73. );
  74. }
  75. sub send
  76. {
  77. my $self = shift;
  78. my $key = shift;
  79. my $obj = shift || {};
  80. my $header = shift || {};
  81. $header->{delivery_mode} = 2;
  82. $self->{channel}->publish(
  83. body => j($obj),
  84. exchange => "mol",
  85. routing_key => $key,
  86. header => $header,
  87. );
  88. }
  89. sub emit
  90. {
  91. my $self = shift;
  92. my $key = shift;
  93. my $obj = shift || {};
  94. my $header = shift || {};
  95. $header->{delivery_mode} = 1;
  96. $self->{channel}->publish(
  97. body => j($obj),
  98. exchange => "mol",
  99. routing_key => $key,
  100. header => $header,
  101. );
  102. }
  103. sub subscribe
  104. {
  105. my $self = shift;
  106. my $keys = shift;
  107. my $callback = shift;
  108. $self->{channel}->declare_queue(
  109. queue => "",
  110. no_ack => 1,
  111. durable => 0,
  112. exclusive => 1,
  113. on_failure => sub { $self->{on_error}->("declare queue failure: ".$_[2]) },
  114. on_success => sub
  115. {
  116. my $method = shift;
  117. my $name = $method->method_frame->queue;
  118. $keys = [ $keys] unless ref($keys) eq "ARRAY";
  119. foreach (@$keys)
  120. {
  121. $self->{channel}->bind_queue(
  122. queue => $name,
  123. exchange => "mol",
  124. routing_key => $_,
  125. on_failure => sub { $self->{on_error}->("bind queue failure: ".$_[0]) },
  126. on_success => sub
  127. {
  128. $self->{channel}->consume(
  129. queue => $name,
  130. no_ack => 1,
  131. on_failure => sub { $self->{on_error}->("consume failure: ".$_[0]) },
  132. on_consume => sub
  133. {
  134. my $msg = shift;
  135. $callback->({
  136. message => $msg,
  137. content => j($msg->{body}->{payload}),
  138. header => $msg->{header},
  139. routing_key => $msg->{deliver}->{method_frame}->{routing_key},
  140. });
  141. },
  142. );
  143. },
  144. );
  145. }
  146. }
  147. );
  148. }
  149. sub listen_queue
  150. {
  151. my $self = shift;
  152. my $queue = shift;
  153. my $bind = shift;
  154. my $callback = shift;
  155. $self->{channel}->declare_queue(
  156. queue => $queue,
  157. no_ack => 0,
  158. durable => 1,
  159. auto_delete => 0,
  160. on_failure => sub { $self->{on_error}->("declare queue failure: ".$_[2]) },
  161. on_success => sub
  162. {
  163. my $method = shift;
  164. my $name = $method->method_frame->queue;
  165. $self->{channel}->bind_queue(
  166. queue => $queue,
  167. exchange => "mol",
  168. routing_key => $bind,
  169. );
  170. $self->{channel}->consume(
  171. queue => $queue,
  172. no_ack => 0,
  173. on_failure => sub { $self->{on_error}->("consume failure: ".$_[2]) },
  174. on_consume => sub
  175. {
  176. my $msg = shift;
  177. $callback->({
  178. message => $msg,
  179. content => j($msg->{body}->{payload}),
  180. header => $msg->{header},
  181. routing_key => $msg->{deliver}->{method_frame}->{routing_key},
  182. });
  183. },
  184. );
  185. }
  186. );
  187. }
  188. sub ack
  189. {
  190. my $self = shift;
  191. my $m = shift;
  192. $self->{channel}->ack(
  193. delivery_tag => $m->{deliver}->{method_frame}->{delivery_tag},
  194. );
  195. }
  196. sub reject
  197. {
  198. my $self = shift;
  199. my $m = shift;
  200. $self->{channel}->reject(
  201. delivery_tag => $m->{deliver}->{method_frame}->{delivery_tag},
  202. requeue => 0,
  203. );
  204. }
  205. sub requeue
  206. {
  207. my $self = shift;
  208. my $m = shift;
  209. $self->{channel}->reject(
  210. delivery_tag => $m->{deliver}->{method_frame}->{delivery_tag},
  211. requeue => 1,
  212. );
  213. }
  214. 1;