rabbit_async.pm 8.8 KB


  1. #!/usr/bin/perl
  2. # Асинхронный доступ к RabbitMQ
  3. # Юрий Жиловец, 20.08.2015
  4. package rabbit_async;
  5. use Modern::Perl;
  6. use EV;
  7. use AnyEvent;
  8. use AnyEvent::RabbitMQ;
  9. use Data::Dumper;
  10. use Mojo::JSON qw/j/;
  11. use Data::UUID;
  12. use Promises qw/deferred collect/;
  13. my $uuid_gen = new Data::UUID;
  14. sub new
  15. {
  16. my $class = shift;
  17. my $arg = shift;
  18. my $callback = shift;
  19. my $self = $class->create($arg);
  20. $self->connect($callback)->done(sub { $callback->($self) if $callback });
  21. return $self;
  22. }
  23. sub create
  24. {
  25. my $class = shift;
  26. my $arg = shift;
  27. my $host = $arg->{host};
  28. my $user = $arg->{user};
  29. my $password = $arg->{password};
  30. my $rab = AnyEvent::RabbitMQ->new->load_xml_spec();
  31. $rab->{verbose} = 1 if $arg->{verbose};
  32. my $on_error = $arg->{on_error} || sub { say "RabbitMQ error: ".shift(); exit() };
  33. my $on_close = $arg->{on_close} || $on_error;
  34. my $on_connect_error = $arg->{on_connect_error} || $on_error;
  35. my $self = bless {
  36. rabbit=>$rab,
  37. on_error=>$on_error,
  38. on_close => $on_close,
  39. on_connect_error => $on_connect_error,
  40. host=>$host,
  41. user=>$user,
  42. product => $arg->{product} || "rabbit_async",
  43. password=>$password,
  44. rpc_calls => {},
  45. };
  46. return $self;
  47. }
  48. sub _connect
  49. {
  50. my $self = shift;
  51. my $def = deferred;
  52. my %params = (
  53. host => $self->{host},
  54. port => 5672,
  55. user => $self->{user},
  56. pass => $self->{password},
  57. vhost => "/",
  58. timeout => 1,
  59. tls => 0,
  60. tune => { heartbeat => 60, channel_max => 1 },
  61. client_properties => {
  62. product => $self->{product},
  63. },
  64. on_failure => sub { $def->reject(join(" ",@_)) },
  65. on_read_failure => sub { $self->{on_error}->(@_) },
  66. on_return => sub {
  67. my $frame = shift;
  68. $self->{on_error}->("Unable to deliver frame" . Dumper($frame));
  69. },
  70. on_close => sub {
  71. my $why = shift;
  72. if (ref($why)) {
  73. my $method_frame = $why->method_frame;
  74. $self->{on_close}->("Connection closed:" . $method_frame->reply_text);
  75. }
  76. else {
  77. $self->{on_close}->("Connection closed: $why");
  78. }
  79. },
  80. on_success => sub { $def->resolve() },
  81. );
  82. $self->{rabbit}->connect(%params);
  83. return $def->promise;
  84. }
  85. sub connect
  86. {
  87. my $self = shift;
  88. return $self->_connect->then(sub
  89. {
  90. $self->_open_channel;
  91. })->then(sub
  92. {
  93. my $channel = shift;
  94. $self->{chan} = $channel;
  95. })->catch(sub
  96. {
  97. $self->{on_connect_error}->(@_);
  98. });
  99. }
  100. sub _open_channel
  101. {
  102. my $self = shift;
  103. my $deferred = deferred;
  104. my %params = ();
  105. $params{on_success} = sub { $deferred->resolve(shift) };
  106. $params{on_failure} = sub { $deferred->reject(@_) };
  107. $params{on_close} = sub {
  108. my $why = shift;
  109. if (ref($why)) {
  110. my $method_frame = $why->method_frame;
  111. $self->{on_error}->("Channel closed: " . $method_frame->reply_text);
  112. }
  113. else {
  114. $self->{on_error}->("Channel closed: $why");
  115. }
  116. };
  117. $self->{rabbit}->open_channel(%params);
  118. return $deferred->promise;
  119. }
  120. sub send
  121. {
  122. my $self = shift;
  123. my $key = shift;
  124. my $obj = shift || {};
  125. my $header = shift || {};
  126. $obj = (j($obj) || {}) if ref($obj);
  127. $header->{delivery_mode} = 2;
  128. $self->{chan}->publish(
  129. body => $obj,
  130. exchange => "mol",
  131. routing_key => $key,
  132. header => $header,
  133. );
  134. }
  135. sub call
  136. {
  137. my $self = shift;
  138. my $sub = pop;
  139. my $key = shift;
  140. my $obj = shift || {};
  141. my $header = shift || {};
  142. my $uniq = $uuid_gen->create_str();
  143. $self->{rpc_calls}->{$uniq} = $sub;
  144. $self->_reply_queue->then(sub {
  145. my $queue = shift;
  146. $header->{reply_to} = $queue;
  147. $header->{correlation_id} = $uniq;
  148. $self->send($key,$obj,$header);
  149. });
  150. }
  151. sub _reply_queue
  152. {
  153. my $self = shift;
  154. return deferred->resolve($self->{reply_queue})->promise if $self->{reply_queue};
  155. return $self->_declare_queue(
  156. queue => "",
  157. no_ack => 1,
  158. durable => 0,
  159. exclusive => 1
  160. )->then(sub {
  161. my $queue = shift;
  162. return $self->_bind_queue($queue,$queue);
  163. })->then(sub {
  164. my $queue = shift;
  165. $self->{reply_queue} = $queue;
  166. return $self->_consume(
  167. queue => $queue,
  168. no_ack => 1,
  169. sub
  170. {
  171. my $msg = shift;
  172. my $corr_id = $msg->{header}->{correlation_id};
  173. if (exists $self->{rpc_calls}->{$corr_id})
  174. {
  175. (delete $self->{rpc_calls}->{$corr_id})->({
  176. message => $msg,
  177. content => j($msg->{body}->{payload}) || {},
  178. header => $msg->{header},
  179. routing_key => $msg->{deliver}->{method_frame}->{routing_key},
  180. });
  181. }
  182. });
  183. })
  184. ->then(sub { $self->{reply_queue} })
  185. ->catch(sub { $self->{on_error}->(@_) });
  186. }
  187. sub emit
  188. {
  189. my $self = shift;
  190. my $key = shift;
  191. my $obj = shift || {};
  192. my $header = shift || {};
  193. $obj = (j($obj) || {}) if ref($obj);
  194. $header->{delivery_mode} = 1;
  195. $self->{chan}->publish(
  196. body => $obj,
  197. exchange => "mol",
  198. routing_key => $key,
  199. header => $header,
  200. );
  201. }
  202. sub _declare_queue
  203. {
  204. my $self = shift;
  205. my %params = @_;
  206. my $deferred = deferred;
  207. $params{on_success} = sub {
  208. my $method = shift;
  209. my $name = $method->method_frame->queue;
  210. $deferred->resolve($name);
  211. };
  212. $params{on_failure} = sub { $deferred->reject("_declare_queue failure [".$params{queue}."]: ".$_[2]) };
  213. $self->{chan}->declare_queue(%params);
  214. return $deferred->promise;
  215. }
  216. sub _bind_queue
  217. {
  218. my $self = shift;
  219. my $queue = shift;
  220. my $key = shift;
  221. my $deferred = deferred;
  222. $self->{chan}->bind_queue(
  223. queue => $queue,
  224. exchange => "mol",
  225. routing_key => $key,
  226. on_success => sub { $deferred->resolve($queue) },
  227. on_failure => sub { $deferred->reject("_bind_queue failure [$queue/$key]: ".$_[0]) },
  228. );
  229. return $deferred->promise;
  230. }
  231. sub _bind_queue_to_many_keys
  232. {
  233. my $self = shift;
  234. my $queue = shift;
  235. my @keys = @_;
  236. return collect(map { $self->_bind_queue($queue,$_) } @keys)->then(sub { $queue });
  237. }
  238. sub _consume
  239. {
  240. my $self = shift;
  241. my $sub = pop;
  242. my %params = @_;
  243. my $deferred = deferred;
  244. $params{on_failure} = sub { $deferred->reject("_consume failure [".$params{queue}."]: ".$_[2]) };
  245. $params{on_success} = sub { $deferred->resolve() };
  246. $params{on_consume} = $sub;
  247. $self->{chan}->consume(%params);
  248. return $deferred->promise;
  249. }
  250. sub subscribe
  251. {
  252. my $self = shift;
  253. my $keys = shift;
  254. my $callback = shift;
  255. $self->_declare_queue(
  256. queue => "",
  257. no_ack => 1,
  258. durable => 0,
  259. exclusive => 1
  260. )->then(sub {
  261. my $queue = shift;
  262. $keys = [ $keys] unless ref($keys) eq "ARRAY";
  263. return $self->_bind_queue_to_many_keys($queue,@$keys);
  264. })->then(sub {
  265. my $queue = shift;
  266. return $self->_consume(
  267. queue => $queue,
  268. no_ack => 1,
  269. sub
  270. {
  271. my $msg = shift;
  272. $callback->({
  273. message => $msg,
  274. content => j($msg->{body}->{payload}) || {},
  275. header => $msg->{header},
  276. routing_key => $msg->{deliver}->{method_frame}->{routing_key},
  277. });
  278. });
  279. })->catch(sub { $self->{on_error}->(@_) });
  280. }
  281. sub listen_queue
  282. {
  283. my $self = shift;
  284. my $queue = shift;
  285. my $bind = shift;
  286. my $callback = shift;
  287. $self->_declare_queue(
  288. queue => $queue,
  289. no_ack => 0,
  290. durable => 1,
  291. auto_delete => 0,
  292. )->then(sub {
  293. return $self->_bind_queue($queue,$bind)
  294. })->then(sub {
  295. return $self->_consume(
  296. queue => $queue,
  297. no_ack => 0,
  298. sub
  299. {
  300. my $msg = shift;
  301. $callback->({
  302. message => $msg,
  303. content => j($msg->{body}->{payload}) || {},
  304. header => $msg->{header},
  305. routing_key => $msg->{deliver}->{method_frame}->{routing_key},
  306. });
  307. });
  308. })->catch(sub { $self->{on_error}->(@_) });
  309. }
  310. sub ack
  311. {
  312. my $self = shift;
  313. my $m = shift;
  314. $self->{chan}->ack(
  315. delivery_tag => $m->{message}->{deliver}->{method_frame}->{delivery_tag},
  316. );
  317. }
  318. sub reject
  319. {
  320. my $self = shift;
  321. my $m = shift;
  322. $self->{chan}->reject(
  323. delivery_tag => $m->{message}->{deliver}->{method_frame}->{delivery_tag},
  324. requeue => 0,
  325. );
  326. }
  327. sub requeue
  328. {
  329. my $self = shift;
  330. my $m = shift;
  331. $self->{chan}->reject(
  332. delivery_tag => $m->{message}->{deliver}->{method_frame}->{delivery_tag},
  333. requeue => 1,
  334. );
  335. }
  336. sub reply
  337. {
  338. my $self = shift;
  339. my $msg = shift;
  340. my $obj = shift || {};
  341. my $header = shift || {};
  342. $header->{delivery_mode} = 2;
  343. $header->{correlation_id} = $msg->{header}->{correlation_id};
  344. $obj = (j($obj) || {}) if ref($obj);
  345. $self->{chan}->publish(
  346. body => $obj,
  347. exchange => "mol",
  348. routing_key => $msg->{header}->{reply_to},
  349. header => $header,
  350. );
  351. }
  352. sub DESTROY
  353. {
  354. my $self = shift;
  355. $self->{rabbit} = undef;
  356. }
  357. 1;