delete_topic: Use the same timeout pattern as /mark_all_as_read.

We don't want to use the nginx 502 timeout as an API pattern. In
/mark_all_as_read we've already figured out how we want to handle this
instead.
This commit is contained in:
Mateusz Mandera 2022-11-01 10:00:38 +01:00 committed by Alex Vandiver
parent 66da42bbae
commit 6e336ef6f6
8 changed files with 128 additions and 57 deletions

View File

@ -976,13 +976,14 @@ export function delete_topic(stream_id, topic_name, failures = 0) {
data: { data: {
topic_name, topic_name,
}, },
success() {}, success(data) {
error(xhr) { if (data.result === "partially_completed") {
if (failures >= 9) { if (failures >= 9) {
// Don't keep retrying indefinitely to avoid DoSing the server. // Don't keep retrying indefinitely to avoid DoSing the server.
return; return;
} }
if (xhr.status === 502) {
failures += 1;
/* When trying to delete a very large topic, it's /* When trying to delete a very large topic, it's
possible for the request to the server to possible for the request to the server to
time out after making some progress. Retry the time out after making some progress. Retry the
@ -991,7 +992,6 @@ export function delete_topic(stream_id, topic_name, failures = 0) {
TODO: Show a nice loading indicator experience. TODO: Show a nice loading indicator experience.
*/ */
failures += 1;
delete_topic(stream_id, topic_name, failures); delete_topic(stream_id, topic_name, failures);
} }
}, },

View File

@ -20,6 +20,13 @@ format used by the Zulip server that they are interacting with.
## Changes in Zulip 6.0 ## Changes in Zulip 6.0
**Feature level 154**
* [`POST /streams/{stream_id}/delete_topic`](/api/delete-topic):
When the process of deleting messages times out, a success response
with "partially_completed" result will now be returned by the server,
analogically to the `/mark_all_as_read` endpoint.
**Feature level 153** **Feature level 153**
* [`POST /mark_all_as_read`](/api/mark-all-as-read): Messages are now * [`POST /mark_all_as_read`](/api/mark-all-as-read): Messages are now

View File

@ -33,7 +33,7 @@ DESKTOP_WARNING_VERSION = "5.4.3"
# Changes should be accompanied by documentation explaining what the # Changes should be accompanied by documentation explaining what the
# new level means in templates/zerver/api/changelog.md, as well as # new level means in templates/zerver/api/changelog.md, as well as
# "**Changes**" entries in the endpoint's documentation in `zulip.yaml`. # "**Changes**" entries in the endpoint's documentation in `zulip.yaml`.
API_FEATURE_LEVEL = 153 API_FEATURE_LEVEL = 154
# Bump the minor PROVISION_VERSION to indicate that folks should provision # Bump the minor PROVISION_VERSION to indicate that folks should provision
# only when going from an old version of the code to a newer version. Bump # only when going from an old version of the code to a newer version. Bump

View File

@ -717,3 +717,15 @@ def mock_queue_publish(
with mock.patch(method_to_patch, side_effect=verify_serialize): with mock.patch(method_to_patch, side_effect=verify_serialize):
yield inner yield inner
@contextmanager
def timeout_mock(mock_path: str) -> Iterator[None]:
# timeout() doesn't work in test environment with database operations
# and they don't get committed - so we need to replace it with a mock
# that just calls the function.
def mock_timeout(seconds: int, func: Callable[[], object]) -> object:
return func()
with mock.patch(f"{mock_path}.timeout", new=mock_timeout):
yield

View File

@ -14388,6 +14388,9 @@ paths:
for very large topics. It now deletes messages in batches, for very large topics. It now deletes messages in batches,
starting with the newest messages, so that progress will be starting with the newest messages, so that progress will be
made even if the request times out. made even if the request times out.
As of feature level 154, in case of timeout, a success response
with "partially_completed" result will now be returned.
parameters: parameters:
- $ref: "#/components/parameters/StreamIdInPath" - $ref: "#/components/parameters/StreamIdInPath"
- name: topic_name - name: topic_name
@ -14400,7 +14403,26 @@ paths:
required: true required: true
responses: responses:
"200": "200":
$ref: "#/components/responses/SimpleSuccess" description: Success or partial success.
content:
application/json:
schema:
oneOf:
- allOf:
- $ref: "#/components/schemas/JsonSuccess"
- $ref: "#/components/schemas/SuccessDescription"
- allOf:
- $ref: "#/components/schemas/PartiallyCompleted"
- example:
{
"code": "REQUEST_TIMEOUT",
"msg": "",
"result": "partially_completed",
}
description: |
If the request exceeds its processing time limit after having
successfully marked some messages as read, response code 200
with result "partially_completed" and code "REQUEST_TIMEOUT" will be returned like this:
"400": "400":
description: Bad request. description: Bad request.
content: content:

View File

@ -1,5 +1,4 @@
from contextlib import contextmanager from typing import TYPE_CHECKING, Any, List, Mapping, Set
from typing import TYPE_CHECKING, Any, Callable, Iterator, List, Mapping, Set
from unittest import mock from unittest import mock
import orjson import orjson
@ -22,7 +21,7 @@ from zerver.lib.message import (
get_raw_unread_data, get_raw_unread_data,
) )
from zerver.lib.test_classes import ZulipTestCase from zerver.lib.test_classes import ZulipTestCase
from zerver.lib.test_helpers import get_subscription from zerver.lib.test_helpers import get_subscription, timeout_mock
from zerver.lib.timeout import TimeoutExpired from zerver.lib.timeout import TimeoutExpired
from zerver.lib.user_topics import add_topic_mute from zerver.lib.user_topics import add_topic_mute
from zerver.models import ( from zerver.models import (
@ -51,18 +50,6 @@ def check_flags(flags: List[str], expected: Set[str]) -> None:
raise AssertionError(f"expected flags (ignoring has_alert_word) to be {expected}") raise AssertionError(f"expected flags (ignoring has_alert_word) to be {expected}")
@contextmanager
def timeout_mock() -> Iterator[None]:
# timeout() doesn't work in test environment with database operations
# and they don't get committed - so we need to replace it with a mock
# that just calls the function.
def mock_timeout(seconds: int, func: Callable[[], object]) -> object:
return func()
with mock.patch("zerver.views.message_flags.timeout", new=mock_timeout):
yield
class FirstUnreadAnchorTests(ZulipTestCase): class FirstUnreadAnchorTests(ZulipTestCase):
""" """
HISTORICAL NOTE: HISTORICAL NOTE:
@ -76,7 +63,7 @@ class FirstUnreadAnchorTests(ZulipTestCase):
self.login("hamlet") self.login("hamlet")
# Mark all existing messages as read # Mark all existing messages as read
with timeout_mock(): with timeout_mock("zerver.views.message_flags"):
result = self.client_post("/json/mark_all_as_read") result = self.client_post("/json/mark_all_as_read")
self.assert_json_success(result) self.assert_json_success(result)
@ -136,7 +123,7 @@ class FirstUnreadAnchorTests(ZulipTestCase):
def test_visible_messages_use_first_unread_anchor(self) -> None: def test_visible_messages_use_first_unread_anchor(self) -> None:
self.login("hamlet") self.login("hamlet")
with timeout_mock(): with timeout_mock("zerver.views.message_flags"):
result = self.client_post("/json/mark_all_as_read") result = self.client_post("/json/mark_all_as_read")
self.assert_json_success(result) self.assert_json_success(result)
@ -579,7 +566,7 @@ class PushNotificationMarkReadFlowsTest(ZulipTestCase):
[third_message_id, fourth_message_id], [third_message_id, fourth_message_id],
) )
with timeout_mock(): with timeout_mock("zerver.views.message_flags"):
result = self.client_post("/json/mark_all_as_read", {}) result = self.client_post("/json/mark_all_as_read", {})
self.assertEqual(self.get_mobile_push_notification_ids(user_profile), []) self.assertEqual(self.get_mobile_push_notification_ids(user_profile), [])
mock_push_notifications.assert_called() mock_push_notifications.assert_called()
@ -602,7 +589,7 @@ class MarkAllAsReadEndpointTest(ZulipTestCase):
.count() .count()
) )
self.assertNotEqual(unread_count, 0) self.assertNotEqual(unread_count, 0)
with timeout_mock(): with timeout_mock("zerver.views.message_flags"):
result = self.client_post("/json/mark_all_as_read", {}) result = self.client_post("/json/mark_all_as_read", {})
self.assert_json_success(result) self.assert_json_success(result)

View File

@ -1,7 +1,12 @@
from unittest import mock
import orjson
from django.utils.timezone import now as timezone_now from django.utils.timezone import now as timezone_now
from zerver.actions.streams import do_change_stream_permission from zerver.actions.streams import do_change_stream_permission
from zerver.lib.test_classes import ZulipTestCase from zerver.lib.test_classes import ZulipTestCase
from zerver.lib.test_helpers import timeout_mock
from zerver.lib.timeout import TimeoutExpired
from zerver.models import Message, UserMessage, get_client, get_realm, get_stream from zerver.models import Message, UserMessage, get_client, get_realm, get_stream
@ -282,23 +287,51 @@ class TopicDeleteTest(ZulipTestCase):
acting_user=user_profile, acting_user=user_profile,
) )
# Delete the topic should now remove all messages # Delete the topic should now remove all messages
result = self.client_post( with timeout_mock("zerver.views.streams"):
endpoint, result = self.client_post(
{ endpoint,
"topic_name": topic_name, {
}, "topic_name": topic_name,
) },
)
self.assert_json_success(result) self.assert_json_success(result)
self.assertFalse(Message.objects.filter(id=last_msg_id).exists()) self.assertFalse(Message.objects.filter(id=last_msg_id).exists())
self.assertTrue(Message.objects.filter(id=initial_last_msg_id).exists()) self.assertTrue(Message.objects.filter(id=initial_last_msg_id).exists())
# Delete again, to test the edge case of deleting an empty topic. # Delete again, to test the edge case of deleting an empty topic.
result = self.client_post( with timeout_mock("zerver.views.streams"):
endpoint, result = self.client_post(
{ endpoint,
"topic_name": topic_name, {
}, "topic_name": topic_name,
) },
)
self.assert_json_success(result) self.assert_json_success(result)
self.assertFalse(Message.objects.filter(id=last_msg_id).exists()) self.assertFalse(Message.objects.filter(id=last_msg_id).exists())
self.assertTrue(Message.objects.filter(id=initial_last_msg_id).exists()) self.assertTrue(Message.objects.filter(id=initial_last_msg_id).exists())
def test_topic_delete_timeout(self) -> None:
stream_name = "new_stream"
topic_name = "new topic 2"
user_profile = self.example_user("iago")
self.subscribe(user_profile, stream_name)
stream = get_stream(stream_name, user_profile.realm)
self.send_stream_message(user_profile, stream_name, topic_name=topic_name)
self.login_user(user_profile)
endpoint = "/json/streams/" + str(stream.id) + "/delete_topic"
with mock.patch("zerver.views.streams.timeout", side_effect=TimeoutExpired):
result = self.client_post(
endpoint,
{
"topic_name": topic_name,
},
)
self.assertEqual(result.status_code, 200)
result_dict = orjson.loads(result.content)
self.assertEqual(
result_dict, {"result": "partially_completed", "msg": "", "code": "REQUEST_TIMEOUT"}
)

View File

@ -1,5 +1,5 @@
from collections import defaultdict from collections import defaultdict
from typing import Any, Callable, Dict, List, Mapping, Optional, Sequence, Set, Union from typing import Any, Callable, Dict, List, Literal, Mapping, Optional, Sequence, Set, Union
import orjson import orjson
from django.conf import settings from django.conf import settings
@ -54,7 +54,7 @@ from zerver.lib.exceptions import (
) )
from zerver.lib.mention import MentionBackend, silent_mention_syntax_for_user from zerver.lib.mention import MentionBackend, silent_mention_syntax_for_user
from zerver.lib.request import REQ, has_request_variables from zerver.lib.request import REQ, has_request_variables
from zerver.lib.response import json_success from zerver.lib.response import json_partial_success, json_success
from zerver.lib.retention import STREAM_MESSAGE_BATCH_SIZE as RETENTION_STREAM_MESSAGE_BATCH_SIZE from zerver.lib.retention import STREAM_MESSAGE_BATCH_SIZE as RETENTION_STREAM_MESSAGE_BATCH_SIZE
from zerver.lib.retention import parse_message_retention_days from zerver.lib.retention import parse_message_retention_days
from zerver.lib.streams import ( from zerver.lib.streams import (
@ -72,6 +72,7 @@ from zerver.lib.streams import (
) )
from zerver.lib.string_validation import check_stream_name from zerver.lib.string_validation import check_stream_name
from zerver.lib.subscription_info import gather_subscriptions from zerver.lib.subscription_info import gather_subscriptions
from zerver.lib.timeout import TimeoutExpired, timeout
from zerver.lib.topic import ( from zerver.lib.topic import (
get_topic_history_for_public_stream, get_topic_history_for_public_stream,
get_topic_history_for_stream, get_topic_history_for_stream,
@ -867,20 +868,29 @@ def delete_in_topic(
).values_list("message_id", flat=True) ).values_list("message_id", flat=True)
messages = messages.filter(id__in=deletable_message_ids) messages = messages.filter(id__in=deletable_message_ids)
# Topics can be large enough that this request will inevitably time out. def delete_in_batches() -> Literal[True]:
# In such a case, it's good for some progress to be accomplished, so that # Topics can be large enough that this request will inevitably time out.
# full deletion can be achieved by repeating the request. For that purpose, # In such a case, it's good for some progress to be accomplished, so that
# we delete messages in atomic batches, committing after each batch. # full deletion can be achieved by repeating the request. For that purpose,
# TODO: Ideally this should be moved to the deferred_work queue. # we delete messages in atomic batches, committing after each batch.
batch_size = RETENTION_STREAM_MESSAGE_BATCH_SIZE # TODO: Ideally this should be moved to the deferred_work queue.
while True: batch_size = RETENTION_STREAM_MESSAGE_BATCH_SIZE
with transaction.atomic(durable=True): while True:
messages_to_delete = messages.order_by("-id")[0:batch_size].select_for_update( with transaction.atomic(durable=True):
of=("self",) messages_to_delete = messages.order_by("-id")[0:batch_size].select_for_update(
) of=("self",)
if not messages_to_delete: )
break if not messages_to_delete:
do_delete_messages(user_profile.realm, messages_to_delete) break
do_delete_messages(user_profile.realm, messages_to_delete)
# timeout() in which we call this function requires non-None return value.
return True
try:
timeout(50, delete_in_batches)
except TimeoutExpired:
return json_partial_success(request, data={"code": ErrorCode.REQUEST_TIMEOUT.name})
return json_success(request) return json_success(request)