From 953e195be17a039355f8d65044f813d12a37c016 Mon Sep 17 00:00:00 2001 From: Kevin Mehall Date: Tue, 25 Jun 2013 14:22:40 -0400 Subject: [PATCH] Refactor update_message_flags to do the query in the django process. Trac #1398. (imported from commit c001747d8e6d78a12fe535c36a81c12592976840) --- zephyr/lib/actions.py | 87 +++---------------- .../commands/process_user_activity.py | 4 +- 2 files changed, 11 insertions(+), 80 deletions(-) diff --git a/zephyr/lib/actions.py b/zephyr/lib/actions.py index 2bc2c964e8..f127de6a9f 100644 --- a/zephyr/lib/actions.py +++ b/zephyr/lib/actions.py @@ -836,44 +836,21 @@ def update_user_presence(user_profile, client, log_time, status): queue_json_publish("user_activity", event, process_user_presence_event) def update_message_flags(user_profile, operation, flag, messages, all): - rest_until = None + flagattr = getattr(UserMessage.flags, flag) if all: log_statsd_event('bankruptcy') + msgs = UserMessage.objects.filter(user_profile=user_profile) + else: + msgs = UserMessage.objects.filter(user_profile=user_profile, + message__id__in=messages) - # Do the first 450 message updates in-process, as this is a - # bankruptcy request and the user is about to reload. We don't - # want them to see a bunch of unread messages while we go about - # doing the work - first_batch = 450 - flagattr = getattr(UserMessage.flags, flag) + if operation == 'add': + msgs.update(flags=F('flags').bitor(flagattr)) + elif operation == 'remove': + msgs.update(flags=F('flags').bitand(~flagattr)) - all_ums = UserMessage.objects.filter(user_profile=user_profile) - if operation == "add": - umessages = all_ums.filter(flags=~flagattr) - elif operation == "remove": - umessages = all_ums.filter(flags=flagattr) - - mids = [m.id for m in umessages.order_by('-id')[:first_batch]] - to_update = UserMessage.objects.filter(id__in=mids) - - if operation == "add": - to_update.update(flags=F('flags').bitor(flagattr)) - elif operation == "remove": - to_update.update(flags=F('flags').bitand(~flagattr)) - - if len(mids) == 0: - return True - - rest_until = mids[len(mids) - 1] - - event = {'type': 'update_message', - 'user_profile_id': user_profile.id, - 'operation': operation, - 'flag': flag, - 'messages': messages, - 'until_id': rest_until} - queue_json_publish("user_activity", event, process_update_message_flags) + statsd.incr("flags.%s.%s" % (flag, operation), len(msgs)) def process_user_presence_event(event): user_profile = get_user_profile_by_id(event["user_profile_id"]) @@ -882,50 +859,6 @@ def process_user_presence_event(event): status = event["status"] return do_update_user_presence(user_profile, client, log_time, status) -def process_update_message_flags(event): - user_profile = get_user_profile_by_id(event["user_profile_id"]) - try: - until_id = event["until_id"] - messages = event["messages"] - flag = event["flag"] - op = event["operation"] - except (KeyError, AttributeError): - return False - - # Shell out bankruptcy requests as we split them up into many - # pieces to avoid swamping the db - if until_id and not settings.TEST_SUITE: - update_flags_externally(op, flag, user_profile, until_id) - return True - - flagattr = getattr(UserMessage.flags, flag) - msgs = UserMessage.objects.filter(user_profile=user_profile, - message__id__in=messages) - - # If we're running in the test suite, don't shell out to manage.py. - # Updates that the manage.py command makes don't seem to be immediately - # reflected in the next in-process sqlite queries. - # TODO(leo) remove when tests switch to postgres - if settings.TEST_SUITE and until_id: - msgs = UserMessage.objects.filter(user_profile=user_profile, - id__lte=until_id) - - if op == 'add': - msgs.update(flags=F('flags').bitor(flagattr)) - elif op == 'remove': - msgs.update(flags=F('flags').bitand(~flagattr)) - - statsd.incr("flags.%s.%s" % (flag, op), len(msgs)) - - return True - -def update_flags_externally(op, flag, user_profile, until_id): - args = ['python', os.path.join(os.path.dirname(__file__), '../..', 'manage.py'), - 'set_message_flags', '--for-real', '-o', op, '-f', flag, '-m', user_profile.email, - '-u', str(until_id)] - - subprocess.Popen(args, stdin=subprocess.PIPE, stdout=None, stderr=None) - def subscribed_to_stream(user_profile, stream): try: if Subscription.objects.get(user_profile=user_profile, diff --git a/zephyr/management/commands/process_user_activity.py b/zephyr/management/commands/process_user_activity.py index 527609b21a..f382bfc237 100644 --- a/zephyr/management/commands/process_user_activity.py +++ b/zephyr/management/commands/process_user_activity.py @@ -5,7 +5,7 @@ from django.core.management.base import BaseCommand import ujson import pika from zephyr.lib.actions import process_user_activity_event, \ - process_user_presence_event, process_update_message_flags + process_user_presence_event from zephyr.lib.queue import SimpleQueueClient import sys import signal @@ -24,8 +24,6 @@ class Command(BaseCommand): process_user_activity_event(event) elif msg_type == 'user_presence': process_user_presence_event(event) - elif msg_type == 'update_message': - process_update_message_flags(event) else: print("[*] Unknown message type: %s" (msg_type,))