From f473cb5fbe335ce5056ae8d50ffbf468949fbd99 Mon Sep 17 00:00:00 2001 From: Tim Abbott Date: Fri, 28 Jun 2013 11:49:51 -0400 Subject: [PATCH] Use bulk operations in notify_subscription_{added_removed}. After fixing the high numbers of database queries earlier in this branch, I found that sending 500 RabbitMQ messages for a bulk change in subscriptions was consuming more than half the time for these (and then we'd end up with 500 events in a queue). To handle this, we create a "user X subscribed to these N streams" event, rather than sending one event for each individual subscription. (imported from commit 44a34a9fab9b67e9f0da6fee53335d8c5030392b) --- zephyr/lib/actions.py | 65 ++++++++++++++++++++++++++------------ zephyr/static/js/zephyr.js | 14 +++++--- 2 files changed, 54 insertions(+), 25 deletions(-) diff --git a/zephyr/lib/actions.py b/zephyr/lib/actions.py index cf0344af17..d83d71e68a 100644 --- a/zephyr/lib/actions.py +++ b/zephyr/lib/actions.py @@ -555,18 +555,20 @@ def set_stream_color(user_profile, stream_name, color=None): subscription.save(update_fields=["color"]) return color -def notify_new_subscription(user_profile, stream, subscription, no_log=False): +def notify_subscriptions_added(user_profile, sub_pairs, no_log=False): if not no_log: log_event({'type': 'subscription_added', 'user': user_profile.email, - 'name': stream.name, + 'names': [stream.name for sub, stream in sub_pairs], 'domain': stream.realm.domain}) - notice = dict(event=dict(type="subscription", op="add", - subscription=dict(name=stream.name, - in_home_view=subscription.in_home_view, - invite_only=stream.invite_only, - color=subscription.color)), + payload = [dict(name=stream.name, + in_home_view=subscription.in_home_view, + invite_only=stream.invite_only, + color=subscription.color) + for (subscription, stream) in sub_pairs] + notice = dict(event=dict(type="subscriptions", op="add", + subscriptions=payload), users=[user_profile.id]) tornado_callbacks.send_notification(notice) @@ -613,8 +615,15 @@ def bulk_add_subscriptions(streams, users): Subscription.objects.bulk_create([sub for (sub, stream) in subs_to_add]) Subscription.objects.filter(id__in=[sub.id for (sub, stream_name) in subs_to_activate]).update(active=True) + sub_tuples_by_user = defaultdict(list) for (sub, stream) in subs_to_add + subs_to_activate: - notify_new_subscription(sub.user_profile, stream, sub) + sub_tuples_by_user[sub.user_profile.id].append((sub, stream)) + + for user_profile in users: + if len(sub_tuples_by_user[user_profile.id]) == 0: + continue + notify_subscriptions_added(user_profile, sub_tuples_by_user[user_profile.id]) + return ([(user_profile, stream_name) for (user_profile, recipient_id, stream_name) in new_subs] + [(sub.user_profile, stream_name) for (sub, stream_name) in subs_to_activate], already_subscribed) @@ -632,18 +641,19 @@ def do_add_subscription(user_profile, stream, no_log=False): subscription.active = True subscription.save(update_fields=["active"]) if did_subscribe: - notify_new_subscription(user_profile, stream, subscription, no_log) + notify_subscriptions_added(user_profile, [(subscription, stream)], no_log) return did_subscribe -def notify_subscription_removed(user_profile, stream, no_log=False): +def notify_subscriptions_removed(user_profile, streams, no_log=False): if not no_log: log_event({'type': 'subscription_removed', 'user': user_profile.email, - 'name': stream.name, + 'names': [stream.name for stream in streams], 'domain': stream.realm.domain}) - notice = dict(event=dict(type="subscription", op="remove", - subscription=dict(name=stream.name)), + payload = [dict(name=stream.name) for stream in streams] + notice = dict(event=dict(type="subscriptions", op="remove", + subscriptions=payload), users=[user_profile.id]) tornado_callbacks.send_notification(notice) @@ -673,8 +683,14 @@ def bulk_remove_subscriptions(users, streams): Subscription.objects.filter(id__in=[sub.id for (sub, stream_name) in subs_to_deactivate]).update(active=False) + streams_by_user = defaultdict(list) for (sub, stream) in subs_to_deactivate: - notify_subscription_removed(sub.user_profile, stream) + streams_by_user[sub.user_profile_id].append(stream) + + for user_profile in users: + if len(streams_by_user[user_profile.id]) == 0: + continue + notify_subscriptions_removed(user_profile, streams_by_user[user_profile.id]) return ([(sub.user_profile, stream) for (sub, stream) in subs_to_deactivate], not_subscribed) @@ -690,7 +706,7 @@ def do_remove_subscription(user_profile, stream, no_log=False): subscription.active = False subscription.save(update_fields=["active"]) if did_remove: - notify_subscription_removed(user_profile, stream, no_log) + notify_subscriptions_removed(user_profile, [stream], no_log) return did_remove @@ -1129,14 +1145,23 @@ def do_events_register(user_profile, user_client, apply_markdown=True, person = event['person'] ret['realm_users'] = filter(lambda p: p['email'] != person['email'], ret['realm_users']) - elif event['type'] == "subscription": + elif event['type'] == "subscriptions": + subscriptions_to_filter = set(sub.name.lower() for sub in event["subscriptions"]) + # We add the new subscriptions to the list of streams the + # user is subscribed to, and also remove/add them from the + # list of streams the user is not subscribed to (which we + # are still sending on data about so that e.g. colors and + # the in_home_view bit are properly available for those streams) + # + # And we do the opposite filtering process for unsubscribe events. if event['op'] == "add": - ret['subscriptions'].append(event['subscription']) + ret['subscriptions'] += event['subscriptions'] + ret['unsubscribed'] = filter(lambda s: s['name'].lower() not in subscriptions_to_filter, + ret['unsubscribed']) elif event['op'] == "remove": - sub = event['subscription'] - ret['subscriptions'] = filter(lambda s: s['name'] != sub['name'], + ret['unsubscribed'] += event['subscriptions'] + ret['subscriptions'] = filter(lambda s: s['name'].lower() not in subscriptions_to_filter, ret['subscriptions']) - ret['unsubscribed'].append(sub) elif event['type'] == "presence": ret['presences'][event['email']] = event['presence'] elif event['type'] == "update_message": diff --git a/zephyr/static/js/zephyr.js b/zephyr/static/js/zephyr.js index ced414c655..5e06bcf8ff 100644 --- a/zephyr/static/js/zephyr.js +++ b/zephyr/static/js/zephyr.js @@ -872,13 +872,17 @@ function get_updates(options) { } typeahead_helper.autocomplete_needs_update(true); break; - case 'subscription': + case 'subscriptions': if (event.op === 'add') { - $(document).trigger($.Event('subscription_add.zephyr', - {subscription: event.subscription})); + $.each(event.subscriptions, function(index, subscription) { + $(document).trigger($.Event('subscription_add.zephyr', + {subscription: subscription})); + }); } else if (event.op === 'remove') { - $(document).trigger($.Event('subscription_remove.zephyr', - {subscription: event.subscription})); + $.each(event.subscriptions, function(index, subscription) { + $(document).trigger($.Event('subscription_remove.zephyr', + {subscription: subscription})); + }); } break; case 'presence':