Use bulk operations in notify_subscription_{added_removed}.

After fixing the high numbers of database queries earlier in this
branch, I found that sending 500 RabbitMQ messages for a bulk change
in subscriptions was consuming more than half the time for these (and
then we'd end up with 500 events in a queue).  To handle this, we
create a "user X subscribed to these N streams" event, rather than
sending one event for each individual subscription.

(imported from commit 44a34a9fab9b67e9f0da6fee53335d8c5030392b)
This commit is contained in:
Tim Abbott 2013-06-28 11:49:51 -04:00
parent 74fd508b2f
commit f473cb5fbe
2 changed files with 54 additions and 25 deletions

View File

@ -555,18 +555,20 @@ def set_stream_color(user_profile, stream_name, color=None):
subscription.save(update_fields=["color"])
return color
def notify_new_subscription(user_profile, stream, subscription, no_log=False):
def notify_subscriptions_added(user_profile, sub_pairs, no_log=False):
if not no_log:
log_event({'type': 'subscription_added',
'user': user_profile.email,
'name': stream.name,
'names': [stream.name for sub, stream in sub_pairs],
'domain': stream.realm.domain})
notice = dict(event=dict(type="subscription", op="add",
subscription=dict(name=stream.name,
in_home_view=subscription.in_home_view,
invite_only=stream.invite_only,
color=subscription.color)),
payload = [dict(name=stream.name,
in_home_view=subscription.in_home_view,
invite_only=stream.invite_only,
color=subscription.color)
for (subscription, stream) in sub_pairs]
notice = dict(event=dict(type="subscriptions", op="add",
subscriptions=payload),
users=[user_profile.id])
tornado_callbacks.send_notification(notice)
@ -613,8 +615,15 @@ def bulk_add_subscriptions(streams, users):
Subscription.objects.bulk_create([sub for (sub, stream) in subs_to_add])
Subscription.objects.filter(id__in=[sub.id for (sub, stream_name) in subs_to_activate]).update(active=True)
sub_tuples_by_user = defaultdict(list)
for (sub, stream) in subs_to_add + subs_to_activate:
notify_new_subscription(sub.user_profile, stream, sub)
sub_tuples_by_user[sub.user_profile.id].append((sub, stream))
for user_profile in users:
if len(sub_tuples_by_user[user_profile.id]) == 0:
continue
notify_subscriptions_added(user_profile, sub_tuples_by_user[user_profile.id])
return ([(user_profile, stream_name) for (user_profile, recipient_id, stream_name) in new_subs] +
[(sub.user_profile, stream_name) for (sub, stream_name) in subs_to_activate],
already_subscribed)
@ -632,18 +641,19 @@ def do_add_subscription(user_profile, stream, no_log=False):
subscription.active = True
subscription.save(update_fields=["active"])
if did_subscribe:
notify_new_subscription(user_profile, stream, subscription, no_log)
notify_subscriptions_added(user_profile, [(subscription, stream)], no_log)
return did_subscribe
def notify_subscription_removed(user_profile, stream, no_log=False):
def notify_subscriptions_removed(user_profile, streams, no_log=False):
if not no_log:
log_event({'type': 'subscription_removed',
'user': user_profile.email,
'name': stream.name,
'names': [stream.name for stream in streams],
'domain': stream.realm.domain})
notice = dict(event=dict(type="subscription", op="remove",
subscription=dict(name=stream.name)),
payload = [dict(name=stream.name) for stream in streams]
notice = dict(event=dict(type="subscriptions", op="remove",
subscriptions=payload),
users=[user_profile.id])
tornado_callbacks.send_notification(notice)
@ -673,8 +683,14 @@ def bulk_remove_subscriptions(users, streams):
Subscription.objects.filter(id__in=[sub.id for (sub, stream_name) in
subs_to_deactivate]).update(active=False)
streams_by_user = defaultdict(list)
for (sub, stream) in subs_to_deactivate:
notify_subscription_removed(sub.user_profile, stream)
streams_by_user[sub.user_profile_id].append(stream)
for user_profile in users:
if len(streams_by_user[user_profile.id]) == 0:
continue
notify_subscriptions_removed(user_profile, streams_by_user[user_profile.id])
return ([(sub.user_profile, stream) for (sub, stream) in subs_to_deactivate],
not_subscribed)
@ -690,7 +706,7 @@ def do_remove_subscription(user_profile, stream, no_log=False):
subscription.active = False
subscription.save(update_fields=["active"])
if did_remove:
notify_subscription_removed(user_profile, stream, no_log)
notify_subscriptions_removed(user_profile, [stream], no_log)
return did_remove
@ -1129,14 +1145,23 @@ def do_events_register(user_profile, user_client, apply_markdown=True,
person = event['person']
ret['realm_users'] = filter(lambda p: p['email'] != person['email'],
ret['realm_users'])
elif event['type'] == "subscription":
elif event['type'] == "subscriptions":
subscriptions_to_filter = set(sub.name.lower() for sub in event["subscriptions"])
# We add the new subscriptions to the list of streams the
# user is subscribed to, and also remove/add them from the
# list of streams the user is not subscribed to (which we
# are still sending on data about so that e.g. colors and
# the in_home_view bit are properly available for those streams)
#
# And we do the opposite filtering process for unsubscribe events.
if event['op'] == "add":
ret['subscriptions'].append(event['subscription'])
ret['subscriptions'] += event['subscriptions']
ret['unsubscribed'] = filter(lambda s: s['name'].lower() not in subscriptions_to_filter,
ret['unsubscribed'])
elif event['op'] == "remove":
sub = event['subscription']
ret['subscriptions'] = filter(lambda s: s['name'] != sub['name'],
ret['unsubscribed'] += event['subscriptions']
ret['subscriptions'] = filter(lambda s: s['name'].lower() not in subscriptions_to_filter,
ret['subscriptions'])
ret['unsubscribed'].append(sub)
elif event['type'] == "presence":
ret['presences'][event['email']] = event['presence']
elif event['type'] == "update_message":

View File

@ -872,13 +872,17 @@ function get_updates(options) {
}
typeahead_helper.autocomplete_needs_update(true);
break;
case 'subscription':
case 'subscriptions':
if (event.op === 'add') {
$(document).trigger($.Event('subscription_add.zephyr',
{subscription: event.subscription}));
$.each(event.subscriptions, function(index, subscription) {
$(document).trigger($.Event('subscription_add.zephyr',
{subscription: subscription}));
});
} else if (event.op === 'remove') {
$(document).trigger($.Event('subscription_remove.zephyr',
{subscription: event.subscription}));
$.each(event.subscriptions, function(index, subscription) {
$(document).trigger($.Event('subscription_remove.zephyr',
{subscription: subscription}));
});
}
break;
case 'presence':