From cda7b2f5393b803f521f45f566b520ebcd9b8d20 Mon Sep 17 00:00:00 2001 From: Aman Agrawal Date: Wed, 10 Jun 2020 17:17:08 +0530 Subject: [PATCH] deletion: Add support for bulk message deletion events. This is designed to have no user-facing change unless the client declares bulk_message_deletion in its client_capabilities. Clients that do so will receive a single bulk event for bulk deletions of messages within a single conversation (topic or PM thread). Backend implementation of #15285. --- templates/zerver/api/changelog.md | 11 +++++ version.py | 2 +- zerver/lib/actions.py | 79 +++++++++++++++---------------- zerver/lib/events.py | 14 ++++-- zerver/openapi/zulip.yaml | 9 ++++ zerver/tests/test_events.py | 4 +- zerver/tests/test_retention.py | 2 +- zerver/tornado/event_queue.py | 59 ++++++++++++++++++----- zerver/tornado/views.py | 5 +- zerver/views/events_register.py | 1 + zerver/views/home.py | 1 + 11 files changed, 124 insertions(+), 63 deletions(-) diff --git a/templates/zerver/api/changelog.md b/templates/zerver/api/changelog.md index 7c46ef6385..d7b227520b 100644 --- a/templates/zerver/api/changelog.md +++ b/templates/zerver/api/changelog.md @@ -10,6 +10,17 @@ below features are supported. ## Changes in Zulip 2.2 +**Feature level 13** + +* [`POST /register`](/api/register-queue): Added + `bulk_message_deletion` to supported `client_capabilities`. +* [`GET /events`](/api/get-events-from-queue): `message_deleted` + events have new behavior. The `sender` and `sender_id` fields were + removed, and the `message_id` field was replaced by a `message_ids` + list for clients with the `bulk_message_deletion` client capability. + All clients should upgrade; we expect `bulk_message_deletion` to be + required in the future. + **Feature level 12** * [`GET users/{user_id}/subscriptions/{stream_id}`](/api/get-subscription-status): diff --git a/version.py b/version.py index 9e0b04898a..6172bd686c 100644 --- a/version.py +++ b/version.py @@ -29,7 +29,7 @@ DESKTOP_WARNING_VERSION = "5.2.0" # # Changes should be accompanied by documentation explaining what the # new level means in templates/zerver/api/changelog.md. -API_FEATURE_LEVEL = 12 +API_FEATURE_LEVEL = 13 # Bump the minor PROVISION_VERSION to indicate that folks should provision # only when going from an old version of the code to a newer version. Bump diff --git a/zerver/lib/actions.py b/zerver/lib/actions.py index bea0355a07..681541df6f 100644 --- a/zerver/lib/actions.py +++ b/zerver/lib/actions.py @@ -251,6 +251,12 @@ STREAM_ASSIGNMENT_COLORS = [ "#9987e1", "#e4523d", "#c2c2c2", "#4f8de4", "#c6a8ad", "#e7cc4d", "#c8bebf", "#a47462"] +def subscriber_info(user_id: int) -> Dict[str, Any]: + return { + 'id': user_id, + 'flags': ['read'] + } + # Store an event in the log for re-importing messages def log_event(event: MutableMapping[str, Any]) -> None: if settings.EVENT_LOG_DIR is None: @@ -4540,12 +4546,6 @@ def do_update_message(user_profile: UserProfile, message: Message, 'flags': um.flags_list(), } - def subscriber_info(user_id: int) -> Dict[str, Any]: - return { - 'id': user_id, - 'flags': ['read'], - } - # The following blocks arranges that users who are subscribed to a # stream and can see history from before they subscribed get # live-update when old messages are edited (e.g. if the user does @@ -4584,50 +4584,45 @@ def do_update_message(user_profile: UserProfile, message: Message, return len(changed_messages) def do_delete_messages(realm: Realm, messages: Iterable[Message]) -> None: + # messages in delete_message event belong to the same topic + # or is a single private message, as any other behaviour is not possible with + # the current callers to this method. + messages = list(messages) message_ids = [message.id for message in messages] if not message_ids: return - usermessages = UserMessage.objects.filter(message_id__in=message_ids) - message_id_to_notifiable_users: Dict[int, List[int]] = {} - for um in usermessages: - if um.message_id not in message_id_to_notifiable_users: - message_id_to_notifiable_users[um.message_id] = [] - message_id_to_notifiable_users[um.message_id].append(um.user_profile_id) + event: Dict[str, Any] = { + 'type': 'delete_message', + 'message_ids': message_ids, + } - events_and_users_to_notify = [] - for message in messages: - message_type = "stream" - if not message.is_stream_message(): - message_type = "private" + sample_message = messages[0] + message_type = "stream" + users_to_notify = [] + if not sample_message.is_stream_message(): + assert len(messages) == 1 + message_type = "private" + ums = UserMessage.objects.filter(message_id__in=message_ids) + users_to_notify = [um.user_profile_id for um in ums] + # TODO: We should plan to remove `sender_id` here. + event['recipient_id'] = sample_message.recipient_id + event['sender_id'] = sample_message.sender_id - event: Dict[str, Any] = { - 'type': 'delete_message', - 'sender': message.sender.email, - 'sender_id': message.sender_id, - 'message_id': message.id, - 'message_type': message_type, - } - if message_type == "stream": - event['stream_id'] = message.recipient.type_id - event['topic'] = message.topic_name() - else: - event['recipient_id'] = message.recipient_id - - # In theory, it's possible for message_id_to_notifiable_users - # to not have a key for the message ID in some weird corner - # case where we've deleted the last user subscribed to the - # target stream before a bot sent a message to it, and thus - # there are no UserMessage objects associated with the - # message. - events_and_users_to_notify.append( - (event, message_id_to_notifiable_users.get(message.id, [])), - ) + if message_type == "stream": + stream_id = sample_message.recipient.type_id + event['stream_id'] = stream_id + event['topic'] = sample_message.topic_name() + subscribers = get_active_subscriptions_for_stream_id(stream_id) + # We exclude long-term idle users, since they by definition have no active clients. + subscribers = subscribers.exclude(user_profile__long_term_idle=True) + subscribers_ids = [user.user_profile_id for user in subscribers] + users_to_notify = list(map(subscriber_info, subscribers_ids)) move_messages_to_archive(message_ids, realm=realm) - for event, users_to_notify in events_and_users_to_notify: - # TODO: Figure out some kind of bulk event that we could send just one of? - send_event(realm, event, users_to_notify) + + event['message_type'] = message_type + send_event(realm, event, users_to_notify) def do_delete_messages_by_sender(user: UserProfile) -> None: message_ids = list(Message.objects.filter(sender=user).values_list('id', flat=True).order_by('id')) diff --git a/zerver/lib/events.py b/zerver/lib/events.py index 17642e1e0d..8b475ae1df 100644 --- a/zerver/lib/events.py +++ b/zerver/lib/events.py @@ -679,6 +679,10 @@ def apply_event(state: Dict[str, Any], if message_id in stream_dict: stream_dict[message_id]['topic'] = topic elif event['type'] == "delete_message": + if 'message_id' in event: + message_ids = [event['message_id']] + else: + message_ids = event['message_ids'] # nocoverage max_message = Message.objects.filter( usermessage__user_profile=user_profile).order_by('-id').first() if max_message: @@ -687,8 +691,8 @@ def apply_event(state: Dict[str, Any], state['max_message_id'] = -1 if 'raw_unread_msgs' in state: - remove_id = event['message_id'] - remove_message_id_from_unread_mgs(state['raw_unread_msgs'], remove_id) + for remove_id in message_ids: + remove_message_id_from_unread_mgs(state['raw_unread_msgs'], remove_id) # The remainder of this block is about maintaining recent_private_conversations if 'raw_recent_private_conversations' not in state or event['message_type'] != 'private': @@ -705,7 +709,7 @@ def apply_event(state: Dict[str, Any], return old_max_message_id = state['raw_recent_private_conversations'][recipient_id]['max_message_id'] - if old_max_message_id != event['message_id']: # nocoverage + if old_max_message_id not in message_ids: # nocoverage return # OK, we just deleted what had been the max_message_id for @@ -839,6 +843,7 @@ def do_events_register(user_profile: UserProfile, user_client: Client, check_supported_events_narrow_filter(narrow) notification_settings_null = client_capabilities.get('notification_settings_null', False) + bulk_message_deletion = client_capabilities.get('bulk_message_deletion', False) if user_profile.realm.email_address_visibility != Realm.EMAIL_ADDRESS_VISIBILITY_EVERYONE: # If real email addresses are not available to the user, their @@ -850,7 +855,8 @@ def do_events_register(user_profile: UserProfile, user_client: Client, queue_id = request_event_queue(user_profile, user_client, apply_markdown, client_gravatar, slim_presence, queue_lifespan_secs, event_types, all_public_streams, - narrow=narrow) + narrow=narrow, + bulk_message_deletion=bulk_message_deletion) if queue_id is None: raise JsonableError(_("Could not allocate event queue")) diff --git a/zerver/openapi/zulip.yaml b/zerver/openapi/zulip.yaml index 7cb3798a7a..36a2941ae5 100644 --- a/zerver/openapi/zulip.yaml +++ b/zerver/openapi/zulip.yaml @@ -3112,6 +3112,15 @@ paths: notification settings for stream messages). New in Zulip 2.1.0; in earlier Zulip releases, stream-level notification settings were simple booleans. + + * `bulk_message_deletion`: Boolean for whether the client's + handler for the `delete_message` event type has been + updated to process the new bulk format (with a + `message_ids`, rather than a singleton `message_id`). + Otherwise, the server will send `delete_message` events + in a loop. New in Zulip 2.2 (feature level 13). This + capability is for backwards-compatibility; it will be + required in a future server release. schema: type: object example: diff --git a/zerver/tests/test_events.py b/zerver/tests/test_events.py index f2a817df4f..260a752ced 100644 --- a/zerver/tests/test_events.py +++ b/zerver/tests/test_events.py @@ -528,6 +528,7 @@ class EventsRegisterTest(ZulipTestCase): last_connection_time = time.time(), narrow = []), ) + # hybrid_state = initial fetch state + re-applying events triggered by our action # normal_state = do action then fetch at the end (the "normal" code path) hybrid_state = fetch_initial_state_data( @@ -2732,8 +2733,6 @@ class EventsRegisterTest(ZulipTestCase): schema_checker = self.check_events_dict([ ('type', equals('delete_message')), ('message_id', check_int), - ('sender', check_string), - ('sender_id', check_int), ('message_type', equals("stream")), ('stream_id', check_int), ('topic', check_string), @@ -2752,7 +2751,6 @@ class EventsRegisterTest(ZulipTestCase): schema_checker = self.check_events_dict([ ('type', equals('delete_message')), ('message_id', check_int), - ('sender', check_string), ('sender_id', check_int), ('message_type', equals("private")), ('recipient_id', check_int), diff --git a/zerver/tests/test_retention.py b/zerver/tests/test_retention.py index 192ab15455..6d8f4c60cc 100644 --- a/zerver/tests/test_retention.py +++ b/zerver/tests/test_retention.py @@ -895,7 +895,7 @@ class TestDoDeleteMessages(ZulipTestCase): with queries_captured() as queries: do_delete_messages(realm, messages) self.assertFalse(Message.objects.filter(id__in=message_ids).exists()) - self.assert_length(queries, 37) + self.assert_length(queries, 18) archived_messages = ArchivedMessage.objects.filter(id__in=message_ids) self.assertEqual(archived_messages.count(), len(message_ids)) diff --git a/zerver/tornado/event_queue.py b/zerver/tornado/event_queue.py index 2aea57b176..6fe5b0de80 100644 --- a/zerver/tornado/event_queue.py +++ b/zerver/tornado/event_queue.py @@ -90,7 +90,8 @@ class ClientDescriptor: slim_presence: bool=False, all_public_streams: bool=False, lifespan_secs: int=0, - narrow: Iterable[Sequence[str]]=[]) -> None: + narrow: Iterable[Sequence[str]]=[], + bulk_message_deletion: bool=False) -> None: # These objects are serialized on shutdown and restored on restart. # If fields are added or semantics are changed, temporary code must be # added to load_event_queues() to update the restored objects. @@ -110,6 +111,7 @@ class ClientDescriptor: self._timeout_handle: Any = None # TODO: should be return type of ioloop.call_later self.narrow = narrow self.narrow_filter = build_narrow_filter(narrow) + self.bulk_message_deletion = bulk_message_deletion # Default for lifespan_secs is DEFAULT_EVENT_QUEUE_TIMEOUT_SECS; # but users can set it as high as MAX_QUEUE_TIMEOUT_SECS. @@ -132,7 +134,8 @@ class ClientDescriptor: slim_presence=self.slim_presence, all_public_streams=self.all_public_streams, narrow=self.narrow, - client_type_name=self.client_type_name) + client_type_name=self.client_type_name, + bulk_message_deletion=self.bulk_message_deletion) def __repr__(self) -> str: return f"ClientDescriptor<{self.event_queue.id}>" @@ -161,6 +164,7 @@ class ClientDescriptor: d['all_public_streams'], d['queue_timeout'], d.get('narrow', []), + d.get('bulk_message_deletion', False), ) ret.last_connection_time = d['last_connection_time'] return ret @@ -604,7 +608,9 @@ def request_event_queue(user_profile: UserProfile, user_client: Client, apply_ma client_gravatar: bool, slim_presence: bool, queue_lifespan_secs: int, event_types: Optional[Iterable[str]]=None, all_public_streams: bool=False, - narrow: Iterable[Sequence[str]]=[]) -> Optional[str]: + narrow: Iterable[Sequence[str]]=[], + bulk_message_deletion: bool=False) -> Optional[str]: + if settings.TORNADO_SERVER: tornado_uri = get_tornado_uri(user_profile.realm) req = {'dont_block': 'true', @@ -617,7 +623,9 @@ def request_event_queue(user_profile: UserProfile, user_client: Client, apply_ma 'user_client': user_client.name, 'narrow': ujson.dumps(narrow), 'secret': settings.SHARED_SECRET, - 'lifespan_secs': queue_lifespan_secs} + 'lifespan_secs': queue_lifespan_secs, + 'bulk_message_deletion': ujson.dumps(bulk_message_deletion)} + if event_types is not None: req['event_types'] = ujson.dumps(event_types) @@ -972,6 +980,29 @@ def process_event(event: Mapping[str, Any], users: Iterable[int]) -> None: if client.accepts_event(event): client.add_event(event) +def process_deletion_event(event: Mapping[str, Any], users: Iterable[int]) -> None: + for user_profile_id in users: + for client in get_client_descriptors_for_user(user_profile_id): + if not client.accepts_event(event): + continue + + # For clients which support message deletion in bulk, we + # send a list of msgs_ids together, otherwise we send a + # delete event for each message. All clients will be + # required to support bulk_message_deletion in the future; + # this logic is intended for backwards-compatibility only. + if client.bulk_message_deletion: + client.add_event(event) + continue + + for message_id in event['message_ids']: + # We use the following rather than event.copy() + # because the read-only Mapping type doesn't support .copy(). + compatibility_event = dict(event) + compatibility_event['message_id'] = message_id + del compatibility_event['message_ids'] + client.add_event(compatibility_event) + def process_message_update_event(event_template: Mapping[str, Any], users: Iterable[Mapping[str, Any]]) -> None: prior_mention_user_ids = set(event_template.get('prior_mention_user_ids', [])) @@ -1089,13 +1120,19 @@ def process_notification(notice: Mapping[str, Any]) -> None: process_message_event(event, cast(Iterable[Mapping[str, Any]], users)) elif event['type'] == "update_message": process_message_update_event(event, cast(Iterable[Mapping[str, Any]], users)) - elif event['type'] == "delete_message" and len(users) > 0 and isinstance(users[0], dict): - # do_delete_messages used to send events with users in dict format {"id": } - # This block is here for compatibility with events in that format still in the queue - # at the time of upgrade. - # TODO: Remove this block in release >= 2.3. - user_ids = [user['id'] for user in cast(Iterable[Mapping[str, int]], users)] - process_event(event, user_ids) + elif event['type'] == "delete_message": + if len(users) > 0 and isinstance(users[0], dict): + # do_delete_messages used to send events with users in + # dict format {"id": } This block is here for + # compatibility with events in that format still in the + # queue at the time of upgrade. + # + # TODO: Remove this block in release >= 2.3. + user_ids: List[int] = [user['id'] for user in + cast(List[Mapping[str, int]], users)] + else: + user_ids = cast(List[int], users) + process_deletion_event(event, user_ids) elif event['type'] == "presence": process_presence_event(event, cast(Iterable[int], users)) else: diff --git a/zerver/tornado/views.py b/zerver/tornado/views.py index 9de1c1d3f2..04fef432dd 100644 --- a/zerver/tornado/views.py +++ b/zerver/tornado/views.py @@ -76,6 +76,8 @@ def get_events_backend(request: HttpRequest, user_profile: UserProfile, intentionally_undocumented=True), lifespan_secs: int=REQ(default=0, converter=to_non_negative_int, intentionally_undocumented=True), + bulk_message_deletion: bool=REQ(default=False, validator=check_bool, + intentionally_undocumented=True) ) -> HttpResponse: # Extract the Tornado handler from the request handler: AsyncDjangoHandler = request._tornado_handler @@ -109,7 +111,8 @@ def get_events_backend(request: HttpRequest, user_profile: UserProfile, all_public_streams = all_public_streams, queue_timeout = lifespan_secs, last_connection_time = time.time(), - narrow = narrow) + narrow = narrow, + bulk_message_deletion = bulk_message_deletion) result = fetch_events(events_query) if "extra_log_data" in result: diff --git a/zerver/views/events_register.py b/zerver/views/events_register.py index cba94e6e9c..d48bd0f821 100644 --- a/zerver/views/events_register.py +++ b/zerver/views/events_register.py @@ -39,6 +39,7 @@ def events_register_backend( ("notification_settings_null", check_bool), ], [ # Any new fields of `client_capabilities` should be optional. Add them here. + ("bulk_message_deletion", check_bool), ]), default=None), event_types: Optional[Iterable[str]]=REQ(validator=check_list(check_string), default=None), fetch_event_types: Optional[Iterable[str]]=REQ(validator=check_list(check_string), default=None), diff --git a/zerver/views/home.py b/zerver/views/home.py index a4765165c5..8a8efb2996 100644 --- a/zerver/views/home.py +++ b/zerver/views/home.py @@ -191,6 +191,7 @@ def home_real(request: HttpRequest) -> HttpResponse: client_capabilities = { 'notification_settings_null': True, + 'bulk_message_deletion': False, } register_ret = do_events_register(user_profile, request.client,