deletion: Add support for bulk message deletion events.

This is designed to have no user-facing change unless the client
declares bulk_message_deletion in its client_capabilities.

Clients that do so will receive a single bulk event for bulk deletions
of messages within a single conversation (topic or PM thread).

Backend implementation of #15285.
This commit is contained in:
Aman Agrawal 2020-06-10 17:17:08 +05:30 committed by Tim Abbott
parent 21ac1fb32c
commit cda7b2f539
11 changed files with 124 additions and 63 deletions

View File

@ -10,6 +10,17 @@ below features are supported.
## Changes in Zulip 2.2 ## Changes in Zulip 2.2
**Feature level 13**
* [`POST /register`](/api/register-queue): Added
`bulk_message_deletion` to supported `client_capabilities`.
* [`GET /events`](/api/get-events-from-queue): `message_deleted`
events have new behavior. The `sender` and `sender_id` fields were
removed, and the `message_id` field was replaced by a `message_ids`
list for clients with the `bulk_message_deletion` client capability.
All clients should upgrade; we expect `bulk_message_deletion` to be
required in the future.
**Feature level 12** **Feature level 12**
* [`GET users/{user_id}/subscriptions/{stream_id}`](/api/get-subscription-status): * [`GET users/{user_id}/subscriptions/{stream_id}`](/api/get-subscription-status):

View File

@ -29,7 +29,7 @@ DESKTOP_WARNING_VERSION = "5.2.0"
# #
# Changes should be accompanied by documentation explaining what the # Changes should be accompanied by documentation explaining what the
# new level means in templates/zerver/api/changelog.md. # new level means in templates/zerver/api/changelog.md.
API_FEATURE_LEVEL = 12 API_FEATURE_LEVEL = 13
# Bump the minor PROVISION_VERSION to indicate that folks should provision # Bump the minor PROVISION_VERSION to indicate that folks should provision
# only when going from an old version of the code to a newer version. Bump # only when going from an old version of the code to a newer version. Bump

View File

@ -251,6 +251,12 @@ STREAM_ASSIGNMENT_COLORS = [
"#9987e1", "#e4523d", "#c2c2c2", "#4f8de4", "#9987e1", "#e4523d", "#c2c2c2", "#4f8de4",
"#c6a8ad", "#e7cc4d", "#c8bebf", "#a47462"] "#c6a8ad", "#e7cc4d", "#c8bebf", "#a47462"]
def subscriber_info(user_id: int) -> Dict[str, Any]:
return {
'id': user_id,
'flags': ['read']
}
# Store an event in the log for re-importing messages # Store an event in the log for re-importing messages
def log_event(event: MutableMapping[str, Any]) -> None: def log_event(event: MutableMapping[str, Any]) -> None:
if settings.EVENT_LOG_DIR is None: if settings.EVENT_LOG_DIR is None:
@ -4540,12 +4546,6 @@ def do_update_message(user_profile: UserProfile, message: Message,
'flags': um.flags_list(), 'flags': um.flags_list(),
} }
def subscriber_info(user_id: int) -> Dict[str, Any]:
return {
'id': user_id,
'flags': ['read'],
}
# The following blocks arranges that users who are subscribed to a # The following blocks arranges that users who are subscribed to a
# stream and can see history from before they subscribed get # stream and can see history from before they subscribed get
# live-update when old messages are edited (e.g. if the user does # live-update when old messages are edited (e.g. if the user does
@ -4584,50 +4584,45 @@ def do_update_message(user_profile: UserProfile, message: Message,
return len(changed_messages) return len(changed_messages)
def do_delete_messages(realm: Realm, messages: Iterable[Message]) -> None: def do_delete_messages(realm: Realm, messages: Iterable[Message]) -> None:
# messages in delete_message event belong to the same topic
# or is a single private message, as any other behaviour is not possible with
# the current callers to this method.
messages = list(messages)
message_ids = [message.id for message in messages] message_ids = [message.id for message in messages]
if not message_ids: if not message_ids:
return return
usermessages = UserMessage.objects.filter(message_id__in=message_ids) event: Dict[str, Any] = {
message_id_to_notifiable_users: Dict[int, List[int]] = {} 'type': 'delete_message',
for um in usermessages: 'message_ids': message_ids,
if um.message_id not in message_id_to_notifiable_users: }
message_id_to_notifiable_users[um.message_id] = []
message_id_to_notifiable_users[um.message_id].append(um.user_profile_id)
events_and_users_to_notify = [] sample_message = messages[0]
for message in messages: message_type = "stream"
message_type = "stream" users_to_notify = []
if not message.is_stream_message(): if not sample_message.is_stream_message():
message_type = "private" assert len(messages) == 1
message_type = "private"
ums = UserMessage.objects.filter(message_id__in=message_ids)
users_to_notify = [um.user_profile_id for um in ums]
# TODO: We should plan to remove `sender_id` here.
event['recipient_id'] = sample_message.recipient_id
event['sender_id'] = sample_message.sender_id
event: Dict[str, Any] = { if message_type == "stream":
'type': 'delete_message', stream_id = sample_message.recipient.type_id
'sender': message.sender.email, event['stream_id'] = stream_id
'sender_id': message.sender_id, event['topic'] = sample_message.topic_name()
'message_id': message.id, subscribers = get_active_subscriptions_for_stream_id(stream_id)
'message_type': message_type, # We exclude long-term idle users, since they by definition have no active clients.
} subscribers = subscribers.exclude(user_profile__long_term_idle=True)
if message_type == "stream": subscribers_ids = [user.user_profile_id for user in subscribers]
event['stream_id'] = message.recipient.type_id users_to_notify = list(map(subscriber_info, subscribers_ids))
event['topic'] = message.topic_name()
else:
event['recipient_id'] = message.recipient_id
# In theory, it's possible for message_id_to_notifiable_users
# to not have a key for the message ID in some weird corner
# case where we've deleted the last user subscribed to the
# target stream before a bot sent a message to it, and thus
# there are no UserMessage objects associated with the
# message.
events_and_users_to_notify.append(
(event, message_id_to_notifiable_users.get(message.id, [])),
)
move_messages_to_archive(message_ids, realm=realm) move_messages_to_archive(message_ids, realm=realm)
for event, users_to_notify in events_and_users_to_notify:
# TODO: Figure out some kind of bulk event that we could send just one of? event['message_type'] = message_type
send_event(realm, event, users_to_notify) send_event(realm, event, users_to_notify)
def do_delete_messages_by_sender(user: UserProfile) -> None: def do_delete_messages_by_sender(user: UserProfile) -> None:
message_ids = list(Message.objects.filter(sender=user).values_list('id', flat=True).order_by('id')) message_ids = list(Message.objects.filter(sender=user).values_list('id', flat=True).order_by('id'))

View File

@ -679,6 +679,10 @@ def apply_event(state: Dict[str, Any],
if message_id in stream_dict: if message_id in stream_dict:
stream_dict[message_id]['topic'] = topic stream_dict[message_id]['topic'] = topic
elif event['type'] == "delete_message": elif event['type'] == "delete_message":
if 'message_id' in event:
message_ids = [event['message_id']]
else:
message_ids = event['message_ids'] # nocoverage
max_message = Message.objects.filter( max_message = Message.objects.filter(
usermessage__user_profile=user_profile).order_by('-id').first() usermessage__user_profile=user_profile).order_by('-id').first()
if max_message: if max_message:
@ -687,8 +691,8 @@ def apply_event(state: Dict[str, Any],
state['max_message_id'] = -1 state['max_message_id'] = -1
if 'raw_unread_msgs' in state: if 'raw_unread_msgs' in state:
remove_id = event['message_id'] for remove_id in message_ids:
remove_message_id_from_unread_mgs(state['raw_unread_msgs'], remove_id) remove_message_id_from_unread_mgs(state['raw_unread_msgs'], remove_id)
# The remainder of this block is about maintaining recent_private_conversations # The remainder of this block is about maintaining recent_private_conversations
if 'raw_recent_private_conversations' not in state or event['message_type'] != 'private': if 'raw_recent_private_conversations' not in state or event['message_type'] != 'private':
@ -705,7 +709,7 @@ def apply_event(state: Dict[str, Any],
return return
old_max_message_id = state['raw_recent_private_conversations'][recipient_id]['max_message_id'] old_max_message_id = state['raw_recent_private_conversations'][recipient_id]['max_message_id']
if old_max_message_id != event['message_id']: # nocoverage if old_max_message_id not in message_ids: # nocoverage
return return
# OK, we just deleted what had been the max_message_id for # OK, we just deleted what had been the max_message_id for
@ -839,6 +843,7 @@ def do_events_register(user_profile: UserProfile, user_client: Client,
check_supported_events_narrow_filter(narrow) check_supported_events_narrow_filter(narrow)
notification_settings_null = client_capabilities.get('notification_settings_null', False) notification_settings_null = client_capabilities.get('notification_settings_null', False)
bulk_message_deletion = client_capabilities.get('bulk_message_deletion', False)
if user_profile.realm.email_address_visibility != Realm.EMAIL_ADDRESS_VISIBILITY_EVERYONE: if user_profile.realm.email_address_visibility != Realm.EMAIL_ADDRESS_VISIBILITY_EVERYONE:
# If real email addresses are not available to the user, their # If real email addresses are not available to the user, their
@ -850,7 +855,8 @@ def do_events_register(user_profile: UserProfile, user_client: Client,
queue_id = request_event_queue(user_profile, user_client, queue_id = request_event_queue(user_profile, user_client,
apply_markdown, client_gravatar, slim_presence, apply_markdown, client_gravatar, slim_presence,
queue_lifespan_secs, event_types, all_public_streams, queue_lifespan_secs, event_types, all_public_streams,
narrow=narrow) narrow=narrow,
bulk_message_deletion=bulk_message_deletion)
if queue_id is None: if queue_id is None:
raise JsonableError(_("Could not allocate event queue")) raise JsonableError(_("Could not allocate event queue"))

View File

@ -3112,6 +3112,15 @@ paths:
notification settings for stream messages). New in Zulip notification settings for stream messages). New in Zulip
2.1.0; in earlier Zulip releases, stream-level 2.1.0; in earlier Zulip releases, stream-level
notification settings were simple booleans. notification settings were simple booleans.
* `bulk_message_deletion`: Boolean for whether the client's
handler for the `delete_message` event type has been
updated to process the new bulk format (with a
`message_ids`, rather than a singleton `message_id`).
Otherwise, the server will send `delete_message` events
in a loop. New in Zulip 2.2 (feature level 13). This
capability is for backwards-compatibility; it will be
required in a future server release.
schema: schema:
type: object type: object
example: example:

View File

@ -528,6 +528,7 @@ class EventsRegisterTest(ZulipTestCase):
last_connection_time = time.time(), last_connection_time = time.time(),
narrow = []), narrow = []),
) )
# hybrid_state = initial fetch state + re-applying events triggered by our action # hybrid_state = initial fetch state + re-applying events triggered by our action
# normal_state = do action then fetch at the end (the "normal" code path) # normal_state = do action then fetch at the end (the "normal" code path)
hybrid_state = fetch_initial_state_data( hybrid_state = fetch_initial_state_data(
@ -2732,8 +2733,6 @@ class EventsRegisterTest(ZulipTestCase):
schema_checker = self.check_events_dict([ schema_checker = self.check_events_dict([
('type', equals('delete_message')), ('type', equals('delete_message')),
('message_id', check_int), ('message_id', check_int),
('sender', check_string),
('sender_id', check_int),
('message_type', equals("stream")), ('message_type', equals("stream")),
('stream_id', check_int), ('stream_id', check_int),
('topic', check_string), ('topic', check_string),
@ -2752,7 +2751,6 @@ class EventsRegisterTest(ZulipTestCase):
schema_checker = self.check_events_dict([ schema_checker = self.check_events_dict([
('type', equals('delete_message')), ('type', equals('delete_message')),
('message_id', check_int), ('message_id', check_int),
('sender', check_string),
('sender_id', check_int), ('sender_id', check_int),
('message_type', equals("private")), ('message_type', equals("private")),
('recipient_id', check_int), ('recipient_id', check_int),

View File

@ -895,7 +895,7 @@ class TestDoDeleteMessages(ZulipTestCase):
with queries_captured() as queries: with queries_captured() as queries:
do_delete_messages(realm, messages) do_delete_messages(realm, messages)
self.assertFalse(Message.objects.filter(id__in=message_ids).exists()) self.assertFalse(Message.objects.filter(id__in=message_ids).exists())
self.assert_length(queries, 37) self.assert_length(queries, 18)
archived_messages = ArchivedMessage.objects.filter(id__in=message_ids) archived_messages = ArchivedMessage.objects.filter(id__in=message_ids)
self.assertEqual(archived_messages.count(), len(message_ids)) self.assertEqual(archived_messages.count(), len(message_ids))

View File

@ -90,7 +90,8 @@ class ClientDescriptor:
slim_presence: bool=False, slim_presence: bool=False,
all_public_streams: bool=False, all_public_streams: bool=False,
lifespan_secs: int=0, lifespan_secs: int=0,
narrow: Iterable[Sequence[str]]=[]) -> None: narrow: Iterable[Sequence[str]]=[],
bulk_message_deletion: bool=False) -> None:
# These objects are serialized on shutdown and restored on restart. # These objects are serialized on shutdown and restored on restart.
# If fields are added or semantics are changed, temporary code must be # If fields are added or semantics are changed, temporary code must be
# added to load_event_queues() to update the restored objects. # added to load_event_queues() to update the restored objects.
@ -110,6 +111,7 @@ class ClientDescriptor:
self._timeout_handle: Any = None # TODO: should be return type of ioloop.call_later self._timeout_handle: Any = None # TODO: should be return type of ioloop.call_later
self.narrow = narrow self.narrow = narrow
self.narrow_filter = build_narrow_filter(narrow) self.narrow_filter = build_narrow_filter(narrow)
self.bulk_message_deletion = bulk_message_deletion
# Default for lifespan_secs is DEFAULT_EVENT_QUEUE_TIMEOUT_SECS; # Default for lifespan_secs is DEFAULT_EVENT_QUEUE_TIMEOUT_SECS;
# but users can set it as high as MAX_QUEUE_TIMEOUT_SECS. # but users can set it as high as MAX_QUEUE_TIMEOUT_SECS.
@ -132,7 +134,8 @@ class ClientDescriptor:
slim_presence=self.slim_presence, slim_presence=self.slim_presence,
all_public_streams=self.all_public_streams, all_public_streams=self.all_public_streams,
narrow=self.narrow, narrow=self.narrow,
client_type_name=self.client_type_name) client_type_name=self.client_type_name,
bulk_message_deletion=self.bulk_message_deletion)
def __repr__(self) -> str: def __repr__(self) -> str:
return f"ClientDescriptor<{self.event_queue.id}>" return f"ClientDescriptor<{self.event_queue.id}>"
@ -161,6 +164,7 @@ class ClientDescriptor:
d['all_public_streams'], d['all_public_streams'],
d['queue_timeout'], d['queue_timeout'],
d.get('narrow', []), d.get('narrow', []),
d.get('bulk_message_deletion', False),
) )
ret.last_connection_time = d['last_connection_time'] ret.last_connection_time = d['last_connection_time']
return ret return ret
@ -604,7 +608,9 @@ def request_event_queue(user_profile: UserProfile, user_client: Client, apply_ma
client_gravatar: bool, slim_presence: bool, queue_lifespan_secs: int, client_gravatar: bool, slim_presence: bool, queue_lifespan_secs: int,
event_types: Optional[Iterable[str]]=None, event_types: Optional[Iterable[str]]=None,
all_public_streams: bool=False, all_public_streams: bool=False,
narrow: Iterable[Sequence[str]]=[]) -> Optional[str]: narrow: Iterable[Sequence[str]]=[],
bulk_message_deletion: bool=False) -> Optional[str]:
if settings.TORNADO_SERVER: if settings.TORNADO_SERVER:
tornado_uri = get_tornado_uri(user_profile.realm) tornado_uri = get_tornado_uri(user_profile.realm)
req = {'dont_block': 'true', req = {'dont_block': 'true',
@ -617,7 +623,9 @@ def request_event_queue(user_profile: UserProfile, user_client: Client, apply_ma
'user_client': user_client.name, 'user_client': user_client.name,
'narrow': ujson.dumps(narrow), 'narrow': ujson.dumps(narrow),
'secret': settings.SHARED_SECRET, 'secret': settings.SHARED_SECRET,
'lifespan_secs': queue_lifespan_secs} 'lifespan_secs': queue_lifespan_secs,
'bulk_message_deletion': ujson.dumps(bulk_message_deletion)}
if event_types is not None: if event_types is not None:
req['event_types'] = ujson.dumps(event_types) req['event_types'] = ujson.dumps(event_types)
@ -972,6 +980,29 @@ def process_event(event: Mapping[str, Any], users: Iterable[int]) -> None:
if client.accepts_event(event): if client.accepts_event(event):
client.add_event(event) client.add_event(event)
def process_deletion_event(event: Mapping[str, Any], users: Iterable[int]) -> None:
for user_profile_id in users:
for client in get_client_descriptors_for_user(user_profile_id):
if not client.accepts_event(event):
continue
# For clients which support message deletion in bulk, we
# send a list of msgs_ids together, otherwise we send a
# delete event for each message. All clients will be
# required to support bulk_message_deletion in the future;
# this logic is intended for backwards-compatibility only.
if client.bulk_message_deletion:
client.add_event(event)
continue
for message_id in event['message_ids']:
# We use the following rather than event.copy()
# because the read-only Mapping type doesn't support .copy().
compatibility_event = dict(event)
compatibility_event['message_id'] = message_id
del compatibility_event['message_ids']
client.add_event(compatibility_event)
def process_message_update_event(event_template: Mapping[str, Any], def process_message_update_event(event_template: Mapping[str, Any],
users: Iterable[Mapping[str, Any]]) -> None: users: Iterable[Mapping[str, Any]]) -> None:
prior_mention_user_ids = set(event_template.get('prior_mention_user_ids', [])) prior_mention_user_ids = set(event_template.get('prior_mention_user_ids', []))
@ -1089,13 +1120,19 @@ def process_notification(notice: Mapping[str, Any]) -> None:
process_message_event(event, cast(Iterable[Mapping[str, Any]], users)) process_message_event(event, cast(Iterable[Mapping[str, Any]], users))
elif event['type'] == "update_message": elif event['type'] == "update_message":
process_message_update_event(event, cast(Iterable[Mapping[str, Any]], users)) process_message_update_event(event, cast(Iterable[Mapping[str, Any]], users))
elif event['type'] == "delete_message" and len(users) > 0 and isinstance(users[0], dict): elif event['type'] == "delete_message":
# do_delete_messages used to send events with users in dict format {"id": <int>} if len(users) > 0 and isinstance(users[0], dict):
# This block is here for compatibility with events in that format still in the queue # do_delete_messages used to send events with users in
# at the time of upgrade. # dict format {"id": <int>} This block is here for
# TODO: Remove this block in release >= 2.3. # compatibility with events in that format still in the
user_ids = [user['id'] for user in cast(Iterable[Mapping[str, int]], users)] # queue at the time of upgrade.
process_event(event, user_ids) #
# TODO: Remove this block in release >= 2.3.
user_ids: List[int] = [user['id'] for user in
cast(List[Mapping[str, int]], users)]
else:
user_ids = cast(List[int], users)
process_deletion_event(event, user_ids)
elif event['type'] == "presence": elif event['type'] == "presence":
process_presence_event(event, cast(Iterable[int], users)) process_presence_event(event, cast(Iterable[int], users))
else: else:

View File

@ -76,6 +76,8 @@ def get_events_backend(request: HttpRequest, user_profile: UserProfile,
intentionally_undocumented=True), intentionally_undocumented=True),
lifespan_secs: int=REQ(default=0, converter=to_non_negative_int, lifespan_secs: int=REQ(default=0, converter=to_non_negative_int,
intentionally_undocumented=True), intentionally_undocumented=True),
bulk_message_deletion: bool=REQ(default=False, validator=check_bool,
intentionally_undocumented=True)
) -> HttpResponse: ) -> HttpResponse:
# Extract the Tornado handler from the request # Extract the Tornado handler from the request
handler: AsyncDjangoHandler = request._tornado_handler handler: AsyncDjangoHandler = request._tornado_handler
@ -109,7 +111,8 @@ def get_events_backend(request: HttpRequest, user_profile: UserProfile,
all_public_streams = all_public_streams, all_public_streams = all_public_streams,
queue_timeout = lifespan_secs, queue_timeout = lifespan_secs,
last_connection_time = time.time(), last_connection_time = time.time(),
narrow = narrow) narrow = narrow,
bulk_message_deletion = bulk_message_deletion)
result = fetch_events(events_query) result = fetch_events(events_query)
if "extra_log_data" in result: if "extra_log_data" in result:

View File

@ -39,6 +39,7 @@ def events_register_backend(
("notification_settings_null", check_bool), ("notification_settings_null", check_bool),
], [ ], [
# Any new fields of `client_capabilities` should be optional. Add them here. # Any new fields of `client_capabilities` should be optional. Add them here.
("bulk_message_deletion", check_bool),
]), default=None), ]), default=None),
event_types: Optional[Iterable[str]]=REQ(validator=check_list(check_string), default=None), event_types: Optional[Iterable[str]]=REQ(validator=check_list(check_string), default=None),
fetch_event_types: Optional[Iterable[str]]=REQ(validator=check_list(check_string), default=None), fetch_event_types: Optional[Iterable[str]]=REQ(validator=check_list(check_string), default=None),

View File

@ -191,6 +191,7 @@ def home_real(request: HttpRequest) -> HttpResponse:
client_capabilities = { client_capabilities = {
'notification_settings_null': True, 'notification_settings_null': True,
'bulk_message_deletion': False,
} }
register_ret = do_events_register(user_profile, request.client, register_ret = do_events_register(user_profile, request.client,