From eb4e9fe1e73067de3cfc3b95b061763815169ec6 Mon Sep 17 00:00:00 2001 From: Anders Kaseorg Date: Thu, 14 Apr 2022 14:54:53 -0700 Subject: [PATCH] actions: Split out zerver.actions.message_flags. Signed-off-by: Anders Kaseorg --- analytics/tests/test_counts.py | 8 +- zerver/actions/message_flags.py | 287 ++++++++++++++++++++++++ zerver/lib/actions.py | 283 +---------------------- zerver/tests/test_events.py | 2 +- zerver/tests/test_message_flags.py | 2 +- zerver/tests/test_push_notifications.py | 9 +- zerver/views/message_flags.py | 2 +- zerver/worker/queue_processors.py | 3 +- 8 files changed, 304 insertions(+), 292 deletions(-) create mode 100644 zerver/actions/message_flags.py diff --git a/analytics/tests/test_counts.py b/analytics/tests/test_counts.py index cb9c8fbafe..cfab734b43 100644 --- a/analytics/tests/test_counts.py +++ b/analytics/tests/test_counts.py @@ -42,14 +42,14 @@ from zerver.actions.invites import ( do_resend_user_invite_email, do_revoke_user_invite, ) -from zerver.actions.user_activity import update_user_activity_interval -from zerver.actions.users import do_deactivate_user -from zerver.lib.actions import ( - do_create_realm, +from zerver.actions.message_flags import ( do_mark_all_as_read, do_mark_stream_messages_as_read, do_update_message_flags, ) +from zerver.actions.user_activity import update_user_activity_interval +from zerver.actions.users import do_deactivate_user +from zerver.lib.actions import do_create_realm from zerver.lib.create_user import create_user from zerver.lib.exceptions import InvitationError from zerver.lib.test_classes import ZulipTestCase diff --git a/zerver/actions/message_flags.py b/zerver/actions/message_flags.py new file mode 100644 index 0000000000..ad3666d65d --- /dev/null +++ b/zerver/actions/message_flags.py @@ -0,0 +1,287 @@ +from collections import defaultdict +from dataclasses import asdict, dataclass, field +from typing import List, Optional, Set + +from django.db.models import F +from django.utils.timezone import now as timezone_now +from django.utils.translation import gettext as _ + +from analytics.lib.counts import COUNT_STATS, do_increment_logging_stat +from zerver.actions.create_user import create_historical_user_messages +from zerver.lib.exceptions import JsonableError +from zerver.lib.message import access_message, format_unread_message_details, get_raw_unread_data +from zerver.lib.queue import queue_json_publish +from zerver.lib.topic import filter_by_topic_name_via_message +from zerver.lib.utils import log_statsd_event +from zerver.models import Message, UserMessage, UserProfile +from zerver.tornado.django_api import send_event + + +@dataclass +class ReadMessagesEvent: + messages: List[int] + all: bool + type: str = field(default="update_message_flags", init=False) + op: str = field(default="add", init=False) + operation: str = field(default="add", init=False) + flag: str = field(default="read", init=False) + + +def do_mark_all_as_read(user_profile: UserProfile) -> int: + log_statsd_event("bankruptcy") + + # First, we clear mobile push notifications. This is safer in the + # event that the below logic times out and we're killed. + all_push_message_ids = ( + UserMessage.objects.filter( + user_profile=user_profile, + ) + .extra( + where=[UserMessage.where_active_push_notification()], + ) + .values_list("message_id", flat=True)[0:10000] + ) + do_clear_mobile_push_notifications_for_ids([user_profile.id], all_push_message_ids) + + msgs = UserMessage.objects.filter(user_profile=user_profile).extra( + where=[UserMessage.where_unread()], + ) + + count = msgs.update( + flags=F("flags").bitor(UserMessage.flags.read), + ) + + event = asdict( + ReadMessagesEvent( + messages=[], # we don't send messages, since the client reloads anyway + all=True, + ) + ) + event_time = timezone_now() + + send_event(user_profile.realm, event, [user_profile.id]) + + do_increment_logging_stat( + user_profile, COUNT_STATS["messages_read::hour"], None, event_time, increment=count + ) + do_increment_logging_stat( + user_profile, + COUNT_STATS["messages_read_interactions::hour"], + None, + event_time, + increment=min(1, count), + ) + + return count + + +def do_mark_stream_messages_as_read( + user_profile: UserProfile, stream_recipient_id: int, topic_name: Optional[str] = None +) -> int: + log_statsd_event("mark_stream_as_read") + + msgs = UserMessage.objects.filter( + user_profile=user_profile, + ) + + msgs = msgs.filter(message__recipient_id=stream_recipient_id) + + if topic_name: + msgs = filter_by_topic_name_via_message( + query=msgs, + topic_name=topic_name, + ) + + msgs = msgs.extra( + where=[UserMessage.where_unread()], + ) + + message_ids = list(msgs.values_list("message_id", flat=True)) + + count = msgs.update( + flags=F("flags").bitor(UserMessage.flags.read), + ) + + event = asdict( + ReadMessagesEvent( + messages=message_ids, + all=False, + ) + ) + event_time = timezone_now() + + send_event(user_profile.realm, event, [user_profile.id]) + do_clear_mobile_push_notifications_for_ids([user_profile.id], message_ids) + + do_increment_logging_stat( + user_profile, COUNT_STATS["messages_read::hour"], None, event_time, increment=count + ) + do_increment_logging_stat( + user_profile, + COUNT_STATS["messages_read_interactions::hour"], + None, + event_time, + increment=min(1, count), + ) + return count + + +def do_mark_muted_user_messages_as_read( + user_profile: UserProfile, + muted_user: UserProfile, +) -> int: + messages = UserMessage.objects.filter( + user_profile=user_profile, message__sender=muted_user + ).extra(where=[UserMessage.where_unread()]) + + message_ids = list(messages.values_list("message_id", flat=True)) + + count = messages.update( + flags=F("flags").bitor(UserMessage.flags.read), + ) + + event = asdict( + ReadMessagesEvent( + messages=message_ids, + all=False, + ) + ) + event_time = timezone_now() + + send_event(user_profile.realm, event, [user_profile.id]) + do_clear_mobile_push_notifications_for_ids([user_profile.id], message_ids) + + do_increment_logging_stat( + user_profile, COUNT_STATS["messages_read::hour"], None, event_time, increment=count + ) + do_increment_logging_stat( + user_profile, + COUNT_STATS["messages_read_interactions::hour"], + None, + event_time, + increment=min(1, count), + ) + return count + + +def do_update_mobile_push_notification( + message: Message, + prior_mention_user_ids: Set[int], + mentions_user_ids: Set[int], + stream_push_user_ids: Set[int], +) -> None: + # Called during the message edit code path to remove mobile push + # notifications for users who are no longer mentioned following + # the edit. See #15428 for details. + # + # A perfect implementation would also support updating the message + # in a sent notification if a message was edited to mention a + # group rather than a user (or vice versa), though it is likely + # not worth the effort to do such a change. + if not message.is_stream_message(): + return + + remove_notify_users = prior_mention_user_ids - mentions_user_ids - stream_push_user_ids + do_clear_mobile_push_notifications_for_ids(list(remove_notify_users), [message.id]) + + +def do_clear_mobile_push_notifications_for_ids( + user_profile_ids: List[int], message_ids: List[int] +) -> None: + if len(message_ids) == 0: + return + + # This function supports clearing notifications for several users + # only for the message-edit use case where we'll have a single message_id. + assert len(user_profile_ids) == 1 or len(message_ids) == 1 + + messages_by_user = defaultdict(list) + notifications_to_update = list( + UserMessage.objects.filter( + message_id__in=message_ids, + user_profile_id__in=user_profile_ids, + ) + .extra( + where=[UserMessage.where_active_push_notification()], + ) + .values_list("user_profile_id", "message_id") + ) + + for (user_id, message_id) in notifications_to_update: + messages_by_user[user_id].append(message_id) + + for (user_profile_id, event_message_ids) in messages_by_user.items(): + queue_json_publish( + "missedmessage_mobile_notifications", + { + "type": "remove", + "user_profile_id": user_profile_id, + "message_ids": event_message_ids, + }, + ) + + +def do_update_message_flags( + user_profile: UserProfile, operation: str, flag: str, messages: List[int] +) -> int: + valid_flags = [item for item in UserMessage.flags if item not in UserMessage.NON_API_FLAGS] + if flag not in valid_flags: + raise JsonableError(_("Invalid flag: '{}'").format(flag)) + if flag in UserMessage.NON_EDITABLE_FLAGS: + raise JsonableError(_("Flag not editable: '{}'").format(flag)) + if operation not in ("add", "remove"): + raise JsonableError(_("Invalid message flag operation: '{}'").format(operation)) + flagattr = getattr(UserMessage.flags, flag) + + msgs = UserMessage.objects.filter(user_profile=user_profile, message_id__in=messages) + um_message_ids = {um.message_id for um in msgs} + historical_message_ids = list(set(messages) - um_message_ids) + + # Users can mutate flags for messages that don't have a UserMessage yet. + # First, validate that the user is even allowed to access these message_ids. + for message_id in historical_message_ids: + access_message(user_profile, message_id) + + # And then create historical UserMessage records. See the called function for more context. + create_historical_user_messages(user_id=user_profile.id, message_ids=historical_message_ids) + + if operation == "add": + count = msgs.update(flags=F("flags").bitor(flagattr)) + elif operation == "remove": + count = msgs.update(flags=F("flags").bitand(~flagattr)) + + event = { + "type": "update_message_flags", + "op": operation, + "operation": operation, + "flag": flag, + "messages": messages, + "all": False, + } + + if flag == "read" and operation == "remove": + # When removing the read flag (i.e. marking messages as + # unread), extend the event with an additional object with + # details on the messages required to update the client's + # `unread_msgs` data structure. + raw_unread_data = get_raw_unread_data(user_profile, messages) + event["message_details"] = format_unread_message_details(user_profile.id, raw_unread_data) + + send_event(user_profile.realm, event, [user_profile.id]) + + if flag == "read" and operation == "add": + event_time = timezone_now() + do_clear_mobile_push_notifications_for_ids([user_profile.id], messages) + + do_increment_logging_stat( + user_profile, COUNT_STATS["messages_read::hour"], None, event_time, increment=count + ) + do_increment_logging_stat( + user_profile, + COUNT_STATS["messages_read_interactions::hour"], + None, + event_time, + increment=min(1, count), + ) + + return count diff --git a/zerver/lib/actions.py b/zerver/lib/actions.py index 756b7ecb2d..21b303ca2f 100644 --- a/zerver/lib/actions.py +++ b/zerver/lib/actions.py @@ -1,24 +1,24 @@ import datetime import logging -from collections import defaultdict -from dataclasses import asdict, dataclass, field from typing import Any, Dict, Iterable, List, Optional, Sequence, Set import orjson from django.conf import settings from django.core.exceptions import ValidationError from django.db import transaction -from django.db.models import F from django.utils.timezone import now as timezone_now from django.utils.translation import gettext as _ from django.utils.translation import gettext_lazy from django.utils.translation import override as override_language from typing_extensions import TypedDict -from analytics.lib.counts import COUNT_STATS, do_increment_logging_stat from confirmation.models import Confirmation, create_confirmation_link, generate_key -from zerver.actions.create_user import create_historical_user_messages, created_bot_event +from zerver.actions.create_user import created_bot_event from zerver.actions.custom_profile_fields import do_remove_realm_custom_profile_fields +from zerver.actions.message_flags import ( + do_mark_muted_user_messages_as_read, + do_update_mobile_push_notification, +) from zerver.actions.message_send import ( filter_presence_idle_user_ids, get_recipient_info, @@ -41,8 +41,6 @@ from zerver.lib.mention import MentionBackend, MentionData, silent_mention_synta from zerver.lib.message import ( access_message, bulk_access_messages, - format_unread_message_details, - get_raw_unread_data, normalize_body, truncate_topic, update_first_visible_message_id, @@ -81,7 +79,6 @@ from zerver.lib.user_groups import create_system_user_groups_for_realm from zerver.lib.user_message import UserMessageLite, bulk_insert_ums from zerver.lib.user_mutes import add_user_mute, get_user_mutes from zerver.lib.user_topics import get_users_muting_topic, remove_topic_mute -from zerver.lib.utils import log_statsd_event from zerver.lib.widget import is_widget_message from zerver.models import ( ArchivedAttachment, @@ -1171,276 +1168,6 @@ def do_create_realm( return realm -@dataclass -class ReadMessagesEvent: - messages: List[int] - all: bool - type: str = field(default="update_message_flags", init=False) - op: str = field(default="add", init=False) - operation: str = field(default="add", init=False) - flag: str = field(default="read", init=False) - - -def do_mark_all_as_read(user_profile: UserProfile) -> int: - log_statsd_event("bankruptcy") - - # First, we clear mobile push notifications. This is safer in the - # event that the below logic times out and we're killed. - all_push_message_ids = ( - UserMessage.objects.filter( - user_profile=user_profile, - ) - .extra( - where=[UserMessage.where_active_push_notification()], - ) - .values_list("message_id", flat=True)[0:10000] - ) - do_clear_mobile_push_notifications_for_ids([user_profile.id], all_push_message_ids) - - msgs = UserMessage.objects.filter(user_profile=user_profile).extra( - where=[UserMessage.where_unread()], - ) - - count = msgs.update( - flags=F("flags").bitor(UserMessage.flags.read), - ) - - event = asdict( - ReadMessagesEvent( - messages=[], # we don't send messages, since the client reloads anyway - all=True, - ) - ) - event_time = timezone_now() - - send_event(user_profile.realm, event, [user_profile.id]) - - do_increment_logging_stat( - user_profile, COUNT_STATS["messages_read::hour"], None, event_time, increment=count - ) - do_increment_logging_stat( - user_profile, - COUNT_STATS["messages_read_interactions::hour"], - None, - event_time, - increment=min(1, count), - ) - - return count - - -def do_mark_stream_messages_as_read( - user_profile: UserProfile, stream_recipient_id: int, topic_name: Optional[str] = None -) -> int: - log_statsd_event("mark_stream_as_read") - - msgs = UserMessage.objects.filter( - user_profile=user_profile, - ) - - msgs = msgs.filter(message__recipient_id=stream_recipient_id) - - if topic_name: - msgs = filter_by_topic_name_via_message( - query=msgs, - topic_name=topic_name, - ) - - msgs = msgs.extra( - where=[UserMessage.where_unread()], - ) - - message_ids = list(msgs.values_list("message_id", flat=True)) - - count = msgs.update( - flags=F("flags").bitor(UserMessage.flags.read), - ) - - event = asdict( - ReadMessagesEvent( - messages=message_ids, - all=False, - ) - ) - event_time = timezone_now() - - send_event(user_profile.realm, event, [user_profile.id]) - do_clear_mobile_push_notifications_for_ids([user_profile.id], message_ids) - - do_increment_logging_stat( - user_profile, COUNT_STATS["messages_read::hour"], None, event_time, increment=count - ) - do_increment_logging_stat( - user_profile, - COUNT_STATS["messages_read_interactions::hour"], - None, - event_time, - increment=min(1, count), - ) - return count - - -def do_mark_muted_user_messages_as_read( - user_profile: UserProfile, - muted_user: UserProfile, -) -> int: - messages = UserMessage.objects.filter( - user_profile=user_profile, message__sender=muted_user - ).extra(where=[UserMessage.where_unread()]) - - message_ids = list(messages.values_list("message_id", flat=True)) - - count = messages.update( - flags=F("flags").bitor(UserMessage.flags.read), - ) - - event = asdict( - ReadMessagesEvent( - messages=message_ids, - all=False, - ) - ) - event_time = timezone_now() - - send_event(user_profile.realm, event, [user_profile.id]) - do_clear_mobile_push_notifications_for_ids([user_profile.id], message_ids) - - do_increment_logging_stat( - user_profile, COUNT_STATS["messages_read::hour"], None, event_time, increment=count - ) - do_increment_logging_stat( - user_profile, - COUNT_STATS["messages_read_interactions::hour"], - None, - event_time, - increment=min(1, count), - ) - return count - - -def do_update_mobile_push_notification( - message: Message, - prior_mention_user_ids: Set[int], - mentions_user_ids: Set[int], - stream_push_user_ids: Set[int], -) -> None: - # Called during the message edit code path to remove mobile push - # notifications for users who are no longer mentioned following - # the edit. See #15428 for details. - # - # A perfect implementation would also support updating the message - # in a sent notification if a message was edited to mention a - # group rather than a user (or vice versa), though it is likely - # not worth the effort to do such a change. - if not message.is_stream_message(): - return - - remove_notify_users = prior_mention_user_ids - mentions_user_ids - stream_push_user_ids - do_clear_mobile_push_notifications_for_ids(list(remove_notify_users), [message.id]) - - -def do_clear_mobile_push_notifications_for_ids( - user_profile_ids: List[int], message_ids: List[int] -) -> None: - if len(message_ids) == 0: - return - - # This function supports clearing notifications for several users - # only for the message-edit use case where we'll have a single message_id. - assert len(user_profile_ids) == 1 or len(message_ids) == 1 - - messages_by_user = defaultdict(list) - notifications_to_update = list( - UserMessage.objects.filter( - message_id__in=message_ids, - user_profile_id__in=user_profile_ids, - ) - .extra( - where=[UserMessage.where_active_push_notification()], - ) - .values_list("user_profile_id", "message_id") - ) - - for (user_id, message_id) in notifications_to_update: - messages_by_user[user_id].append(message_id) - - for (user_profile_id, event_message_ids) in messages_by_user.items(): - queue_json_publish( - "missedmessage_mobile_notifications", - { - "type": "remove", - "user_profile_id": user_profile_id, - "message_ids": event_message_ids, - }, - ) - - -def do_update_message_flags( - user_profile: UserProfile, operation: str, flag: str, messages: List[int] -) -> int: - valid_flags = [item for item in UserMessage.flags if item not in UserMessage.NON_API_FLAGS] - if flag not in valid_flags: - raise JsonableError(_("Invalid flag: '{}'").format(flag)) - if flag in UserMessage.NON_EDITABLE_FLAGS: - raise JsonableError(_("Flag not editable: '{}'").format(flag)) - if operation not in ("add", "remove"): - raise JsonableError(_("Invalid message flag operation: '{}'").format(operation)) - flagattr = getattr(UserMessage.flags, flag) - - msgs = UserMessage.objects.filter(user_profile=user_profile, message_id__in=messages) - um_message_ids = {um.message_id for um in msgs} - historical_message_ids = list(set(messages) - um_message_ids) - - # Users can mutate flags for messages that don't have a UserMessage yet. - # First, validate that the user is even allowed to access these message_ids. - for message_id in historical_message_ids: - access_message(user_profile, message_id) - - # And then create historical UserMessage records. See the called function for more context. - create_historical_user_messages(user_id=user_profile.id, message_ids=historical_message_ids) - - if operation == "add": - count = msgs.update(flags=F("flags").bitor(flagattr)) - elif operation == "remove": - count = msgs.update(flags=F("flags").bitand(~flagattr)) - - event = { - "type": "update_message_flags", - "op": operation, - "operation": operation, - "flag": flag, - "messages": messages, - "all": False, - } - - if flag == "read" and operation == "remove": - # When removing the read flag (i.e. marking messages as - # unread), extend the event with an additional object with - # details on the messages required to update the client's - # `unread_msgs` data structure. - raw_unread_data = get_raw_unread_data(user_profile, messages) - event["message_details"] = format_unread_message_details(user_profile.id, raw_unread_data) - - send_event(user_profile.realm, event, [user_profile.id]) - - if flag == "read" and operation == "add": - event_time = timezone_now() - do_clear_mobile_push_notifications_for_ids([user_profile.id], messages) - - do_increment_logging_stat( - user_profile, COUNT_STATS["messages_read::hour"], None, event_time, increment=count - ) - do_increment_logging_stat( - user_profile, - COUNT_STATS["messages_read_interactions::hour"], - None, - event_time, - increment=min(1, count), - ) - - return count - - class MessageUpdateUserInfoResult(TypedDict): message_user_ids: Set[int] mention_user_ids: Set[int] diff --git a/zerver/tests/test_events.py b/zerver/tests/test_events.py index 5a28959ed8..a7b32dd077 100644 --- a/zerver/tests/test_events.py +++ b/zerver/tests/test_events.py @@ -39,6 +39,7 @@ from zerver.actions.invites import ( do_revoke_multi_use_invite, do_revoke_user_invite, ) +from zerver.actions.message_flags import do_update_message_flags from zerver.actions.presence import do_update_user_presence, do_update_user_status from zerver.actions.reactions import do_add_reaction, do_remove_reaction from zerver.actions.realm_emoji import check_add_realm_emoji, do_remove_realm_emoji @@ -107,7 +108,6 @@ from zerver.lib.actions import ( do_unmute_user, do_update_embedded_data, do_update_message, - do_update_message_flags, ) from zerver.lib.drafts import do_create_drafts, do_delete_draft, do_edit_draft from zerver.lib.event_schema import ( diff --git a/zerver/tests/test_message_flags.py b/zerver/tests/test_message_flags.py index 68e72475f1..b163394c21 100644 --- a/zerver/tests/test_message_flags.py +++ b/zerver/tests/test_message_flags.py @@ -5,8 +5,8 @@ import orjson from django.db import connection from django.http import HttpResponse +from zerver.actions.message_flags import do_update_message_flags from zerver.actions.streams import do_change_stream_permission -from zerver.lib.actions import do_update_message_flags from zerver.lib.fix_unreads import fix, fix_unsubscribed from zerver.lib.message import ( MessageDetailsDict, diff --git a/zerver/tests/test_push_notifications.py b/zerver/tests/test_push_notifications.py index 4d962ff247..2782c8b76c 100644 --- a/zerver/tests/test_push_notifications.py +++ b/zerver/tests/test_push_notifications.py @@ -23,12 +23,9 @@ from requests.models import PreparedRequest from analytics.lib.counts import CountStat, LoggingCountStat from analytics.models import InstallationCount, RealmCount +from zerver.actions.message_flags import do_mark_stream_messages_as_read, do_update_message_flags from zerver.actions.user_settings import do_regenerate_api_key -from zerver.lib.actions import ( - do_delete_messages, - do_mark_stream_messages_as_read, - do_update_message_flags, -) +from zerver.lib.actions import do_delete_messages from zerver.lib.avatar import absolute_avatar_url from zerver.lib.exceptions import JsonableError from zerver.lib.push_notifications import ( @@ -2499,7 +2496,7 @@ class TestClearOnRead(ZulipTestCase): message_id__in=message_ids, ).update(flags=F("flags").bitor(UserMessage.flags.active_mobile_push_notification)) - with mock_queue_publish("zerver.lib.actions.queue_json_publish") as mock_publish: + with mock_queue_publish("zerver.actions.message_flags.queue_json_publish") as mock_publish: assert stream.recipient_id is not None do_mark_stream_messages_as_read(hamlet, stream.recipient_id) queue_items = [c[0][1] for c in mock_publish.call_args_list] diff --git a/zerver/views/message_flags.py b/zerver/views/message_flags.py index 7ca7436910..8670baca1e 100644 --- a/zerver/views/message_flags.py +++ b/zerver/views/message_flags.py @@ -3,7 +3,7 @@ from typing import List, Optional from django.http import HttpRequest, HttpResponse from django.utils.translation import gettext as _ -from zerver.lib.actions import ( +from zerver.actions.message_flags import ( do_mark_all_as_read, do_mark_stream_messages_as_read, do_update_message_flags, diff --git a/zerver/worker/queue_processors.py b/zerver/worker/queue_processors.py index 0548746306..ae1f0ac0f1 100644 --- a/zerver/worker/queue_processors.py +++ b/zerver/worker/queue_processors.py @@ -47,12 +47,13 @@ from sentry_sdk import add_breadcrumb, configure_scope from zulip_bots.lib import extract_query_without_mention from zerver.actions.invites import do_send_confirmation_email +from zerver.actions.message_flags import do_mark_stream_messages_as_read from zerver.actions.message_send import internal_send_private_message, render_incoming_message from zerver.actions.presence import do_update_user_presence from zerver.actions.realm_export import notify_realm_export from zerver.actions.user_activity import do_update_user_activity, do_update_user_activity_interval from zerver.context_processors import common_context -from zerver.lib.actions import do_mark_stream_messages_as_read, do_update_embedded_data +from zerver.lib.actions import do_update_embedded_data from zerver.lib.bot_lib import EmbeddedBotHandler, EmbeddedBotQuitException, get_bot_handler from zerver.lib.context_managers import lockfile from zerver.lib.db import reset_queries