subscriber events: Change schema for peer_add/peer_remove.

We now can send an implied matrix of user/stream tuples
for peer_add and peer_remove events.

The client code basically does this:

    for stream_id in event['stream_ids']:
        for user_id in event['user_ids']:
            update_sub(stream_id, user_id)

We used to send individual events, which gets real
expensive when you are creating new streams. For
the case of copy-to-stream case, we should see
events go from U to 1, where U is the number of users
added.

Note that we don't yet fully optimize the potential
of this schema.  For adding a new user with lots
of default streams, we still send S peer_add events.

And if you subscribe a bunch of users to a bunch of
private streams, we only go from U * S to S; we can't
optimize it down to one event easily.
This commit is contained in:
Steve Howell 2020-10-22 12:14:02 +00:00 committed by Tim Abbott
parent efa8dd3a47
commit 7ff3859136
8 changed files with 144 additions and 137 deletions

View File

@ -50,7 +50,7 @@ test("peer add/remove", (override) => {
stream_data.add_sub({
name: "devel",
stream_id: event.stream_id,
stream_id: event.stream_ids[0],
});
const subs_stub = global.make_stub();
@ -111,11 +111,13 @@ test("add error handling", (override) => {
});
});
test("peer event error handling (bad stream_ids)", () => {
test("peer event error handling (bad stream_ids)", (override) => {
override("compose_fade.update_faded_users", () => {});
const add_event = {
type: "subscription",
op: "peer_add",
stream_id: 99999,
stream_ids: [99999],
};
blueslip.expect("warn", "Cannot find stream for peer_add: 99999");
@ -125,7 +127,7 @@ test("peer event error handling (bad stream_ids)", () => {
const remove_event = {
type: "subscription",
op: "peer_remove",
stream_id: 99999,
stream_ids: [99999],
};
blueslip.expect("warn", "Cannot find stream for peer_remove: 99999");
@ -133,6 +135,9 @@ test("peer event error handling (bad stream_ids)", () => {
});
test("peer event error handling (add_subscriber)", (override) => {
override("compose_fade.update_faded_users", () => {});
override("subs.update_subscribers_ui", () => {});
stream_data.add_sub({
name: "devel",
stream_id: 1,
@ -143,8 +148,8 @@ test("peer event error handling (add_subscriber)", (override) => {
const add_event = {
type: "subscription",
op: "peer_add",
stream_id: 1,
user_id: 99999, // id is irrelevant
stream_ids: [1],
user_ids: [99999], // id is irrelevant
};
blueslip.expect("warn", "Cannot process peer_add event");
@ -156,8 +161,8 @@ test("peer event error handling (add_subscriber)", (override) => {
const remove_event = {
type: "subscription",
op: "peer_remove",
stream_id: 1,
user_id: 99999, // id is irrelevant
stream_ids: [1],
user_ids: [99999], // id is irrelevant
};
blueslip.expect("warn", "Cannot process peer_remove event.");

View File

@ -537,15 +537,15 @@ exports.fixtures = {
subscription__peer_add: {
type: "subscription",
op: "peer_add",
user_id: test_user.user_id,
stream_id: 42,
user_ids: [test_user.user_id],
stream_ids: [streams.devel.stream_id],
},
subscription__peer_remove: {
type: "subscription",
op: "peer_remove",
user_id: test_user.user_id,
stream_id: 42,
user_ids: [test_user.user_id],
stream_ids: [streams.devel.stream_id],
},
subscription__remove: {

View File

@ -343,7 +343,7 @@ exports.dispatch_normal_event = function dispatch_normal_event(event) {
}
}
} else if (event.op === "peer_add") {
function add_peer(stream_id, user_id) {
event.stream_ids.forEach((stream_id) => {
const sub = stream_data.get_sub_by_id(stream_id);
if (!sub) {
@ -351,17 +351,18 @@ exports.dispatch_normal_event = function dispatch_normal_event(event) {
return;
}
if (!stream_data.add_subscriber(stream_id, user_id)) {
blueslip.warn("Cannot process peer_add event");
return;
}
event.user_ids.forEach((user_id) => {
if (!stream_data.add_subscriber(stream_id, user_id)) {
blueslip.warn("Cannot process peer_add event");
return;
}
});
subs.update_subscribers_ui(sub);
compose_fade.update_faded_users();
}
add_peer(event.stream_id, event.user_id);
});
compose_fade.update_faded_users();
} else if (event.op === "peer_remove") {
function remove_peer(stream_id, user_id) {
event.stream_ids.forEach((stream_id) => {
const sub = stream_data.get_sub_by_id(stream_id);
if (!sub) {
@ -369,15 +370,16 @@ exports.dispatch_normal_event = function dispatch_normal_event(event) {
return;
}
if (!stream_data.remove_subscriber(sub.stream_id, user_id)) {
blueslip.warn("Cannot process peer_remove event.");
return;
}
event.user_ids.forEach((user_id) => {
if (!stream_data.remove_subscriber(sub.stream_id, user_id)) {
blueslip.warn("Cannot process peer_remove event.");
return;
}
});
subs.update_subscribers_ui(sub);
compose_fade.update_faded_users();
}
remove_peer(event.stream_id, event.user_id);
});
compose_fade.update_faded_users();
} else if (event.op === "remove") {
for (const rec of event.subscriptions) {
const sub = stream_data.get_sub_by_id(rec.stream_id);

View File

@ -2966,14 +2966,13 @@ def send_peer_subscriber_events(
peer_user_ids = private_peer_dict[stream_id] - altered_user_ids
if peer_user_ids:
for new_user_id in altered_user_ids:
event = dict(
type="subscription",
op=op,
stream_id=stream_id,
user_id=new_user_id,
)
send_event(realm, event, peer_user_ids)
event = dict(
type="subscription",
op=op,
stream_ids=[stream_id],
user_ids=sorted(list(altered_user_ids))
)
send_event(realm, event, peer_user_ids)
public_stream_ids = [
stream_id for stream_id in altered_user_dict
@ -2984,19 +2983,25 @@ def send_peer_subscriber_events(
if public_stream_ids:
public_peer_ids = set(active_non_guest_user_ids(realm.id))
# TODO:
#
# We eventually want a special optimization for a single user that
# subscribes to many streams. Right now we optimize for the other
# scenario, which is also kind of common--if we add/remove multiple
# users all for the same stream, we just send one event per stream.
for stream_id in public_stream_ids:
altered_user_ids = altered_user_dict[stream_id]
peer_user_ids = public_peer_ids - altered_user_ids
if peer_user_ids:
for new_user_id in altered_user_ids:
event = dict(
type="subscription",
op=op,
stream_id=stream_id,
user_id=new_user_id,
)
send_event(realm, event, peer_user_ids)
event = dict(
type="subscription",
op=op,
stream_ids=[stream_id],
user_ids=sorted(list(altered_user_ids)),
)
send_event(realm, event, peer_user_ids)
def send_peer_remove_events(
realm: Realm,

View File

@ -1184,8 +1184,8 @@ subscription_peer_add_event = event_dict_type(
required_keys=[
("type", Equals("subscription")),
("op", Equals("peer_add")),
("user_id", int),
("stream_id", int),
("user_ids", ListType(int)),
("stream_ids", ListType(int)),
]
)
check_subscription_peer_add = make_checker(subscription_peer_add_event)
@ -1194,8 +1194,8 @@ subscription_peer_remove_event = event_dict_type(
required_keys=[
("type", Equals("subscription")),
("op", Equals("peer_remove")),
("user_id", int),
("stream_id", int),
("user_ids", ListType(int)),
("stream_ids", ListType(int)),
]
)
check_subscription_peer_remove = make_checker(subscription_peer_remove_event)

View File

@ -717,23 +717,21 @@ def apply_event(state: Dict[str, Any],
if sub['name'].lower() == event['name'].lower():
sub[event['property']] = event['value']
elif event['op'] == 'peer_add':
stream_id = event['stream_id']
user_id = event['user_id']
for sub in state['subscriptions']:
if (sub['stream_id'] == stream_id and
user_id not in sub['subscribers']):
sub['subscribers'].append(user_id)
for sub in state['never_subscribed']:
if (sub['stream_id'] == stream_id and
user_id not in sub['subscribers']):
sub['subscribers'].append(user_id)
stream_ids = set(event["stream_ids"])
user_ids = set(event["user_ids"])
for sub_dict in [state["subscriptions"], state["never_subscribed"]]:
for sub in sub_dict:
if sub["stream_id"] in stream_ids:
subscribers = set(sub["subscribers"]) | user_ids
sub["subscribers"] = sorted(list(subscribers))
elif event['op'] == 'peer_remove':
stream_id = event['stream_id']
user_id = event['user_id']
for sub in state['subscriptions']:
if (sub['stream_id'] == stream_id and
user_id in sub['subscribers']):
sub['subscribers'].remove(user_id)
stream_ids = set(event["stream_ids"])
user_ids = set(event["user_ids"])
for sub_dict in [state["subscriptions"]]:
for sub in sub_dict:
if sub["stream_id"] in stream_ids:
subscribers = set(sub["subscribers"]) - user_ids
sub["subscribers"] = sorted(list(subscribers))
elif event['type'] == "presence":
if slim_presence:
user_key = str(event['user_id'])

View File

@ -589,8 +589,8 @@ paths:
}
- type: object
description: |
Event sent to other users when a user has been subscribed to a
stream. Sent to all users if the stream is public or to only
Event sent to other users when users have been subscribed to
streams. Sent to all users if the stream is public or to only
the existing subscribers if the stream is private.
properties:
id:
@ -603,14 +603,18 @@ paths:
type: string
enum:
- peer_add
stream_id:
type: integer
stream_ids:
type: array
description: |
The ID of the stream to which the user has subscribed.
user_id:
type: integer
The IDs of the streams to which the user has subscribed.
items:
type: integer
user_ids:
type: array
description: |
The ID of the user who subscribed.
The IDs of the users who subscribed.
items:
type: integer
additionalProperties: false
example:
{
@ -622,8 +626,8 @@ paths:
}
- type: object
description: |
Event sent to other users when a user has been unsubscribed
from a stream. Sent to all users if the stream is public or to only
Event sent to other users when users have been unsubscribed
from streams. Sent to all users if the stream is public or to only
the existing subscribers if the stream is private.
properties:
id:
@ -636,15 +640,19 @@ paths:
type: string
enum:
- peer_remove
stream_id:
type: integer
stream_ids:
type: array
description: |
The ID of the stream from which the user has been
The IDs of the streams from which the users have been
unsubscribed from.
user_id:
type: integer
items:
type: integer
user_ids:
type: array
description: |
The ID of the user who has been unsubscribed.
The IDs of the users who have been unsubscribed.
items:
type: integer
additionalProperties: false
example:
{

View File

@ -2973,7 +2973,7 @@ class SubscriptionAPITest(ZulipTestCase):
)
self.assert_length(queries, 37)
self.assert_length(events, 6)
self.assert_length(events, 5)
for ev in [x for x in events if x['event']['type'] not in ('message', 'stream')]:
if ev['event']['op'] == 'add':
self.assertEqual(
@ -3014,7 +3014,7 @@ class SubscriptionAPITest(ZulipTestCase):
self.assertEqual(len(add_peer_event['users']), 11)
self.assertEqual(add_peer_event['event']['type'], 'subscription')
self.assertEqual(add_peer_event['event']['op'], 'peer_add')
self.assertEqual(add_peer_event['event']['user_id'], self.user_profile.id)
self.assertEqual(add_peer_event['event']['user_ids'], [self.user_profile.id])
stream = get_stream('multi_user_stream', realm)
self.assertEqual(num_subscribers_for_stream_id(stream.id), 3)
@ -3046,7 +3046,7 @@ class SubscriptionAPITest(ZulipTestCase):
self.assertEqual(len(add_peer_event['users']), 11)
self.assertEqual(add_peer_event['event']['type'], 'subscription')
self.assertEqual(add_peer_event['event']['op'], 'peer_add')
self.assertEqual(add_peer_event['event']['user_id'], user_profile.id)
self.assertEqual(add_peer_event['event']['user_ids'], [user_profile.id])
def test_private_stream_subscription(self) -> None:
realm = get_realm("zulip")
@ -3087,7 +3087,7 @@ class SubscriptionAPITest(ZulipTestCase):
self.assertEqual(len(add_peer_event['users']), 3)
self.assertEqual(add_peer_event['event']['type'], 'subscription')
self.assertEqual(add_peer_event['event']['op'], 'peer_add')
self.assertEqual(add_peer_event['event']['user_id'], user_profile.id)
self.assertEqual(add_peer_event['event']['user_ids'], [user_profile.id])
# Do not send stream creation event to realm admin users
# even if realm admin is subscribed to stream cause realm admin already get
@ -3109,7 +3109,7 @@ class SubscriptionAPITest(ZulipTestCase):
self.assertEqual(len(add_peer_event['users']), 1)
self.assertEqual(add_peer_event['event']['type'], 'subscription')
self.assertEqual(add_peer_event['event']['op'], 'peer_add')
self.assertEqual(add_peer_event['event']['user_id'], self.example_user("iago").id)
self.assertEqual(add_peer_event['event']['user_ids'], [self.example_user("iago").id])
def test_subscribe_to_stream_post_policy_admins_stream(self) -> None:
"""
@ -3211,18 +3211,19 @@ class SubscriptionAPITest(ZulipTestCase):
dict(principals=orjson.dumps(new_user_ids_to_subscribe).decode()),
)
add_peer_events = [events[2], events[3]]
for add_peer_event in add_peer_events:
self.assertEqual(add_peer_event['event']['type'], 'subscription')
self.assertEqual(add_peer_event['event']['op'], 'peer_add')
event_sent_to_ids = add_peer_event['users']
for user_id in new_user_ids_to_subscribe:
# Make sure new users subscribed to stream is not in
# peer_add event recipient list
self.assertNotIn(user_id, event_sent_to_ids)
for old_user in orig_user_ids_to_subscribe:
# Check non new users are in peer_add event recipient list.
self.assertIn(old_user, event_sent_to_ids)
add_peer_events = [event for event in events if event["event"].get("op") == "peer_add"]
(add_peer_event,) = add_peer_events
self.assertEqual(add_peer_event["event"]["type"], "subscription")
self.assertEqual(add_peer_event["event"]["op"], "peer_add")
event_sent_to_ids = add_peer_event["users"]
for user_id in new_user_ids_to_subscribe:
# Make sure new users subscribed to stream is not in
# peer_add event recipient list
self.assertNotIn(user_id, event_sent_to_ids)
for old_user in orig_user_ids_to_subscribe:
# Check non new users are in peer_add event recipient list.
self.assertIn(old_user, event_sent_to_ids)
def test_users_getting_remove_peer_event(self) -> None:
"""
@ -3233,6 +3234,7 @@ class SubscriptionAPITest(ZulipTestCase):
user3 = self.example_user("hamlet")
user4 = self.example_user("iago")
user5 = self.example_user("AARON")
guest = self.example_user("polonius")
stream1 = self.make_stream('stream1')
stream2 = self.make_stream('stream2')
@ -3266,49 +3268,36 @@ class SubscriptionAPITest(ZulipTestCase):
peer_events = [e for e in events
if e['event'].get('op') == 'peer_remove']
notifications = set()
# We only care about a subset of users when we inspect
# peer_remove events.
our_user_ids = {
user1.id,
user2.id,
user3.id,
user4.id,
user5.id,
guest.id,
}
notifications = []
for event in peer_events:
for user_id in event['users']:
stream_id = event['event']['stream_id']
stream_name = Stream.objects.get(id=stream_id).name
removed_user_id = event['event']['user_id']
notifications.add((user_id, removed_user_id, stream_name))
(stream_id,) = event['event']['stream_ids']
stream_name = Stream.objects.get(id=stream_id).name
removed_user_ids = set(event['event']['user_ids'])
notified_user_ids = set(event['users']) & our_user_ids
notifications.append((stream_name, removed_user_ids, notified_user_ids))
# POSITIVE CASES FIRST
self.assertIn((user3.id, user1.id, 'stream1'), notifications)
self.assertIn((user4.id, user1.id, 'stream1'), notifications)
notifications.sort(key=lambda tup: tup[0])
self.assertIn((user3.id, user2.id, 'stream1'), notifications)
self.assertIn((user4.id, user2.id, 'stream1'), notifications)
self.assertIn((user1.id, user2.id, 'stream2'), notifications)
self.assertIn((user3.id, user2.id, 'stream2'), notifications)
self.assertIn((user4.id, user2.id, 'stream2'), notifications)
self.assertIn((user1.id, user2.id, 'stream3'), notifications)
self.assertIn((user3.id, user2.id, 'stream3'), notifications)
self.assertIn((user4.id, user2.id, 'stream3'), notifications)
self.assertIn((user3.id, user1.id, 'private_stream'), notifications)
self.assertIn((user3.id, user2.id, 'private_stream'), notifications)
self.assertIn((user4.id, user1.id, 'private_stream'), notifications)
self.assertIn((user4.id, user2.id, 'private_stream'), notifications)
# NEGATIVE
# don't be notified if you are being removed yourself
self.assertNotIn((user1.id, user1.id, 'stream1'), notifications)
# don't send false notifications for folks that weren't actually
# subscribed int he first place
self.assertNotIn((user3.id, user1.id, 'stream2'), notifications)
# don't send notifications for random people
self.assertNotIn((user3.id, user4.id, 'stream2'), notifications)
# don't send notifications to unsubscribed non realm admin users for private streams
self.assertNotIn((user5.id, user1.id, 'private_stream'), notifications)
self.assertEqual(
notifications,
[
("private_stream", {user1.id, user2.id}, {user3.id, user4.id}),
("stream1", {user1.id, user2.id}, {user3.id, user4.id, user5.id}),
("stream2", {user2.id}, {user1.id, user3.id, user4.id, user5.id}),
("stream3", {user2.id}, {user1.id, user3.id, user4.id, user5.id}),
],
)
def test_bulk_subscribe_MIT(self) -> None:
mit_user = self.mit_user('starnine')