message_fetch: Take a REPEATABLE READ READ ONLY when fetching messages.

This commit is contained in:
Alex Vandiver 2024-04-10 17:04:52 +00:00 committed by Tim Abbott
parent 0f521fba41
commit 9a682fb20a
1 changed files with 91 additions and 62 deletions

View File

@ -1,6 +1,8 @@
from typing import Dict, Iterable, List, Optional, Tuple, Union from typing import Dict, Iterable, List, Optional, Tuple, Union
from django.conf import settings
from django.contrib.auth.models import AnonymousUser from django.contrib.auth.models import AnonymousUser
from django.db import connection, transaction
from django.http import HttpRequest, HttpResponse from django.http import HttpRequest, HttpResponse
from django.utils.html import escape as escape_html from django.utils.html import escape as escape_html
from django.utils.translation import gettext as _ from django.utils.translation import gettext as _
@ -169,74 +171,101 @@ def get_messages_backend(
assert log_data is not None assert log_data is not None
log_data["extra"] = "[{}]".format(",".join(verbose_operators)) log_data["extra"] = "[{}]".format(",".join(verbose_operators))
query_info = fetch_messages( with transaction.atomic(durable=True):
narrow=narrow, # We're about to perform a search, and then get results from
user_profile=user_profile, # it; this is done across multiple queries. To prevent race
realm=realm, # conditions, we want the messages returned to be consistent
is_web_public_query=is_web_public_query, # with the version of the messages that was searched, to
anchor=anchor, # prevent changes which happened between them from leaking to
include_anchor=include_anchor, # clients who should not be able to see the new values, and
num_before=num_before, # when messages are deleted in between. We set up
num_after=num_after, # repeatable-read isolation for this transaction, so that we
) # prevent both phantom reads and non-repeatable reads.
#
# In a read-only repeatable-read transaction, it is not
# possible to encounter deadlocks or need retries due to
# serialization errors.
#
# You can only set the isolation level before any queries in
# the transaction, meaning it must be the top-most
# transaction, which durable=True establishes. Except in
# tests, where durable=True is a lie, because there is an
# outer transaction for each test. We thus skip this command
# in tests, since it would fail.
if not settings.TEST_SUITE: # nocoverage
cursor = connection.cursor()
cursor.execute("SET TRANSACTION ISOLATION LEVEL REPEATABLE READ READ ONLY")
anchor = query_info.anchor query_info = fetch_messages(
include_history = query_info.include_history narrow=narrow,
is_search = query_info.is_search user_profile=user_profile,
rows = query_info.rows realm=realm,
is_web_public_query=is_web_public_query,
anchor=anchor,
include_anchor=include_anchor,
num_before=num_before,
num_after=num_after,
)
# The following is a little messy, but ensures that the code paths anchor = query_info.anchor
# are similar regardless of the value of include_history. The include_history = query_info.include_history
# 'user_messages' dictionary maps each message to the user's is_search = query_info.is_search
# UserMessage object for that message, which we will attach to the rows = query_info.rows
# rendered message dict before returning it. We attempt to
# bulk-fetch rendered message dicts from remote cache using the
# 'messages' list.
message_ids: List[int] = []
user_message_flags: Dict[int, List[str]] = {}
if is_web_public_query:
# For spectators, we treat all historical messages as read.
for row in rows:
message_id = row[0]
message_ids.append(message_id)
user_message_flags[message_id] = ["read"]
elif include_history:
assert user_profile is not None
message_ids = [row[0] for row in rows]
# TODO: This could be done with an outer join instead of two queries # The following is a little messy, but ensures that the code paths
um_rows = UserMessage.objects.filter(user_profile=user_profile, message_id__in=message_ids) # are similar regardless of the value of include_history. The
user_message_flags = {um.message_id: um.flags_list() for um in um_rows} # 'user_messages' dictionary maps each message to the user's
# UserMessage object for that message, which we will attach to the
# rendered message dict before returning it. We attempt to
# bulk-fetch rendered message dicts from remote cache using the
# 'messages' list.
message_ids: List[int] = []
user_message_flags: Dict[int, List[str]] = {}
if is_web_public_query:
# For spectators, we treat all historical messages as read.
for row in rows:
message_id = row[0]
message_ids.append(message_id)
user_message_flags[message_id] = ["read"]
elif include_history:
assert user_profile is not None
message_ids = [row[0] for row in rows]
for message_id in message_ids: # TODO: This could be done with an outer join instead of two queries
if message_id not in user_message_flags: um_rows = UserMessage.objects.filter(
user_message_flags[message_id] = ["read", "historical"] user_profile=user_profile, message_id__in=message_ids
else:
for row in rows:
message_id = row[0]
flags = row[1]
user_message_flags[message_id] = UserMessage.flags_list_for_flags(flags)
message_ids.append(message_id)
search_fields: Dict[int, Dict[str, str]] = {}
if is_search:
for row in rows:
message_id = row[0]
(topic_name, rendered_content, content_matches, topic_matches) = row[-4:]
search_fields[message_id] = get_search_fields(
rendered_content, topic_name, content_matches, topic_matches
) )
user_message_flags = {um.message_id: um.flags_list() for um in um_rows}
message_list = messages_for_ids( for message_id in message_ids:
message_ids=message_ids, if message_id not in user_message_flags:
user_message_flags=user_message_flags, user_message_flags[message_id] = ["read", "historical"]
search_fields=search_fields, else:
apply_markdown=apply_markdown, for row in rows:
client_gravatar=client_gravatar, message_id = row[0]
allow_edit_history=realm.allow_edit_history, flags = row[1]
user_profile=user_profile, user_message_flags[message_id] = UserMessage.flags_list_for_flags(flags)
realm=realm, message_ids.append(message_id)
)
search_fields: Dict[int, Dict[str, str]] = {}
if is_search:
for row in rows:
message_id = row[0]
(topic_name, rendered_content, content_matches, topic_matches) = row[-4:]
search_fields[message_id] = get_search_fields(
rendered_content, topic_name, content_matches, topic_matches
)
message_list = messages_for_ids(
message_ids=message_ids,
user_message_flags=user_message_flags,
search_fields=search_fields,
apply_markdown=apply_markdown,
client_gravatar=client_gravatar,
allow_edit_history=realm.allow_edit_history,
user_profile=user_profile,
realm=realm,
)
ret = dict( ret = dict(
messages=message_list, messages=message_list,