streams: Remove inappropriate use of zerver.lib.timeout.

zerver.lib.timeout abuses asynchronous exceptions, so it’s only safe
to use on CPU computations with no side effects.

Signed-off-by: Anders Kaseorg <anders@zulip.com>
This commit is contained in:
Anders Kaseorg 2024-04-18 10:31:01 -07:00 committed by Tim Abbott
parent 95a1481f99
commit da979bc65c
2 changed files with 32 additions and 42 deletions

View File

@ -4,8 +4,6 @@ from django.utils.timezone import now as timezone_now
from zerver.actions.streams import do_change_stream_permission
from zerver.lib.test_classes import ZulipTestCase
from zerver.lib.test_helpers import timeout_mock
from zerver.lib.timeout import TimeoutExpiredError
from zerver.models import Message, UserMessage
from zerver.models.clients import get_client
from zerver.models.realms import get_realm
@ -291,26 +289,24 @@ class TopicDeleteTest(ZulipTestCase):
acting_user=user_profile,
)
# Delete the topic should now remove all messages
with timeout_mock("zerver.views.streams"):
result = self.client_post(
endpoint,
{
"topic_name": topic_name,
},
)
result = self.client_post(
endpoint,
{
"topic_name": topic_name,
},
)
result_dict = self.assert_json_success(result)
self.assertTrue(result_dict["complete"])
self.assertFalse(Message.objects.filter(id=last_msg_id).exists())
self.assertTrue(Message.objects.filter(id=initial_last_msg_id).exists())
# Delete again, to test the edge case of deleting an empty topic.
with timeout_mock("zerver.views.streams"):
result = self.client_post(
endpoint,
{
"topic_name": topic_name,
},
)
result = self.client_post(
endpoint,
{
"topic_name": topic_name,
},
)
result_dict = self.assert_json_success(result)
self.assertTrue(result_dict["complete"])
self.assertFalse(Message.objects.filter(id=last_msg_id).exists())
@ -328,7 +324,7 @@ class TopicDeleteTest(ZulipTestCase):
self.login_user(user_profile)
endpoint = "/json/streams/" + str(stream.id) + "/delete_topic"
with mock.patch("zerver.views.streams.timeout", side_effect=TimeoutExpiredError):
with mock.patch("time.monotonic", side_effect=[10000, 10051]):
result = self.client_post(
endpoint,
{

View File

@ -1,5 +1,6 @@
import time
from collections import defaultdict
from typing import Any, Callable, Dict, List, Literal, Mapping, Optional, Sequence, Set, Union
from typing import Any, Callable, Dict, List, Mapping, Optional, Sequence, Set, Union
import orjson
from django.conf import settings
@ -66,7 +67,6 @@ from zerver.lib.streams import (
stream_to_dict,
)
from zerver.lib.subscription_info import gather_subscriptions
from zerver.lib.timeout import TimeoutExpiredError, timeout
from zerver.lib.topic import (
get_topic_history_for_public_stream,
get_topic_history_for_stream,
@ -922,29 +922,23 @@ def delete_in_topic(
# the user can see are returned in the query.
messages = bulk_access_stream_messages_query(user_profile, messages, stream)
def delete_in_batches() -> Literal[True]:
# Topics can be large enough that this request will inevitably time out.
# In such a case, it's good for some progress to be accomplished, so that
# full deletion can be achieved by repeating the request. For that purpose,
# we delete messages in atomic batches, committing after each batch.
# TODO: Ideally this should be moved to the deferred_work queue.
batch_size = RETENTION_STREAM_MESSAGE_BATCH_SIZE
while True:
with transaction.atomic(durable=True):
messages_to_delete = messages.order_by("-id")[0:batch_size].select_for_update(
of=("self",)
)
if not messages_to_delete:
break
do_delete_messages(user_profile.realm, messages_to_delete)
# timeout() in which we call this function requires non-None return value.
return True
try:
timeout(50, delete_in_batches)
except TimeoutExpiredError:
return json_success(request, data={"complete": False})
# Topics can be large enough that this request will inevitably time out.
# In such a case, it's good for some progress to be accomplished, so that
# full deletion can be achieved by repeating the request. For that purpose,
# we delete messages in atomic batches, committing after each batch.
# TODO: Ideally this should be moved to the deferred_work queue.
start_time = time.monotonic()
batch_size = RETENTION_STREAM_MESSAGE_BATCH_SIZE
while True:
if time.monotonic() >= start_time + 50:
return json_success(request, data={"complete": False})
with transaction.atomic(durable=True):
messages_to_delete = messages.order_by("-id")[0:batch_size].select_for_update(
of=("self",)
)
if not messages_to_delete:
break
do_delete_messages(user_profile.realm, messages_to_delete)
return json_success(request, data={"complete": True})