From 975066e3f0e3d4c3a356da3dfc1d4472f72717b7 Mon Sep 17 00:00:00 2001 From: Anders Kaseorg Date: Thu, 14 Apr 2022 14:50:10 -0700 Subject: [PATCH] actions: Split out zerver.actions.message_send. Signed-off-by: Anders Kaseorg --- corporate/lib/stripe.py | 3 +- docs/subsystems/sending-messages.md | 4 +- stubs/taint/false_positives.pysa | 2 +- tools/linter_lib/custom_check.py | 2 +- zerver/actions/message_send.py | 1660 +++++++++++++++++ zerver/lib/actions.py | 1640 +--------------- zerver/lib/bot_lib.py | 2 +- zerver/lib/email_mirror.py | 2 +- zerver/lib/error_notify.py | 2 +- zerver/lib/onboarding.py | 5 +- zerver/lib/outgoing_webhook.py | 2 +- zerver/lib/test_classes.py | 3 +- zerver/lib/webhooks/common.py | 2 +- .../commands/deliver_scheduled_messages.py | 2 +- zerver/tests/test_event_system.py | 3 +- zerver/tests/test_link_embed.py | 28 +- zerver/tests/test_message_send.py | 16 +- zerver/tests/test_messages.py | 2 +- zerver/tests/test_mirror_users.py | 4 +- zerver/tests/test_presence.py | 2 +- zerver/tests/test_retention.py | 8 +- zerver/tests/test_service_bot_system.py | 17 +- zerver/tests/test_tutorial.py | 2 +- zerver/tests/test_upload.py | 8 +- zerver/tests/test_users.py | 2 +- zerver/views/message_send.py | 2 +- zerver/views/streams.py | 8 +- zerver/webhooks/dialogflow/view.py | 2 +- zerver/webhooks/freshstatus/view.py | 2 +- zerver/webhooks/slack/view.py | 2 +- zerver/webhooks/teamcity/view.py | 4 +- zerver/webhooks/uptimerobot/view.py | 2 +- zerver/webhooks/yo/view.py | 2 +- zerver/webhooks/zabbix/view.py | 2 +- zerver/worker/queue_processors.py | 8 +- .../commands/add_mock_conversation.py | 9 +- zilencer/management/commands/populate_db.py | 3 +- zproject/default_settings.py | 2 +- 38 files changed, 1755 insertions(+), 1716 deletions(-) create mode 100644 zerver/actions/message_send.py diff --git a/corporate/lib/stripe.py b/corporate/lib/stripe.py index ce4079ca6f..81caa58d1c 100644 --- a/corporate/lib/stripe.py +++ b/corporate/lib/stripe.py @@ -971,7 +971,8 @@ def update_sponsorship_status( def approve_sponsorship(realm: Realm, *, acting_user: Optional[UserProfile]) -> None: - from zerver.lib.actions import do_change_realm_plan_type, internal_send_private_message + from zerver.actions.message_send import internal_send_private_message + from zerver.lib.actions import do_change_realm_plan_type do_change_realm_plan_type(realm, Realm.PLAN_TYPE_STANDARD_FREE, acting_user=acting_user) customer = get_customer_by_realm(realm) diff --git a/docs/subsystems/sending-messages.md b/docs/subsystems/sending-messages.md index d26165fb3d..ff79f8ef1a 100644 --- a/docs/subsystems/sending-messages.md +++ b/docs/subsystems/sending-messages.md @@ -71,12 +71,12 @@ This section details the ways in which it is different: `apply_markdown` and `client_gravatar` features in our [events API docs](https://zulip.com/api/register-queue)). - Following our standard naming convention, input validation is done - inside the `check_message` function in `zerver/lib/actions.py`, which is responsible for + inside the `check_message` function in `zerver/actions/message_send.py`, which is responsible for validating the user can send to the recipient, [rendering the Markdown](markdown.md), etc. -- basically everything that can fail due to bad user input. - The core `do_send_messages` function (which handles actually sending - the message) in `zerver/lib/actions.py` is one of the most optimized and thus complex parts of + the message) in `zerver/actions/message_send.py` is one of the most optimized and thus complex parts of the system. But in short, its job is to atomically do a few key things: - Store a `Message` row in the database. diff --git a/stubs/taint/false_positives.pysa b/stubs/taint/false_positives.pysa index 75fc9531fe..ce4d5715bb 100644 --- a/stubs/taint/false_positives.pysa +++ b/stubs/taint/false_positives.pysa @@ -23,7 +23,7 @@ def zerver.lib.avatar_hash.user_avatar_path_from_ids(user_profile_id, realm_id) # This function creates a list of 'UserMessageLite' objects, which contain only # integral IDs and flags. These should safe for use with SQL and other # operations. -def zerver.lib.actions.create_user_messages( +def zerver.actions.message_send.create_user_messages( message, um_eligible_user_ids, long_term_idle_user_ids, diff --git a/tools/linter_lib/custom_check.py b/tools/linter_lib/custom_check.py index 08379e33a0..c38d07883a 100644 --- a/tools/linter_lib/custom_check.py +++ b/tools/linter_lib/custom_check.py @@ -378,7 +378,7 @@ python_rules = RuleList( "exclude_line": { # This one in check_message is kinda terrible, since it's # how most instances are written, but better to exclude something than nothing - ("zerver/lib/actions.py", "stream = get_stream(stream_name, realm)"), + ("zerver/actions/message_send.py", "stream = get_stream(stream_name, realm)"), }, "description": "Please use access_stream_by_*() to fetch Stream objects", }, diff --git a/zerver/actions/message_send.py b/zerver/actions/message_send.py new file mode 100644 index 0000000000..3a31c3c708 --- /dev/null +++ b/zerver/actions/message_send.py @@ -0,0 +1,1660 @@ +import datetime +import logging +from collections import defaultdict +from typing import ( + AbstractSet, + Any, + Callable, + Collection, + Dict, + List, + Optional, + Sequence, + Set, + Tuple, + Union, +) + +import orjson +from django.conf import settings +from django.core.exceptions import ValidationError +from django.db import IntegrityError, transaction +from django.db.models import F +from django.utils.html import escape +from django.utils.timezone import now as timezone_now +from django.utils.translation import gettext as _ +from django.utils.translation import override as override_language +from typing_extensions import TypedDict + +from zerver.actions.uploads import do_claim_attachments +from zerver.lib.addressee import Addressee +from zerver.lib.alert_words import get_alert_word_automaton +from zerver.lib.cache import cache_with_key, user_profile_delivery_email_cache_key +from zerver.lib.create_user import create_user +from zerver.lib.exceptions import ( + JsonableError, + MarkdownRenderingException, + StreamDoesNotExistError, + StreamWithIDDoesNotExistError, + ZephyrMessageAlreadySentException, +) +from zerver.lib.markdown import MessageRenderingResult +from zerver.lib.markdown import version as markdown_version +from zerver.lib.mention import MentionBackend, MentionData +from zerver.lib.message import ( + MessageDict, + SendMessageRequest, + normalize_body, + render_markdown, + truncate_topic, + wildcard_mention_allowed, +) +from zerver.lib.notification_data import UserMessageNotificationsData, get_user_group_mentions_data +from zerver.lib.queue import queue_json_publish +from zerver.lib.recipient_users import recipient_for_user_profiles +from zerver.lib.stream_subscription import ( + get_subscriptions_for_send_message, + num_subscribers_for_stream_id, +) +from zerver.lib.stream_topic import StreamTopicTarget +from zerver.lib.streams import access_stream_for_send_message, ensure_stream +from zerver.lib.string_validation import check_stream_name +from zerver.lib.timestamp import timestamp_to_datetime +from zerver.lib.topic import filter_by_exact_message_topic +from zerver.lib.user_message import UserMessageLite, bulk_insert_ums +from zerver.lib.user_mutes import get_muting_users +from zerver.lib.validator import check_widget_content +from zerver.lib.widget import do_widget_post_save_actions +from zerver.models import ( + Client, + Message, + Realm, + Recipient, + ScheduledMessage, + Stream, + UserMessage, + UserPresence, + UserProfile, + get_client, + get_huddle_user_ids, + get_stream, + get_stream_by_id_in_realm, + get_system_bot, + get_user_by_delivery_email, + query_for_ids, +) +from zerver.tornado.django_api import send_event + + +def compute_irc_user_fullname(email: str) -> str: + return email.split("@")[0] + " (IRC)" + + +def compute_jabber_user_fullname(email: str) -> str: + return email.split("@")[0] + " (XMPP)" + + +def get_user_profile_delivery_email_cache_key( + realm: Realm, email: str, email_to_fullname: Callable[[str], str] +) -> str: + return user_profile_delivery_email_cache_key(email, realm) + + +@cache_with_key( + get_user_profile_delivery_email_cache_key, + timeout=3600 * 24 * 7, +) +def create_mirror_user_if_needed( + realm: Realm, email: str, email_to_fullname: Callable[[str], str] +) -> UserProfile: + try: + return get_user_by_delivery_email(email, realm) + except UserProfile.DoesNotExist: + try: + # Forge a user for this person + return create_user( + email=email, + password=None, + realm=realm, + full_name=email_to_fullname(email), + active=False, + is_mirror_dummy=True, + ) + except IntegrityError: + return get_user_by_delivery_email(email, realm) + + +def render_incoming_message( + message: Message, + content: str, + user_ids: Set[int], + realm: Realm, + mention_data: Optional[MentionData] = None, + email_gateway: bool = False, +) -> MessageRenderingResult: + realm_alert_words_automaton = get_alert_word_automaton(realm) + try: + rendering_result = render_markdown( + message=message, + content=content, + realm=realm, + realm_alert_words_automaton=realm_alert_words_automaton, + mention_data=mention_data, + email_gateway=email_gateway, + ) + except MarkdownRenderingException: + raise JsonableError(_("Unable to render message")) + return rendering_result + + +class RecipientInfoResult(TypedDict): + active_user_ids: Set[int] + online_push_user_ids: Set[int] + pm_mention_email_disabled_user_ids: Set[int] + pm_mention_push_disabled_user_ids: Set[int] + stream_email_user_ids: Set[int] + stream_push_user_ids: Set[int] + wildcard_mention_user_ids: Set[int] + muted_sender_user_ids: Set[int] + um_eligible_user_ids: Set[int] + long_term_idle_user_ids: Set[int] + default_bot_user_ids: Set[int] + service_bot_tuples: List[Tuple[int, int]] + all_bot_user_ids: Set[int] + + +def get_recipient_info( + *, + realm_id: int, + recipient: Recipient, + sender_id: int, + stream_topic: Optional[StreamTopicTarget], + possibly_mentioned_user_ids: AbstractSet[int] = set(), + possible_wildcard_mention: bool = True, +) -> RecipientInfoResult: + stream_push_user_ids: Set[int] = set() + stream_email_user_ids: Set[int] = set() + wildcard_mention_user_ids: Set[int] = set() + muted_sender_user_ids: Set[int] = get_muting_users(sender_id) + + if recipient.type == Recipient.PERSONAL: + # The sender and recipient may be the same id, so + # de-duplicate using a set. + message_to_user_ids = list({recipient.type_id, sender_id}) + assert len(message_to_user_ids) in [1, 2] + + elif recipient.type == Recipient.STREAM: + # Anybody calling us w/r/t a stream message needs to supply + # stream_topic. We may eventually want to have different versions + # of this function for different message types. + assert stream_topic is not None + user_ids_muting_topic = stream_topic.user_ids_muting_topic() + + subscription_rows = ( + get_subscriptions_for_send_message( + realm_id=realm_id, + stream_id=stream_topic.stream_id, + possible_wildcard_mention=possible_wildcard_mention, + possibly_mentioned_user_ids=possibly_mentioned_user_ids, + ) + .annotate( + user_profile_email_notifications=F( + "user_profile__enable_stream_email_notifications" + ), + user_profile_push_notifications=F("user_profile__enable_stream_push_notifications"), + user_profile_wildcard_mentions_notify=F("user_profile__wildcard_mentions_notify"), + ) + .values( + "user_profile_id", + "push_notifications", + "email_notifications", + "wildcard_mentions_notify", + "user_profile_email_notifications", + "user_profile_push_notifications", + "user_profile_wildcard_mentions_notify", + "is_muted", + ) + .order_by("user_profile_id") + ) + + message_to_user_ids = [row["user_profile_id"] for row in subscription_rows] + + def should_send(setting: str, row: Dict[str, Any]) -> bool: + # This implements the structure that the UserProfile stream notification settings + # are defaults, which can be overridden by the stream-level settings (if those + # values are not null). + if row["is_muted"]: + return False + if row["user_profile_id"] in user_ids_muting_topic: + return False + if row[setting] is not None: + return row[setting] + return row["user_profile_" + setting] + + stream_push_user_ids = { + row["user_profile_id"] + for row in subscription_rows + # Note: muting a stream overrides stream_push_notify + if should_send("push_notifications", row) + } + + stream_email_user_ids = { + row["user_profile_id"] + for row in subscription_rows + # Note: muting a stream overrides stream_email_notify + if should_send("email_notifications", row) + } + + if possible_wildcard_mention: + # If there's a possible wildcard mention, we need to + # determine the set of users who have enabled the + # "wildcard_mentions_notify" setting (that is, the set of + # users for whom wildcard mentions should be treated like + # personal mentions for notifications). This setting + # applies to both email and push notifications. + wildcard_mention_user_ids = { + row["user_profile_id"] + for row in subscription_rows + if should_send("wildcard_mentions_notify", row) + } + + elif recipient.type == Recipient.HUDDLE: + message_to_user_ids = get_huddle_user_ids(recipient) + + else: + raise ValueError("Bad recipient type") + + message_to_user_id_set = set(message_to_user_ids) + + user_ids = set(message_to_user_id_set) + # Important note: Because we haven't rendered Markdown yet, we + # don't yet know which of these possibly-mentioned users was + # actually mentioned in the message (in other words, the + # mention syntax might have been in a code block or otherwise + # escaped). `get_ids_for` will filter these extra user rows + # for our data structures not related to bots + user_ids |= possibly_mentioned_user_ids + + if user_ids: + query = UserProfile.objects.filter(is_active=True).values( + "id", + "enable_online_push_notifications", + "enable_offline_email_notifications", + "enable_offline_push_notifications", + "is_bot", + "bot_type", + "long_term_idle", + ) + + # query_for_ids is fast highly optimized for large queries, and we + # need this codepath to be fast (it's part of sending messages) + query = query_for_ids( + query=query, + user_ids=sorted(user_ids), + field="id", + ) + rows = list(query) + else: + # TODO: We should always have at least one user_id as a recipient + # of any message we send. Right now the exception to this + # rule is `notify_new_user`, which, at least in a possibly + # contrived test scenario, can attempt to send messages + # to an inactive bot. When we plug that hole, we can avoid + # this `else` clause and just `assert(user_ids)`. + # + # UPDATE: It's February 2020 (and a couple years after the above + # comment was written). We have simplified notify_new_user + # so that it should be a little easier to reason about. + # There is currently some cleanup to how we handle cross + # realm bots that is still under development. Once that + # effort is complete, we should be able to address this + # to-do. + rows = [] + + def get_ids_for(f: Callable[[Dict[str, Any]], bool]) -> Set[int]: + """Only includes users on the explicit message to line""" + return {row["id"] for row in rows if f(row)} & message_to_user_id_set + + def is_service_bot(row: Dict[str, Any]) -> bool: + return row["is_bot"] and (row["bot_type"] in UserProfile.SERVICE_BOT_TYPES) + + active_user_ids = get_ids_for(lambda r: True) + online_push_user_ids = get_ids_for( + lambda r: r["enable_online_push_notifications"], + ) + + # We deal with only the users who have disabled this setting, since that + # will usually be much smaller a set than those who have enabled it (which + # is the default) + pm_mention_email_disabled_user_ids = get_ids_for( + lambda r: not r["enable_offline_email_notifications"] + ) + pm_mention_push_disabled_user_ids = get_ids_for( + lambda r: not r["enable_offline_push_notifications"] + ) + + # Service bots don't get UserMessage rows. + um_eligible_user_ids = get_ids_for( + lambda r: not is_service_bot(r), + ) + + long_term_idle_user_ids = get_ids_for( + lambda r: r["long_term_idle"], + ) + + # These three bot data structures need to filter from the full set + # of users who either are receiving the message or might have been + # mentioned in it, and so can't use get_ids_for. + # + # Further in the do_send_messages code path, once + # `mentioned_user_ids` has been computed via Markdown, we'll filter + # these data structures for just those users who are either a + # direct recipient or were mentioned; for now, we're just making + # sure we have the data we need for that without extra database + # queries. + default_bot_user_ids = { + row["id"] for row in rows if row["is_bot"] and row["bot_type"] == UserProfile.DEFAULT_BOT + } + + service_bot_tuples = [(row["id"], row["bot_type"]) for row in rows if is_service_bot(row)] + + # We also need the user IDs of all bots, to avoid trying to send push/email + # notifications to them. This set will be directly sent to the event queue code + # where we determine notifiability of the message for users. + all_bot_user_ids = {row["id"] for row in rows if row["is_bot"]} + + info: RecipientInfoResult = dict( + active_user_ids=active_user_ids, + online_push_user_ids=online_push_user_ids, + pm_mention_email_disabled_user_ids=pm_mention_email_disabled_user_ids, + pm_mention_push_disabled_user_ids=pm_mention_push_disabled_user_ids, + stream_push_user_ids=stream_push_user_ids, + stream_email_user_ids=stream_email_user_ids, + wildcard_mention_user_ids=wildcard_mention_user_ids, + muted_sender_user_ids=muted_sender_user_ids, + um_eligible_user_ids=um_eligible_user_ids, + long_term_idle_user_ids=long_term_idle_user_ids, + default_bot_user_ids=default_bot_user_ids, + service_bot_tuples=service_bot_tuples, + all_bot_user_ids=all_bot_user_ids, + ) + return info + + +def get_service_bot_events( + sender: UserProfile, + service_bot_tuples: List[Tuple[int, int]], + mentioned_user_ids: Set[int], + active_user_ids: Set[int], + recipient_type: int, +) -> Dict[str, List[Dict[str, Any]]]: + + event_dict: Dict[str, List[Dict[str, Any]]] = defaultdict(list) + + # Avoid infinite loops by preventing messages sent by bots from generating + # Service events. + if sender.is_bot: + return event_dict + + def maybe_add_event(user_profile_id: int, bot_type: int) -> None: + if bot_type == UserProfile.OUTGOING_WEBHOOK_BOT: + queue_name = "outgoing_webhooks" + elif bot_type == UserProfile.EMBEDDED_BOT: + queue_name = "embedded_bots" + else: + logging.error( + "Unexpected bot_type for Service bot id=%s: %s", + user_profile_id, + bot_type, + ) + return + + is_stream = recipient_type == Recipient.STREAM + + # Important note: service_bot_tuples may contain service bots + # who were not actually mentioned in the message (e.g. if + # mention syntax for that bot appeared in a code block). + # Thus, it is important to filter any users who aren't part of + # either mentioned_user_ids (the actual mentioned users) or + # active_user_ids (the actual recipients). + # + # So even though this is implied by the logic below, we filter + # these not-actually-mentioned users here, to help keep this + # function future-proof. + if user_profile_id not in mentioned_user_ids and user_profile_id not in active_user_ids: + return + + # Mention triggers, for stream messages + if is_stream and user_profile_id in mentioned_user_ids: + trigger = "mention" + # PM triggers for personal and huddle messages + elif (not is_stream) and (user_profile_id in active_user_ids): + trigger = "private_message" + else: + return + + event_dict[queue_name].append( + { + "trigger": trigger, + "user_profile_id": user_profile_id, + } + ) + + for user_profile_id, bot_type in service_bot_tuples: + maybe_add_event( + user_profile_id=user_profile_id, + bot_type=bot_type, + ) + + return event_dict + + +def do_schedule_messages(send_message_requests: Sequence[SendMessageRequest]) -> List[int]: + scheduled_messages: List[ScheduledMessage] = [] + + for send_request in send_message_requests: + scheduled_message = ScheduledMessage() + scheduled_message.sender = send_request.message.sender + scheduled_message.recipient = send_request.message.recipient + topic_name = send_request.message.topic_name() + scheduled_message.set_topic_name(topic_name=topic_name) + scheduled_message.content = send_request.message.content + scheduled_message.sending_client = send_request.message.sending_client + scheduled_message.stream = send_request.stream + scheduled_message.realm = send_request.realm + assert send_request.deliver_at is not None + scheduled_message.scheduled_timestamp = send_request.deliver_at + if send_request.delivery_type == "send_later": + scheduled_message.delivery_type = ScheduledMessage.SEND_LATER + elif send_request.delivery_type == "remind": + scheduled_message.delivery_type = ScheduledMessage.REMIND + + scheduled_messages.append(scheduled_message) + + ScheduledMessage.objects.bulk_create(scheduled_messages) + return [scheduled_message.id for scheduled_message in scheduled_messages] + + +def build_message_send_dict( + message: Message, + stream: Optional[Stream] = None, + local_id: Optional[str] = None, + sender_queue_id: Optional[str] = None, + realm: Optional[Realm] = None, + widget_content_dict: Optional[Dict[str, Any]] = None, + email_gateway: bool = False, + mention_backend: Optional[MentionBackend] = None, + limit_unread_user_ids: Optional[Set[int]] = None, +) -> SendMessageRequest: + """Returns a dictionary that can be passed into do_send_messages. In + production, this is always called by check_message, but some + testing code paths call it directly. + """ + if realm is None: + realm = message.sender.realm + + if mention_backend is None: + mention_backend = MentionBackend(realm.id) + + mention_data = MentionData( + mention_backend=mention_backend, + content=message.content, + ) + + if message.is_stream_message(): + stream_id = message.recipient.type_id + stream_topic: Optional[StreamTopicTarget] = StreamTopicTarget( + stream_id=stream_id, + topic_name=message.topic_name(), + ) + else: + stream_topic = None + + info = get_recipient_info( + realm_id=realm.id, + recipient=message.recipient, + sender_id=message.sender_id, + stream_topic=stream_topic, + possibly_mentioned_user_ids=mention_data.get_user_ids(), + possible_wildcard_mention=mention_data.message_has_wildcards(), + ) + + # Render our message_dicts. + assert message.rendered_content is None + + rendering_result = render_incoming_message( + message, + message.content, + info["active_user_ids"], + realm, + mention_data=mention_data, + email_gateway=email_gateway, + ) + message.rendered_content = rendering_result.rendered_content + message.rendered_content_version = markdown_version + links_for_embed = rendering_result.links_for_preview + + mentioned_user_groups_map = get_user_group_mentions_data( + mentioned_user_ids=rendering_result.mentions_user_ids, + mentioned_user_group_ids=list(rendering_result.mentions_user_group_ids), + mention_data=mention_data, + ) + + # For single user as well as user group mentions, we set the `mentioned` + # flag on `UserMessage` + for group_id in rendering_result.mentions_user_group_ids: + members = mention_data.get_group_members(group_id) + rendering_result.mentions_user_ids.update(members) + + # Only send data to Tornado about wildcard mentions if message + # rendering determined the message had an actual wildcard + # mention in it (and not e.g. wildcard mention syntax inside a + # code block). + if rendering_result.mentions_wildcard: + wildcard_mention_user_ids = info["wildcard_mention_user_ids"] + else: + wildcard_mention_user_ids = set() + + """ + Once we have the actual list of mentioned ids from message + rendering, we can patch in "default bots" (aka normal bots) + who were directly mentioned in this message as eligible to + get UserMessage rows. + """ + mentioned_user_ids = rendering_result.mentions_user_ids + default_bot_user_ids = info["default_bot_user_ids"] + mentioned_bot_user_ids = default_bot_user_ids & mentioned_user_ids + info["um_eligible_user_ids"] |= mentioned_bot_user_ids + + message_send_dict = SendMessageRequest( + stream=stream, + local_id=local_id, + sender_queue_id=sender_queue_id, + realm=realm, + mention_data=mention_data, + mentioned_user_groups_map=mentioned_user_groups_map, + message=message, + rendering_result=rendering_result, + active_user_ids=info["active_user_ids"], + online_push_user_ids=info["online_push_user_ids"], + pm_mention_email_disabled_user_ids=info["pm_mention_email_disabled_user_ids"], + pm_mention_push_disabled_user_ids=info["pm_mention_push_disabled_user_ids"], + stream_push_user_ids=info["stream_push_user_ids"], + stream_email_user_ids=info["stream_email_user_ids"], + muted_sender_user_ids=info["muted_sender_user_ids"], + um_eligible_user_ids=info["um_eligible_user_ids"], + long_term_idle_user_ids=info["long_term_idle_user_ids"], + default_bot_user_ids=info["default_bot_user_ids"], + service_bot_tuples=info["service_bot_tuples"], + all_bot_user_ids=info["all_bot_user_ids"], + wildcard_mention_user_ids=wildcard_mention_user_ids, + links_for_embed=links_for_embed, + widget_content=widget_content_dict, + limit_unread_user_ids=limit_unread_user_ids, + ) + + return message_send_dict + + +def create_user_messages( + message: Message, + rendering_result: MessageRenderingResult, + um_eligible_user_ids: AbstractSet[int], + long_term_idle_user_ids: AbstractSet[int], + stream_push_user_ids: AbstractSet[int], + stream_email_user_ids: AbstractSet[int], + mentioned_user_ids: AbstractSet[int], + mark_as_read_user_ids: Set[int], + limit_unread_user_ids: Optional[Set[int]], +) -> List[UserMessageLite]: + # These properties on the Message are set via + # render_markdown by code in the Markdown inline patterns + ids_with_alert_words = rendering_result.user_ids_with_alert_words + sender_id = message.sender.id + is_stream_message = message.is_stream_message() + + base_flags = 0 + if rendering_result.mentions_wildcard: + base_flags |= UserMessage.flags.wildcard_mentioned + if message.recipient.type in [Recipient.HUDDLE, Recipient.PERSONAL]: + base_flags |= UserMessage.flags.is_private + + # For long_term_idle (aka soft-deactivated) users, we are allowed + # to optimize by lazily not creating UserMessage rows that would + # have the default 0 flag set (since the soft-reactivation logic + # knows how to create those when the user comes back). We need to + # create the UserMessage rows for these long_term_idle users + # non-lazily in a few cases: + # + # * There are nonzero flags (e.g. the user was mentioned), since + # that case is rare and this saves a lot of complexity in + # soft-reactivation. + # + # * If the user is going to be notified (e.g. they get push/email + # notifications for every message on a stream), since in that + # case the notifications code will call `access_message` on the + # message to re-verify permissions, and for private streams, + # will get an error if the UserMessage row doesn't exist yet. + # + # See https://zulip.readthedocs.io/en/latest/subsystems/sending-messages.html#soft-deactivation + # for details on this system. + user_messages = [] + for user_profile_id in um_eligible_user_ids: + flags = base_flags + if ( + (user_profile_id == sender_id and message.sent_by_human()) + or user_profile_id in mark_as_read_user_ids + or (limit_unread_user_ids is not None and user_profile_id not in limit_unread_user_ids) + ): + flags |= UserMessage.flags.read + if user_profile_id in mentioned_user_ids: + flags |= UserMessage.flags.mentioned + if user_profile_id in ids_with_alert_words: + flags |= UserMessage.flags.has_alert_word + + if ( + user_profile_id in long_term_idle_user_ids + and user_profile_id not in stream_push_user_ids + and user_profile_id not in stream_email_user_ids + and is_stream_message + and int(flags) == 0 + ): + continue + + um = UserMessageLite( + user_profile_id=user_profile_id, + message_id=message.id, + flags=flags, + ) + user_messages.append(um) + + return user_messages + + +class ActivePresenceIdleUserData(TypedDict): + alerted: bool + notifications_data: UserMessageNotificationsData + + +def filter_presence_idle_user_ids(user_ids: Set[int]) -> List[int]: + # Given a set of user IDs (the recipients of a message), accesses + # the UserPresence table to determine which of these users are + # currently idle and should potentially get email notifications + # (and push notifications with with + # user_profile.enable_online_push_notifications=False). + # + # We exclude any presence data from ZulipMobile for the purpose of + # triggering these notifications; the mobile app can more + # effectively do its own client-side filtering of notification + # sounds/etc. for the case that the user is actively doing a PM + # conversation in the app. + + if not user_ids: + return [] + + # Matches presence.js constant + OFFLINE_THRESHOLD_SECS = 140 + + recent = timezone_now() - datetime.timedelta(seconds=OFFLINE_THRESHOLD_SECS) + rows = ( + UserPresence.objects.filter( + user_profile_id__in=user_ids, + status=UserPresence.ACTIVE, + timestamp__gte=recent, + ) + .exclude(client__name="ZulipMobile") + .distinct("user_profile_id") + .values("user_profile_id") + ) + active_user_ids = {row["user_profile_id"] for row in rows} + idle_user_ids = user_ids - active_user_ids + return sorted(idle_user_ids) + + +def get_active_presence_idle_user_ids( + realm: Realm, + sender_id: int, + active_users_data: List[ActivePresenceIdleUserData], +) -> List[int]: + """ + Given a list of active_user_ids, we build up a subset + of those users who fit these criteria: + + * They are likely to need notifications. + * They are no longer "present" according to the + UserPresence table. + """ + + if realm.presence_disabled: + return [] + + user_ids = set() + for user_data in active_users_data: + user_notifications_data: UserMessageNotificationsData = user_data["notifications_data"] + alerted = user_data["alerted"] + + # We only need to know the presence idle state for a user if this message would be notifiable + # for them if they were indeed idle. Only including those users in the calculation below is a + # very important optimization for open communities with many inactive users. + if user_notifications_data.is_notifiable(sender_id, idle=True) or alerted: + user_ids.add(user_notifications_data.user_id) + + return filter_presence_idle_user_ids(user_ids) + + +def do_send_messages( + send_message_requests_maybe_none: Sequence[Optional[SendMessageRequest]], + email_gateway: bool = False, + mark_as_read: Sequence[int] = [], +) -> List[int]: + """See + https://zulip.readthedocs.io/en/latest/subsystems/sending-messages.html + for high-level documentation on this subsystem. + """ + + # Filter out messages which didn't pass internal_prep_message properly + send_message_requests = [ + send_request + for send_request in send_message_requests_maybe_none + if send_request is not None + ] + + # Save the message receipts in the database + user_message_flags: Dict[int, Dict[int, List[str]]] = defaultdict(dict) + with transaction.atomic(): + Message.objects.bulk_create(send_request.message for send_request in send_message_requests) + + # Claim attachments in message + for send_request in send_message_requests: + if do_claim_attachments( + send_request.message, send_request.rendering_result.potential_attachment_path_ids + ): + send_request.message.has_attachment = True + send_request.message.save(update_fields=["has_attachment"]) + + ums: List[UserMessageLite] = [] + for send_request in send_message_requests: + # Service bots (outgoing webhook bots and embedded bots) don't store UserMessage rows; + # they will be processed later. + mentioned_user_ids = send_request.rendering_result.mentions_user_ids + + # Extend the set with users who have muted the sender. + mark_as_read_user_ids = send_request.muted_sender_user_ids + mark_as_read_user_ids.update(mark_as_read) + + user_messages = create_user_messages( + message=send_request.message, + rendering_result=send_request.rendering_result, + um_eligible_user_ids=send_request.um_eligible_user_ids, + long_term_idle_user_ids=send_request.long_term_idle_user_ids, + stream_push_user_ids=send_request.stream_push_user_ids, + stream_email_user_ids=send_request.stream_email_user_ids, + mentioned_user_ids=mentioned_user_ids, + mark_as_read_user_ids=mark_as_read_user_ids, + limit_unread_user_ids=send_request.limit_unread_user_ids, + ) + + for um in user_messages: + user_message_flags[send_request.message.id][um.user_profile_id] = um.flags_list() + + ums.extend(user_messages) + + send_request.message.service_queue_events = get_service_bot_events( + sender=send_request.message.sender, + service_bot_tuples=send_request.service_bot_tuples, + mentioned_user_ids=mentioned_user_ids, + active_user_ids=send_request.active_user_ids, + recipient_type=send_request.message.recipient.type, + ) + + bulk_insert_ums(ums) + + for send_request in send_message_requests: + do_widget_post_save_actions(send_request) + + # This next loop is responsible for notifying other parts of the + # Zulip system about the messages we just committed to the database: + # * Notifying clients via send_event + # * Triggering outgoing webhooks via the service event queue. + # * Updating the `first_message_id` field for streams without any message history. + # * Implementing the Welcome Bot reply hack + # * Adding links to the embed_links queue for open graph processing. + for send_request in send_message_requests: + realm_id: Optional[int] = None + if send_request.message.is_stream_message(): + if send_request.stream is None: + stream_id = send_request.message.recipient.type_id + send_request.stream = Stream.objects.select_related().get(id=stream_id) + # assert needed because stubs for django are missing + assert send_request.stream is not None + realm_id = send_request.stream.realm_id + + # Deliver events to the real-time push system, as well as + # enqueuing any additional processing triggered by the message. + wide_message_dict = MessageDict.wide_dict(send_request.message, realm_id) + + user_flags = user_message_flags.get(send_request.message.id, {}) + + """ + TODO: We may want to limit user_ids to only those users who have + UserMessage rows, if only for minor performance reasons. + + For now we queue events for all subscribers/sendees of the + message, since downstream code may still do notifications + that don't require UserMessage rows. + + Our automated tests have gotten better on this codepath, + but we may have coverage gaps, so we should be careful + about changing the next line. + """ + user_ids = send_request.active_user_ids | set(user_flags.keys()) + sender_id = send_request.message.sender_id + + # We make sure the sender is listed first in the `users` list; + # this results in the sender receiving the message first if + # there are thousands of recipients, decreasing perceived latency. + if sender_id in user_ids: + user_list = [sender_id] + list(user_ids - {sender_id}) + else: + user_list = list(user_ids) + + class UserData(TypedDict): + id: int + flags: List[str] + mentioned_user_group_id: Optional[int] + + users: List[UserData] = [] + for user_id in user_list: + flags = user_flags.get(user_id, []) + user_data: UserData = dict(id=user_id, flags=flags, mentioned_user_group_id=None) + + if user_id in send_request.mentioned_user_groups_map: + user_data["mentioned_user_group_id"] = send_request.mentioned_user_groups_map[ + user_id + ] + + users.append(user_data) + + sender = send_request.message.sender + message_type = wide_message_dict["type"] + active_users_data = [ + ActivePresenceIdleUserData( + alerted="has_alert_word" in user_flags.get(user_id, []), + notifications_data=UserMessageNotificationsData.from_user_id_sets( + user_id=user_id, + flags=user_flags.get(user_id, []), + private_message=(message_type == "private"), + online_push_user_ids=send_request.online_push_user_ids, + pm_mention_push_disabled_user_ids=send_request.pm_mention_push_disabled_user_ids, + pm_mention_email_disabled_user_ids=send_request.pm_mention_email_disabled_user_ids, + stream_push_user_ids=send_request.stream_push_user_ids, + stream_email_user_ids=send_request.stream_email_user_ids, + wildcard_mention_user_ids=send_request.wildcard_mention_user_ids, + muted_sender_user_ids=send_request.muted_sender_user_ids, + all_bot_user_ids=send_request.all_bot_user_ids, + ), + ) + for user_id in send_request.active_user_ids + ] + + presence_idle_user_ids = get_active_presence_idle_user_ids( + realm=sender.realm, + sender_id=sender.id, + active_users_data=active_users_data, + ) + + event = dict( + type="message", + message=send_request.message.id, + message_dict=wide_message_dict, + presence_idle_user_ids=presence_idle_user_ids, + online_push_user_ids=list(send_request.online_push_user_ids), + pm_mention_push_disabled_user_ids=list(send_request.pm_mention_push_disabled_user_ids), + pm_mention_email_disabled_user_ids=list( + send_request.pm_mention_email_disabled_user_ids + ), + stream_push_user_ids=list(send_request.stream_push_user_ids), + stream_email_user_ids=list(send_request.stream_email_user_ids), + wildcard_mention_user_ids=list(send_request.wildcard_mention_user_ids), + muted_sender_user_ids=list(send_request.muted_sender_user_ids), + all_bot_user_ids=list(send_request.all_bot_user_ids), + ) + + if send_request.message.is_stream_message(): + # Note: This is where authorization for single-stream + # get_updates happens! We only attach stream data to the + # notify new_message request if it's a public stream, + # ensuring that in the tornado server, non-public stream + # messages are only associated to their subscribed users. + + # assert needed because stubs for django are missing + assert send_request.stream is not None + if send_request.stream.is_public(): + event["realm_id"] = send_request.stream.realm_id + event["stream_name"] = send_request.stream.name + if send_request.stream.invite_only: + event["invite_only"] = True + if send_request.stream.first_message_id is None: + send_request.stream.first_message_id = send_request.message.id + send_request.stream.save(update_fields=["first_message_id"]) + if send_request.local_id is not None: + event["local_id"] = send_request.local_id + if send_request.sender_queue_id is not None: + event["sender_queue_id"] = send_request.sender_queue_id + send_event(send_request.realm, event, users) + + if send_request.links_for_embed: + event_data = { + "message_id": send_request.message.id, + "message_content": send_request.message.content, + "message_realm_id": send_request.realm.id, + "urls": list(send_request.links_for_embed), + } + queue_json_publish("embed_links", event_data) + + if send_request.message.recipient.type == Recipient.PERSONAL: + welcome_bot_id = get_system_bot( + settings.WELCOME_BOT, send_request.message.sender.realm_id + ).id + if ( + welcome_bot_id in send_request.active_user_ids + and welcome_bot_id != send_request.message.sender_id + ): + from zerver.lib.onboarding import send_welcome_bot_response + + send_welcome_bot_response(send_request) + + for queue_name, events in send_request.message.service_queue_events.items(): + for event in events: + queue_json_publish( + queue_name, + { + "message": wide_message_dict, + "trigger": event["trigger"], + "user_profile_id": event["user_profile_id"], + }, + ) + + return [send_request.message.id for send_request in send_message_requests] + + +def already_sent_mirrored_message_id(message: Message) -> Optional[int]: + if message.recipient.type == Recipient.HUDDLE: + # For huddle messages, we use a 10-second window because the + # timestamps aren't guaranteed to actually match between two + # copies of the same message. + time_window = datetime.timedelta(seconds=10) + else: + time_window = datetime.timedelta(seconds=0) + + query = Message.objects.filter( + sender=message.sender, + recipient=message.recipient, + content=message.content, + sending_client=message.sending_client, + date_sent__gte=message.date_sent - time_window, + date_sent__lte=message.date_sent + time_window, + ) + + messages = filter_by_exact_message_topic( + query=query, + message=message, + ) + + if messages.exists(): + return messages[0].id + return None + + +def extract_stream_indicator(s: str) -> Union[str, int]: + # Users can pass stream name as either an id or a name, + # and if they choose to pass a name, they may JSON encode + # it for legacy reasons. + + try: + data = orjson.loads(s) + except orjson.JSONDecodeError: + # If there was no JSON encoding, then we just + # have a raw stream name. + return s + + # We should stop supporting this odd use case + # once we improve our documentation. + if isinstance(data, list): + if len(data) != 1: # nocoverage + raise JsonableError(_("Expected exactly one stream")) + data = data[0] + + if isinstance(data, str): + # We had a JSON-encoded stream name. + return data + + if isinstance(data, int): + # We had a stream id. + return data + + raise JsonableError(_("Invalid data type for stream")) + + +def extract_private_recipients(s: str) -> Union[List[str], List[int]]: + # We try to accept multiple incoming formats for recipients. + # See test_extract_recipients() for examples of what we allow. + + try: + data = orjson.loads(s) + except orjson.JSONDecodeError: + data = s + + if isinstance(data, str): + data = data.split(",") + + if not isinstance(data, list): + raise JsonableError(_("Invalid data type for recipients")) + + if not data: + # We don't complain about empty message recipients here + return data + + if isinstance(data[0], str): + return get_validated_emails(data) + + if not isinstance(data[0], int): + raise JsonableError(_("Invalid data type for recipients")) + + return get_validated_user_ids(data) + + +def get_validated_user_ids(user_ids: Collection[int]) -> List[int]: + for user_id in user_ids: + if not isinstance(user_id, int): + raise JsonableError(_("Recipient lists may contain emails or user IDs, but not both.")) + + return list(set(user_ids)) + + +def get_validated_emails(emails: Collection[str]) -> List[str]: + for email in emails: + if not isinstance(email, str): + raise JsonableError(_("Recipient lists may contain emails or user IDs, but not both.")) + + return list(filter(bool, {email.strip() for email in emails})) + + +def check_send_stream_message( + sender: UserProfile, + client: Client, + stream_name: str, + topic: str, + body: str, + realm: Optional[Realm] = None, +) -> int: + addressee = Addressee.for_stream_name(stream_name, topic) + message = check_message(sender, client, addressee, body, realm) + + return do_send_messages([message])[0] + + +def check_send_stream_message_by_id( + sender: UserProfile, + client: Client, + stream_id: int, + topic: str, + body: str, + realm: Optional[Realm] = None, +) -> int: + addressee = Addressee.for_stream_id(stream_id, topic) + message = check_message(sender, client, addressee, body, realm) + + return do_send_messages([message])[0] + + +def check_send_private_message( + sender: UserProfile, client: Client, receiving_user: UserProfile, body: str +) -> int: + addressee = Addressee.for_user_profile(receiving_user) + message = check_message(sender, client, addressee, body) + + return do_send_messages([message])[0] + + +# check_send_message: +# Returns the id of the sent message. Has same argspec as check_message. +def check_send_message( + sender: UserProfile, + client: Client, + message_type_name: str, + message_to: Union[Sequence[int], Sequence[str]], + topic_name: Optional[str], + message_content: str, + realm: Optional[Realm] = None, + forged: bool = False, + forged_timestamp: Optional[float] = None, + forwarder_user_profile: Optional[UserProfile] = None, + local_id: Optional[str] = None, + sender_queue_id: Optional[str] = None, + widget_content: Optional[str] = None, + *, + skip_stream_access_check: bool = False, +) -> int: + + addressee = Addressee.legacy_build(sender, message_type_name, message_to, topic_name) + try: + message = check_message( + sender, + client, + addressee, + message_content, + realm, + forged, + forged_timestamp, + forwarder_user_profile, + local_id, + sender_queue_id, + widget_content, + skip_stream_access_check=skip_stream_access_check, + ) + except ZephyrMessageAlreadySentException as e: + return e.message_id + return do_send_messages([message])[0] + + +def check_schedule_message( + sender: UserProfile, + client: Client, + message_type_name: str, + message_to: Union[Sequence[str], Sequence[int]], + topic_name: Optional[str], + message_content: str, + delivery_type: str, + deliver_at: datetime.datetime, + realm: Optional[Realm] = None, + forwarder_user_profile: Optional[UserProfile] = None, +) -> int: + addressee = Addressee.legacy_build(sender, message_type_name, message_to, topic_name) + + send_request = check_message( + sender, + client, + addressee, + message_content, + realm=realm, + forwarder_user_profile=forwarder_user_profile, + ) + send_request.deliver_at = deliver_at + send_request.delivery_type = delivery_type + + recipient = send_request.message.recipient + if delivery_type == "remind" and ( + recipient.type != Recipient.STREAM and recipient.type_id != sender.id + ): + raise JsonableError(_("Reminders can only be set for streams.")) + + return do_schedule_messages([send_request])[0] + + +def send_rate_limited_pm_notification_to_bot_owner( + sender: UserProfile, realm: Realm, content: str +) -> None: + """ + Sends a PM error notification to a bot's owner if one hasn't already + been sent in the last 5 minutes. + """ + if sender.realm.is_zephyr_mirror_realm or sender.realm.deactivated: + return + + if not sender.is_bot or sender.bot_owner is None: + return + + # Don't send these notifications for cross-realm bot messages + # (e.g. from EMAIL_GATEWAY_BOT) since the owner for + # EMAIL_GATEWAY_BOT is probably the server administrator, not + # the owner of the bot who could potentially fix the problem. + if sender.realm != realm: + return + + # We warn the user once every 5 minutes to avoid a flood of + # PMs on a misconfigured integration, re-using the + # UserProfile.last_reminder field, which is not used for bots. + last_reminder = sender.last_reminder + waitperiod = datetime.timedelta(minutes=UserProfile.BOT_OWNER_STREAM_ALERT_WAITPERIOD) + if last_reminder and timezone_now() - last_reminder <= waitperiod: + return + + internal_send_private_message( + get_system_bot(settings.NOTIFICATION_BOT, sender.bot_owner.realm_id), + sender.bot_owner, + content, + ) + + sender.last_reminder = timezone_now() + sender.save(update_fields=["last_reminder"]) + + +def send_pm_if_empty_stream( + stream: Optional[Stream], + realm: Realm, + sender: UserProfile, + stream_name: Optional[str] = None, + stream_id: Optional[int] = None, +) -> None: + """If a bot sends a message to a stream that doesn't exist or has no + subscribers, sends a notification to the bot owner (if not a + cross-realm bot) so that the owner can correct the issue.""" + if not sender.is_bot or sender.bot_owner is None: + return + + arg_dict = { + "bot_identity": f"`{sender.delivery_email}`", + "stream_id": stream_id, + "stream_name": f"#**{stream_name}**", + "new_stream_link": "#streams/new", + } + if sender.bot_owner is not None: + with override_language(sender.bot_owner.default_language): + if stream is None: + if stream_id is not None: + content = _( + "Your bot {bot_identity} tried to send a message to stream ID " + "{stream_id}, but there is no stream with that ID." + ).format(**arg_dict) + else: + assert stream_name is not None + content = _( + "Your bot {bot_identity} tried to send a message to stream " + "{stream_name}, but that stream does not exist. " + "Click [here]({new_stream_link}) to create it." + ).format(**arg_dict) + else: + if num_subscribers_for_stream_id(stream.id) > 0: + return + content = _( + "Your bot {bot_identity} tried to send a message to " + "stream {stream_name}. The stream exists but " + "does not have any subscribers." + ).format(**arg_dict) + + send_rate_limited_pm_notification_to_bot_owner(sender, realm, content) + + +def validate_stream_name_with_pm_notification( + stream_name: str, realm: Realm, sender: UserProfile +) -> Stream: + stream_name = stream_name.strip() + check_stream_name(stream_name) + + try: + stream = get_stream(stream_name, realm) + send_pm_if_empty_stream(stream, realm, sender) + except Stream.DoesNotExist: + send_pm_if_empty_stream(None, realm, sender, stream_name=stream_name) + raise StreamDoesNotExistError(escape(stream_name)) + + return stream + + +def validate_stream_id_with_pm_notification( + stream_id: int, realm: Realm, sender: UserProfile +) -> Stream: + try: + stream = get_stream_by_id_in_realm(stream_id, realm) + send_pm_if_empty_stream(stream, realm, sender) + except Stream.DoesNotExist: + send_pm_if_empty_stream(None, realm, sender, stream_id=stream_id) + raise StreamWithIDDoesNotExistError(stream_id) + + return stream + + +def check_private_message_policy( + realm: Realm, sender: UserProfile, user_profiles: Sequence[UserProfile] +) -> None: + if realm.private_message_policy == Realm.PRIVATE_MESSAGE_POLICY_DISABLED: + if sender.is_bot or (len(user_profiles) == 1 and user_profiles[0].is_bot): + # We allow PMs only between users and bots, to avoid + # breaking the tutorial as well as automated + # notifications from system bots to users. + return + + raise JsonableError(_("Private messages are disabled in this organization.")) + + +# check_message: +# Returns message ready for sending with do_send_message on success or the error message (string) on error. +def check_message( + sender: UserProfile, + client: Client, + addressee: Addressee, + message_content_raw: str, + realm: Optional[Realm] = None, + forged: bool = False, + forged_timestamp: Optional[float] = None, + forwarder_user_profile: Optional[UserProfile] = None, + local_id: Optional[str] = None, + sender_queue_id: Optional[str] = None, + widget_content: Optional[str] = None, + email_gateway: bool = False, + *, + skip_stream_access_check: bool = False, + mention_backend: Optional[MentionBackend] = None, + limit_unread_user_ids: Optional[Set[int]] = None, +) -> SendMessageRequest: + """See + https://zulip.readthedocs.io/en/latest/subsystems/sending-messages.html + for high-level documentation on this subsystem. + """ + stream = None + + message_content = normalize_body(message_content_raw) + + if realm is None: + realm = sender.realm + + if addressee.is_stream(): + topic_name = addressee.topic() + topic_name = truncate_topic(topic_name) + + stream_name = addressee.stream_name() + stream_id = addressee.stream_id() + + if stream_name is not None: + stream = validate_stream_name_with_pm_notification(stream_name, realm, sender) + elif stream_id is not None: + stream = validate_stream_id_with_pm_notification(stream_id, realm, sender) + else: + stream = addressee.stream() + assert stream is not None + + # To save a database round trip, we construct the Recipient + # object for the Stream rather than fetching it from the + # database using the stream.recipient foreign key. + # + # This is simpler than ensuring that code paths that fetch a + # Stream that will be used for sending a message have a + # `select_related("recipient"), which would also needlessly + # expand Stream objects in memory (all the fields of Recipient + # are already known given the Stream object). + recipient = Recipient( + id=stream.recipient_id, + type_id=stream.id, + type=Recipient.STREAM, + ) + + if not skip_stream_access_check: + access_stream_for_send_message( + sender=sender, stream=stream, forwarder_user_profile=forwarder_user_profile + ) + else: + # Defensive assertion - the only currently supported use case + # for this option is for outgoing webhook bots and since this + # is security-sensitive code, it's beneficial to ensure nothing + # else can sneak past the access check. + assert sender.bot_type == sender.OUTGOING_WEBHOOK_BOT + + if realm.mandatory_topics and topic_name == "(no topic)": + raise JsonableError(_("Topics are required in this organization")) + + elif addressee.is_private(): + user_profiles = addressee.user_profiles() + mirror_message = client and client.name in [ + "zephyr_mirror", + "irc_mirror", + "jabber_mirror", + "JabberMirror", + ] + + check_private_message_policy(realm, sender, user_profiles) + + # API super-users who set the `forged` flag are allowed to + # forge messages sent by any user, so we disable the + # `forwarded_mirror_message` security check in that case. + forwarded_mirror_message = mirror_message and not forged + try: + recipient = recipient_for_user_profiles( + user_profiles, forwarded_mirror_message, forwarder_user_profile, sender + ) + except ValidationError as e: + assert isinstance(e.messages[0], str) + raise JsonableError(e.messages[0]) + else: + # This is defensive code--Addressee already validates + # the message type. + raise AssertionError("Invalid message type") + + message = Message() + message.sender = sender + message.content = message_content + message.recipient = recipient + if addressee.is_stream(): + message.set_topic_name(topic_name) + if forged and forged_timestamp is not None: + # Forged messages come with a timestamp + message.date_sent = timestamp_to_datetime(forged_timestamp) + else: + message.date_sent = timezone_now() + message.sending_client = client + + # We render messages later in the process. + assert message.rendered_content is None + + if client.name == "zephyr_mirror": + id = already_sent_mirrored_message_id(message) + if id is not None: + raise ZephyrMessageAlreadySentException(id) + + widget_content_dict = None + if widget_content is not None: + try: + widget_content_dict = orjson.loads(widget_content) + except orjson.JSONDecodeError: + raise JsonableError(_("Widgets: API programmer sent invalid JSON content")) + + try: + check_widget_content(widget_content_dict) + except ValidationError as error: + raise JsonableError( + _("Widgets: {error_msg}").format( + error_msg=error.message, + ) + ) + + message_send_dict = build_message_send_dict( + message=message, + stream=stream, + local_id=local_id, + sender_queue_id=sender_queue_id, + realm=realm, + widget_content_dict=widget_content_dict, + email_gateway=email_gateway, + mention_backend=mention_backend, + limit_unread_user_ids=limit_unread_user_ids, + ) + + if stream is not None and message_send_dict.rendering_result.mentions_wildcard: + if not wildcard_mention_allowed(sender, stream): + raise JsonableError( + _("You do not have permission to use wildcard mentions in this stream.") + ) + return message_send_dict + + +def _internal_prep_message( + realm: Realm, + sender: UserProfile, + addressee: Addressee, + content: str, + email_gateway: bool = False, + mention_backend: Optional[MentionBackend] = None, + limit_unread_user_ids: Optional[Set[int]] = None, +) -> Optional[SendMessageRequest]: + """ + Create a message object and checks it, but doesn't send it or save it to the database. + The internal function that calls this can therefore batch send a bunch of created + messages together as one database query. + Call do_send_messages with a list of the return values of this method. + """ + # Remove any null bytes from the content + if len(content) > settings.MAX_MESSAGE_LENGTH: + content = content[0:3900] + "\n\n[message was too long and has been truncated]" + + # If we have a stream name, and the stream doesn't exist, we + # create it here (though this code path should probably be removed + # eventually, moving that responsibility to the caller). If + # addressee.stream_name() is None (i.e. we're sending to a stream + # by ID), we skip this, as the stream object must already exist. + if addressee.is_stream(): + stream_name = addressee.stream_name() + if stream_name is not None: + ensure_stream(realm, stream_name, acting_user=sender) + + try: + return check_message( + sender, + get_client("Internal"), + addressee, + content, + realm=realm, + email_gateway=email_gateway, + mention_backend=mention_backend, + limit_unread_user_ids=limit_unread_user_ids, + ) + except JsonableError as e: + logging.exception( + "Error queueing internal message by %s: %s", + sender.delivery_email, + e.msg, + stack_info=True, + ) + + return None + + +def internal_prep_stream_message( + sender: UserProfile, + stream: Stream, + topic: str, + content: str, + email_gateway: bool = False, + limit_unread_user_ids: Optional[Set[int]] = None, +) -> Optional[SendMessageRequest]: + """ + See _internal_prep_message for details of how this works. + """ + realm = stream.realm + addressee = Addressee.for_stream(stream, topic) + + return _internal_prep_message( + realm=realm, + sender=sender, + addressee=addressee, + content=content, + email_gateway=email_gateway, + limit_unread_user_ids=limit_unread_user_ids, + ) + + +def internal_prep_stream_message_by_name( + realm: Realm, + sender: UserProfile, + stream_name: str, + topic: str, + content: str, +) -> Optional[SendMessageRequest]: + """ + See _internal_prep_message for details of how this works. + """ + addressee = Addressee.for_stream_name(stream_name, topic) + + return _internal_prep_message( + realm=realm, + sender=sender, + addressee=addressee, + content=content, + ) + + +def internal_prep_private_message( + realm: Realm, + sender: UserProfile, + recipient_user: UserProfile, + content: str, + mention_backend: Optional[MentionBackend] = None, +) -> Optional[SendMessageRequest]: + """ + See _internal_prep_message for details of how this works. + """ + addressee = Addressee.for_user_profile(recipient_user) + + return _internal_prep_message( + realm=realm, + sender=sender, + addressee=addressee, + content=content, + mention_backend=mention_backend, + ) + + +def internal_send_private_message( + sender: UserProfile, recipient_user: UserProfile, content: str +) -> Optional[int]: + realm = recipient_user.realm + message = internal_prep_private_message(realm, sender, recipient_user, content) + if message is None: + return None + message_ids = do_send_messages([message]) + return message_ids[0] + + +def internal_send_stream_message( + sender: UserProfile, + stream: Stream, + topic: str, + content: str, + email_gateway: bool = False, + limit_unread_user_ids: Optional[Set[int]] = None, +) -> Optional[int]: + + message = internal_prep_stream_message( + sender, stream, topic, content, email_gateway, limit_unread_user_ids=limit_unread_user_ids + ) + + if message is None: + return None + message_ids = do_send_messages([message]) + return message_ids[0] + + +def internal_send_stream_message_by_name( + realm: Realm, + sender: UserProfile, + stream_name: str, + topic: str, + content: str, +) -> Optional[int]: + message = internal_prep_stream_message_by_name( + realm, + sender, + stream_name, + topic, + content, + ) + + if message is None: + return None + message_ids = do_send_messages([message]) + return message_ids[0] + + +def internal_send_huddle_message( + realm: Realm, sender: UserProfile, emails: List[str], content: str +) -> Optional[int]: + addressee = Addressee.for_private(emails, realm) + message = _internal_prep_message( + realm=realm, + sender=sender, + addressee=addressee, + content=content, + ) + if message is None: + return None + message_ids = do_send_messages([message]) + return message_ids[0] diff --git a/zerver/lib/actions.py b/zerver/lib/actions.py index 5356a45184..8fe6320740 100644 --- a/zerver/lib/actions.py +++ b/zerver/lib/actions.py @@ -3,28 +3,13 @@ import hashlib import logging from collections import defaultdict from dataclasses import asdict, dataclass, field -from typing import ( - AbstractSet, - Any, - Callable, - Collection, - Dict, - Iterable, - List, - Mapping, - Optional, - Sequence, - Set, - Tuple, - Union, -) +from typing import Any, Collection, Dict, Iterable, List, Mapping, Optional, Sequence, Set, Tuple import orjson from django.conf import settings from django.core.exceptions import ValidationError -from django.db import IntegrityError, transaction +from django.db import transaction from django.db.models import F -from django.utils.html import escape from django.utils.timezone import now as timezone_now from django.utils.translation import gettext as _ from django.utils.translation import gettext_lazy @@ -41,7 +26,14 @@ from zerver.actions.default_streams import ( get_default_streams_for_realm, ) from zerver.actions.invites import notify_invites_changed -from zerver.actions.uploads import check_attachment_reference_change, do_claim_attachments +from zerver.actions.message_send import ( + filter_presence_idle_user_ids, + get_recipient_info, + internal_send_private_message, + internal_send_stream_message, + render_incoming_message, +) +from zerver.actions.uploads import check_attachment_reference_change from zerver.actions.user_groups import ( do_send_user_group_members_update_event, update_users_in_full_members_system_group, @@ -50,54 +42,39 @@ from zerver.actions.user_settings import do_delete_avatar_image, send_user_email from zerver.actions.user_topics import do_mute_topic, do_unmute_topic from zerver.actions.users import change_user_is_active, get_service_dicts_for_bot from zerver.lib import retention as retention -from zerver.lib.addressee import Addressee -from zerver.lib.alert_words import get_alert_word_automaton from zerver.lib.avatar import avatar_url from zerver.lib.bulk_create import create_users from zerver.lib.cache import ( cache_delete, cache_delete_many, cache_set, - cache_with_key, display_recipient_cache_key, flush_user_profile, get_stream_cache_key, to_dict_cache_key_id, - user_profile_delivery_email_cache_key, ) from zerver.lib.create_user import create_user, get_display_email_address from zerver.lib.email_mirror_helpers import encode_email_address from zerver.lib.email_notifications import enqueue_welcome_emails from zerver.lib.email_validation import email_reserved_for_system_bots_error from zerver.lib.emoji import check_emoji_request, emoji_name_to_emoji_code -from zerver.lib.exceptions import ( - JsonableError, - MarkdownRenderingException, - StreamDoesNotExistError, - StreamWithIDDoesNotExistError, - ZephyrMessageAlreadySentException, -) +from zerver.lib.exceptions import JsonableError from zerver.lib.markdown import MessageRenderingResult, topic_links from zerver.lib.markdown import version as markdown_version from zerver.lib.mention import MentionBackend, MentionData, silent_mention_syntax_for_user from zerver.lib.message import ( - MessageDict, - SendMessageRequest, access_message, bulk_access_messages, format_unread_message_details, get_last_message_id, get_raw_unread_data, normalize_body, - render_markdown, truncate_topic, update_first_visible_message_id, update_to_dict_cache, wildcard_mention_allowed, ) -from zerver.lib.notification_data import UserMessageNotificationsData, get_user_group_mentions_data from zerver.lib.queue import queue_json_publish -from zerver.lib.recipient_users import recipient_for_user_profiles from zerver.lib.retention import move_messages_to_archive from zerver.lib.send_email import ( FromAddress, @@ -113,16 +90,13 @@ from zerver.lib.stream_subscription import ( bulk_get_subscriber_peer_info, get_active_subscriptions_for_stream_id, get_bulk_stream_subscriber_info, - get_subscriptions_for_send_message, get_used_colors_for_user_ids, - num_subscribers_for_stream_id, subscriber_ids_with_stream_history_access, ) from zerver.lib.stream_topic import StreamTopicTarget from zerver.lib.stream_traffic import get_average_weekly_stream_traffic, get_streams_traffic from zerver.lib.streams import ( access_stream_by_id, - access_stream_for_send_message, can_access_stream_user_ids, check_stream_access_based_on_stream_post_policy, ensure_stream, @@ -133,15 +107,14 @@ from zerver.lib.streams import ( render_stream_description, send_stream_creation_event, ) -from zerver.lib.string_validation import check_stream_name, check_stream_topic +from zerver.lib.string_validation import check_stream_topic from zerver.lib.subscription_info import get_subscribers_query -from zerver.lib.timestamp import datetime_to_timestamp, timestamp_to_datetime +from zerver.lib.timestamp import datetime_to_timestamp from zerver.lib.topic import ( ORIG_TOPIC, RESOLVED_TOPIC_PREFIX, TOPIC_LINKS, TOPIC_NAME, - filter_by_exact_message_topic, filter_by_topic_name_via_message, messages_for_topic, save_message_for_edit_use_case, @@ -155,16 +128,14 @@ from zerver.lib.user_groups import ( get_system_user_group_for_user, ) from zerver.lib.user_message import UserMessageLite, bulk_insert_ums -from zerver.lib.user_mutes import add_user_mute, get_muting_users, get_user_mutes +from zerver.lib.user_mutes import add_user_mute, get_user_mutes from zerver.lib.user_topics import get_users_muting_topic, remove_topic_mute from zerver.lib.users import format_user_row, get_api_key, user_profile_to_user_row from zerver.lib.utils import log_statsd_event -from zerver.lib.validator import check_widget_content -from zerver.lib.widget import do_widget_post_save_actions, is_widget_message +from zerver.lib.widget import is_widget_message from zerver.models import ( ArchivedAttachment, Attachment, - Client, DefaultStream, DefaultStreamGroup, Message, @@ -177,27 +148,20 @@ from zerver.models import ( RealmUserDefault, Recipient, ScheduledEmail, - ScheduledMessage, Stream, Subscription, UserGroup, UserGroupMembership, UserMessage, - UserPresence, UserProfile, active_non_guest_user_ids, active_user_ids, bot_owner_user_ids, - get_client, - get_huddle_user_ids, get_realm, get_realm_domains, - get_stream, get_stream_by_id_in_realm, get_system_bot, - get_user_by_delivery_email, is_cross_realm_bot_email, - query_for_ids, ) from zerver.tornado.django_api import send_event @@ -1183,827 +1147,6 @@ def do_deactivate_stream( ) -def compute_irc_user_fullname(email: str) -> str: - return email.split("@")[0] + " (IRC)" - - -def compute_jabber_user_fullname(email: str) -> str: - return email.split("@")[0] + " (XMPP)" - - -def get_user_profile_delivery_email_cache_key( - realm: Realm, email: str, email_to_fullname: Callable[[str], str] -) -> str: - return user_profile_delivery_email_cache_key(email, realm) - - -@cache_with_key( - get_user_profile_delivery_email_cache_key, - timeout=3600 * 24 * 7, -) -def create_mirror_user_if_needed( - realm: Realm, email: str, email_to_fullname: Callable[[str], str] -) -> UserProfile: - try: - return get_user_by_delivery_email(email, realm) - except UserProfile.DoesNotExist: - try: - # Forge a user for this person - return create_user( - email=email, - password=None, - realm=realm, - full_name=email_to_fullname(email), - active=False, - is_mirror_dummy=True, - ) - except IntegrityError: - return get_user_by_delivery_email(email, realm) - - -def render_incoming_message( - message: Message, - content: str, - user_ids: Set[int], - realm: Realm, - mention_data: Optional[MentionData] = None, - email_gateway: bool = False, -) -> MessageRenderingResult: - realm_alert_words_automaton = get_alert_word_automaton(realm) - try: - rendering_result = render_markdown( - message=message, - content=content, - realm=realm, - realm_alert_words_automaton=realm_alert_words_automaton, - mention_data=mention_data, - email_gateway=email_gateway, - ) - except MarkdownRenderingException: - raise JsonableError(_("Unable to render message")) - return rendering_result - - -class RecipientInfoResult(TypedDict): - active_user_ids: Set[int] - online_push_user_ids: Set[int] - pm_mention_email_disabled_user_ids: Set[int] - pm_mention_push_disabled_user_ids: Set[int] - stream_email_user_ids: Set[int] - stream_push_user_ids: Set[int] - wildcard_mention_user_ids: Set[int] - muted_sender_user_ids: Set[int] - um_eligible_user_ids: Set[int] - long_term_idle_user_ids: Set[int] - default_bot_user_ids: Set[int] - service_bot_tuples: List[Tuple[int, int]] - all_bot_user_ids: Set[int] - - -def get_recipient_info( - *, - realm_id: int, - recipient: Recipient, - sender_id: int, - stream_topic: Optional[StreamTopicTarget], - possibly_mentioned_user_ids: AbstractSet[int] = set(), - possible_wildcard_mention: bool = True, -) -> RecipientInfoResult: - stream_push_user_ids: Set[int] = set() - stream_email_user_ids: Set[int] = set() - wildcard_mention_user_ids: Set[int] = set() - muted_sender_user_ids: Set[int] = get_muting_users(sender_id) - - if recipient.type == Recipient.PERSONAL: - # The sender and recipient may be the same id, so - # de-duplicate using a set. - message_to_user_ids = list({recipient.type_id, sender_id}) - assert len(message_to_user_ids) in [1, 2] - - elif recipient.type == Recipient.STREAM: - # Anybody calling us w/r/t a stream message needs to supply - # stream_topic. We may eventually want to have different versions - # of this function for different message types. - assert stream_topic is not None - user_ids_muting_topic = stream_topic.user_ids_muting_topic() - - subscription_rows = ( - get_subscriptions_for_send_message( - realm_id=realm_id, - stream_id=stream_topic.stream_id, - possible_wildcard_mention=possible_wildcard_mention, - possibly_mentioned_user_ids=possibly_mentioned_user_ids, - ) - .annotate( - user_profile_email_notifications=F( - "user_profile__enable_stream_email_notifications" - ), - user_profile_push_notifications=F("user_profile__enable_stream_push_notifications"), - user_profile_wildcard_mentions_notify=F("user_profile__wildcard_mentions_notify"), - ) - .values( - "user_profile_id", - "push_notifications", - "email_notifications", - "wildcard_mentions_notify", - "user_profile_email_notifications", - "user_profile_push_notifications", - "user_profile_wildcard_mentions_notify", - "is_muted", - ) - .order_by("user_profile_id") - ) - - message_to_user_ids = [row["user_profile_id"] for row in subscription_rows] - - def should_send(setting: str, row: Dict[str, Any]) -> bool: - # This implements the structure that the UserProfile stream notification settings - # are defaults, which can be overridden by the stream-level settings (if those - # values are not null). - if row["is_muted"]: - return False - if row["user_profile_id"] in user_ids_muting_topic: - return False - if row[setting] is not None: - return row[setting] - return row["user_profile_" + setting] - - stream_push_user_ids = { - row["user_profile_id"] - for row in subscription_rows - # Note: muting a stream overrides stream_push_notify - if should_send("push_notifications", row) - } - - stream_email_user_ids = { - row["user_profile_id"] - for row in subscription_rows - # Note: muting a stream overrides stream_email_notify - if should_send("email_notifications", row) - } - - if possible_wildcard_mention: - # If there's a possible wildcard mention, we need to - # determine the set of users who have enabled the - # "wildcard_mentions_notify" setting (that is, the set of - # users for whom wildcard mentions should be treated like - # personal mentions for notifications). This setting - # applies to both email and push notifications. - wildcard_mention_user_ids = { - row["user_profile_id"] - for row in subscription_rows - if should_send("wildcard_mentions_notify", row) - } - - elif recipient.type == Recipient.HUDDLE: - message_to_user_ids = get_huddle_user_ids(recipient) - - else: - raise ValueError("Bad recipient type") - - message_to_user_id_set = set(message_to_user_ids) - - user_ids = set(message_to_user_id_set) - # Important note: Because we haven't rendered Markdown yet, we - # don't yet know which of these possibly-mentioned users was - # actually mentioned in the message (in other words, the - # mention syntax might have been in a code block or otherwise - # escaped). `get_ids_for` will filter these extra user rows - # for our data structures not related to bots - user_ids |= possibly_mentioned_user_ids - - if user_ids: - query = UserProfile.objects.filter(is_active=True).values( - "id", - "enable_online_push_notifications", - "enable_offline_email_notifications", - "enable_offline_push_notifications", - "is_bot", - "bot_type", - "long_term_idle", - ) - - # query_for_ids is fast highly optimized for large queries, and we - # need this codepath to be fast (it's part of sending messages) - query = query_for_ids( - query=query, - user_ids=sorted(user_ids), - field="id", - ) - rows = list(query) - else: - # TODO: We should always have at least one user_id as a recipient - # of any message we send. Right now the exception to this - # rule is `notify_new_user`, which, at least in a possibly - # contrived test scenario, can attempt to send messages - # to an inactive bot. When we plug that hole, we can avoid - # this `else` clause and just `assert(user_ids)`. - # - # UPDATE: It's February 2020 (and a couple years after the above - # comment was written). We have simplified notify_new_user - # so that it should be a little easier to reason about. - # There is currently some cleanup to how we handle cross - # realm bots that is still under development. Once that - # effort is complete, we should be able to address this - # to-do. - rows = [] - - def get_ids_for(f: Callable[[Dict[str, Any]], bool]) -> Set[int]: - """Only includes users on the explicit message to line""" - return {row["id"] for row in rows if f(row)} & message_to_user_id_set - - def is_service_bot(row: Dict[str, Any]) -> bool: - return row["is_bot"] and (row["bot_type"] in UserProfile.SERVICE_BOT_TYPES) - - active_user_ids = get_ids_for(lambda r: True) - online_push_user_ids = get_ids_for( - lambda r: r["enable_online_push_notifications"], - ) - - # We deal with only the users who have disabled this setting, since that - # will usually be much smaller a set than those who have enabled it (which - # is the default) - pm_mention_email_disabled_user_ids = get_ids_for( - lambda r: not r["enable_offline_email_notifications"] - ) - pm_mention_push_disabled_user_ids = get_ids_for( - lambda r: not r["enable_offline_push_notifications"] - ) - - # Service bots don't get UserMessage rows. - um_eligible_user_ids = get_ids_for( - lambda r: not is_service_bot(r), - ) - - long_term_idle_user_ids = get_ids_for( - lambda r: r["long_term_idle"], - ) - - # These three bot data structures need to filter from the full set - # of users who either are receiving the message or might have been - # mentioned in it, and so can't use get_ids_for. - # - # Further in the do_send_messages code path, once - # `mentioned_user_ids` has been computed via Markdown, we'll filter - # these data structures for just those users who are either a - # direct recipient or were mentioned; for now, we're just making - # sure we have the data we need for that without extra database - # queries. - default_bot_user_ids = { - row["id"] for row in rows if row["is_bot"] and row["bot_type"] == UserProfile.DEFAULT_BOT - } - - service_bot_tuples = [(row["id"], row["bot_type"]) for row in rows if is_service_bot(row)] - - # We also need the user IDs of all bots, to avoid trying to send push/email - # notifications to them. This set will be directly sent to the event queue code - # where we determine notifiability of the message for users. - all_bot_user_ids = {row["id"] for row in rows if row["is_bot"]} - - info: RecipientInfoResult = dict( - active_user_ids=active_user_ids, - online_push_user_ids=online_push_user_ids, - pm_mention_email_disabled_user_ids=pm_mention_email_disabled_user_ids, - pm_mention_push_disabled_user_ids=pm_mention_push_disabled_user_ids, - stream_push_user_ids=stream_push_user_ids, - stream_email_user_ids=stream_email_user_ids, - wildcard_mention_user_ids=wildcard_mention_user_ids, - muted_sender_user_ids=muted_sender_user_ids, - um_eligible_user_ids=um_eligible_user_ids, - long_term_idle_user_ids=long_term_idle_user_ids, - default_bot_user_ids=default_bot_user_ids, - service_bot_tuples=service_bot_tuples, - all_bot_user_ids=all_bot_user_ids, - ) - return info - - -def get_service_bot_events( - sender: UserProfile, - service_bot_tuples: List[Tuple[int, int]], - mentioned_user_ids: Set[int], - active_user_ids: Set[int], - recipient_type: int, -) -> Dict[str, List[Dict[str, Any]]]: - - event_dict: Dict[str, List[Dict[str, Any]]] = defaultdict(list) - - # Avoid infinite loops by preventing messages sent by bots from generating - # Service events. - if sender.is_bot: - return event_dict - - def maybe_add_event(user_profile_id: int, bot_type: int) -> None: - if bot_type == UserProfile.OUTGOING_WEBHOOK_BOT: - queue_name = "outgoing_webhooks" - elif bot_type == UserProfile.EMBEDDED_BOT: - queue_name = "embedded_bots" - else: - logging.error( - "Unexpected bot_type for Service bot id=%s: %s", - user_profile_id, - bot_type, - ) - return - - is_stream = recipient_type == Recipient.STREAM - - # Important note: service_bot_tuples may contain service bots - # who were not actually mentioned in the message (e.g. if - # mention syntax for that bot appeared in a code block). - # Thus, it is important to filter any users who aren't part of - # either mentioned_user_ids (the actual mentioned users) or - # active_user_ids (the actual recipients). - # - # So even though this is implied by the logic below, we filter - # these not-actually-mentioned users here, to help keep this - # function future-proof. - if user_profile_id not in mentioned_user_ids and user_profile_id not in active_user_ids: - return - - # Mention triggers, for stream messages - if is_stream and user_profile_id in mentioned_user_ids: - trigger = "mention" - # PM triggers for personal and huddle messages - elif (not is_stream) and (user_profile_id in active_user_ids): - trigger = "private_message" - else: - return - - event_dict[queue_name].append( - { - "trigger": trigger, - "user_profile_id": user_profile_id, - } - ) - - for user_profile_id, bot_type in service_bot_tuples: - maybe_add_event( - user_profile_id=user_profile_id, - bot_type=bot_type, - ) - - return event_dict - - -def do_schedule_messages(send_message_requests: Sequence[SendMessageRequest]) -> List[int]: - scheduled_messages: List[ScheduledMessage] = [] - - for send_request in send_message_requests: - scheduled_message = ScheduledMessage() - scheduled_message.sender = send_request.message.sender - scheduled_message.recipient = send_request.message.recipient - topic_name = send_request.message.topic_name() - scheduled_message.set_topic_name(topic_name=topic_name) - scheduled_message.content = send_request.message.content - scheduled_message.sending_client = send_request.message.sending_client - scheduled_message.stream = send_request.stream - scheduled_message.realm = send_request.realm - assert send_request.deliver_at is not None - scheduled_message.scheduled_timestamp = send_request.deliver_at - if send_request.delivery_type == "send_later": - scheduled_message.delivery_type = ScheduledMessage.SEND_LATER - elif send_request.delivery_type == "remind": - scheduled_message.delivery_type = ScheduledMessage.REMIND - - scheduled_messages.append(scheduled_message) - - ScheduledMessage.objects.bulk_create(scheduled_messages) - return [scheduled_message.id for scheduled_message in scheduled_messages] - - -def build_message_send_dict( - message: Message, - stream: Optional[Stream] = None, - local_id: Optional[str] = None, - sender_queue_id: Optional[str] = None, - realm: Optional[Realm] = None, - widget_content_dict: Optional[Dict[str, Any]] = None, - email_gateway: bool = False, - mention_backend: Optional[MentionBackend] = None, - limit_unread_user_ids: Optional[Set[int]] = None, -) -> SendMessageRequest: - """Returns a dictionary that can be passed into do_send_messages. In - production, this is always called by check_message, but some - testing code paths call it directly. - """ - if realm is None: - realm = message.sender.realm - - if mention_backend is None: - mention_backend = MentionBackend(realm.id) - - mention_data = MentionData( - mention_backend=mention_backend, - content=message.content, - ) - - if message.is_stream_message(): - stream_id = message.recipient.type_id - stream_topic: Optional[StreamTopicTarget] = StreamTopicTarget( - stream_id=stream_id, - topic_name=message.topic_name(), - ) - else: - stream_topic = None - - info = get_recipient_info( - realm_id=realm.id, - recipient=message.recipient, - sender_id=message.sender_id, - stream_topic=stream_topic, - possibly_mentioned_user_ids=mention_data.get_user_ids(), - possible_wildcard_mention=mention_data.message_has_wildcards(), - ) - - # Render our message_dicts. - assert message.rendered_content is None - - rendering_result = render_incoming_message( - message, - message.content, - info["active_user_ids"], - realm, - mention_data=mention_data, - email_gateway=email_gateway, - ) - message.rendered_content = rendering_result.rendered_content - message.rendered_content_version = markdown_version - links_for_embed = rendering_result.links_for_preview - - mentioned_user_groups_map = get_user_group_mentions_data( - mentioned_user_ids=rendering_result.mentions_user_ids, - mentioned_user_group_ids=list(rendering_result.mentions_user_group_ids), - mention_data=mention_data, - ) - - # For single user as well as user group mentions, we set the `mentioned` - # flag on `UserMessage` - for group_id in rendering_result.mentions_user_group_ids: - members = mention_data.get_group_members(group_id) - rendering_result.mentions_user_ids.update(members) - - # Only send data to Tornado about wildcard mentions if message - # rendering determined the message had an actual wildcard - # mention in it (and not e.g. wildcard mention syntax inside a - # code block). - if rendering_result.mentions_wildcard: - wildcard_mention_user_ids = info["wildcard_mention_user_ids"] - else: - wildcard_mention_user_ids = set() - - """ - Once we have the actual list of mentioned ids from message - rendering, we can patch in "default bots" (aka normal bots) - who were directly mentioned in this message as eligible to - get UserMessage rows. - """ - mentioned_user_ids = rendering_result.mentions_user_ids - default_bot_user_ids = info["default_bot_user_ids"] - mentioned_bot_user_ids = default_bot_user_ids & mentioned_user_ids - info["um_eligible_user_ids"] |= mentioned_bot_user_ids - - message_send_dict = SendMessageRequest( - stream=stream, - local_id=local_id, - sender_queue_id=sender_queue_id, - realm=realm, - mention_data=mention_data, - mentioned_user_groups_map=mentioned_user_groups_map, - message=message, - rendering_result=rendering_result, - active_user_ids=info["active_user_ids"], - online_push_user_ids=info["online_push_user_ids"], - pm_mention_email_disabled_user_ids=info["pm_mention_email_disabled_user_ids"], - pm_mention_push_disabled_user_ids=info["pm_mention_push_disabled_user_ids"], - stream_push_user_ids=info["stream_push_user_ids"], - stream_email_user_ids=info["stream_email_user_ids"], - muted_sender_user_ids=info["muted_sender_user_ids"], - um_eligible_user_ids=info["um_eligible_user_ids"], - long_term_idle_user_ids=info["long_term_idle_user_ids"], - default_bot_user_ids=info["default_bot_user_ids"], - service_bot_tuples=info["service_bot_tuples"], - all_bot_user_ids=info["all_bot_user_ids"], - wildcard_mention_user_ids=wildcard_mention_user_ids, - links_for_embed=links_for_embed, - widget_content=widget_content_dict, - limit_unread_user_ids=limit_unread_user_ids, - ) - - return message_send_dict - - -def do_send_messages( - send_message_requests_maybe_none: Sequence[Optional[SendMessageRequest]], - email_gateway: bool = False, - mark_as_read: Sequence[int] = [], -) -> List[int]: - """See - https://zulip.readthedocs.io/en/latest/subsystems/sending-messages.html - for high-level documentation on this subsystem. - """ - - # Filter out messages which didn't pass internal_prep_message properly - send_message_requests = [ - send_request - for send_request in send_message_requests_maybe_none - if send_request is not None - ] - - # Save the message receipts in the database - user_message_flags: Dict[int, Dict[int, List[str]]] = defaultdict(dict) - with transaction.atomic(): - Message.objects.bulk_create(send_request.message for send_request in send_message_requests) - - # Claim attachments in message - for send_request in send_message_requests: - if do_claim_attachments( - send_request.message, send_request.rendering_result.potential_attachment_path_ids - ): - send_request.message.has_attachment = True - send_request.message.save(update_fields=["has_attachment"]) - - ums: List[UserMessageLite] = [] - for send_request in send_message_requests: - # Service bots (outgoing webhook bots and embedded bots) don't store UserMessage rows; - # they will be processed later. - mentioned_user_ids = send_request.rendering_result.mentions_user_ids - - # Extend the set with users who have muted the sender. - mark_as_read_user_ids = send_request.muted_sender_user_ids - mark_as_read_user_ids.update(mark_as_read) - - user_messages = create_user_messages( - message=send_request.message, - rendering_result=send_request.rendering_result, - um_eligible_user_ids=send_request.um_eligible_user_ids, - long_term_idle_user_ids=send_request.long_term_idle_user_ids, - stream_push_user_ids=send_request.stream_push_user_ids, - stream_email_user_ids=send_request.stream_email_user_ids, - mentioned_user_ids=mentioned_user_ids, - mark_as_read_user_ids=mark_as_read_user_ids, - limit_unread_user_ids=send_request.limit_unread_user_ids, - ) - - for um in user_messages: - user_message_flags[send_request.message.id][um.user_profile_id] = um.flags_list() - - ums.extend(user_messages) - - send_request.message.service_queue_events = get_service_bot_events( - sender=send_request.message.sender, - service_bot_tuples=send_request.service_bot_tuples, - mentioned_user_ids=mentioned_user_ids, - active_user_ids=send_request.active_user_ids, - recipient_type=send_request.message.recipient.type, - ) - - bulk_insert_ums(ums) - - for send_request in send_message_requests: - do_widget_post_save_actions(send_request) - - # This next loop is responsible for notifying other parts of the - # Zulip system about the messages we just committed to the database: - # * Notifying clients via send_event - # * Triggering outgoing webhooks via the service event queue. - # * Updating the `first_message_id` field for streams without any message history. - # * Implementing the Welcome Bot reply hack - # * Adding links to the embed_links queue for open graph processing. - for send_request in send_message_requests: - realm_id: Optional[int] = None - if send_request.message.is_stream_message(): - if send_request.stream is None: - stream_id = send_request.message.recipient.type_id - send_request.stream = Stream.objects.select_related().get(id=stream_id) - # assert needed because stubs for django are missing - assert send_request.stream is not None - realm_id = send_request.stream.realm_id - - # Deliver events to the real-time push system, as well as - # enqueuing any additional processing triggered by the message. - wide_message_dict = MessageDict.wide_dict(send_request.message, realm_id) - - user_flags = user_message_flags.get(send_request.message.id, {}) - - """ - TODO: We may want to limit user_ids to only those users who have - UserMessage rows, if only for minor performance reasons. - - For now we queue events for all subscribers/sendees of the - message, since downstream code may still do notifications - that don't require UserMessage rows. - - Our automated tests have gotten better on this codepath, - but we may have coverage gaps, so we should be careful - about changing the next line. - """ - user_ids = send_request.active_user_ids | set(user_flags.keys()) - sender_id = send_request.message.sender_id - - # We make sure the sender is listed first in the `users` list; - # this results in the sender receiving the message first if - # there are thousands of recipients, decreasing perceived latency. - if sender_id in user_ids: - user_list = [sender_id] + list(user_ids - {sender_id}) - else: - user_list = list(user_ids) - - class UserData(TypedDict): - id: int - flags: List[str] - mentioned_user_group_id: Optional[int] - - users: List[UserData] = [] - for user_id in user_list: - flags = user_flags.get(user_id, []) - user_data: UserData = dict(id=user_id, flags=flags, mentioned_user_group_id=None) - - if user_id in send_request.mentioned_user_groups_map: - user_data["mentioned_user_group_id"] = send_request.mentioned_user_groups_map[ - user_id - ] - - users.append(user_data) - - sender = send_request.message.sender - message_type = wide_message_dict["type"] - active_users_data = [ - ActivePresenceIdleUserData( - alerted="has_alert_word" in user_flags.get(user_id, []), - notifications_data=UserMessageNotificationsData.from_user_id_sets( - user_id=user_id, - flags=user_flags.get(user_id, []), - private_message=(message_type == "private"), - online_push_user_ids=send_request.online_push_user_ids, - pm_mention_push_disabled_user_ids=send_request.pm_mention_push_disabled_user_ids, - pm_mention_email_disabled_user_ids=send_request.pm_mention_email_disabled_user_ids, - stream_push_user_ids=send_request.stream_push_user_ids, - stream_email_user_ids=send_request.stream_email_user_ids, - wildcard_mention_user_ids=send_request.wildcard_mention_user_ids, - muted_sender_user_ids=send_request.muted_sender_user_ids, - all_bot_user_ids=send_request.all_bot_user_ids, - ), - ) - for user_id in send_request.active_user_ids - ] - - presence_idle_user_ids = get_active_presence_idle_user_ids( - realm=sender.realm, - sender_id=sender.id, - active_users_data=active_users_data, - ) - - event = dict( - type="message", - message=send_request.message.id, - message_dict=wide_message_dict, - presence_idle_user_ids=presence_idle_user_ids, - online_push_user_ids=list(send_request.online_push_user_ids), - pm_mention_push_disabled_user_ids=list(send_request.pm_mention_push_disabled_user_ids), - pm_mention_email_disabled_user_ids=list( - send_request.pm_mention_email_disabled_user_ids - ), - stream_push_user_ids=list(send_request.stream_push_user_ids), - stream_email_user_ids=list(send_request.stream_email_user_ids), - wildcard_mention_user_ids=list(send_request.wildcard_mention_user_ids), - muted_sender_user_ids=list(send_request.muted_sender_user_ids), - all_bot_user_ids=list(send_request.all_bot_user_ids), - ) - - if send_request.message.is_stream_message(): - # Note: This is where authorization for single-stream - # get_updates happens! We only attach stream data to the - # notify new_message request if it's a public stream, - # ensuring that in the tornado server, non-public stream - # messages are only associated to their subscribed users. - - # assert needed because stubs for django are missing - assert send_request.stream is not None - if send_request.stream.is_public(): - event["realm_id"] = send_request.stream.realm_id - event["stream_name"] = send_request.stream.name - if send_request.stream.invite_only: - event["invite_only"] = True - if send_request.stream.first_message_id is None: - send_request.stream.first_message_id = send_request.message.id - send_request.stream.save(update_fields=["first_message_id"]) - if send_request.local_id is not None: - event["local_id"] = send_request.local_id - if send_request.sender_queue_id is not None: - event["sender_queue_id"] = send_request.sender_queue_id - send_event(send_request.realm, event, users) - - if send_request.links_for_embed: - event_data = { - "message_id": send_request.message.id, - "message_content": send_request.message.content, - "message_realm_id": send_request.realm.id, - "urls": list(send_request.links_for_embed), - } - queue_json_publish("embed_links", event_data) - - if send_request.message.recipient.type == Recipient.PERSONAL: - welcome_bot_id = get_system_bot( - settings.WELCOME_BOT, send_request.message.sender.realm_id - ).id - if ( - welcome_bot_id in send_request.active_user_ids - and welcome_bot_id != send_request.message.sender_id - ): - from zerver.lib.onboarding import send_welcome_bot_response - - send_welcome_bot_response(send_request) - - for queue_name, events in send_request.message.service_queue_events.items(): - for event in events: - queue_json_publish( - queue_name, - { - "message": wide_message_dict, - "trigger": event["trigger"], - "user_profile_id": event["user_profile_id"], - }, - ) - - return [send_request.message.id for send_request in send_message_requests] - - -def create_user_messages( - message: Message, - rendering_result: MessageRenderingResult, - um_eligible_user_ids: AbstractSet[int], - long_term_idle_user_ids: AbstractSet[int], - stream_push_user_ids: AbstractSet[int], - stream_email_user_ids: AbstractSet[int], - mentioned_user_ids: AbstractSet[int], - mark_as_read_user_ids: Set[int], - limit_unread_user_ids: Optional[Set[int]], -) -> List[UserMessageLite]: - # These properties on the Message are set via - # render_markdown by code in the Markdown inline patterns - ids_with_alert_words = rendering_result.user_ids_with_alert_words - sender_id = message.sender.id - is_stream_message = message.is_stream_message() - - base_flags = 0 - if rendering_result.mentions_wildcard: - base_flags |= UserMessage.flags.wildcard_mentioned - if message.recipient.type in [Recipient.HUDDLE, Recipient.PERSONAL]: - base_flags |= UserMessage.flags.is_private - - # For long_term_idle (aka soft-deactivated) users, we are allowed - # to optimize by lazily not creating UserMessage rows that would - # have the default 0 flag set (since the soft-reactivation logic - # knows how to create those when the user comes back). We need to - # create the UserMessage rows for these long_term_idle users - # non-lazily in a few cases: - # - # * There are nonzero flags (e.g. the user was mentioned), since - # that case is rare and this saves a lot of complexity in - # soft-reactivation. - # - # * If the user is going to be notified (e.g. they get push/email - # notifications for every message on a stream), since in that - # case the notifications code will call `access_message` on the - # message to re-verify permissions, and for private streams, - # will get an error if the UserMessage row doesn't exist yet. - # - # See https://zulip.readthedocs.io/en/latest/subsystems/sending-messages.html#soft-deactivation - # for details on this system. - user_messages = [] - for user_profile_id in um_eligible_user_ids: - flags = base_flags - if ( - (user_profile_id == sender_id and message.sent_by_human()) - or user_profile_id in mark_as_read_user_ids - or (limit_unread_user_ids is not None and user_profile_id not in limit_unread_user_ids) - ): - flags |= UserMessage.flags.read - if user_profile_id in mentioned_user_ids: - flags |= UserMessage.flags.mentioned - if user_profile_id in ids_with_alert_words: - flags |= UserMessage.flags.has_alert_word - - if ( - user_profile_id in long_term_idle_user_ids - and user_profile_id not in stream_push_user_ids - and user_profile_id not in stream_email_user_ids - and is_stream_message - and int(flags) == 0 - ): - continue - - um = UserMessageLite( - user_profile_id=user_profile_id, - message_id=message.id, - flags=flags, - ) - user_messages.append(um) - - return user_messages - - def notify_reaction_update( user_profile: UserProfile, message: Message, reaction: Reaction, op: str ) -> None: @@ -2162,220 +1305,6 @@ def do_remove_reaction( notify_reaction_update(user_profile, message, reaction, "remove") -def already_sent_mirrored_message_id(message: Message) -> Optional[int]: - if message.recipient.type == Recipient.HUDDLE: - # For huddle messages, we use a 10-second window because the - # timestamps aren't guaranteed to actually match between two - # copies of the same message. - time_window = datetime.timedelta(seconds=10) - else: - time_window = datetime.timedelta(seconds=0) - - query = Message.objects.filter( - sender=message.sender, - recipient=message.recipient, - content=message.content, - sending_client=message.sending_client, - date_sent__gte=message.date_sent - time_window, - date_sent__lte=message.date_sent + time_window, - ) - - messages = filter_by_exact_message_topic( - query=query, - message=message, - ) - - if messages.exists(): - return messages[0].id - return None - - -def extract_stream_indicator(s: str) -> Union[str, int]: - # Users can pass stream name as either an id or a name, - # and if they choose to pass a name, they may JSON encode - # it for legacy reasons. - - try: - data = orjson.loads(s) - except orjson.JSONDecodeError: - # If there was no JSON encoding, then we just - # have a raw stream name. - return s - - # We should stop supporting this odd use case - # once we improve our documentation. - if isinstance(data, list): - if len(data) != 1: # nocoverage - raise JsonableError(_("Expected exactly one stream")) - data = data[0] - - if isinstance(data, str): - # We had a JSON-encoded stream name. - return data - - if isinstance(data, int): - # We had a stream id. - return data - - raise JsonableError(_("Invalid data type for stream")) - - -def extract_private_recipients(s: str) -> Union[List[str], List[int]]: - # We try to accept multiple incoming formats for recipients. - # See test_extract_recipients() for examples of what we allow. - - try: - data = orjson.loads(s) - except orjson.JSONDecodeError: - data = s - - if isinstance(data, str): - data = data.split(",") - - if not isinstance(data, list): - raise JsonableError(_("Invalid data type for recipients")) - - if not data: - # We don't complain about empty message recipients here - return data - - if isinstance(data[0], str): - return get_validated_emails(data) - - if not isinstance(data[0], int): - raise JsonableError(_("Invalid data type for recipients")) - - return get_validated_user_ids(data) - - -def get_validated_user_ids(user_ids: Collection[int]) -> List[int]: - for user_id in user_ids: - if not isinstance(user_id, int): - raise JsonableError(_("Recipient lists may contain emails or user IDs, but not both.")) - - return list(set(user_ids)) - - -def get_validated_emails(emails: Collection[str]) -> List[str]: - for email in emails: - if not isinstance(email, str): - raise JsonableError(_("Recipient lists may contain emails or user IDs, but not both.")) - - return list(filter(bool, {email.strip() for email in emails})) - - -def check_send_stream_message( - sender: UserProfile, - client: Client, - stream_name: str, - topic: str, - body: str, - realm: Optional[Realm] = None, -) -> int: - addressee = Addressee.for_stream_name(stream_name, topic) - message = check_message(sender, client, addressee, body, realm) - - return do_send_messages([message])[0] - - -def check_send_stream_message_by_id( - sender: UserProfile, - client: Client, - stream_id: int, - topic: str, - body: str, - realm: Optional[Realm] = None, -) -> int: - addressee = Addressee.for_stream_id(stream_id, topic) - message = check_message(sender, client, addressee, body, realm) - - return do_send_messages([message])[0] - - -def check_send_private_message( - sender: UserProfile, client: Client, receiving_user: UserProfile, body: str -) -> int: - addressee = Addressee.for_user_profile(receiving_user) - message = check_message(sender, client, addressee, body) - - return do_send_messages([message])[0] - - -# check_send_message: -# Returns the id of the sent message. Has same argspec as check_message. -def check_send_message( - sender: UserProfile, - client: Client, - message_type_name: str, - message_to: Union[Sequence[int], Sequence[str]], - topic_name: Optional[str], - message_content: str, - realm: Optional[Realm] = None, - forged: bool = False, - forged_timestamp: Optional[float] = None, - forwarder_user_profile: Optional[UserProfile] = None, - local_id: Optional[str] = None, - sender_queue_id: Optional[str] = None, - widget_content: Optional[str] = None, - *, - skip_stream_access_check: bool = False, -) -> int: - - addressee = Addressee.legacy_build(sender, message_type_name, message_to, topic_name) - try: - message = check_message( - sender, - client, - addressee, - message_content, - realm, - forged, - forged_timestamp, - forwarder_user_profile, - local_id, - sender_queue_id, - widget_content, - skip_stream_access_check=skip_stream_access_check, - ) - except ZephyrMessageAlreadySentException as e: - return e.message_id - return do_send_messages([message])[0] - - -def check_schedule_message( - sender: UserProfile, - client: Client, - message_type_name: str, - message_to: Union[Sequence[str], Sequence[int]], - topic_name: Optional[str], - message_content: str, - delivery_type: str, - deliver_at: datetime.datetime, - realm: Optional[Realm] = None, - forwarder_user_profile: Optional[UserProfile] = None, -) -> int: - addressee = Addressee.legacy_build(sender, message_type_name, message_to, topic_name) - - send_request = check_message( - sender, - client, - addressee, - message_content, - realm=realm, - forwarder_user_profile=forwarder_user_profile, - ) - send_request.deliver_at = deliver_at - send_request.delivery_type = delivery_type - - recipient = send_request.message.recipient - if delivery_type == "remind" and ( - recipient.type != Recipient.STREAM and recipient.type_id != sender.id - ): - raise JsonableError(_("Reminders can only be set for streams.")) - - return do_schedule_messages([send_request])[0] - - def validate_message_edit_payload( message: Message, stream_id: Optional[int], @@ -2591,474 +1520,6 @@ def check_update_message( return number_changed -def send_rate_limited_pm_notification_to_bot_owner( - sender: UserProfile, realm: Realm, content: str -) -> None: - """ - Sends a PM error notification to a bot's owner if one hasn't already - been sent in the last 5 minutes. - """ - if sender.realm.is_zephyr_mirror_realm or sender.realm.deactivated: - return - - if not sender.is_bot or sender.bot_owner is None: - return - - # Don't send these notifications for cross-realm bot messages - # (e.g. from EMAIL_GATEWAY_BOT) since the owner for - # EMAIL_GATEWAY_BOT is probably the server administrator, not - # the owner of the bot who could potentially fix the problem. - if sender.realm != realm: - return - - # We warn the user once every 5 minutes to avoid a flood of - # PMs on a misconfigured integration, re-using the - # UserProfile.last_reminder field, which is not used for bots. - last_reminder = sender.last_reminder - waitperiod = datetime.timedelta(minutes=UserProfile.BOT_OWNER_STREAM_ALERT_WAITPERIOD) - if last_reminder and timezone_now() - last_reminder <= waitperiod: - return - - internal_send_private_message( - get_system_bot(settings.NOTIFICATION_BOT, sender.bot_owner.realm_id), - sender.bot_owner, - content, - ) - - sender.last_reminder = timezone_now() - sender.save(update_fields=["last_reminder"]) - - -def send_pm_if_empty_stream( - stream: Optional[Stream], - realm: Realm, - sender: UserProfile, - stream_name: Optional[str] = None, - stream_id: Optional[int] = None, -) -> None: - """If a bot sends a message to a stream that doesn't exist or has no - subscribers, sends a notification to the bot owner (if not a - cross-realm bot) so that the owner can correct the issue.""" - if not sender.is_bot or sender.bot_owner is None: - return - - arg_dict = { - "bot_identity": f"`{sender.delivery_email}`", - "stream_id": stream_id, - "stream_name": f"#**{stream_name}**", - "new_stream_link": "#streams/new", - } - if sender.bot_owner is not None: - with override_language(sender.bot_owner.default_language): - if stream is None: - if stream_id is not None: - content = _( - "Your bot {bot_identity} tried to send a message to stream ID " - "{stream_id}, but there is no stream with that ID." - ).format(**arg_dict) - else: - assert stream_name is not None - content = _( - "Your bot {bot_identity} tried to send a message to stream " - "{stream_name}, but that stream does not exist. " - "Click [here]({new_stream_link}) to create it." - ).format(**arg_dict) - else: - if num_subscribers_for_stream_id(stream.id) > 0: - return - content = _( - "Your bot {bot_identity} tried to send a message to " - "stream {stream_name}. The stream exists but " - "does not have any subscribers." - ).format(**arg_dict) - - send_rate_limited_pm_notification_to_bot_owner(sender, realm, content) - - -def validate_stream_name_with_pm_notification( - stream_name: str, realm: Realm, sender: UserProfile -) -> Stream: - stream_name = stream_name.strip() - check_stream_name(stream_name) - - try: - stream = get_stream(stream_name, realm) - send_pm_if_empty_stream(stream, realm, sender) - except Stream.DoesNotExist: - send_pm_if_empty_stream(None, realm, sender, stream_name=stream_name) - raise StreamDoesNotExistError(escape(stream_name)) - - return stream - - -def validate_stream_id_with_pm_notification( - stream_id: int, realm: Realm, sender: UserProfile -) -> Stream: - try: - stream = get_stream_by_id_in_realm(stream_id, realm) - send_pm_if_empty_stream(stream, realm, sender) - except Stream.DoesNotExist: - send_pm_if_empty_stream(None, realm, sender, stream_id=stream_id) - raise StreamWithIDDoesNotExistError(stream_id) - - return stream - - -def check_private_message_policy( - realm: Realm, sender: UserProfile, user_profiles: Sequence[UserProfile] -) -> None: - if realm.private_message_policy == Realm.PRIVATE_MESSAGE_POLICY_DISABLED: - if sender.is_bot or (len(user_profiles) == 1 and user_profiles[0].is_bot): - # We allow PMs only between users and bots, to avoid - # breaking the tutorial as well as automated - # notifications from system bots to users. - return - - raise JsonableError(_("Private messages are disabled in this organization.")) - - -# check_message: -# Returns message ready for sending with do_send_message on success or the error message (string) on error. -def check_message( - sender: UserProfile, - client: Client, - addressee: Addressee, - message_content_raw: str, - realm: Optional[Realm] = None, - forged: bool = False, - forged_timestamp: Optional[float] = None, - forwarder_user_profile: Optional[UserProfile] = None, - local_id: Optional[str] = None, - sender_queue_id: Optional[str] = None, - widget_content: Optional[str] = None, - email_gateway: bool = False, - *, - skip_stream_access_check: bool = False, - mention_backend: Optional[MentionBackend] = None, - limit_unread_user_ids: Optional[Set[int]] = None, -) -> SendMessageRequest: - """See - https://zulip.readthedocs.io/en/latest/subsystems/sending-messages.html - for high-level documentation on this subsystem. - """ - stream = None - - message_content = normalize_body(message_content_raw) - - if realm is None: - realm = sender.realm - - if addressee.is_stream(): - topic_name = addressee.topic() - topic_name = truncate_topic(topic_name) - - stream_name = addressee.stream_name() - stream_id = addressee.stream_id() - - if stream_name is not None: - stream = validate_stream_name_with_pm_notification(stream_name, realm, sender) - elif stream_id is not None: - stream = validate_stream_id_with_pm_notification(stream_id, realm, sender) - else: - stream = addressee.stream() - assert stream is not None - - # To save a database round trip, we construct the Recipient - # object for the Stream rather than fetching it from the - # database using the stream.recipient foreign key. - # - # This is simpler than ensuring that code paths that fetch a - # Stream that will be used for sending a message have a - # `select_related("recipient"), which would also needlessly - # expand Stream objects in memory (all the fields of Recipient - # are already known given the Stream object). - recipient = Recipient( - id=stream.recipient_id, - type_id=stream.id, - type=Recipient.STREAM, - ) - - if not skip_stream_access_check: - access_stream_for_send_message( - sender=sender, stream=stream, forwarder_user_profile=forwarder_user_profile - ) - else: - # Defensive assertion - the only currently supported use case - # for this option is for outgoing webhook bots and since this - # is security-sensitive code, it's beneficial to ensure nothing - # else can sneak past the access check. - assert sender.bot_type == sender.OUTGOING_WEBHOOK_BOT - - if realm.mandatory_topics and topic_name == "(no topic)": - raise JsonableError(_("Topics are required in this organization")) - - elif addressee.is_private(): - user_profiles = addressee.user_profiles() - mirror_message = client and client.name in [ - "zephyr_mirror", - "irc_mirror", - "jabber_mirror", - "JabberMirror", - ] - - check_private_message_policy(realm, sender, user_profiles) - - # API super-users who set the `forged` flag are allowed to - # forge messages sent by any user, so we disable the - # `forwarded_mirror_message` security check in that case. - forwarded_mirror_message = mirror_message and not forged - try: - recipient = recipient_for_user_profiles( - user_profiles, forwarded_mirror_message, forwarder_user_profile, sender - ) - except ValidationError as e: - assert isinstance(e.messages[0], str) - raise JsonableError(e.messages[0]) - else: - # This is defensive code--Addressee already validates - # the message type. - raise AssertionError("Invalid message type") - - message = Message() - message.sender = sender - message.content = message_content - message.recipient = recipient - if addressee.is_stream(): - message.set_topic_name(topic_name) - if forged and forged_timestamp is not None: - # Forged messages come with a timestamp - message.date_sent = timestamp_to_datetime(forged_timestamp) - else: - message.date_sent = timezone_now() - message.sending_client = client - - # We render messages later in the process. - assert message.rendered_content is None - - if client.name == "zephyr_mirror": - id = already_sent_mirrored_message_id(message) - if id is not None: - raise ZephyrMessageAlreadySentException(id) - - widget_content_dict = None - if widget_content is not None: - try: - widget_content_dict = orjson.loads(widget_content) - except orjson.JSONDecodeError: - raise JsonableError(_("Widgets: API programmer sent invalid JSON content")) - - try: - check_widget_content(widget_content_dict) - except ValidationError as error: - raise JsonableError( - _("Widgets: {error_msg}").format( - error_msg=error.message, - ) - ) - - message_send_dict = build_message_send_dict( - message=message, - stream=stream, - local_id=local_id, - sender_queue_id=sender_queue_id, - realm=realm, - widget_content_dict=widget_content_dict, - email_gateway=email_gateway, - mention_backend=mention_backend, - limit_unread_user_ids=limit_unread_user_ids, - ) - - if stream is not None and message_send_dict.rendering_result.mentions_wildcard: - if not wildcard_mention_allowed(sender, stream): - raise JsonableError( - _("You do not have permission to use wildcard mentions in this stream.") - ) - return message_send_dict - - -def _internal_prep_message( - realm: Realm, - sender: UserProfile, - addressee: Addressee, - content: str, - email_gateway: bool = False, - mention_backend: Optional[MentionBackend] = None, - limit_unread_user_ids: Optional[Set[int]] = None, -) -> Optional[SendMessageRequest]: - """ - Create a message object and checks it, but doesn't send it or save it to the database. - The internal function that calls this can therefore batch send a bunch of created - messages together as one database query. - Call do_send_messages with a list of the return values of this method. - """ - # Remove any null bytes from the content - if len(content) > settings.MAX_MESSAGE_LENGTH: - content = content[0:3900] + "\n\n[message was too long and has been truncated]" - - # If we have a stream name, and the stream doesn't exist, we - # create it here (though this code path should probably be removed - # eventually, moving that responsibility to the caller). If - # addressee.stream_name() is None (i.e. we're sending to a stream - # by ID), we skip this, as the stream object must already exist. - if addressee.is_stream(): - stream_name = addressee.stream_name() - if stream_name is not None: - ensure_stream(realm, stream_name, acting_user=sender) - - try: - return check_message( - sender, - get_client("Internal"), - addressee, - content, - realm=realm, - email_gateway=email_gateway, - mention_backend=mention_backend, - limit_unread_user_ids=limit_unread_user_ids, - ) - except JsonableError as e: - logging.exception( - "Error queueing internal message by %s: %s", - sender.delivery_email, - e.msg, - stack_info=True, - ) - - return None - - -def internal_prep_stream_message( - sender: UserProfile, - stream: Stream, - topic: str, - content: str, - email_gateway: bool = False, - limit_unread_user_ids: Optional[Set[int]] = None, -) -> Optional[SendMessageRequest]: - """ - See _internal_prep_message for details of how this works. - """ - realm = stream.realm - addressee = Addressee.for_stream(stream, topic) - - return _internal_prep_message( - realm=realm, - sender=sender, - addressee=addressee, - content=content, - email_gateway=email_gateway, - limit_unread_user_ids=limit_unread_user_ids, - ) - - -def internal_prep_stream_message_by_name( - realm: Realm, - sender: UserProfile, - stream_name: str, - topic: str, - content: str, -) -> Optional[SendMessageRequest]: - """ - See _internal_prep_message for details of how this works. - """ - addressee = Addressee.for_stream_name(stream_name, topic) - - return _internal_prep_message( - realm=realm, - sender=sender, - addressee=addressee, - content=content, - ) - - -def internal_prep_private_message( - realm: Realm, - sender: UserProfile, - recipient_user: UserProfile, - content: str, - mention_backend: Optional[MentionBackend] = None, -) -> Optional[SendMessageRequest]: - """ - See _internal_prep_message for details of how this works. - """ - addressee = Addressee.for_user_profile(recipient_user) - - return _internal_prep_message( - realm=realm, - sender=sender, - addressee=addressee, - content=content, - mention_backend=mention_backend, - ) - - -def internal_send_private_message( - sender: UserProfile, recipient_user: UserProfile, content: str -) -> Optional[int]: - realm = recipient_user.realm - message = internal_prep_private_message(realm, sender, recipient_user, content) - if message is None: - return None - message_ids = do_send_messages([message]) - return message_ids[0] - - -def internal_send_stream_message( - sender: UserProfile, - stream: Stream, - topic: str, - content: str, - email_gateway: bool = False, - limit_unread_user_ids: Optional[Set[int]] = None, -) -> Optional[int]: - - message = internal_prep_stream_message( - sender, stream, topic, content, email_gateway, limit_unread_user_ids=limit_unread_user_ids - ) - - if message is None: - return None - message_ids = do_send_messages([message]) - return message_ids[0] - - -def internal_send_stream_message_by_name( - realm: Realm, - sender: UserProfile, - stream_name: str, - topic: str, - content: str, -) -> Optional[int]: - message = internal_prep_stream_message_by_name( - realm, - sender, - stream_name, - topic, - content, - ) - - if message is None: - return None - message_ids = do_send_messages([message]) - return message_ids[0] - - -def internal_send_huddle_message( - realm: Realm, sender: UserProfile, emails: List[str], content: str -) -> Optional[int]: - addressee = Addressee.for_private(emails, realm) - message = _internal_prep_message( - realm=realm, - sender=sender, - addressee=addressee, - content=content, - ) - if message is None: - return None - message_ids = do_send_messages([message]) - return message_ids[0] - - def get_subscriber_ids(stream: Stream, requesting_user: Optional[UserProfile] = None) -> List[str]: subscriptions_query = get_subscribers_query(stream, requesting_user) return subscriptions_query.values_list("user_profile_id", flat=True) @@ -5617,77 +4078,6 @@ def do_delete_messages_by_sender(user: UserProfile) -> None: move_messages_to_archive(message_ids, chunk_size=retention.STREAM_MESSAGE_BATCH_SIZE) -class ActivePresenceIdleUserData(TypedDict): - alerted: bool - notifications_data: UserMessageNotificationsData - - -def get_active_presence_idle_user_ids( - realm: Realm, - sender_id: int, - active_users_data: List[ActivePresenceIdleUserData], -) -> List[int]: - """ - Given a list of active_user_ids, we build up a subset - of those users who fit these criteria: - - * They are likely to need notifications. - * They are no longer "present" according to the - UserPresence table. - """ - - if realm.presence_disabled: - return [] - - user_ids = set() - for user_data in active_users_data: - user_notifications_data: UserMessageNotificationsData = user_data["notifications_data"] - alerted = user_data["alerted"] - - # We only need to know the presence idle state for a user if this message would be notifiable - # for them if they were indeed idle. Only including those users in the calculation below is a - # very important optimization for open communities with many inactive users. - if user_notifications_data.is_notifiable(sender_id, idle=True) or alerted: - user_ids.add(user_notifications_data.user_id) - - return filter_presence_idle_user_ids(user_ids) - - -def filter_presence_idle_user_ids(user_ids: Set[int]) -> List[int]: - # Given a set of user IDs (the recipients of a message), accesses - # the UserPresence table to determine which of these users are - # currently idle and should potentially get email notifications - # (and push notifications with with - # user_profile.enable_online_push_notifications=False). - # - # We exclude any presence data from ZulipMobile for the purpose of - # triggering these notifications; the mobile app can more - # effectively do its own client-side filtering of notification - # sounds/etc. for the case that the user is actively doing a PM - # conversation in the app. - - if not user_ids: - return [] - - # Matches presence.js constant - OFFLINE_THRESHOLD_SECS = 140 - - recent = timezone_now() - datetime.timedelta(seconds=OFFLINE_THRESHOLD_SECS) - rows = ( - UserPresence.objects.filter( - user_profile_id__in=user_ids, - status=UserPresence.ACTIVE, - timestamp__gte=recent, - ) - .exclude(client__name="ZulipMobile") - .distinct("user_profile_id") - .values("user_profile_id") - ) - active_user_ids = {row["user_profile_id"] for row in rows} - idle_user_ids = user_ids - active_user_ids - return sorted(idle_user_ids) - - def email_not_system_bot(email: str) -> None: if is_cross_realm_bot_email(email): msg = email_reserved_for_system_bots_error(email) diff --git a/zerver/lib/bot_lib.py b/zerver/lib/bot_lib.py index e708f43fe8..47750d9e2b 100644 --- a/zerver/lib/bot_lib.py +++ b/zerver/lib/bot_lib.py @@ -6,7 +6,7 @@ from django.conf import settings from django.utils.translation import gettext as _ from zulip_bots.lib import BotIdentity, RateLimit -from zerver.lib.actions import ( +from zerver.actions.message_send import ( internal_send_huddle_message, internal_send_private_message, internal_send_stream_message_by_name, diff --git a/zerver/lib/email_mirror.py b/zerver/lib/email_mirror.py index c62c297052..a380e4518e 100644 --- a/zerver/lib/email_mirror.py +++ b/zerver/lib/email_mirror.py @@ -9,7 +9,7 @@ from django.conf import settings from django.utils.timezone import now as timezone_now from django.utils.timezone import timedelta -from zerver.lib.actions import ( +from zerver.actions.message_send import ( check_send_message, internal_send_huddle_message, internal_send_private_message, diff --git a/zerver/lib/error_notify.py b/zerver/lib/error_notify.py index c647c462d7..3e923c728c 100644 --- a/zerver/lib/error_notify.py +++ b/zerver/lib/error_notify.py @@ -5,8 +5,8 @@ from typing import Any, Dict from django.conf import settings from django.core.mail import mail_admins +from zerver.actions.message_send import internal_send_stream_message from zerver.filters import clean_data_from_query_parameters -from zerver.lib.actions import internal_send_stream_message from zerver.models import get_realm, get_stream, get_system_bot diff --git a/zerver/lib/onboarding.py b/zerver/lib/onboarding.py index aa05cc2f83..62bb1952a2 100644 --- a/zerver/lib/onboarding.py +++ b/zerver/lib/onboarding.py @@ -5,13 +5,12 @@ from django.db import transaction from django.db.models import Count from django.utils.translation import gettext as _ -from zerver.lib.actions import ( - do_add_reaction, +from zerver.actions.message_send import ( do_send_messages, internal_prep_stream_message_by_name, internal_send_private_message, - setup_realm_internal_bots, ) +from zerver.lib.actions import do_add_reaction, setup_realm_internal_bots from zerver.lib.emoji import emoji_name_to_emoji_code from zerver.lib.message import SendMessageRequest from zerver.models import Message, Realm, UserProfile, get_system_bot diff --git a/zerver/lib/outgoing_webhook.py b/zerver/lib/outgoing_webhook.py index c553f367cd..c0d4916da2 100644 --- a/zerver/lib/outgoing_webhook.py +++ b/zerver/lib/outgoing_webhook.py @@ -10,7 +10,7 @@ from django.utils.translation import gettext as _ from requests import Response from version import ZULIP_VERSION -from zerver.lib.actions import check_send_message +from zerver.actions.message_send import check_send_message from zerver.lib.exceptions import JsonableError from zerver.lib.message import MessageDict from zerver.lib.outgoing_http import OutgoingSession diff --git a/zerver/lib/test_classes.py b/zerver/lib/test_classes.py index fc2326bf56..33561f5e82 100644 --- a/zerver/lib/test_classes.py +++ b/zerver/lib/test_classes.py @@ -47,12 +47,11 @@ from fakeldap import MockLDAP from two_factor.models import PhoneDevice from corporate.models import Customer, CustomerPlan, LicenseLedger +from zerver.actions.message_send import check_send_message, check_send_stream_message from zerver.decorator import do_two_factor_login from zerver.lib.actions import ( bulk_add_subscriptions, bulk_remove_subscriptions, - check_send_message, - check_send_stream_message, do_set_realm_property, ) from zerver.lib.cache import bounce_key_prefix_for_testing diff --git a/zerver/lib/webhooks/common.py b/zerver/lib/webhooks/common.py index 5a7d41b9e3..4f220cb048 100644 --- a/zerver/lib/webhooks/common.py +++ b/zerver/lib/webhooks/common.py @@ -7,7 +7,7 @@ from urllib.parse import unquote from django.http import HttpRequest from django.utils.translation import gettext as _ -from zerver.lib.actions import ( +from zerver.actions.message_send import ( check_send_private_message, check_send_stream_message, check_send_stream_message_by_id, diff --git a/zerver/management/commands/deliver_scheduled_messages.py b/zerver/management/commands/deliver_scheduled_messages.py index 91882de4c8..87f6418442 100644 --- a/zerver/management/commands/deliver_scheduled_messages.py +++ b/zerver/management/commands/deliver_scheduled_messages.py @@ -8,7 +8,7 @@ from django.core.management.base import BaseCommand from django.db import transaction from django.utils.timezone import now as timezone_now -from zerver.lib.actions import build_message_send_dict, do_send_messages +from zerver.actions.message_send import build_message_send_dict, do_send_messages from zerver.lib.logging_util import log_to_file from zerver.lib.message import SendMessageRequest from zerver.models import Message, ScheduledMessage, get_user_by_delivery_email diff --git a/zerver/tests/test_event_system.py b/zerver/tests/test_event_system.py index 8f96ed92f7..8769060cf3 100644 --- a/zerver/tests/test_event_system.py +++ b/zerver/tests/test_event_system.py @@ -8,9 +8,10 @@ from django.http import HttpRequest, HttpResponse from django.utils.timezone import now as timezone_now from version import API_FEATURE_LEVEL, ZULIP_MERGE_BASE, ZULIP_VERSION +from zerver.actions.message_send import check_send_message from zerver.actions.presence import do_update_user_presence from zerver.actions.users import do_change_user_role -from zerver.lib.actions import check_send_message, do_set_realm_property +from zerver.lib.actions import do_set_realm_property from zerver.lib.event_schema import check_restart_event from zerver.lib.events import fetch_initial_state_data from zerver.lib.exceptions import AccessDeniedError diff --git a/zerver/tests/test_link_embed.py b/zerver/tests/test_link_embed.py index 521ec907dd..be5f7f2216 100644 --- a/zerver/tests/test_link_embed.py +++ b/zerver/tests/test_link_embed.py @@ -420,7 +420,7 @@ class PreviewTestCase(ZulipTestCase): self, sender: UserProfile, queue_should_run: bool = True, relative_url: bool = False ) -> Message: url = "http://test.org/" - with mock_queue_publish("zerver.lib.actions.queue_json_publish") as patched: + with mock_queue_publish("zerver.actions.message_send.queue_json_publish") as patched: msg_id = self.send_personal_message( sender, self.example_user("cordelia"), @@ -462,7 +462,7 @@ class PreviewTestCase(ZulipTestCase): self.login_user(user) original_url = "http://test.org/" edited_url = "http://edited.org/" - with mock_queue_publish("zerver.lib.actions.queue_json_publish") as patched: + with mock_queue_publish("zerver.actions.message_send.queue_json_publish") as patched: msg_id = self.send_stream_message( user, "Denmark", topic_name="foo", content=original_url ) @@ -573,7 +573,7 @@ class PreviewTestCase(ZulipTestCase): user = self.example_user("hamlet") self.login_user(user) url = "http://test.org/" - with mock_queue_publish("zerver.lib.actions.queue_json_publish") as patched: + with mock_queue_publish("zerver.actions.message_send.queue_json_publish") as patched: msg_id = self.send_stream_message(user, "Denmark", topic_name="foo", content=url) patched.assert_called_once() queue = patched.call_args[0][0] @@ -606,7 +606,7 @@ class PreviewTestCase(ZulipTestCase): @override_settings(INLINE_URL_EMBED_PREVIEW=True) def test_inline_relative_url_embed_preview(self) -> None: # Relative URLs should not be sent for URL preview. - with mock_queue_publish("zerver.lib.actions.queue_json_publish") as patched: + with mock_queue_publish("zerver.actions.message_send.queue_json_publish") as patched: self.send_personal_message( self.example_user("prospero"), self.example_user("cordelia"), @@ -677,7 +677,7 @@ class PreviewTestCase(ZulipTestCase): user = self.example_user("hamlet") self.login_user(user) url = "http://test.org/audio.mp3" - with mock_queue_publish("zerver.lib.actions.queue_json_publish") as patched: + with mock_queue_publish("zerver.actions.message_send.queue_json_publish") as patched: msg_id = self.send_stream_message(user, "Denmark", topic_name="foo", content=url) patched.assert_called_once() queue = patched.call_args[0][0] @@ -709,7 +709,7 @@ class PreviewTestCase(ZulipTestCase): user = self.example_user("hamlet") self.login_user(user) url = "http://test.org/foo.html" - with mock_queue_publish("zerver.lib.actions.queue_json_publish") as patched: + with mock_queue_publish("zerver.actions.message_send.queue_json_publish") as patched: msg_id = self.send_stream_message(user, "Denmark", topic_name="foo", content=url) patched.assert_called_once() queue = patched.call_args[0][0] @@ -744,7 +744,7 @@ class PreviewTestCase(ZulipTestCase): user = self.example_user("hamlet") self.login_user(user) url = "http://test.org/foo.html" - with mock_queue_publish("zerver.lib.actions.queue_json_publish") as patched: + with mock_queue_publish("zerver.actions.message_send.queue_json_publish") as patched: msg_id = self.send_stream_message(user, "Denmark", topic_name="foo", content=url) patched.assert_called_once() queue = patched.call_args[0][0] @@ -782,7 +782,7 @@ class PreviewTestCase(ZulipTestCase): user = self.example_user("hamlet") self.login_user(user) url = "http://test.org/foo.html" - with mock_queue_publish("zerver.lib.actions.queue_json_publish") as patched: + with mock_queue_publish("zerver.actions.message_send.queue_json_publish") as patched: msg_id = self.send_stream_message(user, "Denmark", topic_name="foo", content=url) patched.assert_called_once() queue = patched.call_args[0][0] @@ -819,7 +819,7 @@ class PreviewTestCase(ZulipTestCase): user = self.example_user("hamlet") self.login_user(user) url = "http://test.org/" - with mock_queue_publish("zerver.lib.actions.queue_json_publish") as patched: + with mock_queue_publish("zerver.actions.message_send.queue_json_publish") as patched: msg_id = self.send_stream_message(user, "Denmark", topic_name="foo", content=url) patched.assert_called_once() queue = patched.call_args[0][0] @@ -847,7 +847,7 @@ class PreviewTestCase(ZulipTestCase): @override_settings(INLINE_URL_EMBED_PREVIEW=True) def test_valid_content_type_error_get_data(self) -> None: url = "http://test.org/" - with mock_queue_publish("zerver.lib.actions.queue_json_publish"): + with mock_queue_publish("zerver.actions.message_send.queue_json_publish"): msg_id = self.send_personal_message( self.example_user("hamlet"), self.example_user("cordelia"), @@ -891,7 +891,7 @@ class PreviewTestCase(ZulipTestCase): def test_invalid_url(self) -> None: url = "http://test.org/" error_url = "http://test.org/x" - with mock_queue_publish("zerver.lib.actions.queue_json_publish"): + with mock_queue_publish("zerver.actions.message_send.queue_json_publish"): msg_id = self.send_personal_message( self.example_user("hamlet"), self.example_user("cordelia"), @@ -927,7 +927,7 @@ class PreviewTestCase(ZulipTestCase): @override_settings(INLINE_URL_EMBED_PREVIEW=True) def test_safe_oembed_html_url(self) -> None: url = "http://test.org/" - with mock_queue_publish("zerver.lib.actions.queue_json_publish"): + with mock_queue_publish("zerver.actions.message_send.queue_json_publish"): msg_id = self.send_personal_message( self.example_user("hamlet"), self.example_user("cordelia"), @@ -969,7 +969,7 @@ class PreviewTestCase(ZulipTestCase): @override_settings(INLINE_URL_EMBED_PREVIEW=True) def test_youtube_url_title_replaces_url(self) -> None: url = "https://www.youtube.com/watch?v=eSJTXC7Ixgg" - with mock_queue_publish("zerver.lib.actions.queue_json_publish"): + with mock_queue_publish("zerver.actions.message_send.queue_json_publish"): msg_id = self.send_personal_message( self.example_user("hamlet"), self.example_user("cordelia"), @@ -1005,7 +1005,7 @@ class PreviewTestCase(ZulipTestCase): @override_settings(INLINE_URL_EMBED_PREVIEW=True) def test_custom_title_replaces_youtube_url_title(self) -> None: url = "[YouTube link](https://www.youtube.com/watch?v=eSJTXC7Ixgg)" - with mock_queue_publish("zerver.lib.actions.queue_json_publish"): + with mock_queue_publish("zerver.actions.message_send.queue_json_publish"): msg_id = self.send_personal_message( self.example_user("hamlet"), self.example_user("cordelia"), diff --git a/zerver/tests/test_message_send.py b/zerver/tests/test_message_send.py index 5d561e29af..04528f7cdb 100644 --- a/zerver/tests/test_message_send.py +++ b/zerver/tests/test_message_send.py @@ -10,17 +10,11 @@ from django.http import HttpResponse from django.test import override_settings from django.utils.timezone import now as timezone_now -from zerver.actions.users import do_change_can_forge_sender, do_deactivate_user -from zerver.lib.actions import ( +from zerver.actions.message_send import ( build_message_send_dict, check_message, check_send_stream_message, - do_add_realm_domain, - do_change_stream_post_policy, - do_create_realm, - do_create_user, do_send_messages, - do_set_realm_property, extract_private_recipients, extract_stream_indicator, internal_prep_private_message, @@ -31,6 +25,14 @@ from zerver.lib.actions import ( internal_send_stream_message_by_name, send_rate_limited_pm_notification_to_bot_owner, ) +from zerver.actions.users import do_change_can_forge_sender, do_deactivate_user +from zerver.lib.actions import ( + do_add_realm_domain, + do_change_stream_post_policy, + do_create_realm, + do_create_user, + do_set_realm_property, +) from zerver.lib.addressee import Addressee from zerver.lib.cache import cache_delete, get_stream_cache_key from zerver.lib.exceptions import JsonableError diff --git a/zerver/tests/test_messages.py b/zerver/tests/test_messages.py index 003394063f..7adfde139a 100644 --- a/zerver/tests/test_messages.py +++ b/zerver/tests/test_messages.py @@ -3,7 +3,7 @@ from typing import List from django.utils.timezone import now as timezone_now -from zerver.lib.actions import get_active_presence_idle_user_ids +from zerver.actions.message_send import get_active_presence_idle_user_ids from zerver.lib.test_classes import ZulipTestCase from zerver.models import ( Message, diff --git a/zerver/tests/test_mirror_users.py b/zerver/tests/test_mirror_users.py index bb2363875b..dc2390d891 100644 --- a/zerver/tests/test_mirror_users.py +++ b/zerver/tests/test_mirror_users.py @@ -4,7 +4,7 @@ from unittest import mock from django.db import IntegrityError from django.utils.timezone import now as timezone_now -from zerver.lib.actions import create_mirror_user_if_needed +from zerver.actions.message_send import create_mirror_user_if_needed from zerver.lib.create_user import create_user_profile from zerver.lib.test_classes import ZulipTestCase from zerver.lib.test_helpers import reset_emails_in_zulip_realm @@ -172,7 +172,7 @@ class MirroredMessageUsersTest(ZulipTestCase): create_user_profile(**kwargs).save() raise IntegrityError() - with mock.patch("zerver.lib.actions.create_user", side_effect=create_user) as m: + with mock.patch("zerver.actions.message_send.create_user", side_effect=create_user) as m: mirror_fred_user = create_mirror_user_if_needed( realm, email, diff --git a/zerver/tests/test_presence.py b/zerver/tests/test_presence.py index 95d1be5ff6..64fc5cd641 100644 --- a/zerver/tests/test_presence.py +++ b/zerver/tests/test_presence.py @@ -254,7 +254,7 @@ class UserPresenceTests(ZulipTestCase): def test_filter_presence_idle_user_ids(self) -> None: user_profile = self.example_user("hamlet") - from zerver.lib.actions import filter_presence_idle_user_ids + from zerver.actions.message_send import filter_presence_idle_user_ids self.login("hamlet") diff --git a/zerver/tests/test_retention.py b/zerver/tests/test_retention.py index 0a49c908c0..71a03bf90c 100644 --- a/zerver/tests/test_retention.py +++ b/zerver/tests/test_retention.py @@ -5,13 +5,9 @@ from unittest import mock from django.conf import settings from django.utils.timezone import now as timezone_now +from zerver.actions.message_send import internal_send_private_message from zerver.actions.submessage import do_add_submessage -from zerver.lib.actions import ( - do_create_realm, - do_delete_messages, - do_set_realm_property, - internal_send_private_message, -) +from zerver.lib.actions import do_create_realm, do_delete_messages, do_set_realm_property from zerver.lib.retention import ( archive_messages, clean_archived_data, diff --git a/zerver/tests/test_service_bot_system.py b/zerver/tests/test_service_bot_system.py index 3a364a8342..01e02b06fe 100644 --- a/zerver/tests/test_service_bot_system.py +++ b/zerver/tests/test_service_bot_system.py @@ -6,7 +6,8 @@ import orjson from django.conf import settings from django.test import override_settings -from zerver.lib.actions import do_create_user, get_service_bot_events +from zerver.actions.message_send import get_service_bot_events +from zerver.lib.actions import do_create_user from zerver.lib.bot_config import ConfigError, load_bot_config_template, set_bot_config from zerver.lib.bot_lib import EmbeddedBotEmptyRecipientsList, EmbeddedBotHandler, StateHandler from zerver.lib.bot_storage import StateError @@ -452,7 +453,7 @@ class TestServiceBotEventTriggers(ZulipTestCase): ) @for_all_bot_types - @patch_queue_publish("zerver.lib.actions.queue_json_publish") + @patch_queue_publish("zerver.actions.message_send.queue_json_publish") def test_trigger_on_stream_mention_from_user(self, mock_queue_json_publish: mock.Mock) -> None: content = "@**FooBot** foo bar!!!" recipient = "Denmark" @@ -478,7 +479,7 @@ class TestServiceBotEventTriggers(ZulipTestCase): self.send_stream_message(self.user_profile, "Denmark", content) self.assertTrue(mock_queue_json_publish.called) - @patch_queue_publish("zerver.lib.actions.queue_json_publish") + @patch_queue_publish("zerver.actions.message_send.queue_json_publish") def test_no_trigger_on_stream_message_without_mention( self, mock_queue_json_publish: mock.Mock ) -> None: @@ -487,7 +488,7 @@ class TestServiceBotEventTriggers(ZulipTestCase): self.assertFalse(mock_queue_json_publish.called) @for_all_bot_types - @patch_queue_publish("zerver.lib.actions.queue_json_publish") + @patch_queue_publish("zerver.actions.message_send.queue_json_publish") def test_no_trigger_on_stream_mention_from_bot( self, mock_queue_json_publish: mock.Mock ) -> None: @@ -495,7 +496,7 @@ class TestServiceBotEventTriggers(ZulipTestCase): self.assertFalse(mock_queue_json_publish.called) @for_all_bot_types - @patch_queue_publish("zerver.lib.actions.queue_json_publish") + @patch_queue_publish("zerver.actions.message_send.queue_json_publish") def test_trigger_on_personal_message_from_user( self, mock_queue_json_publish: mock.Mock ) -> None: @@ -525,7 +526,7 @@ class TestServiceBotEventTriggers(ZulipTestCase): self.assertTrue(mock_queue_json_publish.called) @for_all_bot_types - @patch_queue_publish("zerver.lib.actions.queue_json_publish") + @patch_queue_publish("zerver.actions.message_send.queue_json_publish") def test_no_trigger_on_personal_message_from_bot( self, mock_queue_json_publish: mock.Mock ) -> None: @@ -535,7 +536,7 @@ class TestServiceBotEventTriggers(ZulipTestCase): self.assertFalse(mock_queue_json_publish.called) @for_all_bot_types - @patch_queue_publish("zerver.lib.actions.queue_json_publish") + @patch_queue_publish("zerver.actions.message_send.queue_json_publish") def test_trigger_on_huddle_message_from_user(self, mock_queue_json_publish: mock.Mock) -> None: self.second_bot_profile.bot_type = self.bot_profile.bot_type self.second_bot_profile.save() @@ -563,7 +564,7 @@ class TestServiceBotEventTriggers(ZulipTestCase): self.assertEqual(mock_queue_json_publish.call_count, 2) @for_all_bot_types - @patch_queue_publish("zerver.lib.actions.queue_json_publish") + @patch_queue_publish("zerver.actions.message_send.queue_json_publish") def test_no_trigger_on_huddle_message_from_bot( self, mock_queue_json_publish: mock.Mock ) -> None: diff --git a/zerver/tests/test_tutorial.py b/zerver/tests/test_tutorial.py index cf9716c1f5..b35cf10fc7 100644 --- a/zerver/tests/test_tutorial.py +++ b/zerver/tests/test_tutorial.py @@ -1,6 +1,6 @@ from django.conf import settings -from zerver.lib.actions import internal_send_private_message +from zerver.actions.message_send import internal_send_private_message from zerver.lib.test_classes import ZulipTestCase from zerver.lib.test_helpers import message_stream_count, most_recent_message from zerver.models import UserProfile, get_system_bot diff --git a/zerver/tests/test_upload.py b/zerver/tests/test_upload.py index 8393afc938..a769f1c695 100644 --- a/zerver/tests/test_upload.py +++ b/zerver/tests/test_upload.py @@ -19,16 +19,12 @@ from django_sendfile.utils import _get_sendfile from PIL import Image import zerver.lib.upload +from zerver.actions.message_send import internal_send_private_message from zerver.actions.realm_icon import do_change_icon_source from zerver.actions.realm_logo import do_change_logo_source from zerver.actions.uploads import do_delete_old_unclaimed_attachments from zerver.actions.user_settings import do_delete_avatar_image -from zerver.lib.actions import ( - do_change_realm_plan_type, - do_create_realm, - do_set_realm_property, - internal_send_private_message, -) +from zerver.lib.actions import do_change_realm_plan_type, do_create_realm, do_set_realm_property from zerver.lib.avatar import avatar_url, get_avatar_field from zerver.lib.avatar_hash import user_avatar_path from zerver.lib.cache import cache_get, get_realm_used_upload_space_cache_key diff --git a/zerver/tests/test_users.py b/zerver/tests/test_users.py index 684d94f405..67ff8950ba 100644 --- a/zerver/tests/test_users.py +++ b/zerver/tests/test_users.py @@ -13,6 +13,7 @@ from django.utils.timezone import now as timezone_now from confirmation.models import Confirmation from zerver.actions.invites import do_create_multiuse_invite_link, do_invite_users +from zerver.actions.message_send import get_recipient_info from zerver.actions.users import ( change_user_is_active, do_change_can_create_users, @@ -25,7 +26,6 @@ from zerver.lib.actions import ( do_mute_user, do_reactivate_user, do_set_realm_property, - get_recipient_info, ) from zerver.lib.avatar import avatar_url, get_gravatar_url from zerver.lib.bulk_create import create_users diff --git a/zerver/views/message_send.py b/zerver/views/message_send.py index 7e1babdb78..49d0d7a437 100644 --- a/zerver/views/message_send.py +++ b/zerver/views/message_send.py @@ -8,7 +8,7 @@ from django.http import HttpRequest, HttpResponse from django.utils.timezone import now as timezone_now from django.utils.translation import gettext as _ -from zerver.lib.actions import ( +from zerver.actions.message_send import ( check_schedule_message, check_send_message, compute_irc_user_fullname, diff --git a/zerver/views/streams.py b/zerver/views/streams.py index 316b38ea1d..daa8b8bcc6 100644 --- a/zerver/views/streams.py +++ b/zerver/views/streams.py @@ -21,6 +21,11 @@ from zerver.actions.default_streams import ( do_remove_streams_from_default_stream_group, get_default_streams_for_realm, ) +from zerver.actions.message_send import ( + do_send_messages, + internal_prep_private_message, + internal_prep_stream_message, +) from zerver.context_processors import get_valid_realm_from_request from zerver.decorator import ( authenticated_json_view, @@ -39,10 +44,7 @@ from zerver.lib.actions import ( do_deactivate_stream, do_delete_messages, do_rename_stream, - do_send_messages, get_subscriber_ids, - internal_prep_private_message, - internal_prep_stream_message, ) from zerver.lib.exceptions import ( ErrorCode, diff --git a/zerver/webhooks/dialogflow/view.py b/zerver/webhooks/dialogflow/view.py index 74fa022f90..005f74a1a7 100644 --- a/zerver/webhooks/dialogflow/view.py +++ b/zerver/webhooks/dialogflow/view.py @@ -3,8 +3,8 @@ from typing import Any, Dict from django.http import HttpRequest, HttpResponse +from zerver.actions.message_send import check_send_private_message from zerver.decorator import webhook_view -from zerver.lib.actions import check_send_private_message from zerver.lib.request import REQ, RequestNotes, has_request_variables from zerver.lib.response import json_success from zerver.models import UserProfile, get_user diff --git a/zerver/webhooks/freshstatus/view.py b/zerver/webhooks/freshstatus/view.py index f42bdb19fa..750244a0c3 100644 --- a/zerver/webhooks/freshstatus/view.py +++ b/zerver/webhooks/freshstatus/view.py @@ -4,8 +4,8 @@ import dateutil.parser from django.http import HttpRequest, HttpResponse from django.utils.translation import gettext as _ +from zerver.actions.message_send import send_rate_limited_pm_notification_to_bot_owner from zerver.decorator import webhook_view -from zerver.lib.actions import send_rate_limited_pm_notification_to_bot_owner from zerver.lib.exceptions import JsonableError from zerver.lib.request import REQ, has_request_variables from zerver.lib.response import json_success diff --git a/zerver/webhooks/slack/view.py b/zerver/webhooks/slack/view.py index 64628a5914..e5e9e8af2f 100644 --- a/zerver/webhooks/slack/view.py +++ b/zerver/webhooks/slack/view.py @@ -2,8 +2,8 @@ from django.http import HttpRequest from django.http.response import HttpResponse from django.utils.translation import gettext as _ +from zerver.actions.message_send import check_send_stream_message from zerver.decorator import webhook_view -from zerver.lib.actions import check_send_stream_message from zerver.lib.exceptions import JsonableError from zerver.lib.request import REQ, RequestNotes, has_request_variables from zerver.lib.response import json_success diff --git a/zerver/webhooks/teamcity/view.py b/zerver/webhooks/teamcity/view.py index 03cb86e688..596447f954 100644 --- a/zerver/webhooks/teamcity/view.py +++ b/zerver/webhooks/teamcity/view.py @@ -5,11 +5,11 @@ from typing import Any, Dict, List, Optional from django.db.models import Q from django.http import HttpRequest, HttpResponse -from zerver.decorator import webhook_view -from zerver.lib.actions import ( +from zerver.actions.message_send import ( check_send_private_message, send_rate_limited_pm_notification_to_bot_owner, ) +from zerver.decorator import webhook_view from zerver.lib.request import REQ, RequestNotes, has_request_variables from zerver.lib.response import json_success from zerver.lib.send_email import FromAddress diff --git a/zerver/webhooks/uptimerobot/view.py b/zerver/webhooks/uptimerobot/view.py index 3cc1824c43..7889d96c5c 100644 --- a/zerver/webhooks/uptimerobot/view.py +++ b/zerver/webhooks/uptimerobot/view.py @@ -3,8 +3,8 @@ from typing import Any, Dict from django.http import HttpRequest, HttpResponse from django.utils.translation import gettext as _ +from zerver.actions.message_send import send_rate_limited_pm_notification_to_bot_owner from zerver.decorator import webhook_view -from zerver.lib.actions import send_rate_limited_pm_notification_to_bot_owner from zerver.lib.exceptions import JsonableError from zerver.lib.request import REQ, has_request_variables from zerver.lib.response import json_success diff --git a/zerver/webhooks/yo/view.py b/zerver/webhooks/yo/view.py index 0a80c1214b..6556deaab5 100644 --- a/zerver/webhooks/yo/view.py +++ b/zerver/webhooks/yo/view.py @@ -3,8 +3,8 @@ from typing import Optional from django.http import HttpRequest, HttpResponse +from zerver.actions.message_send import check_send_private_message from zerver.decorator import webhook_view -from zerver.lib.actions import check_send_private_message from zerver.lib.request import REQ, RequestNotes, has_request_variables from zerver.lib.response import json_success from zerver.models import UserProfile, get_user diff --git a/zerver/webhooks/zabbix/view.py b/zerver/webhooks/zabbix/view.py index 94b1d4f0d7..be40a4f3de 100644 --- a/zerver/webhooks/zabbix/view.py +++ b/zerver/webhooks/zabbix/view.py @@ -3,8 +3,8 @@ from typing import Any, Dict from django.http import HttpRequest, HttpResponse from django.utils.translation import gettext as _ +from zerver.actions.message_send import send_rate_limited_pm_notification_to_bot_owner from zerver.decorator import webhook_view -from zerver.lib.actions import send_rate_limited_pm_notification_to_bot_owner from zerver.lib.exceptions import JsonableError from zerver.lib.request import REQ, has_request_variables from zerver.lib.response import json_success diff --git a/zerver/worker/queue_processors.py b/zerver/worker/queue_processors.py index 9efd781dfd..0548746306 100644 --- a/zerver/worker/queue_processors.py +++ b/zerver/worker/queue_processors.py @@ -47,16 +47,12 @@ from sentry_sdk import add_breadcrumb, configure_scope from zulip_bots.lib import extract_query_without_mention from zerver.actions.invites import do_send_confirmation_email +from zerver.actions.message_send import internal_send_private_message, render_incoming_message from zerver.actions.presence import do_update_user_presence from zerver.actions.realm_export import notify_realm_export from zerver.actions.user_activity import do_update_user_activity, do_update_user_activity_interval from zerver.context_processors import common_context -from zerver.lib.actions import ( - do_mark_stream_messages_as_read, - do_update_embedded_data, - internal_send_private_message, - render_incoming_message, -) +from zerver.lib.actions import do_mark_stream_messages_as_read, do_update_embedded_data from zerver.lib.bot_lib import EmbeddedBotHandler, EmbeddedBotQuitException, get_bot_handler from zerver.lib.context_managers import lockfile from zerver.lib.db import reset_queries diff --git a/zilencer/management/commands/add_mock_conversation.py b/zilencer/management/commands/add_mock_conversation.py index 9455bb9f0e..7f041079af 100644 --- a/zilencer/management/commands/add_mock_conversation.py +++ b/zilencer/management/commands/add_mock_conversation.py @@ -2,14 +2,9 @@ from typing import Any, Dict, List from django.core.management.base import BaseCommand +from zerver.actions.message_send import do_send_messages, internal_prep_stream_message from zerver.actions.user_settings import do_change_avatar_fields -from zerver.lib.actions import ( - bulk_add_subscriptions, - do_add_reaction, - do_create_user, - do_send_messages, - internal_prep_stream_message, -) +from zerver.lib.actions import bulk_add_subscriptions, do_add_reaction, do_create_user from zerver.lib.emoji import emoji_name_to_emoji_code from zerver.lib.streams import ensure_stream from zerver.lib.upload import upload_avatar_image diff --git a/zilencer/management/commands/populate_db.py b/zilencer/management/commands/populate_db.py index d98eaaad6b..b4a9a71dac 100644 --- a/zilencer/management/commands/populate_db.py +++ b/zilencer/management/commands/populate_db.py @@ -23,9 +23,10 @@ from zerver.actions.custom_profile_fields import ( try_add_realm_custom_profile_field, try_add_realm_default_custom_profile_field, ) +from zerver.actions.message_send import build_message_send_dict, do_send_messages from zerver.actions.realm_emoji import check_add_realm_emoji from zerver.actions.users import do_change_user_role -from zerver.lib.actions import build_message_send_dict, do_create_realm, do_send_messages +from zerver.lib.actions import do_create_realm from zerver.lib.bulk_create import bulk_create_streams from zerver.lib.cache import cache_set from zerver.lib.generate_test_data import create_test_data, generate_topics diff --git a/zproject/default_settings.py b/zproject/default_settings.py index e89c296767..679b1a9921 100644 --- a/zproject/default_settings.py +++ b/zproject/default_settings.py @@ -469,7 +469,7 @@ OUTGOING_WEBHOOK_TIMEOUT_SECONDS = 10 # Maximum length of message content allowed. # Any message content exceeding this limit will be truncated. -# See: `_internal_prep_message` function in zerver/lib/actions.py. +# See: `_internal_prep_message` function in zerver/actions/message_send.py. MAX_MESSAGE_LENGTH = 10000 # The maximum number of drafts to send in the response to /register.