From 3681b77f22d7f0990e762a93eee3c80aa3638cbb Mon Sep 17 00:00:00 2001 From: Leo Franchi Date: Wed, 17 Apr 2013 10:11:43 -0400 Subject: [PATCH] Patch TornadoConnection to catch exceptions and continue reconnection (imported from commit 6bf9086b6bdc35321b23bb92b35679e2a21f6333) --- zephyr/lib/queue.py | 15 ++++++++++++++- 1 file changed, 14 insertions(+), 1 deletion(-) diff --git a/zephyr/lib/queue.py b/zephyr/lib/queue.py index 799130280b..499a466db1 100644 --- a/zephyr/lib/queue.py +++ b/zephyr/lib/queue.py @@ -103,6 +103,19 @@ class SimpleQueueClient(object): def stop_consuming(self): self.channel.stop_consuming() +# Patch pika.adapters.TornadoConnection so that a socket error doesn't +# throw an exception and disconnect the tornado process from the rabbitmq +# queue. Instead, just re-connect as usual +class ExceptionFreeTornadoConnection(pika.adapters.TornadoConnection): + def _adapter_disconnect(self): + try: + super(ExceptionFreeTornadoConnection, self)._adapter_disconnect() + except (pika.exceptions.ProbableAuthenticationError, + pika.exceptions.ProbableAccessDeniedError) as e: + logging.warning("Caught exception '%r' in ExceptionFreeTornadoConnection when \ +calling _adapter_disconnect, ignoring" % (e,)) + + class TornadoQueueClient(SimpleQueueClient): # Based on: # https://pika.readthedocs.org/en/0.9.8/examples/asynchronous_consumer_example.html @@ -112,7 +125,7 @@ class TornadoQueueClient(SimpleQueueClient): self._on_open_cbs = [] if on_open_cb: self._on_open_cbs.append(on_open_cb) - self.connection = pika.adapters.TornadoConnection( + self.connection = ExceptionFreeTornadoConnection( self._get_parameters(), on_open_callback = self._on_open, stop_ioloop_on_close = False)