rabbit_async.pm 9.9 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364365366367368369370371372373374375376377378379380381382383384385386387388389390391392393394395396397398399400401402403404405406407408409410411412413414415416417418419420421422423424425426427428429430431432433434435436437438439440441442443444445446447448449450451452453454455456457458459460461462463464465466467468469470471472473474475476477478479480
  1. #!/usr/bin/perl
  2. # Асинхронный доступ к RabbitMQ
  3. # Юрий Жиловец, 20.08.2015
  4. package rabbit_async;
  5. use Modern::Perl;
  6. use EV;
  7. use AnyEvent;
  8. use AnyEvent::RabbitMQ;
  9. use Data::Dumper;
  10. use Mojo::JSON qw/j/;
  11. use Data::UUID;
  12. use Mojo::Promise;
  13. use constant DEFAULT_CALL_TIMEOUT => 15;
  14. my $uuid_gen = new Data::UUID;
  15. sub new
  16. {
  17. my $class = shift;
  18. my $arg = shift;
  19. my $callback = shift;
  20. my $self = $class->create($arg);
  21. $self->connect($callback)->then(sub { $callback->($self) if $callback });
  22. return $self;
  23. }
  24. sub create
  25. {
  26. my $class = shift;
  27. my $arg = shift;
  28. my $host = $arg->{host};
  29. my $user = $arg->{user};
  30. my $password = $arg->{password};
  31. my $rab = AnyEvent::RabbitMQ->new->load_xml_spec();
  32. $rab->{verbose} = 1 if $arg->{verbose};
  33. my $on_error = $arg->{on_error} || sub { say "RabbitMQ error: ".shift(); exit() };
  34. my $on_close = $arg->{on_close} || $on_error;
  35. my $on_connect_error = $arg->{on_connect_error} || $on_error;
  36. my $self = bless {
  37. rabbit=>$rab,
  38. on_error=>$on_error,
  39. on_close => $on_close,
  40. on_connect_error => $on_connect_error,
  41. host=>$host,
  42. user=>$user,
  43. product => $arg->{product} || "rabbit_async",
  44. password=>$password,
  45. rpc_calls => {},
  46. delayed => [],
  47. };
  48. return $self;
  49. }
  50. sub _connect
  51. {
  52. my $self = shift;
  53. my $def = Mojo::Promise->new;
  54. my %params = (
  55. host => $self->{host},
  56. port => 5672,
  57. user => $self->{user},
  58. pass => $self->{password},
  59. vhost => "/",
  60. timeout => 1,
  61. tls => 0,
  62. tune => { heartbeat => 60, channel_max => 1 },
  63. client_properties => {
  64. product => $self->{product},
  65. },
  66. on_failure => sub { $def->reject(join(" ",@_)) },
  67. on_read_failure => sub { $self->{on_error}->(@_) },
  68. on_return => sub {
  69. my $frame = shift;
  70. $self->{on_error}->("Unable to deliver frame" . Dumper($frame));
  71. },
  72. on_close => sub {
  73. my $why = shift;
  74. if (ref($why)) {
  75. my $method_frame = $why->method_frame;
  76. $self->{on_close}->("Connection closed:" . $method_frame->reply_text);
  77. }
  78. else {
  79. $self->{on_close}->("Connection closed: $why");
  80. }
  81. },
  82. on_success => sub { $def->resolve() },
  83. );
  84. $self->{rabbit}->connect(%params);
  85. return $def;
  86. }
  87. sub connect
  88. {
  89. my $self = shift;
  90. return $self->_connect->then(sub
  91. {
  92. $self->_open_channel;
  93. })->then(sub
  94. {
  95. my $channel = shift;
  96. $self->{chan} = $channel;
  97. })->catch(sub
  98. {
  99. $self->{on_connect_error}->(@_);
  100. die @_;
  101. });
  102. }
  103. sub _open_channel
  104. {
  105. my $self = shift;
  106. my $deferred = Mojo::Promise->new;
  107. my %params = ();
  108. $params{on_success} = sub { $deferred->resolve(shift) };
  109. $params{on_failure} = sub { $deferred->reject(@_) };
  110. $params{on_close} = sub {
  111. my $why = shift;
  112. if (ref($why)) {
  113. my $method_frame = $why->method_frame;
  114. $self->{on_error}->("Channel closed: " . $method_frame->reply_text);
  115. }
  116. else {
  117. $self->{on_error}->("Channel closed: $why");
  118. }
  119. };
  120. $self->{rabbit}->open_channel(%params);
  121. return $deferred;
  122. }
  123. sub send
  124. {
  125. shift->_send(2, @_);
  126. }
  127. sub emit
  128. {
  129. shift->_send(1, @_);
  130. }
  131. sub _send
  132. {
  133. my $self = shift;
  134. if ($self->{chan}->is_active)
  135. {
  136. $self->_publish(0, @_);
  137. foreach (@{ $self->{delayed} })
  138. {
  139. $self->_publish(@$_);
  140. }
  141. }
  142. else
  143. {
  144. push @{ $self->{delayed} }, [ 0.1, @_];
  145. }
  146. }
  147. sub _publish
  148. {
  149. my $self = shift;
  150. my $delay = shift;
  151. my $mode = shift;
  152. my $key = shift;
  153. my $obj = shift || {};
  154. my $header = shift || {};
  155. Mojo::IOLoop->timer($delay => sub
  156. {
  157. $obj = (j($obj) || {}) if ref($obj);
  158. $header->{delivery_mode} = $mode;
  159. $self->{chan}->publish(
  160. body => $obj,
  161. exchange => "mol",
  162. routing_key => $key,
  163. header => $header,
  164. );
  165. });
  166. }
  167. my $ttt;
  168. sub call
  169. {
  170. my $self = shift;
  171. my $sub = pop;
  172. my $key = shift;
  173. my $obj = shift || {};
  174. my $header = shift || {};
  175. my $timeout = shift || DEFAULT_CALL_TIMEOUT;
  176. my $uniq = $uuid_gen->create_str();
  177. $self->{rpc_calls}->{$uniq} = $sub;
  178. my $timer = AnyEvent->timer(after=>$timeout, cb => sub
  179. {
  180. my $sub = delete $self->{rpc_calls}->{$uniq};
  181. $sub->({error=>"Timeout"}) if $sub;
  182. delete $self->{rpc_calls}->{"$uniq-timer"};
  183. });
  184. $ttt=$timer;
  185. $self->{rpc_calls}->{"$uniq-timer"} = $timer;
  186. $self->_reply_queue->then(sub {
  187. my $queue = shift;
  188. $header->{reply_to} = $queue;
  189. $header->{correlation_id} = $uniq;
  190. $self->send($key,$obj,$header);
  191. });
  192. }
  193. sub _reply_queue
  194. {
  195. my $self = shift;
  196. return Mojo::Promise->new->resolve($self->{reply_queue}) if $self->{reply_queue};
  197. return $self->_declare_queue(
  198. queue => "",
  199. no_ack => 1,
  200. durable => 0,
  201. exclusive => 1
  202. )->then(sub {
  203. my $queue = shift;
  204. return $self->_bind_queue($queue,$queue);
  205. })->then(sub {
  206. my $queue = shift;
  207. $self->{reply_queue} = $queue;
  208. return $self->_consume(
  209. queue => $queue,
  210. no_ack => 1,
  211. sub
  212. {
  213. my $msg = shift;
  214. my $corr_id = $msg->{header}->{correlation_id};
  215. my $sub = delete $self->{rpc_calls}->{$corr_id};
  216. $sub->({
  217. message => $msg,
  218. content => j($msg->{body}->{payload}) || {},
  219. header => $msg->{header},
  220. routing_key => $msg->{deliver}->{method_frame}->{routing_key},
  221. }) if $sub;
  222. });
  223. })
  224. ->then(sub { $self->{reply_queue} })
  225. ->catch(sub { $self->{on_error}->(@_) });
  226. }
  227. sub _declare_queue
  228. {
  229. my $self = shift;
  230. my %params = @_;
  231. my $deferred = Mojo::Promise->new;
  232. $params{on_success} = sub {
  233. my $method = shift;
  234. my $name = $method->method_frame->queue;
  235. $deferred->resolve($name);
  236. };
  237. $params{on_failure} = sub { $deferred->reject("_declare_queue failure [".$params{queue}."]: ".$_[2]) };
  238. $self->{chan}->declare_queue(%params);
  239. return $deferred;
  240. }
  241. sub _bind_queue
  242. {
  243. my $self = shift;
  244. my $queue = shift;
  245. my $key = shift;
  246. my $deferred = Mojo::Promise->new;
  247. $self->{chan}->bind_queue(
  248. queue => $queue,
  249. exchange => "mol",
  250. routing_key => $key,
  251. on_success => sub { $deferred->resolve($queue) },
  252. on_failure => sub { $deferred->reject("_bind_queue failure [$queue/$key]: ".$_[0]) },
  253. );
  254. return $deferred;
  255. }
  256. sub _bind_queue_to_many_keys
  257. {
  258. my $self = shift;
  259. my $queue = shift;
  260. my @keys = @_;
  261. return Mojo::Promise->all(map { $self->_bind_queue($queue,$_) } @keys)->then(sub { $queue });
  262. }
  263. sub _consume
  264. {
  265. my $self = shift;
  266. my $sub = pop;
  267. my %params = @_;
  268. my $deferred = Mojo::Promise->new;
  269. $params{on_failure} = sub { $deferred->reject("_consume failure [".$params{queue}."]: ".$_[2]) };
  270. $params{on_success} = sub { $deferred->resolve() };
  271. $params{on_consume} = $sub;
  272. $self->{chan}->consume(%params);
  273. return $deferred;
  274. }
  275. sub subscribe
  276. {
  277. my $self = shift;
  278. my $keys = shift;
  279. my $callback = shift;
  280. $self->subscribe_queue("", $keys, $callback);
  281. }
  282. sub subscribe_queue
  283. {
  284. my $self = shift;
  285. my $queue = shift;
  286. my $keys = shift;
  287. my $callback = shift;
  288. $self->_declare_queue(
  289. queue => $queue,
  290. no_ack => 1,
  291. durable => 0,
  292. exclusive => 1
  293. )->then(sub {
  294. my $queue = shift;
  295. $keys = [ $keys] unless ref($keys) eq "ARRAY";
  296. return $self->_bind_queue_to_many_keys($queue,@$keys);
  297. })->then(sub {
  298. my $queue = shift;
  299. return $self->_consume(
  300. queue => $queue,
  301. no_ack => 1,
  302. sub
  303. {
  304. my $msg = shift;
  305. $callback->({
  306. message => $msg,
  307. content => j($msg->{body}->{payload}) || {},
  308. header => $msg->{header},
  309. routing_key => $msg->{deliver}->{method_frame}->{routing_key},
  310. });
  311. });
  312. })->catch(sub { $self->{on_error}->(@_) });
  313. }
  314. sub listen_queue
  315. {
  316. my $self = shift;
  317. my $queue = shift;
  318. my $bind = shift;
  319. my $callback = shift;
  320. $self->_declare_queue(
  321. queue => $queue,
  322. no_ack => 0,
  323. durable => 0,
  324. auto_delete => 0,
  325. )->then(sub {
  326. return $self->_bind_queue_to_many_keys($queue, ref($bind) ? @$bind : $bind)
  327. })->then(sub {
  328. return $self->_consume(
  329. queue => $queue,
  330. no_ack => 0,
  331. sub
  332. {
  333. my $msg = shift;
  334. $callback->({
  335. message => $msg,
  336. content => j($msg->{body}->{payload}) || {},
  337. header => $msg->{header},
  338. routing_key => $msg->{deliver}->{method_frame}->{routing_key},
  339. });
  340. });
  341. })->catch(sub { $self->{on_error}->(@_) });
  342. }
  343. sub ack
  344. {
  345. my $self = shift;
  346. my $m = shift;
  347. $self->{chan}->ack(
  348. delivery_tag => $m->{message}->{deliver}->{method_frame}->{delivery_tag},
  349. );
  350. }
  351. sub reject
  352. {
  353. my $self = shift;
  354. my $m = shift;
  355. $self->{chan}->reject(
  356. delivery_tag => $m->{message}->{deliver}->{method_frame}->{delivery_tag},
  357. requeue => 0,
  358. );
  359. }
  360. sub requeue
  361. {
  362. my $self = shift;
  363. my $m = shift;
  364. $self->{chan}->reject(
  365. delivery_tag => $m->{message}->{deliver}->{method_frame}->{delivery_tag},
  366. requeue => 1,
  367. );
  368. }
  369. sub reply
  370. {
  371. my $self = shift;
  372. my $msg = shift;
  373. my $obj = shift || {};
  374. my $header = shift || {};
  375. $header->{delivery_mode} = 2;
  376. $header->{correlation_id} = $msg->{header}->{correlation_id};
  377. $obj = (j($obj) || {}) if ref($obj);
  378. $self->{chan}->publish(
  379. body => $obj,
  380. exchange => "mol",
  381. routing_key => $msg->{header}->{reply_to},
  382. header => $header,
  383. );
  384. }
  385. sub prefetch
  386. {
  387. my $self = shift;
  388. my $cnt = shift;
  389. my $deferred = Mojo::Promise->new;
  390. $self->{chan}->qos(
  391. on_failure => sub { $deferred->reject(join(" ",@_)) },
  392. on_success => sub { $deferred->resolve() },
  393. prefetch_count => $cnt,
  394. global => 0, # только для канала, хотя особой разницы нет - у нас нету мультиплексированных соединений
  395. );
  396. return $deferred;
  397. }
  398. sub DESTROY
  399. {
  400. my $self = shift;
  401. $self->{rabbit} = undef;
  402. }
  403. 1;