rabbit_async.pm 8.9 KB


  1. #!/usr/bin/perl
  2. # Асинхронный доступ к RabbitMQ
  3. # Юрий Жиловец, 20.08.2015
  4. package rabbit_async;
  5. use Modern::Perl;
  6. use EV;
  7. use AnyEvent;
  8. use AnyEvent::RabbitMQ;
  9. use Data::Dumper;
  10. use Mojo::JSON qw/j/;
  11. use Data::UUID;
  12. use Promises qw/deferred collect/;
  13. my $uuid_gen = new Data::UUID;
  14. sub new
  15. {
  16. my $class = shift;
  17. my $arg = shift;
  18. my $callback = shift;
  19. my $self = $class->create($arg);
  20. $self->connect($callback)->done(sub { $callback->($self) if $callback });
  21. return $self;
  22. }
  23. sub create
  24. {
  25. my $class = shift;
  26. my $arg = shift;
  27. my $host = $arg->{host};
  28. my $user = $arg->{user};
  29. my $password = $arg->{password};
  30. my $rab = AnyEvent::RabbitMQ->new->load_xml_spec();
  31. $rab->{verbose} = 1 if $arg->{verbose};
  32. my $on_error = $arg->{on_error} || sub { say "RabbitMQ error: ".shift(); exit() };
  33. my $on_close = $arg->{on_close} || $on_error;
  34. my $on_connect_error = $arg->{on_connect_error} || $on_error;
  35. my $self = bless {
  36. rabbit=>$rab,
  37. on_error=>$on_error,
  38. on_close => $on_close,
  39. on_connect_error => $on_connect_error,
  40. host=>$host,
  41. user=>$user,
  42. product => $arg->{product} || "rabbit_async",
  43. password=>$password,
  44. rpc_calls => {},
  45. delayed => [],
  46. };
  47. return $self;
  48. }
  49. sub _connect
  50. {
  51. my $self = shift;
  52. my $def = deferred;
  53. my %params = (
  54. host => $self->{host},
  55. port => 5672,
  56. user => $self->{user},
  57. pass => $self->{password},
  58. vhost => "/",
  59. timeout => 1,
  60. tls => 0,
  61. tune => { heartbeat => 60, channel_max => 1 },
  62. client_properties => {
  63. product => $self->{product},
  64. },
  65. on_failure => sub { $def->reject(join(" ",@_)) },
  66. on_read_failure => sub { $self->{on_error}->(@_) },
  67. on_return => sub {
  68. my $frame = shift;
  69. $self->{on_error}->("Unable to deliver frame" . Dumper($frame));
  70. },
  71. on_close => sub {
  72. my $why = shift;
  73. if (ref($why)) {
  74. my $method_frame = $why->method_frame;
  75. $self->{on_close}->("Connection closed:" . $method_frame->reply_text);
  76. }
  77. else {
  78. $self->{on_close}->("Connection closed: $why");
  79. }
  80. },
  81. on_success => sub { $def->resolve() },
  82. );
  83. $self->{rabbit}->connect(%params);
  84. return $def->promise;
  85. }
  86. sub connect
  87. {
  88. my $self = shift;
  89. return $self->_connect->then(sub
  90. {
  91. $self->_open_channel;
  92. })->then(sub
  93. {
  94. my $channel = shift;
  95. $self->{chan} = $channel;
  96. })->catch(sub
  97. {
  98. $self->{on_connect_error}->(@_);
  99. die @_;
  100. });
  101. }
  102. sub _open_channel
  103. {
  104. my $self = shift;
  105. my $deferred = deferred;
  106. my %params = ();
  107. $params{on_success} = sub { $deferred->resolve(shift) };
  108. $params{on_failure} = sub { $deferred->reject(@_) };
  109. $params{on_close} = sub {
  110. my $why = shift;
  111. if (ref($why)) {
  112. my $method_frame = $why->method_frame;
  113. $self->{on_error}->("Channel closed: " . $method_frame->reply_text);
  114. }
  115. else {
  116. $self->{on_error}->("Channel closed: $why");
  117. }
  118. };
  119. $self->{rabbit}->open_channel(%params);
  120. return $deferred->promise;
  121. }
  122. sub send
  123. {
  124. shift->_send(2, @_);
  125. }
  126. sub emit
  127. {
  128. shift->_send(1, @_);
  129. }
  130. sub _send
  131. {
  132. my $self = shift;
  133. if ($self->{chan}->is_active)
  134. {
  135. $self->_publish(0, @_);
  136. foreach (@{ $self->{delayed} })
  137. {
  138. $self->_publish(@$_);
  139. }
  140. }
  141. else
  142. {
  143. push @{ $self->{delayed} }, [ 0.1, @_];
  144. }
  145. }
  146. sub _publish
  147. {
  148. my $self = shift;
  149. my $delay = shift;
  150. my $mode = shift;
  151. my $key = shift;
  152. my $obj = shift || {};
  153. my $header = shift || {};
  154. Mojo::IOLoop->timer($delay => sub
  155. {
  156. $obj = (j($obj) || {}) if ref($obj);
  157. $header->{delivery_mode} = $mode;
  158. $self->{chan}->publish(
  159. body => $obj,
  160. exchange => "mol",
  161. routing_key => $key,
  162. header => $header,
  163. );
  164. });
  165. }
  166. sub call
  167. {
  168. my $self = shift;
  169. my $sub = pop;
  170. my $key = shift;
  171. my $obj = shift || {};
  172. my $header = shift || {};
  173. my $uniq = $uuid_gen->create_str();
  174. $self->{rpc_calls}->{$uniq} = $sub;
  175. $self->_reply_queue->then(sub {
  176. my $queue = shift;
  177. $header->{reply_to} = $queue;
  178. $header->{correlation_id} = $uniq;
  179. $self->send($key,$obj,$header);
  180. });
  181. }
  182. sub _reply_queue
  183. {
  184. my $self = shift;
  185. return deferred->resolve($self->{reply_queue})->promise if $self->{reply_queue};
  186. return $self->_declare_queue(
  187. queue => "",
  188. no_ack => 1,
  189. durable => 0,
  190. exclusive => 1
  191. )->then(sub {
  192. my $queue = shift;
  193. return $self->_bind_queue($queue,$queue);
  194. })->then(sub {
  195. my $queue = shift;
  196. $self->{reply_queue} = $queue;
  197. return $self->_consume(
  198. queue => $queue,
  199. no_ack => 1,
  200. sub
  201. {
  202. my $msg = shift;
  203. my $corr_id = $msg->{header}->{correlation_id};
  204. if (exists $self->{rpc_calls}->{$corr_id})
  205. {
  206. (delete $self->{rpc_calls}->{$corr_id})->({
  207. message => $msg,
  208. content => j($msg->{body}->{payload}) || {},
  209. header => $msg->{header},
  210. routing_key => $msg->{deliver}->{method_frame}->{routing_key},
  211. });
  212. }
  213. });
  214. })
  215. ->then(sub { $self->{reply_queue} })
  216. ->catch(sub { $self->{on_error}->(@_) });
  217. }
  218. sub _declare_queue
  219. {
  220. my $self = shift;
  221. my %params = @_;
  222. my $deferred = deferred;
  223. $params{on_success} = sub {
  224. my $method = shift;
  225. my $name = $method->method_frame->queue;
  226. $deferred->resolve($name);
  227. };
  228. $params{on_failure} = sub { $deferred->reject("_declare_queue failure [".$params{queue}."]: ".$_[2]) };
  229. $self->{chan}->declare_queue(%params);
  230. return $deferred->promise;
  231. }
  232. sub _bind_queue
  233. {
  234. my $self = shift;
  235. my $queue = shift;
  236. my $key = shift;
  237. my $deferred = deferred;
  238. $self->{chan}->bind_queue(
  239. queue => $queue,
  240. exchange => "mol",
  241. routing_key => $key,
  242. on_success => sub { $deferred->resolve($queue) },
  243. on_failure => sub { $deferred->reject("_bind_queue failure [$queue/$key]: ".$_[0]) },
  244. );
  245. return $deferred->promise;
  246. }
  247. sub _bind_queue_to_many_keys
  248. {
  249. my $self = shift;
  250. my $queue = shift;
  251. my @keys = @_;
  252. return collect(map { $self->_bind_queue($queue,$_) } @keys)->then(sub { $queue });
  253. }
  254. sub _consume
  255. {
  256. my $self = shift;
  257. my $sub = pop;
  258. my %params = @_;
  259. my $deferred = deferred;
  260. $params{on_failure} = sub { $deferred->reject("_consume failure [".$params{queue}."]: ".$_[2]) };
  261. $params{on_success} = sub { $deferred->resolve() };
  262. $params{on_consume} = $sub;
  263. $self->{chan}->consume(%params);
  264. return $deferred->promise;
  265. }
  266. sub subscribe
  267. {
  268. my $self = shift;
  269. my $keys = shift;
  270. my $callback = shift;
  271. $self->_declare_queue(
  272. queue => "",
  273. no_ack => 1,
  274. durable => 0,
  275. exclusive => 1
  276. )->then(sub {
  277. my $queue = shift;
  278. $keys = [ $keys] unless ref($keys) eq "ARRAY";
  279. return $self->_bind_queue_to_many_keys($queue,@$keys);
  280. })->then(sub {
  281. my $queue = shift;
  282. return $self->_consume(
  283. queue => $queue,
  284. no_ack => 1,
  285. sub
  286. {
  287. my $msg = shift;
  288. $callback->({
  289. message => $msg,
  290. content => j($msg->{body}->{payload}) || {},
  291. header => $msg->{header},
  292. routing_key => $msg->{deliver}->{method_frame}->{routing_key},
  293. });
  294. });
  295. })->catch(sub { $self->{on_error}->(@_) });
  296. }
  297. sub listen_queue
  298. {
  299. my $self = shift;
  300. my $queue = shift;
  301. my $bind = shift;
  302. my $callback = shift;
  303. $self->_declare_queue(
  304. queue => $queue,
  305. no_ack => 0,
  306. durable => 0,
  307. auto_delete => 0,
  308. )->then(sub {
  309. return $self->_bind_queue($queue,$bind)
  310. })->then(sub {
  311. return $self->_consume(
  312. queue => $queue,
  313. no_ack => 0,
  314. sub
  315. {
  316. my $msg = shift;
  317. $callback->({
  318. message => $msg,
  319. content => j($msg->{body}->{payload}) || {},
  320. header => $msg->{header},
  321. routing_key => $msg->{deliver}->{method_frame}->{routing_key},
  322. });
  323. });
  324. })->catch(sub { $self->{on_error}->(@_) });
  325. }
  326. sub ack
  327. {
  328. my $self = shift;
  329. my $m = shift;
  330. $self->{chan}->ack(
  331. delivery_tag => $m->{message}->{deliver}->{method_frame}->{delivery_tag},
  332. );
  333. }
  334. sub reject
  335. {
  336. my $self = shift;
  337. my $m = shift;
  338. $self->{chan}->reject(
  339. delivery_tag => $m->{message}->{deliver}->{method_frame}->{delivery_tag},
  340. requeue => 0,
  341. );
  342. }
  343. sub requeue
  344. {
  345. my $self = shift;
  346. my $m = shift;
  347. $self->{chan}->reject(
  348. delivery_tag => $m->{message}->{deliver}->{method_frame}->{delivery_tag},
  349. requeue => 1,
  350. );
  351. }
  352. sub reply
  353. {
  354. my $self = shift;
  355. my $msg = shift;
  356. my $obj = shift || {};
  357. my $header = shift || {};
  358. $header->{delivery_mode} = 2;
  359. $header->{correlation_id} = $msg->{header}->{correlation_id};
  360. $obj = (j($obj) || {}) if ref($obj);
  361. $self->{chan}->publish(
  362. body => $obj,
  363. exchange => "mol",
  364. routing_key => $msg->{header}->{reply_to},
  365. header => $header,
  366. );
  367. }
  368. sub DESTROY
  369. {
  370. my $self = shift;
  371. $self->{rabbit} = undef;
  372. }
  373. 1;