queue_processors: Extract a duplicated logic block into do_consume.

This commit is contained in:
Mateusz Mandera 2020-03-18 20:04:20 +01:00 committed by Tim Abbott
parent 27c19b081b
commit 5da2f80140
1 changed files with 9 additions and 11 deletions

View File

@ -125,15 +125,20 @@ class QueueProcessingWorker(ABC):
def consume(self, data: Dict[str, Any]) -> None:
pass
def consume_wrapper(self, data: Dict[str, Any]) -> None:
def do_consume(self, consume_func: Callable[[List[Dict[str, Any]]], None],
events: List[Dict[str, Any]]) -> None:
try:
self.consume(data)
consume_func(events)
except Exception:
self._handle_consume_exception([data])
self._handle_consume_exception(events)
finally:
flush_per_request_caches()
reset_queries()
def consume_wrapper(self, data: Dict[str, Any]) -> None:
consume_func = lambda events: self.consume(events[0])
self.do_consume(consume_func, [data])
def _handle_consume_exception(self, events: List[Dict[str, Any]]) -> None:
self._log_problem()
if not os.path.exists(settings.QUEUE_ERROR_DIR):
@ -167,14 +172,7 @@ class LoopQueueProcessingWorker(QueueProcessingWorker):
def start(self) -> None: # nocoverage
while True:
events = self.q.drain_queue(self.queue_name, json=True)
try:
self.consume_batch(events)
except Exception:
self._handle_consume_exception(events)
finally:
flush_per_request_caches()
reset_queries()
self.do_consume(self.consume_batch, events)
# To avoid spinning the CPU, we go to sleep if there's
# nothing in the queue, or for certain queues with
# sleep_only_if_empty=False, unconditionally.