diff --git a/frontend_tests/node_tests/dispatch_subs.js b/frontend_tests/node_tests/dispatch_subs.js index 4dd573dfcd..5c439afcfe 100644 --- a/frontend_tests/node_tests/dispatch_subs.js +++ b/frontend_tests/node_tests/dispatch_subs.js @@ -50,7 +50,7 @@ test("peer add/remove", (override) => { stream_data.add_sub({ name: "devel", - stream_id: event.stream_id, + stream_id: event.stream_ids[0], }); const subs_stub = global.make_stub(); @@ -111,11 +111,13 @@ test("add error handling", (override) => { }); }); -test("peer event error handling (bad stream_ids)", () => { +test("peer event error handling (bad stream_ids)", (override) => { + override("compose_fade.update_faded_users", () => {}); + const add_event = { type: "subscription", op: "peer_add", - stream_id: 99999, + stream_ids: [99999], }; blueslip.expect("warn", "Cannot find stream for peer_add: 99999"); @@ -125,7 +127,7 @@ test("peer event error handling (bad stream_ids)", () => { const remove_event = { type: "subscription", op: "peer_remove", - stream_id: 99999, + stream_ids: [99999], }; blueslip.expect("warn", "Cannot find stream for peer_remove: 99999"); @@ -133,6 +135,9 @@ test("peer event error handling (bad stream_ids)", () => { }); test("peer event error handling (add_subscriber)", (override) => { + override("compose_fade.update_faded_users", () => {}); + override("subs.update_subscribers_ui", () => {}); + stream_data.add_sub({ name: "devel", stream_id: 1, @@ -143,8 +148,8 @@ test("peer event error handling (add_subscriber)", (override) => { const add_event = { type: "subscription", op: "peer_add", - stream_id: 1, - user_id: 99999, // id is irrelevant + stream_ids: [1], + user_ids: [99999], // id is irrelevant }; blueslip.expect("warn", "Cannot process peer_add event"); @@ -156,8 +161,8 @@ test("peer event error handling (add_subscriber)", (override) => { const remove_event = { type: "subscription", op: "peer_remove", - stream_id: 1, - user_id: 99999, // id is irrelevant + stream_ids: [1], + user_ids: [99999], // id is irrelevant }; blueslip.expect("warn", "Cannot process peer_remove event."); diff --git a/frontend_tests/node_tests/lib/events.js b/frontend_tests/node_tests/lib/events.js index 824784d4e8..25526de894 100644 --- a/frontend_tests/node_tests/lib/events.js +++ b/frontend_tests/node_tests/lib/events.js @@ -537,15 +537,15 @@ exports.fixtures = { subscription__peer_add: { type: "subscription", op: "peer_add", - user_id: test_user.user_id, - stream_id: 42, + user_ids: [test_user.user_id], + stream_ids: [streams.devel.stream_id], }, subscription__peer_remove: { type: "subscription", op: "peer_remove", - user_id: test_user.user_id, - stream_id: 42, + user_ids: [test_user.user_id], + stream_ids: [streams.devel.stream_id], }, subscription__remove: { diff --git a/static/js/server_events_dispatch.js b/static/js/server_events_dispatch.js index 6a9f1fdb4e..ff32476212 100644 --- a/static/js/server_events_dispatch.js +++ b/static/js/server_events_dispatch.js @@ -343,7 +343,7 @@ exports.dispatch_normal_event = function dispatch_normal_event(event) { } } } else if (event.op === "peer_add") { - function add_peer(stream_id, user_id) { + event.stream_ids.forEach((stream_id) => { const sub = stream_data.get_sub_by_id(stream_id); if (!sub) { @@ -351,17 +351,18 @@ exports.dispatch_normal_event = function dispatch_normal_event(event) { return; } - if (!stream_data.add_subscriber(stream_id, user_id)) { - blueslip.warn("Cannot process peer_add event"); - return; - } + event.user_ids.forEach((user_id) => { + if (!stream_data.add_subscriber(stream_id, user_id)) { + blueslip.warn("Cannot process peer_add event"); + return; + } + }); subs.update_subscribers_ui(sub); - compose_fade.update_faded_users(); - } - add_peer(event.stream_id, event.user_id); + }); + compose_fade.update_faded_users(); } else if (event.op === "peer_remove") { - function remove_peer(stream_id, user_id) { + event.stream_ids.forEach((stream_id) => { const sub = stream_data.get_sub_by_id(stream_id); if (!sub) { @@ -369,15 +370,16 @@ exports.dispatch_normal_event = function dispatch_normal_event(event) { return; } - if (!stream_data.remove_subscriber(sub.stream_id, user_id)) { - blueslip.warn("Cannot process peer_remove event."); - return; - } + event.user_ids.forEach((user_id) => { + if (!stream_data.remove_subscriber(sub.stream_id, user_id)) { + blueslip.warn("Cannot process peer_remove event."); + return; + } + }); subs.update_subscribers_ui(sub); - compose_fade.update_faded_users(); - } - remove_peer(event.stream_id, event.user_id); + }); + compose_fade.update_faded_users(); } else if (event.op === "remove") { for (const rec of event.subscriptions) { const sub = stream_data.get_sub_by_id(rec.stream_id); diff --git a/zerver/lib/actions.py b/zerver/lib/actions.py index e2bf6c82bd..d4cc19ba0d 100644 --- a/zerver/lib/actions.py +++ b/zerver/lib/actions.py @@ -2966,14 +2966,13 @@ def send_peer_subscriber_events( peer_user_ids = private_peer_dict[stream_id] - altered_user_ids if peer_user_ids: - for new_user_id in altered_user_ids: - event = dict( - type="subscription", - op=op, - stream_id=stream_id, - user_id=new_user_id, - ) - send_event(realm, event, peer_user_ids) + event = dict( + type="subscription", + op=op, + stream_ids=[stream_id], + user_ids=sorted(list(altered_user_ids)) + ) + send_event(realm, event, peer_user_ids) public_stream_ids = [ stream_id for stream_id in altered_user_dict @@ -2984,19 +2983,25 @@ def send_peer_subscriber_events( if public_stream_ids: public_peer_ids = set(active_non_guest_user_ids(realm.id)) + # TODO: + # + # We eventually want a special optimization for a single user that + # subscribes to many streams. Right now we optimize for the other + # scenario, which is also kind of common--if we add/remove multiple + # users all for the same stream, we just send one event per stream. + for stream_id in public_stream_ids: altered_user_ids = altered_user_dict[stream_id] peer_user_ids = public_peer_ids - altered_user_ids if peer_user_ids: - for new_user_id in altered_user_ids: - event = dict( - type="subscription", - op=op, - stream_id=stream_id, - user_id=new_user_id, - ) - send_event(realm, event, peer_user_ids) + event = dict( + type="subscription", + op=op, + stream_ids=[stream_id], + user_ids=sorted(list(altered_user_ids)), + ) + send_event(realm, event, peer_user_ids) def send_peer_remove_events( realm: Realm, diff --git a/zerver/lib/event_schema.py b/zerver/lib/event_schema.py index aea61f25f6..8ff7d5fff3 100644 --- a/zerver/lib/event_schema.py +++ b/zerver/lib/event_schema.py @@ -1184,8 +1184,8 @@ subscription_peer_add_event = event_dict_type( required_keys=[ ("type", Equals("subscription")), ("op", Equals("peer_add")), - ("user_id", int), - ("stream_id", int), + ("user_ids", ListType(int)), + ("stream_ids", ListType(int)), ] ) check_subscription_peer_add = make_checker(subscription_peer_add_event) @@ -1194,8 +1194,8 @@ subscription_peer_remove_event = event_dict_type( required_keys=[ ("type", Equals("subscription")), ("op", Equals("peer_remove")), - ("user_id", int), - ("stream_id", int), + ("user_ids", ListType(int)), + ("stream_ids", ListType(int)), ] ) check_subscription_peer_remove = make_checker(subscription_peer_remove_event) diff --git a/zerver/lib/events.py b/zerver/lib/events.py index dec25fe42d..7ee60af21b 100644 --- a/zerver/lib/events.py +++ b/zerver/lib/events.py @@ -717,23 +717,21 @@ def apply_event(state: Dict[str, Any], if sub['name'].lower() == event['name'].lower(): sub[event['property']] = event['value'] elif event['op'] == 'peer_add': - stream_id = event['stream_id'] - user_id = event['user_id'] - for sub in state['subscriptions']: - if (sub['stream_id'] == stream_id and - user_id not in sub['subscribers']): - sub['subscribers'].append(user_id) - for sub in state['never_subscribed']: - if (sub['stream_id'] == stream_id and - user_id not in sub['subscribers']): - sub['subscribers'].append(user_id) + stream_ids = set(event["stream_ids"]) + user_ids = set(event["user_ids"]) + for sub_dict in [state["subscriptions"], state["never_subscribed"]]: + for sub in sub_dict: + if sub["stream_id"] in stream_ids: + subscribers = set(sub["subscribers"]) | user_ids + sub["subscribers"] = sorted(list(subscribers)) elif event['op'] == 'peer_remove': - stream_id = event['stream_id'] - user_id = event['user_id'] - for sub in state['subscriptions']: - if (sub['stream_id'] == stream_id and - user_id in sub['subscribers']): - sub['subscribers'].remove(user_id) + stream_ids = set(event["stream_ids"]) + user_ids = set(event["user_ids"]) + for sub_dict in [state["subscriptions"]]: + for sub in sub_dict: + if sub["stream_id"] in stream_ids: + subscribers = set(sub["subscribers"]) - user_ids + sub["subscribers"] = sorted(list(subscribers)) elif event['type'] == "presence": if slim_presence: user_key = str(event['user_id']) diff --git a/zerver/openapi/zulip.yaml b/zerver/openapi/zulip.yaml index 708493a547..9cb960f3cb 100644 --- a/zerver/openapi/zulip.yaml +++ b/zerver/openapi/zulip.yaml @@ -589,8 +589,8 @@ paths: } - type: object description: | - Event sent to other users when a user has been subscribed to a - stream. Sent to all users if the stream is public or to only + Event sent to other users when users have been subscribed to + streams. Sent to all users if the stream is public or to only the existing subscribers if the stream is private. properties: id: @@ -603,14 +603,18 @@ paths: type: string enum: - peer_add - stream_id: - type: integer + stream_ids: + type: array description: | - The ID of the stream to which the user has subscribed. - user_id: - type: integer + The IDs of the streams to which the user has subscribed. + items: + type: integer + user_ids: + type: array description: | - The ID of the user who subscribed. + The IDs of the users who subscribed. + items: + type: integer additionalProperties: false example: { @@ -622,8 +626,8 @@ paths: } - type: object description: | - Event sent to other users when a user has been unsubscribed - from a stream. Sent to all users if the stream is public or to only + Event sent to other users when users have been unsubscribed + from streams. Sent to all users if the stream is public or to only the existing subscribers if the stream is private. properties: id: @@ -636,15 +640,19 @@ paths: type: string enum: - peer_remove - stream_id: - type: integer + stream_ids: + type: array description: | - The ID of the stream from which the user has been + The IDs of the streams from which the users have been unsubscribed from. - user_id: - type: integer + items: + type: integer + user_ids: + type: array description: | - The ID of the user who has been unsubscribed. + The IDs of the users who have been unsubscribed. + items: + type: integer additionalProperties: false example: { diff --git a/zerver/tests/test_subs.py b/zerver/tests/test_subs.py index c63be7357f..cf9e395e39 100644 --- a/zerver/tests/test_subs.py +++ b/zerver/tests/test_subs.py @@ -2973,7 +2973,7 @@ class SubscriptionAPITest(ZulipTestCase): ) self.assert_length(queries, 37) - self.assert_length(events, 6) + self.assert_length(events, 5) for ev in [x for x in events if x['event']['type'] not in ('message', 'stream')]: if ev['event']['op'] == 'add': self.assertEqual( @@ -3014,7 +3014,7 @@ class SubscriptionAPITest(ZulipTestCase): self.assertEqual(len(add_peer_event['users']), 11) self.assertEqual(add_peer_event['event']['type'], 'subscription') self.assertEqual(add_peer_event['event']['op'], 'peer_add') - self.assertEqual(add_peer_event['event']['user_id'], self.user_profile.id) + self.assertEqual(add_peer_event['event']['user_ids'], [self.user_profile.id]) stream = get_stream('multi_user_stream', realm) self.assertEqual(num_subscribers_for_stream_id(stream.id), 3) @@ -3046,7 +3046,7 @@ class SubscriptionAPITest(ZulipTestCase): self.assertEqual(len(add_peer_event['users']), 11) self.assertEqual(add_peer_event['event']['type'], 'subscription') self.assertEqual(add_peer_event['event']['op'], 'peer_add') - self.assertEqual(add_peer_event['event']['user_id'], user_profile.id) + self.assertEqual(add_peer_event['event']['user_ids'], [user_profile.id]) def test_private_stream_subscription(self) -> None: realm = get_realm("zulip") @@ -3087,7 +3087,7 @@ class SubscriptionAPITest(ZulipTestCase): self.assertEqual(len(add_peer_event['users']), 3) self.assertEqual(add_peer_event['event']['type'], 'subscription') self.assertEqual(add_peer_event['event']['op'], 'peer_add') - self.assertEqual(add_peer_event['event']['user_id'], user_profile.id) + self.assertEqual(add_peer_event['event']['user_ids'], [user_profile.id]) # Do not send stream creation event to realm admin users # even if realm admin is subscribed to stream cause realm admin already get @@ -3109,7 +3109,7 @@ class SubscriptionAPITest(ZulipTestCase): self.assertEqual(len(add_peer_event['users']), 1) self.assertEqual(add_peer_event['event']['type'], 'subscription') self.assertEqual(add_peer_event['event']['op'], 'peer_add') - self.assertEqual(add_peer_event['event']['user_id'], self.example_user("iago").id) + self.assertEqual(add_peer_event['event']['user_ids'], [self.example_user("iago").id]) def test_subscribe_to_stream_post_policy_admins_stream(self) -> None: """ @@ -3211,18 +3211,19 @@ class SubscriptionAPITest(ZulipTestCase): dict(principals=orjson.dumps(new_user_ids_to_subscribe).decode()), ) - add_peer_events = [events[2], events[3]] - for add_peer_event in add_peer_events: - self.assertEqual(add_peer_event['event']['type'], 'subscription') - self.assertEqual(add_peer_event['event']['op'], 'peer_add') - event_sent_to_ids = add_peer_event['users'] - for user_id in new_user_ids_to_subscribe: - # Make sure new users subscribed to stream is not in - # peer_add event recipient list - self.assertNotIn(user_id, event_sent_to_ids) - for old_user in orig_user_ids_to_subscribe: - # Check non new users are in peer_add event recipient list. - self.assertIn(old_user, event_sent_to_ids) + add_peer_events = [event for event in events if event["event"].get("op") == "peer_add"] + (add_peer_event,) = add_peer_events + + self.assertEqual(add_peer_event["event"]["type"], "subscription") + self.assertEqual(add_peer_event["event"]["op"], "peer_add") + event_sent_to_ids = add_peer_event["users"] + for user_id in new_user_ids_to_subscribe: + # Make sure new users subscribed to stream is not in + # peer_add event recipient list + self.assertNotIn(user_id, event_sent_to_ids) + for old_user in orig_user_ids_to_subscribe: + # Check non new users are in peer_add event recipient list. + self.assertIn(old_user, event_sent_to_ids) def test_users_getting_remove_peer_event(self) -> None: """ @@ -3233,6 +3234,7 @@ class SubscriptionAPITest(ZulipTestCase): user3 = self.example_user("hamlet") user4 = self.example_user("iago") user5 = self.example_user("AARON") + guest = self.example_user("polonius") stream1 = self.make_stream('stream1') stream2 = self.make_stream('stream2') @@ -3266,49 +3268,36 @@ class SubscriptionAPITest(ZulipTestCase): peer_events = [e for e in events if e['event'].get('op') == 'peer_remove'] - notifications = set() + # We only care about a subset of users when we inspect + # peer_remove events. + our_user_ids = { + user1.id, + user2.id, + user3.id, + user4.id, + user5.id, + guest.id, + } + + notifications = [] for event in peer_events: - for user_id in event['users']: - stream_id = event['event']['stream_id'] - stream_name = Stream.objects.get(id=stream_id).name - removed_user_id = event['event']['user_id'] - notifications.add((user_id, removed_user_id, stream_name)) + (stream_id,) = event['event']['stream_ids'] + stream_name = Stream.objects.get(id=stream_id).name + removed_user_ids = set(event['event']['user_ids']) + notified_user_ids = set(event['users']) & our_user_ids + notifications.append((stream_name, removed_user_ids, notified_user_ids)) - # POSITIVE CASES FIRST - self.assertIn((user3.id, user1.id, 'stream1'), notifications) - self.assertIn((user4.id, user1.id, 'stream1'), notifications) + notifications.sort(key=lambda tup: tup[0]) - self.assertIn((user3.id, user2.id, 'stream1'), notifications) - self.assertIn((user4.id, user2.id, 'stream1'), notifications) - - self.assertIn((user1.id, user2.id, 'stream2'), notifications) - self.assertIn((user3.id, user2.id, 'stream2'), notifications) - self.assertIn((user4.id, user2.id, 'stream2'), notifications) - - self.assertIn((user1.id, user2.id, 'stream3'), notifications) - self.assertIn((user3.id, user2.id, 'stream3'), notifications) - self.assertIn((user4.id, user2.id, 'stream3'), notifications) - - self.assertIn((user3.id, user1.id, 'private_stream'), notifications) - self.assertIn((user3.id, user2.id, 'private_stream'), notifications) - - self.assertIn((user4.id, user1.id, 'private_stream'), notifications) - self.assertIn((user4.id, user2.id, 'private_stream'), notifications) - - # NEGATIVE - - # don't be notified if you are being removed yourself - self.assertNotIn((user1.id, user1.id, 'stream1'), notifications) - - # don't send false notifications for folks that weren't actually - # subscribed int he first place - self.assertNotIn((user3.id, user1.id, 'stream2'), notifications) - - # don't send notifications for random people - self.assertNotIn((user3.id, user4.id, 'stream2'), notifications) - - # don't send notifications to unsubscribed non realm admin users for private streams - self.assertNotIn((user5.id, user1.id, 'private_stream'), notifications) + self.assertEqual( + notifications, + [ + ("private_stream", {user1.id, user2.id}, {user3.id, user4.id}), + ("stream1", {user1.id, user2.id}, {user3.id, user4.id, user5.id}), + ("stream2", {user2.id}, {user1.id, user3.id, user4.id, user5.id}), + ("stream3", {user2.id}, {user1.id, user3.id, user4.id, user5.id}), + ], + ) def test_bulk_subscribe_MIT(self) -> None: mit_user = self.mit_user('starnine')