mirror of https://github.com/zulip/zulip.git
1644 lines
57 KiB
Python
1644 lines
57 KiB
Python
import copy
|
|
import datetime
|
|
import zlib
|
|
from dataclasses import dataclass, field
|
|
from typing import (
|
|
TYPE_CHECKING,
|
|
Any,
|
|
Collection,
|
|
Dict,
|
|
List,
|
|
Mapping,
|
|
Optional,
|
|
Sequence,
|
|
Set,
|
|
Tuple,
|
|
TypedDict,
|
|
Union,
|
|
)
|
|
|
|
import ahocorasick
|
|
import orjson
|
|
from django.conf import settings
|
|
from django.db import connection
|
|
from django.db.models import Max, Sum
|
|
from django.utils.timezone import now as timezone_now
|
|
from django.utils.translation import gettext as _
|
|
from psycopg2.sql import SQL
|
|
|
|
from analytics.lib.counts import COUNT_STATS
|
|
from analytics.models import RealmCount
|
|
from zerver.lib.avatar import get_avatar_field
|
|
from zerver.lib.cache import (
|
|
cache_set_many,
|
|
cache_with_key,
|
|
generic_bulk_cached_fetch,
|
|
to_dict_cache_key,
|
|
to_dict_cache_key_id,
|
|
)
|
|
from zerver.lib.display_recipient import bulk_fetch_display_recipients
|
|
from zerver.lib.exceptions import JsonableError, MissingAuthenticationError
|
|
from zerver.lib.markdown import MessageRenderingResult, markdown_convert, topic_links
|
|
from zerver.lib.markdown import version as markdown_version
|
|
from zerver.lib.mention import MentionData
|
|
from zerver.lib.request import RequestVariableConversionError
|
|
from zerver.lib.stream_subscription import (
|
|
get_stream_subscriptions_for_user,
|
|
get_subscribed_stream_recipient_ids_for_user,
|
|
num_subscribers_for_stream_id,
|
|
)
|
|
from zerver.lib.streams import get_web_public_streams_queryset
|
|
from zerver.lib.timestamp import datetime_to_timestamp
|
|
from zerver.lib.topic import DB_TOPIC_NAME, MESSAGE__TOPIC, TOPIC_LINKS, TOPIC_NAME
|
|
from zerver.lib.types import DisplayRecipientT, EditHistoryEvent, UserDisplayRecipient
|
|
from zerver.lib.url_preview.types import UrlEmbedData
|
|
from zerver.lib.user_topics import build_topic_mute_checker, topic_is_muted
|
|
from zerver.models import (
|
|
MAX_TOPIC_NAME_LENGTH,
|
|
Message,
|
|
Reaction,
|
|
Realm,
|
|
Recipient,
|
|
Stream,
|
|
SubMessage,
|
|
Subscription,
|
|
UserMessage,
|
|
UserProfile,
|
|
get_display_recipient_by_id,
|
|
get_usermessage_by_message_id,
|
|
query_for_ids,
|
|
)
|
|
|
|
if TYPE_CHECKING:
|
|
from django.db.models.query import _QuerySet as ValuesQuerySet
|
|
|
|
|
|
class MessageDetailsDict(TypedDict, total=False):
|
|
type: str
|
|
mentioned: bool
|
|
user_ids: List[int]
|
|
stream_id: int
|
|
topic: str
|
|
unmuted_stream_msg: bool
|
|
|
|
|
|
class RawReactionRow(TypedDict):
|
|
emoji_code: str
|
|
emoji_name: str
|
|
message_id: int
|
|
reaction_type: str
|
|
user_profile__email: str
|
|
user_profile__full_name: str
|
|
user_profile_id: int
|
|
|
|
|
|
class RawUnreadStreamDict(TypedDict):
|
|
stream_id: int
|
|
topic: str
|
|
|
|
|
|
class RawUnreadPrivateMessageDict(TypedDict):
|
|
other_user_id: int
|
|
|
|
|
|
class RawUnreadHuddleDict(TypedDict):
|
|
user_ids_string: str
|
|
|
|
|
|
class RawUnreadMessagesResult(TypedDict):
|
|
pm_dict: Dict[int, RawUnreadPrivateMessageDict]
|
|
stream_dict: Dict[int, RawUnreadStreamDict]
|
|
huddle_dict: Dict[int, RawUnreadHuddleDict]
|
|
mentions: Set[int]
|
|
muted_stream_ids: List[int]
|
|
unmuted_stream_msgs: Set[int]
|
|
old_unreads_missing: bool
|
|
|
|
|
|
class UnreadStreamInfo(TypedDict):
|
|
stream_id: int
|
|
topic: str
|
|
unread_message_ids: List[int]
|
|
|
|
|
|
class UnreadPrivateMessageInfo(TypedDict):
|
|
other_user_id: int
|
|
# Deprecated and misleading synonym for other_user_id
|
|
sender_id: int
|
|
unread_message_ids: List[int]
|
|
|
|
|
|
class UnreadHuddleInfo(TypedDict):
|
|
user_ids_string: str
|
|
unread_message_ids: List[int]
|
|
|
|
|
|
class UnreadMessagesResult(TypedDict):
|
|
pms: List[UnreadPrivateMessageInfo]
|
|
streams: List[UnreadStreamInfo]
|
|
huddles: List[UnreadHuddleInfo]
|
|
mentions: List[int]
|
|
count: int
|
|
old_unreads_missing: bool
|
|
|
|
|
|
@dataclass
|
|
class SendMessageRequest:
|
|
message: Message
|
|
rendering_result: MessageRenderingResult
|
|
stream: Optional[Stream]
|
|
local_id: Optional[str]
|
|
sender_queue_id: Optional[str]
|
|
realm: Realm
|
|
mention_data: MentionData
|
|
mentioned_user_groups_map: Dict[int, int]
|
|
active_user_ids: Set[int]
|
|
online_push_user_ids: Set[int]
|
|
pm_mention_push_disabled_user_ids: Set[int]
|
|
pm_mention_email_disabled_user_ids: Set[int]
|
|
stream_push_user_ids: Set[int]
|
|
stream_email_user_ids: Set[int]
|
|
muted_sender_user_ids: Set[int]
|
|
um_eligible_user_ids: Set[int]
|
|
long_term_idle_user_ids: Set[int]
|
|
default_bot_user_ids: Set[int]
|
|
service_bot_tuples: List[Tuple[int, int]]
|
|
all_bot_user_ids: Set[int]
|
|
wildcard_mention_user_ids: Set[int]
|
|
links_for_embed: Set[str]
|
|
widget_content: Optional[Dict[str, Any]]
|
|
submessages: List[Dict[str, Any]] = field(default_factory=list)
|
|
deliver_at: Optional[datetime.datetime] = None
|
|
delivery_type: Optional[str] = None
|
|
limit_unread_user_ids: Optional[Set[int]] = None
|
|
|
|
|
|
# We won't try to fetch more unread message IDs from the database than
|
|
# this limit. The limit is super high, in large part because it means
|
|
# client-side code mostly doesn't need to think about the case that a
|
|
# user has more older unread messages that were cut off.
|
|
MAX_UNREAD_MESSAGES = 50000
|
|
|
|
|
|
def truncate_content(content: str, max_length: int, truncation_message: str) -> str:
|
|
if len(content) > max_length:
|
|
content = content[: max_length - len(truncation_message)] + truncation_message
|
|
return content
|
|
|
|
|
|
def normalize_body(body: str) -> str:
|
|
body = body.rstrip().lstrip("\n")
|
|
if len(body) == 0:
|
|
raise JsonableError(_("Message must not be empty"))
|
|
if "\x00" in body:
|
|
raise JsonableError(_("Message must not contain null bytes"))
|
|
return truncate_content(body, settings.MAX_MESSAGE_LENGTH, "\n[message truncated]")
|
|
|
|
|
|
def truncate_topic(topic: str) -> str:
|
|
return truncate_content(topic, MAX_TOPIC_NAME_LENGTH, "...")
|
|
|
|
|
|
def messages_for_ids(
|
|
message_ids: List[int],
|
|
user_message_flags: Dict[int, List[str]],
|
|
search_fields: Dict[int, Dict[str, str]],
|
|
apply_markdown: bool,
|
|
client_gravatar: bool,
|
|
allow_edit_history: bool,
|
|
) -> List[Dict[str, Any]]:
|
|
|
|
cache_transformer = MessageDict.build_dict_from_raw_db_row
|
|
id_fetcher = lambda row: row["id"]
|
|
|
|
message_dicts = generic_bulk_cached_fetch(
|
|
to_dict_cache_key_id,
|
|
MessageDict.get_raw_db_rows,
|
|
message_ids,
|
|
id_fetcher=id_fetcher,
|
|
cache_transformer=cache_transformer,
|
|
extractor=extract_message_dict,
|
|
setter=stringify_message_dict,
|
|
)
|
|
|
|
message_list: List[Dict[str, Any]] = []
|
|
|
|
for message_id in message_ids:
|
|
msg_dict = message_dicts[message_id]
|
|
msg_dict.update(flags=user_message_flags[message_id])
|
|
if message_id in search_fields:
|
|
msg_dict.update(search_fields[message_id])
|
|
# Make sure that we never send message edit history to clients
|
|
# in realms with allow_edit_history disabled.
|
|
if "edit_history" in msg_dict and not allow_edit_history:
|
|
del msg_dict["edit_history"]
|
|
message_list.append(msg_dict)
|
|
|
|
MessageDict.post_process_dicts(message_list, apply_markdown, client_gravatar)
|
|
|
|
return message_list
|
|
|
|
|
|
def sew_messages_and_reactions(
|
|
messages: List[Dict[str, Any]], reactions: List[Dict[str, Any]]
|
|
) -> List[Dict[str, Any]]:
|
|
"""Given a iterable of messages and reactions stitch reactions
|
|
into messages.
|
|
"""
|
|
# Add all messages with empty reaction item
|
|
for message in messages:
|
|
message["reactions"] = []
|
|
|
|
# Convert list of messages into dictionary to make reaction stitching easy
|
|
converted_messages = {message["id"]: message for message in messages}
|
|
|
|
for reaction in reactions:
|
|
converted_messages[reaction["message_id"]]["reactions"].append(reaction)
|
|
|
|
return list(converted_messages.values())
|
|
|
|
|
|
def sew_messages_and_submessages(
|
|
messages: List[Dict[str, Any]], submessages: List[Dict[str, Any]]
|
|
) -> None:
|
|
# This is super similar to sew_messages_and_reactions.
|
|
for message in messages:
|
|
message["submessages"] = []
|
|
|
|
message_dict = {message["id"]: message for message in messages}
|
|
|
|
for submessage in submessages:
|
|
message_id = submessage["message_id"]
|
|
if message_id in message_dict:
|
|
message = message_dict[message_id]
|
|
message["submessages"].append(submessage)
|
|
|
|
|
|
def extract_message_dict(message_bytes: bytes) -> Dict[str, Any]:
|
|
return orjson.loads(zlib.decompress(message_bytes))
|
|
|
|
|
|
def stringify_message_dict(message_dict: Dict[str, Any]) -> bytes:
|
|
return zlib.compress(orjson.dumps(message_dict))
|
|
|
|
|
|
@cache_with_key(to_dict_cache_key, timeout=3600 * 24)
|
|
def message_to_dict_json(message: Message, realm_id: Optional[int] = None) -> bytes:
|
|
return MessageDict.to_dict_uncached([message], realm_id)[message.id]
|
|
|
|
|
|
def save_message_rendered_content(message: Message, content: str) -> str:
|
|
rendering_result = render_markdown(message, content, realm=message.get_realm())
|
|
rendered_content = None
|
|
if rendering_result is not None:
|
|
rendered_content = rendering_result.rendered_content
|
|
message.rendered_content = rendered_content
|
|
message.rendered_content_version = markdown_version
|
|
message.save_rendered_content()
|
|
return rendered_content
|
|
|
|
|
|
class MessageDict:
|
|
"""MessageDict is the core class responsible for marshalling Message
|
|
objects obtained from the database into a format that can be sent
|
|
to clients via the Zulip API, whether via `GET /messages`,
|
|
outgoing webhooks, or other code paths. There are two core flows through
|
|
which this class is used:
|
|
|
|
* For just-sent messages, we construct a single `wide_dict` object
|
|
containing all the data for the message and the related
|
|
UserProfile models (sender_info and recipient_info); this object
|
|
can be stored in queues, caches, etc., and then later turned
|
|
into an API-format JSONable dictionary via finalize_payload.
|
|
|
|
* When fetching messages from the database, we fetch their data in
|
|
bulk using messages_for_ids, which makes use of caching, bulk
|
|
fetches that skip the Django ORM, etc., to provide an optimized
|
|
interface for fetching hundreds of thousands of messages from
|
|
the database and then turning them into API-format JSON
|
|
dictionaries.
|
|
|
|
"""
|
|
|
|
@staticmethod
|
|
def wide_dict(message: Message, realm_id: Optional[int] = None) -> Dict[str, Any]:
|
|
"""
|
|
The next two lines get the cacheable field related
|
|
to our message object, with the side effect of
|
|
populating the cache.
|
|
"""
|
|
json = message_to_dict_json(message, realm_id)
|
|
obj = extract_message_dict(json)
|
|
|
|
"""
|
|
The steps below are similar to what we do in
|
|
post_process_dicts(), except we don't call finalize_payload(),
|
|
since that step happens later in the queue
|
|
processor.
|
|
"""
|
|
MessageDict.bulk_hydrate_sender_info([obj])
|
|
MessageDict.bulk_hydrate_recipient_info([obj])
|
|
|
|
return obj
|
|
|
|
@staticmethod
|
|
def post_process_dicts(
|
|
objs: List[Dict[str, Any]], apply_markdown: bool, client_gravatar: bool
|
|
) -> None:
|
|
"""
|
|
NOTE: This function mutates the objects in
|
|
the `objs` list, rather than making
|
|
shallow copies. It might be safer to
|
|
make shallow copies here, but performance
|
|
is somewhat important here, as we are
|
|
often fetching hundreds of messages.
|
|
"""
|
|
MessageDict.bulk_hydrate_sender_info(objs)
|
|
MessageDict.bulk_hydrate_recipient_info(objs)
|
|
|
|
for obj in objs:
|
|
MessageDict.finalize_payload(obj, apply_markdown, client_gravatar, skip_copy=True)
|
|
|
|
@staticmethod
|
|
def finalize_payload(
|
|
obj: Dict[str, Any],
|
|
apply_markdown: bool,
|
|
client_gravatar: bool,
|
|
keep_rendered_content: bool = False,
|
|
skip_copy: bool = False,
|
|
) -> Dict[str, Any]:
|
|
"""
|
|
By default, we make a shallow copy of the incoming dict to avoid
|
|
mutation-related bugs. Code paths that are passing a unique object
|
|
can pass skip_copy=True to avoid this extra work.
|
|
"""
|
|
if not skip_copy:
|
|
obj = copy.copy(obj)
|
|
|
|
MessageDict.set_sender_avatar(obj, client_gravatar)
|
|
if apply_markdown:
|
|
obj["content_type"] = "text/html"
|
|
obj["content"] = obj["rendered_content"]
|
|
else:
|
|
obj["content_type"] = "text/x-markdown"
|
|
|
|
if not keep_rendered_content:
|
|
del obj["rendered_content"]
|
|
del obj["sender_realm_id"]
|
|
del obj["sender_avatar_source"]
|
|
del obj["sender_delivery_email"]
|
|
del obj["sender_avatar_version"]
|
|
|
|
del obj["recipient_type"]
|
|
del obj["recipient_type_id"]
|
|
del obj["sender_is_mirror_dummy"]
|
|
return obj
|
|
|
|
@staticmethod
|
|
def sew_submessages_and_reactions_to_msgs(
|
|
messages: List[Dict[str, Any]]
|
|
) -> List[Dict[str, Any]]:
|
|
msg_ids = [msg["id"] for msg in messages]
|
|
submessages = SubMessage.get_raw_db_rows(msg_ids)
|
|
sew_messages_and_submessages(messages, submessages)
|
|
|
|
reactions = Reaction.get_raw_db_rows(msg_ids)
|
|
return sew_messages_and_reactions(messages, reactions)
|
|
|
|
@staticmethod
|
|
def to_dict_uncached(
|
|
messages: List[Message], realm_id: Optional[int] = None
|
|
) -> Dict[int, bytes]:
|
|
messages_dict = MessageDict.to_dict_uncached_helper(messages, realm_id)
|
|
encoded_messages = {msg["id"]: stringify_message_dict(msg) for msg in messages_dict}
|
|
return encoded_messages
|
|
|
|
@staticmethod
|
|
def to_dict_uncached_helper(
|
|
messages: List[Message], realm_id: Optional[int] = None
|
|
) -> List[Dict[str, Any]]:
|
|
# Near duplicate of the build_message_dict + get_raw_db_rows
|
|
# code path that accepts already fetched Message objects
|
|
# rather than message IDs.
|
|
|
|
def get_rendering_realm_id(message: Message) -> int:
|
|
# realm_id can differ among users, currently only possible
|
|
# with cross realm bots.
|
|
if realm_id is not None:
|
|
return realm_id
|
|
if message.recipient.type == Recipient.STREAM:
|
|
return Stream.objects.get(id=message.recipient.type_id).realm_id
|
|
return message.sender.realm_id
|
|
|
|
message_rows = [
|
|
{
|
|
"id": message.id,
|
|
DB_TOPIC_NAME: message.topic_name(),
|
|
"date_sent": message.date_sent,
|
|
"last_edit_time": message.last_edit_time,
|
|
"edit_history": message.edit_history,
|
|
"content": message.content,
|
|
"rendered_content": message.rendered_content,
|
|
"rendered_content_version": message.rendered_content_version,
|
|
"recipient_id": message.recipient.id,
|
|
"recipient__type": message.recipient.type,
|
|
"recipient__type_id": message.recipient.type_id,
|
|
"rendering_realm_id": get_rendering_realm_id(message),
|
|
"sender_id": message.sender.id,
|
|
"sending_client__name": message.sending_client.name,
|
|
"sender__realm_id": message.sender.realm_id,
|
|
}
|
|
for message in messages
|
|
]
|
|
|
|
MessageDict.sew_submessages_and_reactions_to_msgs(message_rows)
|
|
return [MessageDict.build_dict_from_raw_db_row(row) for row in message_rows]
|
|
|
|
@staticmethod
|
|
def get_raw_db_rows(needed_ids: List[int]) -> List[Dict[str, Any]]:
|
|
# This is a special purpose function optimized for
|
|
# callers like get_messages_backend().
|
|
fields = [
|
|
"id",
|
|
DB_TOPIC_NAME,
|
|
"date_sent",
|
|
"last_edit_time",
|
|
"edit_history",
|
|
"content",
|
|
"rendered_content",
|
|
"rendered_content_version",
|
|
"recipient_id",
|
|
"recipient__type",
|
|
"recipient__type_id",
|
|
"sender_id",
|
|
"sending_client__name",
|
|
"sender__realm_id",
|
|
]
|
|
messages = Message.objects.filter(id__in=needed_ids).values(*fields)
|
|
return MessageDict.sew_submessages_and_reactions_to_msgs(messages)
|
|
|
|
@staticmethod
|
|
def build_dict_from_raw_db_row(row: Dict[str, Any]) -> Dict[str, Any]:
|
|
"""
|
|
row is a row from a .values() call, and it needs to have
|
|
all the relevant fields populated
|
|
"""
|
|
return MessageDict.build_message_dict(
|
|
message_id=row["id"],
|
|
last_edit_time=row["last_edit_time"],
|
|
edit_history_json=row["edit_history"],
|
|
content=row["content"],
|
|
topic_name=row[DB_TOPIC_NAME],
|
|
date_sent=row["date_sent"],
|
|
rendered_content=row["rendered_content"],
|
|
rendered_content_version=row["rendered_content_version"],
|
|
sender_id=row["sender_id"],
|
|
sender_realm_id=row["sender__realm_id"],
|
|
sending_client_name=row["sending_client__name"],
|
|
rendering_realm_id=row.get("rendering_realm_id", row["sender__realm_id"]),
|
|
recipient_id=row["recipient_id"],
|
|
recipient_type=row["recipient__type"],
|
|
recipient_type_id=row["recipient__type_id"],
|
|
reactions=row["reactions"],
|
|
submessages=row["submessages"],
|
|
)
|
|
|
|
@staticmethod
|
|
def build_message_dict(
|
|
message_id: int,
|
|
last_edit_time: Optional[datetime.datetime],
|
|
edit_history_json: Optional[str],
|
|
content: str,
|
|
topic_name: str,
|
|
date_sent: datetime.datetime,
|
|
rendered_content: Optional[str],
|
|
rendered_content_version: Optional[int],
|
|
sender_id: int,
|
|
sender_realm_id: int,
|
|
sending_client_name: str,
|
|
rendering_realm_id: int,
|
|
recipient_id: int,
|
|
recipient_type: int,
|
|
recipient_type_id: int,
|
|
reactions: List[RawReactionRow],
|
|
submessages: List[Dict[str, Any]],
|
|
) -> Dict[str, Any]:
|
|
|
|
obj = dict(
|
|
id=message_id,
|
|
sender_id=sender_id,
|
|
content=content,
|
|
recipient_type_id=recipient_type_id,
|
|
recipient_type=recipient_type,
|
|
recipient_id=recipient_id,
|
|
timestamp=datetime_to_timestamp(date_sent),
|
|
client=sending_client_name,
|
|
)
|
|
|
|
obj[TOPIC_NAME] = topic_name
|
|
obj["sender_realm_id"] = sender_realm_id
|
|
|
|
# Render topic_links with the stream's realm instead of the
|
|
# sender's realm; this is important for messages sent by
|
|
# cross-realm bots like NOTIFICATION_BOT.
|
|
obj[TOPIC_LINKS] = topic_links(rendering_realm_id, topic_name)
|
|
|
|
if last_edit_time is not None:
|
|
obj["last_edit_timestamp"] = datetime_to_timestamp(last_edit_time)
|
|
assert edit_history_json is not None
|
|
edit_history: List[EditHistoryEvent] = orjson.loads(edit_history_json)
|
|
obj["edit_history"] = edit_history
|
|
|
|
if Message.need_to_render_content(
|
|
rendered_content, rendered_content_version, markdown_version
|
|
):
|
|
# We really shouldn't be rendering objects in this method, but there is
|
|
# a scenario where we upgrade the version of Markdown and fail to run
|
|
# management commands to re-render historical messages, and then we
|
|
# need to have side effects. This method is optimized to not need full
|
|
# blown ORM objects, but the Markdown renderer is unfortunately highly
|
|
# coupled to Message, and we also need to persist the new rendered content.
|
|
# If we don't have a message object passed in, we get one here. The cost
|
|
# of going to the DB here should be overshadowed by the cost of rendering
|
|
# and updating the row.
|
|
# TODO: see #1379 to eliminate Markdown dependencies
|
|
message = Message.objects.select_related().get(id=message_id)
|
|
|
|
assert message is not None # Hint for mypy.
|
|
# It's unfortunate that we need to have side effects on the message
|
|
# in some cases.
|
|
rendered_content = save_message_rendered_content(message, content)
|
|
|
|
if rendered_content is not None:
|
|
obj["rendered_content"] = rendered_content
|
|
else:
|
|
obj["rendered_content"] = (
|
|
"<p>[Zulip note: Sorry, we could not "
|
|
+ "understand the formatting of your message]</p>"
|
|
)
|
|
|
|
if rendered_content is not None:
|
|
obj["is_me_message"] = Message.is_status_message(content, rendered_content)
|
|
else:
|
|
obj["is_me_message"] = False
|
|
|
|
obj["reactions"] = [
|
|
ReactionDict.build_dict_from_raw_db_row(reaction) for reaction in reactions
|
|
]
|
|
obj["submessages"] = submessages
|
|
return obj
|
|
|
|
@staticmethod
|
|
def bulk_hydrate_sender_info(objs: List[Dict[str, Any]]) -> None:
|
|
|
|
sender_ids = list({obj["sender_id"] for obj in objs})
|
|
|
|
if not sender_ids:
|
|
return
|
|
|
|
query = UserProfile.objects.values(
|
|
"id",
|
|
"full_name",
|
|
"delivery_email",
|
|
"email",
|
|
"realm__string_id",
|
|
"avatar_source",
|
|
"avatar_version",
|
|
"is_mirror_dummy",
|
|
)
|
|
|
|
rows = query_for_ids(query, sender_ids, "zerver_userprofile.id")
|
|
|
|
sender_dict = {row["id"]: row for row in rows}
|
|
|
|
for obj in objs:
|
|
sender_id = obj["sender_id"]
|
|
user_row = sender_dict[sender_id]
|
|
obj["sender_full_name"] = user_row["full_name"]
|
|
obj["sender_email"] = user_row["email"]
|
|
obj["sender_delivery_email"] = user_row["delivery_email"]
|
|
obj["sender_realm_str"] = user_row["realm__string_id"]
|
|
obj["sender_avatar_source"] = user_row["avatar_source"]
|
|
obj["sender_avatar_version"] = user_row["avatar_version"]
|
|
obj["sender_is_mirror_dummy"] = user_row["is_mirror_dummy"]
|
|
|
|
@staticmethod
|
|
def hydrate_recipient_info(obj: Dict[str, Any], display_recipient: DisplayRecipientT) -> None:
|
|
"""
|
|
This method hyrdrates recipient info with things
|
|
like full names and emails of senders. Eventually
|
|
our clients should be able to hyrdrate these fields
|
|
themselves with info they already have on users.
|
|
"""
|
|
|
|
recipient_type = obj["recipient_type"]
|
|
recipient_type_id = obj["recipient_type_id"]
|
|
sender_is_mirror_dummy = obj["sender_is_mirror_dummy"]
|
|
sender_email = obj["sender_email"]
|
|
sender_full_name = obj["sender_full_name"]
|
|
sender_id = obj["sender_id"]
|
|
|
|
if recipient_type == Recipient.STREAM:
|
|
display_type = "stream"
|
|
elif recipient_type in (Recipient.HUDDLE, Recipient.PERSONAL):
|
|
assert not isinstance(display_recipient, str)
|
|
display_type = "private"
|
|
if len(display_recipient) == 1:
|
|
# add the sender in if this isn't a message between
|
|
# someone and themself, preserving ordering
|
|
recip: UserDisplayRecipient = {
|
|
"email": sender_email,
|
|
"full_name": sender_full_name,
|
|
"id": sender_id,
|
|
"is_mirror_dummy": sender_is_mirror_dummy,
|
|
}
|
|
if recip["email"] < display_recipient[0]["email"]:
|
|
display_recipient = [recip, display_recipient[0]]
|
|
elif recip["email"] > display_recipient[0]["email"]:
|
|
display_recipient = [display_recipient[0], recip]
|
|
else:
|
|
raise AssertionError(f"Invalid recipient type {recipient_type}")
|
|
|
|
obj["display_recipient"] = display_recipient
|
|
obj["type"] = display_type
|
|
if obj["type"] == "stream":
|
|
obj["stream_id"] = recipient_type_id
|
|
|
|
@staticmethod
|
|
def bulk_hydrate_recipient_info(objs: List[Dict[str, Any]]) -> None:
|
|
recipient_tuples = { # We use set to eliminate duplicate tuples.
|
|
(
|
|
obj["recipient_id"],
|
|
obj["recipient_type"],
|
|
obj["recipient_type_id"],
|
|
)
|
|
for obj in objs
|
|
}
|
|
display_recipients = bulk_fetch_display_recipients(recipient_tuples)
|
|
|
|
for obj in objs:
|
|
MessageDict.hydrate_recipient_info(obj, display_recipients[obj["recipient_id"]])
|
|
|
|
@staticmethod
|
|
def set_sender_avatar(obj: Dict[str, Any], client_gravatar: bool) -> None:
|
|
sender_id = obj["sender_id"]
|
|
sender_realm_id = obj["sender_realm_id"]
|
|
sender_delivery_email = obj["sender_delivery_email"]
|
|
sender_avatar_source = obj["sender_avatar_source"]
|
|
sender_avatar_version = obj["sender_avatar_version"]
|
|
|
|
obj["avatar_url"] = get_avatar_field(
|
|
user_id=sender_id,
|
|
realm_id=sender_realm_id,
|
|
email=sender_delivery_email,
|
|
avatar_source=sender_avatar_source,
|
|
avatar_version=sender_avatar_version,
|
|
medium=False,
|
|
client_gravatar=client_gravatar,
|
|
)
|
|
|
|
|
|
class ReactionDict:
|
|
@staticmethod
|
|
def build_dict_from_raw_db_row(row: RawReactionRow) -> Dict[str, Any]:
|
|
return {
|
|
"emoji_name": row["emoji_name"],
|
|
"emoji_code": row["emoji_code"],
|
|
"reaction_type": row["reaction_type"],
|
|
# TODO: We plan to remove this redundant user dictionary once
|
|
# clients are updated to support accessing use user_id. See
|
|
# https://github.com/zulip/zulip/pull/14711 for details.
|
|
#
|
|
# When we do that, we can likely update the `.values()` query to
|
|
# not fetch the extra user_profile__* fields from the database
|
|
# as a small performance optimization.
|
|
"user": {
|
|
"email": row["user_profile__email"],
|
|
"id": row["user_profile_id"],
|
|
"full_name": row["user_profile__full_name"],
|
|
},
|
|
"user_id": row["user_profile_id"],
|
|
}
|
|
|
|
|
|
def access_message(
|
|
user_profile: UserProfile,
|
|
message_id: int,
|
|
lock_message: bool = False,
|
|
) -> Tuple[Message, Optional[UserMessage]]:
|
|
"""You can access a message by ID in our APIs that either:
|
|
(1) You received or have previously accessed via starring
|
|
(aka have a UserMessage row for).
|
|
(2) Was sent to a public stream in your realm.
|
|
|
|
We produce consistent, boring error messages to avoid leaking any
|
|
information from a security perspective.
|
|
|
|
The lock_message parameter should be passed by callers that are
|
|
planning to modify the Message object. This will use the SQL
|
|
`SELECT FOR UPDATE` feature to ensure that other processes cannot
|
|
delete the message during the current transaction, which is
|
|
important to prevent rare race conditions. Callers must only
|
|
pass lock_message when inside a @transaction.atomic block.
|
|
"""
|
|
try:
|
|
base_query = Message.objects.select_related()
|
|
if lock_message:
|
|
# We want to lock only the `Message` row, and not the related fields
|
|
# because the `Message` row only has a possibility of races.
|
|
base_query = base_query.select_for_update(of=("self",))
|
|
message = base_query.get(id=message_id)
|
|
except Message.DoesNotExist:
|
|
raise JsonableError(_("Invalid message(s)"))
|
|
|
|
user_message = get_usermessage_by_message_id(user_profile, message_id)
|
|
|
|
if has_message_access(user_profile, message, has_user_message=user_message is not None):
|
|
return (message, user_message)
|
|
raise JsonableError(_("Invalid message(s)"))
|
|
|
|
|
|
def access_web_public_message(
|
|
realm: Realm,
|
|
message_id: int,
|
|
) -> Message:
|
|
"""Access control method for unauthenticated requests interacting
|
|
with a message in web-public streams.
|
|
"""
|
|
|
|
# We throw a MissingAuthenticationError for all errors in this
|
|
# code path, to avoid potentially leaking information on whether a
|
|
# message with the provided ID exists on the server if the client
|
|
# shouldn't have access to it.
|
|
if not realm.web_public_streams_enabled():
|
|
raise MissingAuthenticationError()
|
|
|
|
try:
|
|
message = Message.objects.select_related().get(id=message_id)
|
|
except Message.DoesNotExist:
|
|
raise MissingAuthenticationError()
|
|
|
|
if not message.is_stream_message():
|
|
raise MissingAuthenticationError()
|
|
|
|
queryset = get_web_public_streams_queryset(realm)
|
|
try:
|
|
stream = queryset.get(id=message.recipient.type_id)
|
|
except Stream.DoesNotExist:
|
|
raise MissingAuthenticationError()
|
|
|
|
# These should all have been enforced by the code in
|
|
# get_web_public_streams_queryset
|
|
assert stream.is_web_public
|
|
assert not stream.deactivated
|
|
assert not stream.invite_only
|
|
assert stream.history_public_to_subscribers
|
|
|
|
# Now that we've confirmed this message was sent to the target
|
|
# web-public stream, we can return it as having been successfully
|
|
# accessed.
|
|
return message
|
|
|
|
|
|
def has_message_access(
|
|
user_profile: UserProfile,
|
|
message: Message,
|
|
*,
|
|
has_user_message: bool,
|
|
stream: Optional[Stream] = None,
|
|
is_subscribed: Optional[bool] = None,
|
|
) -> bool:
|
|
"""
|
|
Returns whether a user has access to a given message.
|
|
|
|
* The user_message parameter must be provided if the user has a UserMessage
|
|
row for the target message.
|
|
* The optional stream parameter is validated; is_subscribed is not.
|
|
"""
|
|
|
|
# If you have a user_message object, you have access.
|
|
if has_user_message:
|
|
return True
|
|
|
|
if message.recipient.type != Recipient.STREAM:
|
|
# You can't access private messages you didn't receive
|
|
return False
|
|
|
|
if stream is None:
|
|
stream = Stream.objects.get(id=message.recipient.type_id)
|
|
else:
|
|
assert stream.recipient_id == message.recipient_id
|
|
|
|
if stream.realm != user_profile.realm:
|
|
# You can't access public stream messages in other realms
|
|
return False
|
|
|
|
if not stream.is_history_public_to_subscribers():
|
|
# You can't access messages you didn't directly receive
|
|
# unless history is public to subscribers.
|
|
return False
|
|
|
|
if stream.is_public() and user_profile.can_access_public_streams():
|
|
return True
|
|
|
|
# is_history_public_to_subscribers, so check if you're subscribed
|
|
if is_subscribed is not None:
|
|
return is_subscribed
|
|
|
|
return Subscription.objects.filter(
|
|
user_profile=user_profile, active=True, recipient=message.recipient
|
|
).exists()
|
|
|
|
|
|
def bulk_access_messages(
|
|
user_profile: UserProfile, messages: Collection[Message], *, stream: Optional[Stream] = None
|
|
) -> List[Message]:
|
|
"""This function does the full has_message_access check for each
|
|
message. If stream is provided, it is used to avoid unnecessary
|
|
database queries, and will use exactly 2 bulk queries instead.
|
|
|
|
Throws AssertionError if stream is passed and any of the messages
|
|
were not sent to that stream.
|
|
|
|
"""
|
|
filtered_messages = []
|
|
|
|
user_message_set = set(
|
|
bulk_access_messages_expect_usermessage(
|
|
user_profile.id, [message.id for message in messages]
|
|
)
|
|
)
|
|
|
|
# TODO: Ideally, we'd do a similar bulk-stream-fetch if stream is
|
|
# None, so that this function is fast with
|
|
|
|
subscribed_recipient_ids = set(get_subscribed_stream_recipient_ids_for_user(user_profile))
|
|
|
|
for message in messages:
|
|
has_user_message = message.id in user_message_set
|
|
is_subscribed = message.recipient_id in subscribed_recipient_ids
|
|
if has_message_access(
|
|
user_profile,
|
|
message,
|
|
has_user_message=has_user_message,
|
|
stream=stream,
|
|
is_subscribed=is_subscribed,
|
|
):
|
|
filtered_messages.append(message)
|
|
return filtered_messages
|
|
|
|
|
|
def bulk_access_messages_expect_usermessage(
|
|
user_profile_id: int, message_ids: Sequence[int]
|
|
) -> "ValuesQuerySet[UserMessage, int]":
|
|
"""
|
|
Like bulk_access_messages, but faster and potentially stricter.
|
|
|
|
Returns a subset of `message_ids` containing only messages the
|
|
user can access. Makes O(1) database queries.
|
|
|
|
Use this function only when the user is expected to have a
|
|
UserMessage row for every message in `message_ids`. If a
|
|
UserMessage row is missing, the message will be omitted even if
|
|
the user has access (e.g. because it went to a public stream.)
|
|
|
|
See also: `access_message`, `bulk_access_messages`.
|
|
"""
|
|
return UserMessage.objects.filter(
|
|
user_profile_id=user_profile_id,
|
|
message_id__in=message_ids,
|
|
).values_list("message_id", flat=True)
|
|
|
|
|
|
def render_markdown(
|
|
message: Message,
|
|
content: str,
|
|
realm: Optional[Realm] = None,
|
|
realm_alert_words_automaton: Optional[ahocorasick.Automaton] = None,
|
|
url_embed_data: Optional[Dict[str, Optional[UrlEmbedData]]] = None,
|
|
mention_data: Optional[MentionData] = None,
|
|
email_gateway: bool = False,
|
|
) -> MessageRenderingResult:
|
|
"""
|
|
This is basically just a wrapper for do_render_markdown.
|
|
"""
|
|
|
|
if realm is None:
|
|
realm = message.get_realm()
|
|
|
|
sender = message.sender
|
|
sent_by_bot = sender.is_bot
|
|
translate_emoticons = sender.translate_emoticons
|
|
|
|
rendering_result = markdown_convert(
|
|
content,
|
|
realm_alert_words_automaton=realm_alert_words_automaton,
|
|
message=message,
|
|
message_realm=realm,
|
|
sent_by_bot=sent_by_bot,
|
|
translate_emoticons=translate_emoticons,
|
|
url_embed_data=url_embed_data,
|
|
mention_data=mention_data,
|
|
email_gateway=email_gateway,
|
|
)
|
|
|
|
return rendering_result
|
|
|
|
|
|
def huddle_users(recipient_id: int) -> str:
|
|
display_recipient: DisplayRecipientT = get_display_recipient_by_id(
|
|
recipient_id,
|
|
Recipient.HUDDLE,
|
|
None,
|
|
)
|
|
|
|
# str is for streams.
|
|
assert not isinstance(display_recipient, str)
|
|
|
|
user_ids: List[int] = [obj["id"] for obj in display_recipient]
|
|
user_ids = sorted(user_ids)
|
|
return ",".join(str(uid) for uid in user_ids)
|
|
|
|
|
|
def get_inactive_recipient_ids(user_profile: UserProfile) -> List[int]:
|
|
rows = (
|
|
get_stream_subscriptions_for_user(user_profile)
|
|
.filter(
|
|
active=False,
|
|
)
|
|
.values(
|
|
"recipient_id",
|
|
)
|
|
)
|
|
inactive_recipient_ids = [row["recipient_id"] for row in rows]
|
|
return inactive_recipient_ids
|
|
|
|
|
|
def get_muted_stream_ids(user_profile: UserProfile) -> List[int]:
|
|
rows = (
|
|
get_stream_subscriptions_for_user(user_profile)
|
|
.filter(
|
|
active=True,
|
|
is_muted=True,
|
|
)
|
|
.values(
|
|
"recipient__type_id",
|
|
)
|
|
)
|
|
muted_stream_ids = [row["recipient__type_id"] for row in rows]
|
|
return muted_stream_ids
|
|
|
|
|
|
def get_starred_message_ids(user_profile: UserProfile) -> List[int]:
|
|
return list(
|
|
UserMessage.objects.filter(
|
|
user_profile=user_profile,
|
|
)
|
|
.extra(
|
|
where=[UserMessage.where_starred()],
|
|
)
|
|
.order_by(
|
|
"message_id",
|
|
)
|
|
.values_list("message_id", flat=True)[0:10000]
|
|
)
|
|
|
|
|
|
def get_raw_unread_data(
|
|
user_profile: UserProfile, message_ids: Optional[List[int]] = None
|
|
) -> RawUnreadMessagesResult:
|
|
excluded_recipient_ids = get_inactive_recipient_ids(user_profile)
|
|
|
|
user_msgs = (
|
|
UserMessage.objects.filter(
|
|
user_profile=user_profile,
|
|
)
|
|
.exclude(
|
|
message__recipient_id__in=excluded_recipient_ids,
|
|
)
|
|
.values(
|
|
"message_id",
|
|
"message__sender_id",
|
|
MESSAGE__TOPIC,
|
|
"message__recipient_id",
|
|
"message__recipient__type",
|
|
"message__recipient__type_id",
|
|
"flags",
|
|
)
|
|
.order_by("-message_id")
|
|
)
|
|
|
|
if message_ids is not None:
|
|
# When users are marking just a few messages as unread, we just need
|
|
# those ids, and we know they're unread.
|
|
user_msgs = user_msgs.filter(message_id__in=message_ids)
|
|
else:
|
|
# At page load we need all unread messages.
|
|
user_msgs = user_msgs.extra(
|
|
where=[UserMessage.where_unread()],
|
|
)
|
|
|
|
# Limit unread messages for performance reasons.
|
|
user_msgs = list(user_msgs[:MAX_UNREAD_MESSAGES])
|
|
|
|
rows = list(reversed(user_msgs))
|
|
return extract_unread_data_from_um_rows(rows, user_profile)
|
|
|
|
|
|
def extract_unread_data_from_um_rows(
|
|
rows: List[Dict[str, Any]], user_profile: Optional[UserProfile]
|
|
) -> RawUnreadMessagesResult:
|
|
|
|
pm_dict: Dict[int, RawUnreadPrivateMessageDict] = {}
|
|
stream_dict: Dict[int, RawUnreadStreamDict] = {}
|
|
unmuted_stream_msgs: Set[int] = set()
|
|
huddle_dict: Dict[int, RawUnreadHuddleDict] = {}
|
|
mentions: Set[int] = set()
|
|
total_unreads = 0
|
|
|
|
raw_unread_messages: RawUnreadMessagesResult = dict(
|
|
pm_dict=pm_dict,
|
|
stream_dict=stream_dict,
|
|
muted_stream_ids=[],
|
|
unmuted_stream_msgs=unmuted_stream_msgs,
|
|
huddle_dict=huddle_dict,
|
|
mentions=mentions,
|
|
old_unreads_missing=False,
|
|
)
|
|
|
|
if user_profile is None:
|
|
return raw_unread_messages
|
|
|
|
muted_stream_ids = get_muted_stream_ids(user_profile)
|
|
raw_unread_messages["muted_stream_ids"] = muted_stream_ids
|
|
|
|
topic_mute_checker = build_topic_mute_checker(user_profile)
|
|
|
|
def is_row_muted(stream_id: int, recipient_id: int, topic: str) -> bool:
|
|
if stream_id in muted_stream_ids:
|
|
return True
|
|
|
|
if topic_mute_checker(recipient_id, topic):
|
|
return True
|
|
|
|
# Messages sent by muted users are never unread, so we don't
|
|
# need any logic related to muted users here.
|
|
|
|
return False
|
|
|
|
huddle_cache: Dict[int, str] = {}
|
|
|
|
def get_huddle_users(recipient_id: int) -> str:
|
|
if recipient_id in huddle_cache:
|
|
return huddle_cache[recipient_id]
|
|
|
|
user_ids_string = huddle_users(recipient_id)
|
|
huddle_cache[recipient_id] = user_ids_string
|
|
return user_ids_string
|
|
|
|
for row in rows:
|
|
total_unreads += 1
|
|
message_id = row["message_id"]
|
|
msg_type = row["message__recipient__type"]
|
|
recipient_id = row["message__recipient_id"]
|
|
sender_id = row["message__sender_id"]
|
|
|
|
if msg_type == Recipient.STREAM:
|
|
stream_id = row["message__recipient__type_id"]
|
|
topic = row[MESSAGE__TOPIC]
|
|
stream_dict[message_id] = dict(
|
|
stream_id=stream_id,
|
|
topic=topic,
|
|
)
|
|
if not is_row_muted(stream_id, recipient_id, topic):
|
|
unmuted_stream_msgs.add(message_id)
|
|
|
|
elif msg_type == Recipient.PERSONAL:
|
|
if sender_id == user_profile.id:
|
|
other_user_id = row["message__recipient__type_id"]
|
|
else:
|
|
other_user_id = sender_id
|
|
|
|
pm_dict[message_id] = dict(
|
|
other_user_id=other_user_id,
|
|
)
|
|
|
|
elif msg_type == Recipient.HUDDLE:
|
|
user_ids_string = get_huddle_users(recipient_id)
|
|
huddle_dict[message_id] = dict(
|
|
user_ids_string=user_ids_string,
|
|
)
|
|
|
|
# TODO: Add support for alert words here as well.
|
|
is_mentioned = (row["flags"] & UserMessage.flags.mentioned) != 0
|
|
is_wildcard_mentioned = (row["flags"] & UserMessage.flags.wildcard_mentioned) != 0
|
|
if is_mentioned:
|
|
mentions.add(message_id)
|
|
if is_wildcard_mentioned:
|
|
if msg_type == Recipient.STREAM:
|
|
stream_id = row["message__recipient__type_id"]
|
|
topic = row[MESSAGE__TOPIC]
|
|
if not is_row_muted(stream_id, recipient_id, topic):
|
|
mentions.add(message_id)
|
|
else: # nocoverage # TODO: Test wildcard mentions in PMs.
|
|
mentions.add(message_id)
|
|
|
|
# Record whether the user had more than MAX_UNREAD_MESSAGES total
|
|
# unreads -- that's a state where Zulip's behavior will start to
|
|
# be erroneous, and clients should display a warning.
|
|
raw_unread_messages["old_unreads_missing"] = total_unreads == MAX_UNREAD_MESSAGES
|
|
|
|
return raw_unread_messages
|
|
|
|
|
|
def aggregate_streams(*, input_dict: Dict[int, RawUnreadStreamDict]) -> List[UnreadStreamInfo]:
|
|
lookup_dict: Dict[Tuple[int, str], UnreadStreamInfo] = {}
|
|
for message_id, attribute_dict in input_dict.items():
|
|
stream_id = attribute_dict["stream_id"]
|
|
topic = attribute_dict["topic"]
|
|
lookup_key = (stream_id, topic)
|
|
if lookup_key not in lookup_dict:
|
|
obj = UnreadStreamInfo(
|
|
stream_id=stream_id,
|
|
topic=topic,
|
|
unread_message_ids=[],
|
|
)
|
|
lookup_dict[lookup_key] = obj
|
|
|
|
bucket = lookup_dict[lookup_key]
|
|
bucket["unread_message_ids"].append(message_id)
|
|
|
|
for dct in lookup_dict.values():
|
|
dct["unread_message_ids"].sort()
|
|
|
|
sorted_keys = sorted(lookup_dict.keys())
|
|
|
|
return [lookup_dict[k] for k in sorted_keys]
|
|
|
|
|
|
def aggregate_pms(
|
|
*, input_dict: Dict[int, RawUnreadPrivateMessageDict]
|
|
) -> List[UnreadPrivateMessageInfo]:
|
|
lookup_dict: Dict[int, UnreadPrivateMessageInfo] = {}
|
|
for message_id, attribute_dict in input_dict.items():
|
|
other_user_id = attribute_dict["other_user_id"]
|
|
if other_user_id not in lookup_dict:
|
|
# The `sender_id` field here is only supported for
|
|
# legacy mobile clients. Its actual semantics are the same
|
|
# as `other_user_id`.
|
|
obj = UnreadPrivateMessageInfo(
|
|
other_user_id=other_user_id,
|
|
sender_id=other_user_id,
|
|
unread_message_ids=[],
|
|
)
|
|
lookup_dict[other_user_id] = obj
|
|
|
|
bucket = lookup_dict[other_user_id]
|
|
bucket["unread_message_ids"].append(message_id)
|
|
|
|
for dct in lookup_dict.values():
|
|
dct["unread_message_ids"].sort()
|
|
|
|
sorted_keys = sorted(lookup_dict.keys())
|
|
|
|
return [lookup_dict[k] for k in sorted_keys]
|
|
|
|
|
|
def aggregate_huddles(*, input_dict: Dict[int, RawUnreadHuddleDict]) -> List[UnreadHuddleInfo]:
|
|
lookup_dict: Dict[str, UnreadHuddleInfo] = {}
|
|
for message_id, attribute_dict in input_dict.items():
|
|
user_ids_string = attribute_dict["user_ids_string"]
|
|
if user_ids_string not in lookup_dict:
|
|
obj = UnreadHuddleInfo(
|
|
user_ids_string=user_ids_string,
|
|
unread_message_ids=[],
|
|
)
|
|
lookup_dict[user_ids_string] = obj
|
|
|
|
bucket = lookup_dict[user_ids_string]
|
|
bucket["unread_message_ids"].append(message_id)
|
|
|
|
for dct in lookup_dict.values():
|
|
dct["unread_message_ids"].sort()
|
|
|
|
sorted_keys = sorted(lookup_dict.keys())
|
|
|
|
return [lookup_dict[k] for k in sorted_keys]
|
|
|
|
|
|
def aggregate_unread_data(raw_data: RawUnreadMessagesResult) -> UnreadMessagesResult:
|
|
|
|
pm_dict = raw_data["pm_dict"]
|
|
stream_dict = raw_data["stream_dict"]
|
|
unmuted_stream_msgs = raw_data["unmuted_stream_msgs"]
|
|
huddle_dict = raw_data["huddle_dict"]
|
|
mentions = list(raw_data["mentions"])
|
|
|
|
count = len(pm_dict) + len(unmuted_stream_msgs) + len(huddle_dict)
|
|
|
|
pm_objects = aggregate_pms(input_dict=pm_dict)
|
|
stream_objects = aggregate_streams(input_dict=stream_dict)
|
|
huddle_objects = aggregate_huddles(input_dict=huddle_dict)
|
|
|
|
result: UnreadMessagesResult = dict(
|
|
pms=pm_objects,
|
|
streams=stream_objects,
|
|
huddles=huddle_objects,
|
|
mentions=mentions,
|
|
count=count,
|
|
old_unreads_missing=raw_data["old_unreads_missing"],
|
|
)
|
|
|
|
return result
|
|
|
|
|
|
def apply_unread_message_event(
|
|
user_profile: UserProfile,
|
|
state: RawUnreadMessagesResult,
|
|
message: Dict[str, Any],
|
|
flags: List[str],
|
|
) -> None:
|
|
message_id = message["id"]
|
|
if message["type"] == "stream":
|
|
message_type = "stream"
|
|
elif message["type"] == "private":
|
|
others = [recip for recip in message["display_recipient"] if recip["id"] != user_profile.id]
|
|
if len(others) <= 1:
|
|
message_type = "private"
|
|
else:
|
|
message_type = "huddle"
|
|
else:
|
|
raise AssertionError("Invalid message type {}".format(message["type"]))
|
|
|
|
if message_type == "stream":
|
|
stream_id = message["stream_id"]
|
|
topic = message[TOPIC_NAME]
|
|
state["stream_dict"][message_id] = RawUnreadStreamDict(
|
|
stream_id=stream_id,
|
|
topic=topic,
|
|
)
|
|
|
|
if stream_id not in state["muted_stream_ids"]:
|
|
# This next check hits the database.
|
|
if not topic_is_muted(user_profile, stream_id, topic):
|
|
state["unmuted_stream_msgs"].add(message_id)
|
|
|
|
elif message_type == "private":
|
|
if len(others) == 1:
|
|
other_user_id = others[0]["id"]
|
|
else:
|
|
other_user_id = user_profile.id
|
|
|
|
state["pm_dict"][message_id] = RawUnreadPrivateMessageDict(
|
|
other_user_id=other_user_id,
|
|
)
|
|
|
|
else:
|
|
display_recipient = message["display_recipient"]
|
|
user_ids = [obj["id"] for obj in display_recipient]
|
|
user_ids = sorted(user_ids)
|
|
user_ids_string = ",".join(str(uid) for uid in user_ids)
|
|
|
|
state["huddle_dict"][message_id] = RawUnreadHuddleDict(
|
|
user_ids_string=user_ids_string,
|
|
)
|
|
|
|
if "mentioned" in flags:
|
|
state["mentions"].add(message_id)
|
|
if "wildcard_mentioned" in flags:
|
|
if message_id in state["unmuted_stream_msgs"]:
|
|
state["mentions"].add(message_id)
|
|
|
|
|
|
def remove_message_id_from_unread_mgs(state: RawUnreadMessagesResult, message_id: int) -> None:
|
|
# The opposite of apply_unread_message_event; removes a read or
|
|
# deleted message from a raw_unread_msgs data structure.
|
|
state["pm_dict"].pop(message_id, None)
|
|
state["stream_dict"].pop(message_id, None)
|
|
state["huddle_dict"].pop(message_id, None)
|
|
state["unmuted_stream_msgs"].discard(message_id)
|
|
state["mentions"].discard(message_id)
|
|
|
|
|
|
def format_unread_message_details(
|
|
my_user_id: int,
|
|
raw_unread_data: RawUnreadMessagesResult,
|
|
) -> Dict[str, MessageDetailsDict]:
|
|
unread_data = {}
|
|
|
|
for message_id, private_message_details in raw_unread_data["pm_dict"].items():
|
|
other_user_id = private_message_details["other_user_id"]
|
|
if other_user_id == my_user_id:
|
|
user_ids = []
|
|
else:
|
|
user_ids = [other_user_id]
|
|
|
|
# Note that user_ids excludes ourself, even for the case we send messages
|
|
# to ourself.
|
|
message_details = MessageDetailsDict(
|
|
type="private",
|
|
user_ids=user_ids,
|
|
)
|
|
if message_id in raw_unread_data["mentions"]:
|
|
message_details["mentioned"] = True
|
|
unread_data[str(message_id)] = message_details
|
|
|
|
for message_id, stream_message_details in raw_unread_data["stream_dict"].items():
|
|
if message_id in raw_unread_data["unmuted_stream_msgs"]:
|
|
unmuted_stream_msg = True
|
|
else:
|
|
unmuted_stream_msg = False
|
|
|
|
message_details = MessageDetailsDict(
|
|
type="stream",
|
|
stream_id=stream_message_details["stream_id"],
|
|
topic=stream_message_details["topic"],
|
|
# Clients don't need this detail, but we need it internally for apply_events.
|
|
unmuted_stream_msg=unmuted_stream_msg,
|
|
)
|
|
if message_id in raw_unread_data["mentions"]:
|
|
message_details["mentioned"] = True
|
|
unread_data[str(message_id)] = message_details
|
|
|
|
for message_id, huddle_message_details in raw_unread_data["huddle_dict"].items():
|
|
# The client wants a list of user_ids in the conversation, excluding ourself,
|
|
# that is sorted in numerical order.
|
|
user_ids = [int(s) for s in huddle_message_details["user_ids_string"].split(",")]
|
|
user_ids = [user_id for user_id in user_ids if user_id != my_user_id]
|
|
user_ids.sort()
|
|
message_details = MessageDetailsDict(
|
|
type="private",
|
|
user_ids=user_ids,
|
|
)
|
|
if message_id in raw_unread_data["mentions"]:
|
|
message_details["mentioned"] = True
|
|
unread_data[str(message_id)] = message_details
|
|
|
|
return unread_data
|
|
|
|
|
|
def add_message_to_unread_msgs(
|
|
my_user_id: int,
|
|
state: RawUnreadMessagesResult,
|
|
message_id: int,
|
|
message_details: MessageDetailsDict,
|
|
) -> None:
|
|
if message_details.get("mentioned"):
|
|
state["mentions"].add(message_id)
|
|
|
|
if message_details["type"] == "private":
|
|
user_ids: List[int] = message_details["user_ids"]
|
|
user_ids = [user_id for user_id in user_ids if user_id != my_user_id]
|
|
if user_ids == []:
|
|
state["pm_dict"][message_id] = RawUnreadPrivateMessageDict(
|
|
other_user_id=my_user_id,
|
|
)
|
|
elif len(user_ids) == 1:
|
|
state["pm_dict"][message_id] = RawUnreadPrivateMessageDict(
|
|
other_user_id=user_ids[0],
|
|
)
|
|
else:
|
|
user_ids.append(my_user_id)
|
|
user_ids_string = ",".join(str(user_id) for user_id in sorted(user_ids))
|
|
state["huddle_dict"][message_id] = RawUnreadHuddleDict(
|
|
user_ids_string=user_ids_string,
|
|
)
|
|
elif message_details["type"] == "stream":
|
|
state["stream_dict"][message_id] = RawUnreadStreamDict(
|
|
stream_id=message_details["stream_id"],
|
|
topic=message_details["topic"],
|
|
)
|
|
if message_details["unmuted_stream_msg"]:
|
|
state["unmuted_stream_msgs"].add(message_id)
|
|
|
|
|
|
def estimate_recent_messages(realm: Realm, hours: int) -> int:
|
|
stat = COUNT_STATS["messages_sent:is_bot:hour"]
|
|
d = timezone_now() - datetime.timedelta(hours=hours)
|
|
return (
|
|
RealmCount.objects.filter(property=stat.property, end_time__gt=d, realm=realm).aggregate(
|
|
Sum("value")
|
|
)["value__sum"]
|
|
or 0
|
|
)
|
|
|
|
|
|
def get_first_visible_message_id(realm: Realm) -> int:
|
|
return realm.first_visible_message_id
|
|
|
|
|
|
def maybe_update_first_visible_message_id(realm: Realm, lookback_hours: int) -> None:
|
|
recent_messages_count = estimate_recent_messages(realm, lookback_hours)
|
|
if realm.message_visibility_limit is not None and recent_messages_count > 0:
|
|
update_first_visible_message_id(realm)
|
|
|
|
|
|
def update_first_visible_message_id(realm: Realm) -> None:
|
|
if realm.message_visibility_limit is None:
|
|
realm.first_visible_message_id = 0
|
|
else:
|
|
try:
|
|
first_visible_message_id = (
|
|
Message.objects.filter(sender__realm=realm)
|
|
.values("id")
|
|
.order_by("-id")[realm.message_visibility_limit - 1]["id"]
|
|
)
|
|
except IndexError:
|
|
first_visible_message_id = 0
|
|
realm.first_visible_message_id = first_visible_message_id
|
|
realm.save(update_fields=["first_visible_message_id"])
|
|
|
|
|
|
def get_last_message_id() -> int:
|
|
# We generally use this function to populate RealmAuditLog, and
|
|
# the max id here is actually system-wide, not per-realm. I
|
|
# assume there's some advantage in not filtering by realm.
|
|
last_id = Message.objects.aggregate(Max("id"))["id__max"]
|
|
if last_id is None:
|
|
# During initial realm creation, there might be 0 messages in
|
|
# the database; in that case, the `aggregate` query returns
|
|
# None. Since we want an int for "beginning of time", use -1.
|
|
last_id = -1
|
|
return last_id
|
|
|
|
|
|
def get_recent_conversations_recipient_id(
|
|
user_profile: UserProfile, recipient_id: int, sender_id: int
|
|
) -> int:
|
|
"""Helper for doing lookups of the recipient_id that
|
|
get_recent_private_conversations would have used to record that
|
|
message in its data structure.
|
|
"""
|
|
my_recipient_id = user_profile.recipient_id
|
|
if recipient_id == my_recipient_id:
|
|
return UserProfile.objects.values_list("recipient_id", flat=True).get(id=sender_id)
|
|
return recipient_id
|
|
|
|
|
|
def get_recent_private_conversations(user_profile: UserProfile) -> Dict[int, Dict[str, Any]]:
|
|
"""This function uses some carefully optimized SQL queries, designed
|
|
to use the UserMessage index on private_messages. It is
|
|
significantly complicated by the fact that for 1:1 private
|
|
messages, we store the message against a recipient_id of whichever
|
|
user was the recipient, and thus for 1:1 private messages sent
|
|
directly to us, we need to look up the other user from the
|
|
sender_id on those messages. You'll see that pattern repeated
|
|
both here and also in zerver/lib/events.py.
|
|
|
|
Ideally, we would write these queries using Django, but even
|
|
without the UNION ALL, that seems to not be possible, because the
|
|
equivalent Django syntax (for the first part of this query):
|
|
|
|
message_data = UserMessage.objects.select_related("message__recipient_id").filter(
|
|
user_profile=user_profile,
|
|
).extra(
|
|
where=[UserMessage.where_private()]
|
|
).order_by("-message_id")[:1000].values(
|
|
"message__recipient_id").annotate(last_message_id=Max("message_id"))
|
|
|
|
does not properly nest the GROUP BY (from .annotate) with the slicing.
|
|
|
|
We return a dictionary structure for convenient modification
|
|
below; this structure is converted into its final form by
|
|
post_process.
|
|
|
|
"""
|
|
RECENT_CONVERSATIONS_LIMIT = 1000
|
|
|
|
recipient_map = {}
|
|
my_recipient_id = user_profile.recipient_id
|
|
|
|
query = SQL(
|
|
"""
|
|
SELECT
|
|
subquery.recipient_id, MAX(subquery.message_id)
|
|
FROM (
|
|
(SELECT
|
|
um.message_id AS message_id,
|
|
m.recipient_id AS recipient_id
|
|
FROM
|
|
zerver_usermessage um
|
|
JOIN
|
|
zerver_message m
|
|
ON
|
|
um.message_id = m.id
|
|
WHERE
|
|
um.user_profile_id=%(user_profile_id)s AND
|
|
um.flags & 2048 <> 0 AND
|
|
m.recipient_id <> %(my_recipient_id)s
|
|
ORDER BY message_id DESC
|
|
LIMIT %(conversation_limit)s)
|
|
UNION ALL
|
|
(SELECT
|
|
m.id AS message_id,
|
|
sender_profile.recipient_id AS recipient_id
|
|
FROM
|
|
zerver_message m
|
|
JOIN
|
|
zerver_userprofile sender_profile
|
|
ON
|
|
m.sender_id = sender_profile.id
|
|
WHERE
|
|
m.recipient_id=%(my_recipient_id)s
|
|
ORDER BY message_id DESC
|
|
LIMIT %(conversation_limit)s)
|
|
) AS subquery
|
|
GROUP BY subquery.recipient_id
|
|
"""
|
|
)
|
|
|
|
with connection.cursor() as cursor:
|
|
cursor.execute(
|
|
query,
|
|
{
|
|
"user_profile_id": user_profile.id,
|
|
"conversation_limit": RECENT_CONVERSATIONS_LIMIT,
|
|
"my_recipient_id": my_recipient_id,
|
|
},
|
|
)
|
|
rows = cursor.fetchall()
|
|
|
|
# The resulting rows will be (recipient_id, max_message_id)
|
|
# objects for all parties we've had recent (group?) private
|
|
# message conversations with, including PMs with yourself (those
|
|
# will generate an empty list of user_ids).
|
|
for recipient_id, max_message_id in rows:
|
|
recipient_map[recipient_id] = dict(
|
|
max_message_id=max_message_id,
|
|
user_ids=[],
|
|
)
|
|
|
|
# Now we need to map all the recipient_id objects to lists of user IDs
|
|
for (recipient_id, user_profile_id) in (
|
|
Subscription.objects.filter(recipient_id__in=recipient_map.keys())
|
|
.exclude(user_profile_id=user_profile.id)
|
|
.values_list("recipient_id", "user_profile_id")
|
|
):
|
|
recipient_map[recipient_id]["user_ids"].append(user_profile_id)
|
|
|
|
# Sort to prevent test flakes and client bugs.
|
|
for rec in recipient_map.values():
|
|
rec["user_ids"].sort()
|
|
|
|
return recipient_map
|
|
|
|
|
|
def wildcard_mention_allowed(sender: UserProfile, stream: Stream) -> bool:
|
|
realm = sender.realm
|
|
|
|
# If there are fewer than Realm.WILDCARD_MENTION_THRESHOLD, we
|
|
# allow sending. In the future, we may want to make this behavior
|
|
# a default, and also just allow explicitly setting whether this
|
|
# applies to a stream as an override.
|
|
if num_subscribers_for_stream_id(stream.id) <= Realm.WILDCARD_MENTION_THRESHOLD:
|
|
return True
|
|
|
|
if realm.wildcard_mention_policy == Realm.WILDCARD_MENTION_POLICY_NOBODY:
|
|
return False
|
|
|
|
if realm.wildcard_mention_policy == Realm.WILDCARD_MENTION_POLICY_EVERYONE:
|
|
return True
|
|
|
|
if realm.wildcard_mention_policy == Realm.WILDCARD_MENTION_POLICY_ADMINS:
|
|
return sender.is_realm_admin
|
|
|
|
if realm.wildcard_mention_policy == Realm.WILDCARD_MENTION_POLICY_MODERATORS:
|
|
return sender.is_realm_admin or sender.is_moderator
|
|
|
|
if realm.wildcard_mention_policy == Realm.WILDCARD_MENTION_POLICY_FULL_MEMBERS:
|
|
return sender.is_realm_admin or (not sender.is_provisional_member and not sender.is_guest)
|
|
|
|
if realm.wildcard_mention_policy == Realm.WILDCARD_MENTION_POLICY_MEMBERS:
|
|
return not sender.is_guest
|
|
|
|
raise AssertionError("Invalid wildcard mention policy")
|
|
|
|
|
|
def parse_message_content_delete_limit(
|
|
value: Union[int, str],
|
|
special_values_map: Mapping[str, Optional[int]],
|
|
) -> Optional[int]:
|
|
if isinstance(value, str) and value in special_values_map.keys():
|
|
return special_values_map[value]
|
|
if isinstance(value, str) or value <= 0:
|
|
raise RequestVariableConversionError("message_content_delete_limit_seconds", value)
|
|
assert isinstance(value, int)
|
|
return value
|
|
|
|
|
|
def update_to_dict_cache(
|
|
changed_messages: List[Message], realm_id: Optional[int] = None
|
|
) -> List[int]:
|
|
"""Updates the message as stored in the to_dict cache (for serving
|
|
messages)."""
|
|
items_for_remote_cache = {}
|
|
message_ids = []
|
|
changed_messages_to_dict = MessageDict.to_dict_uncached(changed_messages, realm_id)
|
|
for msg_id, msg in changed_messages_to_dict.items():
|
|
message_ids.append(msg_id)
|
|
key = to_dict_cache_key_id(msg_id)
|
|
items_for_remote_cache[key] = (msg,)
|
|
|
|
cache_set_many(items_for_remote_cache)
|
|
return message_ids
|