queue_processors: Remove the slow_queries queue.

While this functionality to post slow queries to a Zulip stream was
very useful in the early days of Zulip, when there were only a few
hundred accounts, it's long since been useless since (1) the total
request volume on larger Zulip servers run by Zulip developers, and
(2) other server operators don't want real-time notifications of slow
backend queries.  The right structure for this is just a log file.

We get rid of the queue and replace it with a "zulip.slow_queries"
logger, which will still log to /var/log/zulip/slow_queries.log for
ease of access to this information and propagate to the other logging
handlers.  Reducing the amount of queues is good for lowering zulip's
memory footprint and restart performance, since we run at least one
dedicated queue worker process for each one in most configurations.
This commit is contained in:
Mateusz Mandera 2020-05-08 16:37:58 +02:00 committed by Tim Abbott
parent 180c16c80e
commit dd40649e04
12 changed files with 33 additions and 138 deletions

View File

@ -82,7 +82,6 @@ class zulip::base {
'missedmessage_mobile_notifications', 'missedmessage_mobile_notifications',
'outgoing_webhooks', 'outgoing_webhooks',
'signups', 'signups',
'slow_queries',
'user_activity', 'user_activity',
'user_activity_interval', 'user_activity_interval',
'user_presence', 'user_presence',

View File

@ -463,7 +463,7 @@ define service {
# The following queue workers batch-process events and thus can't be # The following queue workers batch-process events and thus can't be
# monitored by checking for running consumers: # monitored by checking for running consumers:
# #
# user_activity, slow_queries, missedmessage_emails # user_activity, missedmessage_emails
define service { define service {
use generic-service use generic-service
@ -501,15 +501,6 @@ define service {
contact_groups admins contact_groups admins
} }
define service {
use generic-service
service_description Check slow_queries queue processor
check_command check_remote_arg_string!manage.py process_queue --queue_name=slow_queries!1:1!1:1
max_check_attempts 3
hostgroup_name frontends
contact_groups admins
}
define service { define service {
use generic-service use generic-service
service_description Check deferred_work queue processor service_description Check deferred_work queue processor

View File

@ -22,7 +22,6 @@ normal_queues = [
'missedmessage_mobile_notifications', 'missedmessage_mobile_notifications',
'outgoing_webhooks', 'outgoing_webhooks',
'signups', 'signups',
'slow_queries',
'user_activity', 'user_activity',
'user_activity_interval', 'user_activity_interval',
'user_presence', 'user_presence',
@ -43,24 +42,20 @@ states = {
MAX_SECONDS_TO_CLEAR_FOR_BURSTS: DefaultDict[str, int] = defaultdict( MAX_SECONDS_TO_CLEAR_FOR_BURSTS: DefaultDict[str, int] = defaultdict(
lambda: 120, lambda: 120,
digest_emails=600, digest_emails=600,
slow_queries=600,
) )
MAX_SECONDS_TO_CLEAR_NORMAL: DefaultDict[str, int] = defaultdict( MAX_SECONDS_TO_CLEAR_NORMAL: DefaultDict[str, int] = defaultdict(
lambda: 30, lambda: 30,
digest_emails=1200, digest_emails=1200,
slow_queries=120,
missedmessage_mobile_notifications=120, missedmessage_mobile_notifications=120,
) )
CRITICAL_SECONDS_TO_CLEAR_FOR_BURSTS: DefaultDict[str, int] = defaultdict( CRITICAL_SECONDS_TO_CLEAR_FOR_BURSTS: DefaultDict[str, int] = defaultdict(
lambda: 240, lambda: 240,
digest_emails=1200, digest_emails=1200,
slow_queries=1200,
) )
CRITICAL_SECONDS_TO_CLEAR_NORMAL: DefaultDict[str, int] = defaultdict( CRITICAL_SECONDS_TO_CLEAR_NORMAL: DefaultDict[str, int] = defaultdict(
lambda: 60, lambda: 60,
missedmessage_mobile_notifications=180, missedmessage_mobile_notifications=180,
digest_emails=600, digest_emails=600,
slow_queries=600,
) )
def analyze_queue_stats(queue_name: str, stats: Dict[str, Any], def analyze_queue_stats(queue_name: str, stats: Dict[str, Any],

View File

@ -14,7 +14,7 @@ sanity_check.check_venv(__file__)
# TODO: Convert this to use scripts/lib/queue_workers.py # TODO: Convert this to use scripts/lib/queue_workers.py
TOOLS_DIR = os.path.dirname(os.path.abspath(__file__)) TOOLS_DIR = os.path.dirname(os.path.abspath(__file__))
successful_worker_launch = '[process_queue] 17 queue worker threads were launched\n' successful_worker_launch = '[process_queue] 16 queue worker threads were launched\n'
def check_worker_launch(run_dev: "subprocess.Popen[str]") -> bool: def check_worker_launch(run_dev: "subprocess.Popen[str]") -> bool:
failed = False failed = False

View File

@ -21,7 +21,6 @@ from zerver.lib.debug import maybe_tracemalloc_listen
from zerver.lib.db import reset_queries from zerver.lib.db import reset_queries
from zerver.lib.exceptions import ErrorCode, JsonableError, RateLimited from zerver.lib.exceptions import ErrorCode, JsonableError, RateLimited
from zerver.lib.html_to_text import get_content_description from zerver.lib.html_to_text import get_content_description
from zerver.lib.queue import queue_json_publish
from zerver.lib.rate_limiter import RateLimitResult from zerver.lib.rate_limiter import RateLimitResult
from zerver.lib.response import json_error, json_response_from_error from zerver.lib.response import json_error, json_response_from_error
from zerver.lib.subdomains import get_subdomain from zerver.lib.subdomains import get_subdomain
@ -30,6 +29,7 @@ from zerver.lib.types import ViewFuncT
from zerver.models import Realm, flush_per_request_caches, get_realm from zerver.models import Realm, flush_per_request_caches, get_realm
logger = logging.getLogger('zulip.requests') logger = logging.getLogger('zulip.requests')
slow_query_logger = logging.getLogger('zulip.slow_queries')
def record_request_stop_data(log_data: MutableMapping[str, Any]) -> None: def record_request_stop_data(log_data: MutableMapping[str, Any]) -> None:
log_data['time_stopped'] = time.time() log_data['time_stopped'] = time.time()
@ -210,8 +210,7 @@ def write_log_line(log_data: MutableMapping[str, Any], path: str, method: str, r
logger.info(logger_line) logger.info(logger_line)
if (is_slow_query(time_delta, path)): if (is_slow_query(time_delta, path)):
queue_json_publish("slow_queries", dict( slow_query_logger.info(logger_line)
query="%s (%s)" % (logger_line, requestor_for_logs)))
if settings.PROFILE_ALL_REQUESTS: if settings.PROFILE_ALL_REQUESTS:
log_data["prof"].disable() log_data["prof"].disable()

View File

@ -2,14 +2,11 @@ import time
from typing import List from typing import List
from bs4 import BeautifulSoup from bs4 import BeautifulSoup
from django.conf import settings from unittest.mock import patch
from django.test import override_settings
from unittest.mock import Mock, patch
from zerver.lib.realm_icon import get_realm_icon_url from zerver.lib.realm_icon import get_realm_icon_url
from zerver.lib.streams import create_stream_if_needed
from zerver.lib.test_classes import ZulipTestCase from zerver.lib.test_classes import ZulipTestCase
from zerver.middleware import is_slow_query, write_log_line from zerver.middleware import is_slow_query, write_log_line
from zerver.models import get_realm, get_system_bot from zerver.models import get_realm
class SlowQueryTest(ZulipTestCase): class SlowQueryTest(ZulipTestCase):
SLOW_QUERY_TIME = 10 SLOW_QUERY_TIME = 10
@ -32,32 +29,21 @@ class SlowQueryTest(ZulipTestCase):
self.assertFalse(is_slow_query(9, '/accounts/webathena_kerberos_login/')) self.assertFalse(is_slow_query(9, '/accounts/webathena_kerberos_login/'))
self.assertTrue(is_slow_query(11, '/accounts/webathena_kerberos_login/')) self.assertTrue(is_slow_query(11, '/accounts/webathena_kerberos_login/'))
@override_settings(SLOW_QUERY_LOGS_STREAM="logs") def test_slow_query_log(self) -> None:
@patch('logging.info')
def test_slow_query_log(self, mock_logging_info: Mock) -> None:
error_bot = get_system_bot(settings.ERROR_BOT)
create_stream_if_needed(error_bot.realm, settings.SLOW_QUERY_LOGS_STREAM)
self.log_data['time_started'] = time.time() - self.SLOW_QUERY_TIME self.log_data['time_started'] = time.time() - self.SLOW_QUERY_TIME
write_log_line(self.log_data, path='/socket/open', method='SOCKET', with patch("zerver.middleware.slow_query_logger") as mock_slow_query_logger, \
remote_ip='123.456.789.012', requestor_for_logs='unknown', client_name='?') patch("zerver.middleware.logger") as mock_normal_logger:
last_message = self.get_last_message()
self.assertEqual(last_message.sender.email, "error-bot@zulip.com")
self.assertIn("logs", str(last_message.recipient))
self.assertEqual(last_message.topic_name(), "testserver: slow queries")
self.assertRegexpMatches(last_message.content,
r"123\.456\.789\.012 SOCKET 200 10\.\ds .*")
@override_settings(ERROR_BOT=None) write_log_line(self.log_data, path='/some/endpoint/', method='GET',
@patch('logging.info') remote_ip='123.456.789.012', requestor_for_logs='unknown', client_name='?')
@patch('zerver.lib.actions.internal_send_stream_message') mock_slow_query_logger.info.assert_called_once()
def test_slow_query_log_without_error_bot(self, mock_normal_logger.info.assert_called_once()
mock_internal_send_stream_message: Mock,
mock_logging_info: Mock) -> None: logged_line = mock_slow_query_logger.info.call_args_list[0][0][0]
self.log_data['time_started'] = time.time() - self.SLOW_QUERY_TIME self.assertRegexpMatches(
write_log_line(self.log_data, path='/socket/open', method='SOCKET', logged_line,
remote_ip='123.456.789.012', requestor_for_logs='unknown', client_name='?') r"123\.456\.789\.012 GET 200 10\.\ds .* \(unknown via \?\)"
mock_internal_send_stream_message.assert_not_called() )
class OpenGraphTest(ZulipTestCase): class OpenGraphTest(ZulipTestCase):
def check_title_and_description(self, path: str, title: str, def check_title_and_description(self, path: str, title: str,

View File

@ -2,7 +2,6 @@ import os
import time import time
import ujson import ujson
import smtplib import smtplib
import re
from django.conf import settings from django.conf import settings
from django.test import override_settings from django.test import override_settings
@ -15,11 +14,10 @@ from zerver.lib.queue import MAX_REQUEST_RETRIES
from zerver.lib.rate_limiter import RateLimiterLockingException from zerver.lib.rate_limiter import RateLimiterLockingException
from zerver.lib.remote_server import PushNotificationBouncerRetryLaterError from zerver.lib.remote_server import PushNotificationBouncerRetryLaterError
from zerver.lib.send_email import FromAddress from zerver.lib.send_email import FromAddress
from zerver.lib.streams import create_stream_if_needed
from zerver.lib.test_helpers import simulated_queue_client from zerver.lib.test_helpers import simulated_queue_client
from zerver.lib.test_classes import ZulipTestCase from zerver.lib.test_classes import ZulipTestCase
from zerver.models import get_client, UserActivity, PreregistrationUser, \ from zerver.models import get_client, UserActivity, PreregistrationUser, \
get_system_bot, get_stream, get_realm get_stream, get_realm
from zerver.tornado.event_queue import build_offline_notification from zerver.tornado.event_queue import build_offline_notification
from zerver.worker import queue_processors from zerver.worker import queue_processors
from zerver.worker.queue_processors import ( from zerver.worker.queue_processors import (
@ -28,11 +26,8 @@ from zerver.worker.queue_processors import (
EmailSendingWorker, EmailSendingWorker,
LoopQueueProcessingWorker, LoopQueueProcessingWorker,
MissedMessageWorker, MissedMessageWorker,
SlowQueryWorker,
) )
from zerver.middleware import write_log_line
Event = Dict[str, Any] Event = Dict[str, Any]
# This is used for testing LoopQueueProcessingWorker, which # This is used for testing LoopQueueProcessingWorker, which
@ -81,44 +76,6 @@ class WorkerTest(ZulipTestCase):
def queue_size(self) -> int: def queue_size(self) -> int:
return len(self.queue) return len(self.queue)
@override_settings(SLOW_QUERY_LOGS_STREAM="errors")
def test_slow_queries_worker(self) -> None:
error_bot = get_system_bot(settings.ERROR_BOT)
fake_client = self.FakeClient()
worker = SlowQueryWorker()
create_stream_if_needed(error_bot.realm, 'errors')
send_mock = patch(
'zerver.worker.queue_processors.internal_send_stream_message'
)
with send_mock as sm, loopworker_sleep_mock as tm:
with simulated_queue_client(lambda: fake_client):
try:
worker.setup()
# `write_log_line` is where we publish slow queries to the queue.
with patch('zerver.middleware.is_slow_query', return_value=True):
write_log_line(log_data=dict(test='data'), requestor_for_logs='test@zulip.com',
remote_ip='127.0.0.1', client_name='website', path='/test/',
method='GET')
worker.start()
except AbortLoop:
pass
self.assertEqual(tm.call_args[0][0], 60) # should sleep 60 seconds
sm.assert_called_once()
args = [c[0] for c in sm.call_args_list][0]
self.assertEqual(args[0], error_bot.realm)
self.assertEqual(args[1].email, error_bot.email)
self.assertEqual(args[2].name, "errors")
self.assertEqual(args[3], "testserver: slow queries")
# Testing for specific query times can lead to test discrepancies.
logging_info = re.sub(r'\(db: [0-9]+ms/\d+q\)', '', args[4])
self.assertEqual(logging_info, ' 127.0.0.1 GET 200 -1000ms '
' /test/ (test@zulip.com via website) (test@zulip.com)\n')
def test_UserActivityWorker(self) -> None: def test_UserActivityWorker(self) -> None:
fake_client = self.FakeClient() fake_client = self.FakeClient()

View File

@ -27,7 +27,7 @@ from zerver.lib.push_notifications import handle_push_notification, handle_remov
initialize_push_notifications, clear_push_device_tokens initialize_push_notifications, clear_push_device_tokens
from zerver.lib.actions import do_send_confirmation_email, \ from zerver.lib.actions import do_send_confirmation_email, \
do_update_user_activity, do_update_user_activity_interval, do_update_user_presence, \ do_update_user_activity, do_update_user_activity_interval, do_update_user_presence, \
internal_send_stream_message, internal_send_private_message, notify_realm_export, \ internal_send_private_message, notify_realm_export, \
render_incoming_message, do_update_embedded_data, do_mark_stream_messages_as_read render_incoming_message, do_update_embedded_data, do_mark_stream_messages_as_read
from zerver.lib.url_preview import preview as url_preview from zerver.lib.url_preview import preview as url_preview
from zerver.lib.digest import handle_digest_email from zerver.lib.digest import handle_digest_email
@ -39,7 +39,7 @@ from zerver.lib.streams import access_stream_by_id
from zerver.lib.db import reset_queries from zerver.lib.db import reset_queries
from zerver.context_processors import common_context from zerver.context_processors import common_context
from zerver.lib.outgoing_webhook import do_rest_call, get_outgoing_webhook_service_handler from zerver.lib.outgoing_webhook import do_rest_call, get_outgoing_webhook_service_handler
from zerver.models import get_bot_services, get_stream, RealmAuditLog from zerver.models import get_bot_services, RealmAuditLog
from zulip_bots.lib import ExternalBotHandler, extract_query_without_mention from zulip_bots.lib import ExternalBotHandler, extract_query_without_mention
from zerver.lib.bot_lib import EmbeddedBotHandler, get_bot_handler, EmbeddedBotQuitException from zerver.lib.bot_lib import EmbeddedBotHandler, get_bot_handler, EmbeddedBotQuitException
from zerver.lib.exceptions import RateLimited from zerver.lib.exceptions import RateLimited
@ -494,44 +494,6 @@ class ErrorReporter(QueueProcessingWorker):
if settings.ERROR_REPORTING: if settings.ERROR_REPORTING:
do_report_error(event['report']['host'], event['type'], event['report']) do_report_error(event['report']['host'], event['type'], event['report'])
@assign_queue('slow_queries', queue_type="loop")
class SlowQueryWorker(LoopQueueProcessingWorker):
# Sleep 1 minute between checking the queue unconditionally,
# regardless of whether anything is in the queue.
sleep_delay = 60 * 1
sleep_only_if_empty = False
def consume_batch(self, slow_query_events: List[Dict[str, Any]]) -> None:
for event in slow_query_events:
logging.info("Slow query: %s", event["query"])
if settings.SLOW_QUERY_LOGS_STREAM is None:
return
if settings.ERROR_BOT is None:
return
if len(slow_query_events) > 0:
topic = "%s: slow queries" % (settings.EXTERNAL_HOST,)
content = ""
for event in slow_query_events:
content += " %s\n" % (event["query"],)
error_bot = get_system_bot(settings.ERROR_BOT)
realm = error_bot.realm
errors_stream = get_stream(
settings.SLOW_QUERY_LOGS_STREAM,
realm
)
internal_send_stream_message(
realm,
error_bot,
errors_stream,
topic,
content
)
@assign_queue('digest_emails') @assign_queue('digest_emails')
class DigestWorker(QueueProcessingWorker): # nocoverage class DigestWorker(QueueProcessingWorker): # nocoverage
# Who gets a digest is entirely determined by the enqueue_digest_emails # Who gets a digest is entirely determined by the enqueue_digest_emails

View File

@ -86,7 +86,6 @@ ERROR_REPORTING = True
BROWSER_ERROR_REPORTING = False BROWSER_ERROR_REPORTING = False
LOGGING_SHOW_MODULE = False LOGGING_SHOW_MODULE = False
LOGGING_SHOW_PID = False LOGGING_SHOW_PID = False
SLOW_QUERY_LOGS_STREAM: Optional[str] = None
# File uploads and avatars # File uploads and avatars
DEFAULT_AVATAR_URI = '/static/images/default-avatar.png' DEFAULT_AVATAR_URI = '/static/images/default-avatar.png'

View File

@ -56,7 +56,6 @@ EXTERNAL_URI_SCHEME = "http://"
EMAIL_GATEWAY_PATTERN = "%s@" + EXTERNAL_HOST.split(':')[0] EMAIL_GATEWAY_PATTERN = "%s@" + EXTERNAL_HOST.split(':')[0]
NOTIFICATION_BOT = "notification-bot@zulip.com" NOTIFICATION_BOT = "notification-bot@zulip.com"
ERROR_BOT = "error-bot@zulip.com" ERROR_BOT = "error-bot@zulip.com"
# SLOW_QUERY_LOGS_STREAM = "errors"
EMAIL_GATEWAY_BOT = "emailgateway@zulip.com" EMAIL_GATEWAY_BOT = "emailgateway@zulip.com"
PHYSICAL_ADDRESS = "Zulip Headquarters, 123 Octo Stream, South Pacific Ocean" PHYSICAL_ADDRESS = "Zulip Headquarters, 123 Octo Stream, South Pacific Ocean"
EXTRA_INSTALLED_APPS = ["zilencer", "analytics", "corporate"] EXTRA_INSTALLED_APPS = ["zilencer", "analytics", "corporate"]

View File

@ -654,6 +654,7 @@ SERVER_LOG_PATH = zulip_path("/var/log/zulip/server.log")
ERROR_FILE_LOG_PATH = zulip_path("/var/log/zulip/errors.log") ERROR_FILE_LOG_PATH = zulip_path("/var/log/zulip/errors.log")
MANAGEMENT_LOG_PATH = zulip_path("/var/log/zulip/manage.log") MANAGEMENT_LOG_PATH = zulip_path("/var/log/zulip/manage.log")
WORKER_LOG_PATH = zulip_path("/var/log/zulip/workers.log") WORKER_LOG_PATH = zulip_path("/var/log/zulip/workers.log")
SLOW_QUERIES_LOG_PATH = zulip_path("/var/log/zulip/slow_queries.log")
JSON_PERSISTENT_QUEUE_FILENAME_PATTERN = zulip_path("/home/zulip/tornado/event_queues%s.json") JSON_PERSISTENT_QUEUE_FILENAME_PATTERN = zulip_path("/home/zulip/tornado/event_queues%s.json")
EMAIL_LOG_PATH = zulip_path("/var/log/zulip/send_email.log") EMAIL_LOG_PATH = zulip_path("/var/log/zulip/send_email.log")
EMAIL_MIRROR_LOG_PATH = zulip_path("/var/log/zulip/email_mirror.log") EMAIL_MIRROR_LOG_PATH = zulip_path("/var/log/zulip/email_mirror.log")
@ -768,6 +769,12 @@ LOGGING: Dict[str, Any] = {
'formatter': 'default', 'formatter': 'default',
'filename': LDAP_LOG_PATH, 'filename': LDAP_LOG_PATH,
}, },
'slow_queries_file': {
'level': 'INFO',
'class': 'logging.handlers.WatchedFileHandler',
'formatter': 'default',
'filename': SLOW_QUERIES_LOG_PATH,
},
}, },
'loggers': { 'loggers': {
# The Python logging module uses a hierarchy of logger names for config: # The Python logging module uses a hierarchy of logger names for config:
@ -892,6 +899,10 @@ LOGGING: Dict[str, Any] = {
'handlers': ['file', 'errors_file'], 'handlers': ['file', 'errors_file'],
'propagate': False, 'propagate': False,
}, },
'zulip.slow_queries': {
'level': 'INFO',
'handlers': ['slow_queries_file'],
},
'zulip.soft_deactivation': { 'zulip.soft_deactivation': {
'handlers': ['file', 'errors_file'], 'handlers': ['file', 'errors_file'],
'propagate': False, 'propagate': False,

View File

@ -179,9 +179,6 @@ SOCIAL_AUTH_SUBDOMAIN = 'auth'
TWO_FACTOR_AUTHENTICATION_ENABLED = False TWO_FACTOR_AUTHENTICATION_ENABLED = False
PUSH_NOTIFICATION_BOUNCER_URL = None PUSH_NOTIFICATION_BOUNCER_URL = None
# Disable messages from slow queries as they affect backend tests.
SLOW_QUERY_LOGS_STREAM = None
THUMBOR_URL = 'http://127.0.0.1:9995' THUMBOR_URL = 'http://127.0.0.1:9995'
THUMBNAIL_IMAGES = True THUMBNAIL_IMAGES = True
THUMBOR_SERVES_CAMO = True THUMBOR_SERVES_CAMO = True