subscriber events: Change schema for peer_add/peer_remove.

We now can send an implied matrix of user/stream tuples
for peer_add and peer_remove events.

The client code basically does this:

    for stream_id in event['stream_ids']:
        for user_id in event['user_ids']:
            update_sub(stream_id, user_id)

We used to send individual events, which gets real
expensive when you are creating new streams. For
the case of copy-to-stream case, we should see
events go from U to 1, where U is the number of users
added.

Note that we don't yet fully optimize the potential
of this schema.  For adding a new user with lots
of default streams, we still send S peer_add events.

And if you subscribe a bunch of users to a bunch of
private streams, we only go from U * S to S; we can't
optimize it down to one event easily.
This commit is contained in:
Steve Howell 2020-10-22 12:14:02 +00:00 committed by Tim Abbott
parent efa8dd3a47
commit 7ff3859136
8 changed files with 144 additions and 137 deletions

View File

@ -50,7 +50,7 @@ test("peer add/remove", (override) => {
stream_data.add_sub({ stream_data.add_sub({
name: "devel", name: "devel",
stream_id: event.stream_id, stream_id: event.stream_ids[0],
}); });
const subs_stub = global.make_stub(); const subs_stub = global.make_stub();
@ -111,11 +111,13 @@ test("add error handling", (override) => {
}); });
}); });
test("peer event error handling (bad stream_ids)", () => { test("peer event error handling (bad stream_ids)", (override) => {
override("compose_fade.update_faded_users", () => {});
const add_event = { const add_event = {
type: "subscription", type: "subscription",
op: "peer_add", op: "peer_add",
stream_id: 99999, stream_ids: [99999],
}; };
blueslip.expect("warn", "Cannot find stream for peer_add: 99999"); blueslip.expect("warn", "Cannot find stream for peer_add: 99999");
@ -125,7 +127,7 @@ test("peer event error handling (bad stream_ids)", () => {
const remove_event = { const remove_event = {
type: "subscription", type: "subscription",
op: "peer_remove", op: "peer_remove",
stream_id: 99999, stream_ids: [99999],
}; };
blueslip.expect("warn", "Cannot find stream for peer_remove: 99999"); blueslip.expect("warn", "Cannot find stream for peer_remove: 99999");
@ -133,6 +135,9 @@ test("peer event error handling (bad stream_ids)", () => {
}); });
test("peer event error handling (add_subscriber)", (override) => { test("peer event error handling (add_subscriber)", (override) => {
override("compose_fade.update_faded_users", () => {});
override("subs.update_subscribers_ui", () => {});
stream_data.add_sub({ stream_data.add_sub({
name: "devel", name: "devel",
stream_id: 1, stream_id: 1,
@ -143,8 +148,8 @@ test("peer event error handling (add_subscriber)", (override) => {
const add_event = { const add_event = {
type: "subscription", type: "subscription",
op: "peer_add", op: "peer_add",
stream_id: 1, stream_ids: [1],
user_id: 99999, // id is irrelevant user_ids: [99999], // id is irrelevant
}; };
blueslip.expect("warn", "Cannot process peer_add event"); blueslip.expect("warn", "Cannot process peer_add event");
@ -156,8 +161,8 @@ test("peer event error handling (add_subscriber)", (override) => {
const remove_event = { const remove_event = {
type: "subscription", type: "subscription",
op: "peer_remove", op: "peer_remove",
stream_id: 1, stream_ids: [1],
user_id: 99999, // id is irrelevant user_ids: [99999], // id is irrelevant
}; };
blueslip.expect("warn", "Cannot process peer_remove event."); blueslip.expect("warn", "Cannot process peer_remove event.");

View File

@ -537,15 +537,15 @@ exports.fixtures = {
subscription__peer_add: { subscription__peer_add: {
type: "subscription", type: "subscription",
op: "peer_add", op: "peer_add",
user_id: test_user.user_id, user_ids: [test_user.user_id],
stream_id: 42, stream_ids: [streams.devel.stream_id],
}, },
subscription__peer_remove: { subscription__peer_remove: {
type: "subscription", type: "subscription",
op: "peer_remove", op: "peer_remove",
user_id: test_user.user_id, user_ids: [test_user.user_id],
stream_id: 42, stream_ids: [streams.devel.stream_id],
}, },
subscription__remove: { subscription__remove: {

View File

@ -343,7 +343,7 @@ exports.dispatch_normal_event = function dispatch_normal_event(event) {
} }
} }
} else if (event.op === "peer_add") { } else if (event.op === "peer_add") {
function add_peer(stream_id, user_id) { event.stream_ids.forEach((stream_id) => {
const sub = stream_data.get_sub_by_id(stream_id); const sub = stream_data.get_sub_by_id(stream_id);
if (!sub) { if (!sub) {
@ -351,17 +351,18 @@ exports.dispatch_normal_event = function dispatch_normal_event(event) {
return; return;
} }
if (!stream_data.add_subscriber(stream_id, user_id)) { event.user_ids.forEach((user_id) => {
blueslip.warn("Cannot process peer_add event"); if (!stream_data.add_subscriber(stream_id, user_id)) {
return; blueslip.warn("Cannot process peer_add event");
} return;
}
});
subs.update_subscribers_ui(sub); subs.update_subscribers_ui(sub);
compose_fade.update_faded_users(); });
} compose_fade.update_faded_users();
add_peer(event.stream_id, event.user_id);
} else if (event.op === "peer_remove") { } else if (event.op === "peer_remove") {
function remove_peer(stream_id, user_id) { event.stream_ids.forEach((stream_id) => {
const sub = stream_data.get_sub_by_id(stream_id); const sub = stream_data.get_sub_by_id(stream_id);
if (!sub) { if (!sub) {
@ -369,15 +370,16 @@ exports.dispatch_normal_event = function dispatch_normal_event(event) {
return; return;
} }
if (!stream_data.remove_subscriber(sub.stream_id, user_id)) { event.user_ids.forEach((user_id) => {
blueslip.warn("Cannot process peer_remove event."); if (!stream_data.remove_subscriber(sub.stream_id, user_id)) {
return; blueslip.warn("Cannot process peer_remove event.");
} return;
}
});
subs.update_subscribers_ui(sub); subs.update_subscribers_ui(sub);
compose_fade.update_faded_users(); });
} compose_fade.update_faded_users();
remove_peer(event.stream_id, event.user_id);
} else if (event.op === "remove") { } else if (event.op === "remove") {
for (const rec of event.subscriptions) { for (const rec of event.subscriptions) {
const sub = stream_data.get_sub_by_id(rec.stream_id); const sub = stream_data.get_sub_by_id(rec.stream_id);

View File

@ -2966,14 +2966,13 @@ def send_peer_subscriber_events(
peer_user_ids = private_peer_dict[stream_id] - altered_user_ids peer_user_ids = private_peer_dict[stream_id] - altered_user_ids
if peer_user_ids: if peer_user_ids:
for new_user_id in altered_user_ids: event = dict(
event = dict( type="subscription",
type="subscription", op=op,
op=op, stream_ids=[stream_id],
stream_id=stream_id, user_ids=sorted(list(altered_user_ids))
user_id=new_user_id, )
) send_event(realm, event, peer_user_ids)
send_event(realm, event, peer_user_ids)
public_stream_ids = [ public_stream_ids = [
stream_id for stream_id in altered_user_dict stream_id for stream_id in altered_user_dict
@ -2984,19 +2983,25 @@ def send_peer_subscriber_events(
if public_stream_ids: if public_stream_ids:
public_peer_ids = set(active_non_guest_user_ids(realm.id)) public_peer_ids = set(active_non_guest_user_ids(realm.id))
# TODO:
#
# We eventually want a special optimization for a single user that
# subscribes to many streams. Right now we optimize for the other
# scenario, which is also kind of common--if we add/remove multiple
# users all for the same stream, we just send one event per stream.
for stream_id in public_stream_ids: for stream_id in public_stream_ids:
altered_user_ids = altered_user_dict[stream_id] altered_user_ids = altered_user_dict[stream_id]
peer_user_ids = public_peer_ids - altered_user_ids peer_user_ids = public_peer_ids - altered_user_ids
if peer_user_ids: if peer_user_ids:
for new_user_id in altered_user_ids: event = dict(
event = dict( type="subscription",
type="subscription", op=op,
op=op, stream_ids=[stream_id],
stream_id=stream_id, user_ids=sorted(list(altered_user_ids)),
user_id=new_user_id, )
) send_event(realm, event, peer_user_ids)
send_event(realm, event, peer_user_ids)
def send_peer_remove_events( def send_peer_remove_events(
realm: Realm, realm: Realm,

View File

@ -1184,8 +1184,8 @@ subscription_peer_add_event = event_dict_type(
required_keys=[ required_keys=[
("type", Equals("subscription")), ("type", Equals("subscription")),
("op", Equals("peer_add")), ("op", Equals("peer_add")),
("user_id", int), ("user_ids", ListType(int)),
("stream_id", int), ("stream_ids", ListType(int)),
] ]
) )
check_subscription_peer_add = make_checker(subscription_peer_add_event) check_subscription_peer_add = make_checker(subscription_peer_add_event)
@ -1194,8 +1194,8 @@ subscription_peer_remove_event = event_dict_type(
required_keys=[ required_keys=[
("type", Equals("subscription")), ("type", Equals("subscription")),
("op", Equals("peer_remove")), ("op", Equals("peer_remove")),
("user_id", int), ("user_ids", ListType(int)),
("stream_id", int), ("stream_ids", ListType(int)),
] ]
) )
check_subscription_peer_remove = make_checker(subscription_peer_remove_event) check_subscription_peer_remove = make_checker(subscription_peer_remove_event)

View File

@ -717,23 +717,21 @@ def apply_event(state: Dict[str, Any],
if sub['name'].lower() == event['name'].lower(): if sub['name'].lower() == event['name'].lower():
sub[event['property']] = event['value'] sub[event['property']] = event['value']
elif event['op'] == 'peer_add': elif event['op'] == 'peer_add':
stream_id = event['stream_id'] stream_ids = set(event["stream_ids"])
user_id = event['user_id'] user_ids = set(event["user_ids"])
for sub in state['subscriptions']: for sub_dict in [state["subscriptions"], state["never_subscribed"]]:
if (sub['stream_id'] == stream_id and for sub in sub_dict:
user_id not in sub['subscribers']): if sub["stream_id"] in stream_ids:
sub['subscribers'].append(user_id) subscribers = set(sub["subscribers"]) | user_ids
for sub in state['never_subscribed']: sub["subscribers"] = sorted(list(subscribers))
if (sub['stream_id'] == stream_id and
user_id not in sub['subscribers']):
sub['subscribers'].append(user_id)
elif event['op'] == 'peer_remove': elif event['op'] == 'peer_remove':
stream_id = event['stream_id'] stream_ids = set(event["stream_ids"])
user_id = event['user_id'] user_ids = set(event["user_ids"])
for sub in state['subscriptions']: for sub_dict in [state["subscriptions"]]:
if (sub['stream_id'] == stream_id and for sub in sub_dict:
user_id in sub['subscribers']): if sub["stream_id"] in stream_ids:
sub['subscribers'].remove(user_id) subscribers = set(sub["subscribers"]) - user_ids
sub["subscribers"] = sorted(list(subscribers))
elif event['type'] == "presence": elif event['type'] == "presence":
if slim_presence: if slim_presence:
user_key = str(event['user_id']) user_key = str(event['user_id'])

View File

@ -589,8 +589,8 @@ paths:
} }
- type: object - type: object
description: | description: |
Event sent to other users when a user has been subscribed to a Event sent to other users when users have been subscribed to
stream. Sent to all users if the stream is public or to only streams. Sent to all users if the stream is public or to only
the existing subscribers if the stream is private. the existing subscribers if the stream is private.
properties: properties:
id: id:
@ -603,14 +603,18 @@ paths:
type: string type: string
enum: enum:
- peer_add - peer_add
stream_id: stream_ids:
type: integer type: array
description: | description: |
The ID of the stream to which the user has subscribed. The IDs of the streams to which the user has subscribed.
user_id: items:
type: integer type: integer
user_ids:
type: array
description: | description: |
The ID of the user who subscribed. The IDs of the users who subscribed.
items:
type: integer
additionalProperties: false additionalProperties: false
example: example:
{ {
@ -622,8 +626,8 @@ paths:
} }
- type: object - type: object
description: | description: |
Event sent to other users when a user has been unsubscribed Event sent to other users when users have been unsubscribed
from a stream. Sent to all users if the stream is public or to only from streams. Sent to all users if the stream is public or to only
the existing subscribers if the stream is private. the existing subscribers if the stream is private.
properties: properties:
id: id:
@ -636,15 +640,19 @@ paths:
type: string type: string
enum: enum:
- peer_remove - peer_remove
stream_id: stream_ids:
type: integer type: array
description: | description: |
The ID of the stream from which the user has been The IDs of the streams from which the users have been
unsubscribed from. unsubscribed from.
user_id: items:
type: integer type: integer
user_ids:
type: array
description: | description: |
The ID of the user who has been unsubscribed. The IDs of the users who have been unsubscribed.
items:
type: integer
additionalProperties: false additionalProperties: false
example: example:
{ {

View File

@ -2973,7 +2973,7 @@ class SubscriptionAPITest(ZulipTestCase):
) )
self.assert_length(queries, 37) self.assert_length(queries, 37)
self.assert_length(events, 6) self.assert_length(events, 5)
for ev in [x for x in events if x['event']['type'] not in ('message', 'stream')]: for ev in [x for x in events if x['event']['type'] not in ('message', 'stream')]:
if ev['event']['op'] == 'add': if ev['event']['op'] == 'add':
self.assertEqual( self.assertEqual(
@ -3014,7 +3014,7 @@ class SubscriptionAPITest(ZulipTestCase):
self.assertEqual(len(add_peer_event['users']), 11) self.assertEqual(len(add_peer_event['users']), 11)
self.assertEqual(add_peer_event['event']['type'], 'subscription') self.assertEqual(add_peer_event['event']['type'], 'subscription')
self.assertEqual(add_peer_event['event']['op'], 'peer_add') self.assertEqual(add_peer_event['event']['op'], 'peer_add')
self.assertEqual(add_peer_event['event']['user_id'], self.user_profile.id) self.assertEqual(add_peer_event['event']['user_ids'], [self.user_profile.id])
stream = get_stream('multi_user_stream', realm) stream = get_stream('multi_user_stream', realm)
self.assertEqual(num_subscribers_for_stream_id(stream.id), 3) self.assertEqual(num_subscribers_for_stream_id(stream.id), 3)
@ -3046,7 +3046,7 @@ class SubscriptionAPITest(ZulipTestCase):
self.assertEqual(len(add_peer_event['users']), 11) self.assertEqual(len(add_peer_event['users']), 11)
self.assertEqual(add_peer_event['event']['type'], 'subscription') self.assertEqual(add_peer_event['event']['type'], 'subscription')
self.assertEqual(add_peer_event['event']['op'], 'peer_add') self.assertEqual(add_peer_event['event']['op'], 'peer_add')
self.assertEqual(add_peer_event['event']['user_id'], user_profile.id) self.assertEqual(add_peer_event['event']['user_ids'], [user_profile.id])
def test_private_stream_subscription(self) -> None: def test_private_stream_subscription(self) -> None:
realm = get_realm("zulip") realm = get_realm("zulip")
@ -3087,7 +3087,7 @@ class SubscriptionAPITest(ZulipTestCase):
self.assertEqual(len(add_peer_event['users']), 3) self.assertEqual(len(add_peer_event['users']), 3)
self.assertEqual(add_peer_event['event']['type'], 'subscription') self.assertEqual(add_peer_event['event']['type'], 'subscription')
self.assertEqual(add_peer_event['event']['op'], 'peer_add') self.assertEqual(add_peer_event['event']['op'], 'peer_add')
self.assertEqual(add_peer_event['event']['user_id'], user_profile.id) self.assertEqual(add_peer_event['event']['user_ids'], [user_profile.id])
# Do not send stream creation event to realm admin users # Do not send stream creation event to realm admin users
# even if realm admin is subscribed to stream cause realm admin already get # even if realm admin is subscribed to stream cause realm admin already get
@ -3109,7 +3109,7 @@ class SubscriptionAPITest(ZulipTestCase):
self.assertEqual(len(add_peer_event['users']), 1) self.assertEqual(len(add_peer_event['users']), 1)
self.assertEqual(add_peer_event['event']['type'], 'subscription') self.assertEqual(add_peer_event['event']['type'], 'subscription')
self.assertEqual(add_peer_event['event']['op'], 'peer_add') self.assertEqual(add_peer_event['event']['op'], 'peer_add')
self.assertEqual(add_peer_event['event']['user_id'], self.example_user("iago").id) self.assertEqual(add_peer_event['event']['user_ids'], [self.example_user("iago").id])
def test_subscribe_to_stream_post_policy_admins_stream(self) -> None: def test_subscribe_to_stream_post_policy_admins_stream(self) -> None:
""" """
@ -3211,18 +3211,19 @@ class SubscriptionAPITest(ZulipTestCase):
dict(principals=orjson.dumps(new_user_ids_to_subscribe).decode()), dict(principals=orjson.dumps(new_user_ids_to_subscribe).decode()),
) )
add_peer_events = [events[2], events[3]] add_peer_events = [event for event in events if event["event"].get("op") == "peer_add"]
for add_peer_event in add_peer_events: (add_peer_event,) = add_peer_events
self.assertEqual(add_peer_event['event']['type'], 'subscription')
self.assertEqual(add_peer_event['event']['op'], 'peer_add') self.assertEqual(add_peer_event["event"]["type"], "subscription")
event_sent_to_ids = add_peer_event['users'] self.assertEqual(add_peer_event["event"]["op"], "peer_add")
for user_id in new_user_ids_to_subscribe: event_sent_to_ids = add_peer_event["users"]
# Make sure new users subscribed to stream is not in for user_id in new_user_ids_to_subscribe:
# peer_add event recipient list # Make sure new users subscribed to stream is not in
self.assertNotIn(user_id, event_sent_to_ids) # peer_add event recipient list
for old_user in orig_user_ids_to_subscribe: self.assertNotIn(user_id, event_sent_to_ids)
# Check non new users are in peer_add event recipient list. for old_user in orig_user_ids_to_subscribe:
self.assertIn(old_user, event_sent_to_ids) # Check non new users are in peer_add event recipient list.
self.assertIn(old_user, event_sent_to_ids)
def test_users_getting_remove_peer_event(self) -> None: def test_users_getting_remove_peer_event(self) -> None:
""" """
@ -3233,6 +3234,7 @@ class SubscriptionAPITest(ZulipTestCase):
user3 = self.example_user("hamlet") user3 = self.example_user("hamlet")
user4 = self.example_user("iago") user4 = self.example_user("iago")
user5 = self.example_user("AARON") user5 = self.example_user("AARON")
guest = self.example_user("polonius")
stream1 = self.make_stream('stream1') stream1 = self.make_stream('stream1')
stream2 = self.make_stream('stream2') stream2 = self.make_stream('stream2')
@ -3266,49 +3268,36 @@ class SubscriptionAPITest(ZulipTestCase):
peer_events = [e for e in events peer_events = [e for e in events
if e['event'].get('op') == 'peer_remove'] if e['event'].get('op') == 'peer_remove']
notifications = set() # We only care about a subset of users when we inspect
# peer_remove events.
our_user_ids = {
user1.id,
user2.id,
user3.id,
user4.id,
user5.id,
guest.id,
}
notifications = []
for event in peer_events: for event in peer_events:
for user_id in event['users']: (stream_id,) = event['event']['stream_ids']
stream_id = event['event']['stream_id'] stream_name = Stream.objects.get(id=stream_id).name
stream_name = Stream.objects.get(id=stream_id).name removed_user_ids = set(event['event']['user_ids'])
removed_user_id = event['event']['user_id'] notified_user_ids = set(event['users']) & our_user_ids
notifications.add((user_id, removed_user_id, stream_name)) notifications.append((stream_name, removed_user_ids, notified_user_ids))
# POSITIVE CASES FIRST notifications.sort(key=lambda tup: tup[0])
self.assertIn((user3.id, user1.id, 'stream1'), notifications)
self.assertIn((user4.id, user1.id, 'stream1'), notifications)
self.assertIn((user3.id, user2.id, 'stream1'), notifications) self.assertEqual(
self.assertIn((user4.id, user2.id, 'stream1'), notifications) notifications,
[
self.assertIn((user1.id, user2.id, 'stream2'), notifications) ("private_stream", {user1.id, user2.id}, {user3.id, user4.id}),
self.assertIn((user3.id, user2.id, 'stream2'), notifications) ("stream1", {user1.id, user2.id}, {user3.id, user4.id, user5.id}),
self.assertIn((user4.id, user2.id, 'stream2'), notifications) ("stream2", {user2.id}, {user1.id, user3.id, user4.id, user5.id}),
("stream3", {user2.id}, {user1.id, user3.id, user4.id, user5.id}),
self.assertIn((user1.id, user2.id, 'stream3'), notifications) ],
self.assertIn((user3.id, user2.id, 'stream3'), notifications) )
self.assertIn((user4.id, user2.id, 'stream3'), notifications)
self.assertIn((user3.id, user1.id, 'private_stream'), notifications)
self.assertIn((user3.id, user2.id, 'private_stream'), notifications)
self.assertIn((user4.id, user1.id, 'private_stream'), notifications)
self.assertIn((user4.id, user2.id, 'private_stream'), notifications)
# NEGATIVE
# don't be notified if you are being removed yourself
self.assertNotIn((user1.id, user1.id, 'stream1'), notifications)
# don't send false notifications for folks that weren't actually
# subscribed int he first place
self.assertNotIn((user3.id, user1.id, 'stream2'), notifications)
# don't send notifications for random people
self.assertNotIn((user3.id, user4.id, 'stream2'), notifications)
# don't send notifications to unsubscribed non realm admin users for private streams
self.assertNotIn((user5.id, user1.id, 'private_stream'), notifications)
def test_bulk_subscribe_MIT(self) -> None: def test_bulk_subscribe_MIT(self) -> None:
mit_user = self.mit_user('starnine') mit_user = self.mit_user('starnine')