mirror of https://github.com/zulip/zulip.git
message_fetch: Take a REPEATABLE READ READ ONLY when fetching messages.
This commit is contained in:
parent
0f521fba41
commit
9a682fb20a
|
@ -1,6 +1,8 @@
|
|||
from typing import Dict, Iterable, List, Optional, Tuple, Union
|
||||
|
||||
from django.conf import settings
|
||||
from django.contrib.auth.models import AnonymousUser
|
||||
from django.db import connection, transaction
|
||||
from django.http import HttpRequest, HttpResponse
|
||||
from django.utils.html import escape as escape_html
|
||||
from django.utils.translation import gettext as _
|
||||
|
@ -169,74 +171,101 @@ def get_messages_backend(
|
|||
assert log_data is not None
|
||||
log_data["extra"] = "[{}]".format(",".join(verbose_operators))
|
||||
|
||||
query_info = fetch_messages(
|
||||
narrow=narrow,
|
||||
user_profile=user_profile,
|
||||
realm=realm,
|
||||
is_web_public_query=is_web_public_query,
|
||||
anchor=anchor,
|
||||
include_anchor=include_anchor,
|
||||
num_before=num_before,
|
||||
num_after=num_after,
|
||||
)
|
||||
with transaction.atomic(durable=True):
|
||||
# We're about to perform a search, and then get results from
|
||||
# it; this is done across multiple queries. To prevent race
|
||||
# conditions, we want the messages returned to be consistent
|
||||
# with the version of the messages that was searched, to
|
||||
# prevent changes which happened between them from leaking to
|
||||
# clients who should not be able to see the new values, and
|
||||
# when messages are deleted in between. We set up
|
||||
# repeatable-read isolation for this transaction, so that we
|
||||
# prevent both phantom reads and non-repeatable reads.
|
||||
#
|
||||
# In a read-only repeatable-read transaction, it is not
|
||||
# possible to encounter deadlocks or need retries due to
|
||||
# serialization errors.
|
||||
#
|
||||
# You can only set the isolation level before any queries in
|
||||
# the transaction, meaning it must be the top-most
|
||||
# transaction, which durable=True establishes. Except in
|
||||
# tests, where durable=True is a lie, because there is an
|
||||
# outer transaction for each test. We thus skip this command
|
||||
# in tests, since it would fail.
|
||||
if not settings.TEST_SUITE: # nocoverage
|
||||
cursor = connection.cursor()
|
||||
cursor.execute("SET TRANSACTION ISOLATION LEVEL REPEATABLE READ READ ONLY")
|
||||
|
||||
anchor = query_info.anchor
|
||||
include_history = query_info.include_history
|
||||
is_search = query_info.is_search
|
||||
rows = query_info.rows
|
||||
query_info = fetch_messages(
|
||||
narrow=narrow,
|
||||
user_profile=user_profile,
|
||||
realm=realm,
|
||||
is_web_public_query=is_web_public_query,
|
||||
anchor=anchor,
|
||||
include_anchor=include_anchor,
|
||||
num_before=num_before,
|
||||
num_after=num_after,
|
||||
)
|
||||
|
||||
# The following is a little messy, but ensures that the code paths
|
||||
# are similar regardless of the value of include_history. The
|
||||
# 'user_messages' dictionary maps each message to the user's
|
||||
# UserMessage object for that message, which we will attach to the
|
||||
# rendered message dict before returning it. We attempt to
|
||||
# bulk-fetch rendered message dicts from remote cache using the
|
||||
# 'messages' list.
|
||||
message_ids: List[int] = []
|
||||
user_message_flags: Dict[int, List[str]] = {}
|
||||
if is_web_public_query:
|
||||
# For spectators, we treat all historical messages as read.
|
||||
for row in rows:
|
||||
message_id = row[0]
|
||||
message_ids.append(message_id)
|
||||
user_message_flags[message_id] = ["read"]
|
||||
elif include_history:
|
||||
assert user_profile is not None
|
||||
message_ids = [row[0] for row in rows]
|
||||
anchor = query_info.anchor
|
||||
include_history = query_info.include_history
|
||||
is_search = query_info.is_search
|
||||
rows = query_info.rows
|
||||
|
||||
# TODO: This could be done with an outer join instead of two queries
|
||||
um_rows = UserMessage.objects.filter(user_profile=user_profile, message_id__in=message_ids)
|
||||
user_message_flags = {um.message_id: um.flags_list() for um in um_rows}
|
||||
# The following is a little messy, but ensures that the code paths
|
||||
# are similar regardless of the value of include_history. The
|
||||
# 'user_messages' dictionary maps each message to the user's
|
||||
# UserMessage object for that message, which we will attach to the
|
||||
# rendered message dict before returning it. We attempt to
|
||||
# bulk-fetch rendered message dicts from remote cache using the
|
||||
# 'messages' list.
|
||||
message_ids: List[int] = []
|
||||
user_message_flags: Dict[int, List[str]] = {}
|
||||
if is_web_public_query:
|
||||
# For spectators, we treat all historical messages as read.
|
||||
for row in rows:
|
||||
message_id = row[0]
|
||||
message_ids.append(message_id)
|
||||
user_message_flags[message_id] = ["read"]
|
||||
elif include_history:
|
||||
assert user_profile is not None
|
||||
message_ids = [row[0] for row in rows]
|
||||
|
||||
for message_id in message_ids:
|
||||
if message_id not in user_message_flags:
|
||||
user_message_flags[message_id] = ["read", "historical"]
|
||||
else:
|
||||
for row in rows:
|
||||
message_id = row[0]
|
||||
flags = row[1]
|
||||
user_message_flags[message_id] = UserMessage.flags_list_for_flags(flags)
|
||||
message_ids.append(message_id)
|
||||
|
||||
search_fields: Dict[int, Dict[str, str]] = {}
|
||||
if is_search:
|
||||
for row in rows:
|
||||
message_id = row[0]
|
||||
(topic_name, rendered_content, content_matches, topic_matches) = row[-4:]
|
||||
search_fields[message_id] = get_search_fields(
|
||||
rendered_content, topic_name, content_matches, topic_matches
|
||||
# TODO: This could be done with an outer join instead of two queries
|
||||
um_rows = UserMessage.objects.filter(
|
||||
user_profile=user_profile, message_id__in=message_ids
|
||||
)
|
||||
user_message_flags = {um.message_id: um.flags_list() for um in um_rows}
|
||||
|
||||
message_list = messages_for_ids(
|
||||
message_ids=message_ids,
|
||||
user_message_flags=user_message_flags,
|
||||
search_fields=search_fields,
|
||||
apply_markdown=apply_markdown,
|
||||
client_gravatar=client_gravatar,
|
||||
allow_edit_history=realm.allow_edit_history,
|
||||
user_profile=user_profile,
|
||||
realm=realm,
|
||||
)
|
||||
for message_id in message_ids:
|
||||
if message_id not in user_message_flags:
|
||||
user_message_flags[message_id] = ["read", "historical"]
|
||||
else:
|
||||
for row in rows:
|
||||
message_id = row[0]
|
||||
flags = row[1]
|
||||
user_message_flags[message_id] = UserMessage.flags_list_for_flags(flags)
|
||||
message_ids.append(message_id)
|
||||
|
||||
search_fields: Dict[int, Dict[str, str]] = {}
|
||||
if is_search:
|
||||
for row in rows:
|
||||
message_id = row[0]
|
||||
(topic_name, rendered_content, content_matches, topic_matches) = row[-4:]
|
||||
search_fields[message_id] = get_search_fields(
|
||||
rendered_content, topic_name, content_matches, topic_matches
|
||||
)
|
||||
|
||||
message_list = messages_for_ids(
|
||||
message_ids=message_ids,
|
||||
user_message_flags=user_message_flags,
|
||||
search_fields=search_fields,
|
||||
apply_markdown=apply_markdown,
|
||||
client_gravatar=client_gravatar,
|
||||
allow_edit_history=realm.allow_edit_history,
|
||||
user_profile=user_profile,
|
||||
realm=realm,
|
||||
)
|
||||
|
||||
ret = dict(
|
||||
messages=message_list,
|
||||
|
|
Loading…
Reference in New Issue