diff --git a/zerver/lib/queue.py b/zerver/lib/queue.py index 39831ebe9a..ed5a316a99 100644 --- a/zerver/lib/queue.py +++ b/zerver/lib/queue.py @@ -305,7 +305,7 @@ def get_queue_client() -> SimpleQueueClient: queue_lock = threading.RLock() def queue_json_publish(queue_name: str, - event: Union[Dict[str, Any], str], + event: Dict[str, Any], processor: Callable[[Any], None]=None) -> None: # most events are dicts, but zerver.middleware.write_log_line uses a str with queue_lock: diff --git a/zerver/middleware.py b/zerver/middleware.py index 8dfe0fe4eb..c77799433b 100644 --- a/zerver/middleware.py +++ b/zerver/middleware.py @@ -211,7 +211,8 @@ def write_log_line(log_data: MutableMapping[str, Any], path: str, method: str, r logger.info(logger_line) if (is_slow_query(time_delta, path)): - queue_json_publish("slow_queries", "%s (%s)" % (logger_line, email)) + queue_json_publish("slow_queries", dict( + query="%s (%s)" % (logger_line, email))) if settings.PROFILE_ALL_REQUESTS: log_data["prof"].disable() diff --git a/zerver/tests/test_queue_worker.py b/zerver/tests/test_queue_worker.py index a6cc0e4116..80b6bdaaec 100644 --- a/zerver/tests/test_queue_worker.py +++ b/zerver/tests/test_queue_worker.py @@ -71,9 +71,11 @@ class WorkerTest(ZulipTestCase): def test_slow_queries_worker(self) -> None: error_bot = get_system_bot(settings.ERROR_BOT) fake_client = self.FakeClient() + # TODO: Rewrite this set part of the test by just mocking + # `is_slow_query` to generate the events. events = [ - 'test query (data)', - 'second test query (data)', + {'query': 'test query (data)'}, + {'query': 'second test query (data)'}, ] for event in events: fake_client.queue.append(('slow_queries', event)) diff --git a/zerver/worker/queue_processors.py b/zerver/worker/queue_processors.py index 74dd2a30f5..cbded28ed2 100644 --- a/zerver/worker/queue_processors.py +++ b/zerver/worker/queue_processors.py @@ -390,11 +390,9 @@ class SlowQueryWorker(LoopQueueProcessingWorker): # Sleep 1 minute between checking the queue sleep_delay = 60 * 1 - # TODO: The type annotation here should be List[str], but that - # creates conflicts with other users in the file. - def consume_batch(self, slow_queries: List[Any]) -> None: - for query in slow_queries: - logging.info("Slow query: %s" % (query,)) + def consume_batch(self, slow_query_events: List[Dict[str, Any]]) -> None: + for event in slow_query_events: + logging.info("Slow query: %s" % (event["query"],)) if settings.SLOW_QUERY_LOGS_STREAM is None: return @@ -402,12 +400,12 @@ class SlowQueryWorker(LoopQueueProcessingWorker): if settings.ERROR_BOT is None: return - if len(slow_queries) > 0: + if len(slow_query_events) > 0: topic = "%s: slow queries" % (settings.EXTERNAL_HOST,) content = "" - for query in slow_queries: - content += " %s\n" % (query,) + for event in slow_query_events: + content += " %s\n" % (event["query"],) error_bot_realm = get_system_bot(settings.ERROR_BOT).realm internal_send_message(error_bot_realm, settings.ERROR_BOT,