user_activity: Use LoopQueueProcessingWorker strategy.

This should dramatically improve the queue processor's performance in
cases where there's a very high volume of requests on a given endpoint
by a given user, as described in the new docstring.

Until we test this more broadly in production, we won't know if this
is a full solution to the problem, but I think it's likely.  We've
never seen the UserActivityInterval worker end up backlogged without a
total queue processor outage, and it should have a similar workload.

Fixes #13180.
This commit is contained in:
Tim Abbott 2019-09-17 16:52:37 -07:00
parent 52a2b2c6a8
commit 1c73ce2450
3 changed files with 87 additions and 24 deletions

View File

@ -11,10 +11,17 @@ import subprocess
WARN_THRESHOLD_DEFAULT = 10
WARN_THRESHOLD = {
'missedmessage_emails': WARN_THRESHOLD_DEFAULT,
# The user_activity worker has high throughput and uses a
# LoopQueueProcessingWorker, so it's normal to have a moderate
# backlog.
'user_activity': 100,
}
CRIT_THRESHOLD_DEFAULT = 50
CRIT_THRESHOLD = {
'missedmessage_emails': CRIT_THRESHOLD_DEFAULT,
# A backlog of hundreds of events for user_activity likely
# indicates an outage of the processor.
'user_activity': 500,
}
states = {

View File

@ -128,16 +128,25 @@ class WorkerTest(ZulipTestCase):
)
fake_client.queue.append(('user_activity', data))
with simulated_queue_client(lambda: fake_client):
worker = queue_processors.UserActivityWorker()
worker.setup()
worker.start()
activity_records = UserActivity.objects.filter(
user_profile = user.id,
client = get_client('ios')
)
self.assertTrue(len(activity_records), 1)
self.assertTrue(activity_records[0].count, 1)
time_mock = patch(
'zerver.worker.queue_processors.time.sleep',
side_effect=AbortLoop,
)
with time_mock:
with simulated_queue_client(lambda: fake_client):
worker = queue_processors.UserActivityWorker()
worker.setup()
try:
worker.start()
except AbortLoop:
pass
activity_records = UserActivity.objects.filter(
user_profile = user.id,
client = get_client('ios')
)
self.assertTrue(len(activity_records), 1)
self.assertTrue(activity_records[0].count, 1)
def test_missed_message_worker(self) -> None:
cordelia = self.example_user('cordelia')

View File

@ -1,5 +1,5 @@
# Documented in https://zulip.readthedocs.io/en/latest/subsystems/queuing.html
from typing import Any, Callable, Dict, List, Mapping, Optional, cast, TypeVar, Type
from typing import Any, Callable, Dict, List, Mapping, Optional, cast, Tuple, TypeVar, Type
import copy
import signal
@ -163,6 +163,7 @@ class QueueProcessingWorker:
class LoopQueueProcessingWorker(QueueProcessingWorker):
sleep_delay = 0
sleep_only_if_empty = True
def start(self) -> None: # nocoverage
while True:
@ -172,7 +173,12 @@ class LoopQueueProcessingWorker(QueueProcessingWorker):
self.consume_batch(events)
finally:
reset_queries()
time.sleep(self.sleep_delay)
# To avoid spinning the CPU, we go to sleep if there's
# nothing in the queue, or for certain queues with
# sleep_only_if_empty=False, unconditionally.
if not self.sleep_only_if_empty or len(events) == 0:
time.sleep(self.sleep_delay)
def consume_batch(self, event: List[Dict[str, Any]]) -> None:
raise NotImplementedError
@ -241,8 +247,27 @@ class ConfirmationEmailWorker(QueueProcessingWorker):
context=context,
delay=datetime.timedelta(days=settings.INVITATION_LINK_VALIDITY_DAYS - 2))
@assign_queue('user_activity')
class UserActivityWorker(QueueProcessingWorker):
@assign_queue('user_activity', queue_type="loop")
class UserActivityWorker(LoopQueueProcessingWorker):
"""The UserActivity queue is perhaps our highest-traffic queue, and
requires some care to ensure it performes adequately.
We use a LoopQueueProcessingWorker as a performance optimization
for managing the queue. The structure of UserActivity records is
such that they are easily deduplicated before being sent to the
database; we take advantage of that to make this queue highly
effective at dealing with a backlog containing many similar
events. Such a backlog happen in a few ways:
* In abuse/DoS situations, if a client is sending huge numbers of
similar requests to the server.
* If the queue ends up with several minutes of backlog e.g. due to
downtime of the queue processor, many clients will have several
common events from doing an action multiple times.
"""
sleep_delay = 10
sleep_only_if_empty = True
client_id_map = {} # type: Dict[str, int]
def start(self) -> None:
@ -250,16 +275,36 @@ class UserActivityWorker(QueueProcessingWorker):
self.client_id_map = {}
super().start()
def consume(self, event: Mapping[str, Any]) -> None:
user_profile_id = event["user_profile_id"]
def consume_batch(self, user_activity_events: List[Dict[str, Any]]) -> None:
uncommitted_events = {} # type: Dict[Tuple[int, int, str], Tuple[int, float]]
if event["client"] not in self.client_id_map:
client = get_client(event["client"])
self.client_id_map[event["client"]] = client.id
client_id = self.client_id_map[event["client"]]
log_time = timestamp_to_datetime(event["time"])
query = event["query"]
do_update_user_activity(user_profile_id, client_id, query, 1, log_time)
# First, we drain the queue of all user_activity events and
# deduplicate them for insertion into the database.
for event in user_activity_events:
user_profile_id = event["user_profile_id"]
if event["client"] not in self.client_id_map:
client = get_client(event["client"])
self.client_id_map[event["client"]] = client.id
client_id = self.client_id_map[event["client"]]
key_tuple = (user_profile_id, client_id, event["query"])
if key_tuple not in uncommitted_events:
uncommitted_events[key_tuple] = (1, event['time'])
else:
count, time = uncommitted_events[key_tuple]
uncommitted_events[key_tuple] = (count + 1, max(time, event['time']))
# Then we insert the updates into the database.
#
# TODO: Doing these updates in sequence individually is likely
# inefficient; the idealized version would do some sort of
# bulk insert_or_update query.
for key_tuple in uncommitted_events:
(user_profile_id, client_id, query) = key_tuple
count, time = uncommitted_events[key_tuple]
log_time = timestamp_to_datetime(time)
do_update_user_activity(user_profile_id, client_id, query, count, log_time)
@assign_queue('user_activity_interval')
class UserActivityIntervalWorker(QueueProcessingWorker):
@ -400,8 +445,10 @@ class ErrorReporter(QueueProcessingWorker):
@assign_queue('slow_queries', queue_type="loop")
class SlowQueryWorker(LoopQueueProcessingWorker):
# Sleep 1 minute between checking the queue
# Sleep 1 minute between checking the queue unconditionally,
# regardless of whether anything is in the queue.
sleep_delay = 60 * 1
sleep_only_if_empty = False
def consume_batch(self, slow_query_events: List[Dict[str, Any]]) -> None:
for event in slow_query_events: