api: Handle restart events in apply_events.

Event of type restart could not be handled properly, because of
its special behavior. For handling this event in most natural way
we recursively call `do_events_register` when restart event is
recieved, based on custom error created for this event.

Testing: Second call to get_user_events due to recursive calling
of do_event_register, is expected to not contain the restart event.
So new test added in test_event_system.py are based on above behavior
of get_user_events.

Fixes: #15541.
This commit is contained in:
m-e-l-u-h-a-n 2021-04-29 16:24:30 +05:30 committed by Tim Abbott
parent 2a4452e722
commit d2c18e28a4
3 changed files with 134 additions and 12 deletions

View File

@ -69,6 +69,12 @@ from zerver.tornado.django_api import get_user_events, request_event_queue
from zproject.backends import email_auth_enabled, password_auth_enabled from zproject.backends import email_auth_enabled, password_auth_enabled
class RestartEventException(Exception):
"""
Special error for handling restart events in apply_events.
"""
def add_realm_logo_fields(state: Dict[str, Any], realm: Realm) -> None: def add_realm_logo_fields(state: Dict[str, Any], realm: Realm) -> None:
state["realm_logo_url"] = get_realm_logo_url(realm, night=False) state["realm_logo_url"] = get_realm_logo_url(realm, night=False)
state["realm_logo_source"] = get_realm_logo_source(realm, night=False) state["realm_logo_source"] = get_realm_logo_source(realm, night=False)
@ -492,6 +498,8 @@ def apply_events(
include_subscribers: bool, include_subscribers: bool,
) -> None: ) -> None:
for event in events: for event in events:
if event["type"] == "restart":
raise RestartEventException()
if fetch_event_types is not None and event["type"] not in fetch_event_types: if fetch_event_types is not None and event["type"] not in fetch_event_types:
# TODO: continuing here is not, most precisely, correct. # TODO: continuing here is not, most precisely, correct.
# In theory, an event of one type, e.g. `realm_user`, # In theory, an event of one type, e.g. `realm_user`,
@ -1141,15 +1149,39 @@ def do_events_register(
# Apply events that came in while we were fetching initial data # Apply events that came in while we were fetching initial data
events = get_user_events(user_profile, queue_id, -1) events = get_user_events(user_profile, queue_id, -1)
apply_events( try:
user_profile, apply_events(
state=ret, user_profile,
events=events, state=ret,
fetch_event_types=fetch_event_types, events=events,
client_gravatar=client_gravatar, fetch_event_types=fetch_event_types,
slim_presence=slim_presence, client_gravatar=client_gravatar,
include_subscribers=include_subscribers, slim_presence=slim_presence,
) include_subscribers=include_subscribers,
)
except RestartEventException:
# This represents a rare race condition, where Tornado
# restarted (and sent `restart` events) while we were waiting
# for fetch_initial_state_data to return. To avoid the client
# needing to reload shortly after loading, we recursively call
# do_events_register here.
ret = do_events_register(
user_profile,
user_client,
apply_markdown,
client_gravatar,
slim_presence,
event_types,
queue_lifespan_secs,
all_public_streams,
include_subscribers,
include_streams,
client_capabilities,
narrow,
fetch_event_types,
)
return ret
post_process_state(user_profile, ret, notification_settings_null) post_process_state(user_profile, ret, notification_settings_null)

View File

@ -51,6 +51,7 @@ from zerver.models import (
Subscription, Subscription,
UserMessage, UserMessage,
UserProfile, UserProfile,
get_client,
get_realm, get_realm,
get_stream, get_stream,
) )
@ -327,10 +328,13 @@ class HostRequestMock:
post_data: Dict[str, Any] = {}, post_data: Dict[str, Any] = {},
user_profile: Optional[UserProfile] = None, user_profile: Optional[UserProfile] = None,
host: str = settings.EXTERNAL_HOST, host: str = settings.EXTERNAL_HOST,
client_name: Optional[str] = None,
) -> None: ) -> None:
self.host = host self.host = host
self.GET: Dict[str, Any] = {} self.GET: Dict[str, Any] = {}
self.method = "" self.method = ""
if client_name is not None:
self.client = get_client(client_name)
# Convert any integer parameters passed into strings, even # Convert any integer parameters passed into strings, even
# though of course the HTTP API would do so. Ideally, we'd # though of course the HTTP API would do so. Ideally, we'd

View File

@ -1,5 +1,5 @@
import time import time
from typing import Any, Callable, Dict, List from typing import Any, Callable, Dict, List, Optional
from unittest import mock from unittest import mock
import orjson import orjson
@ -29,8 +29,12 @@ from zerver.tornado.event_queue import (
process_message_event, process_message_event,
send_restart_events, send_restart_events,
) )
from zerver.tornado.views import get_events from zerver.tornado.views import get_events, get_events_backend
from zerver.views.events_register import _default_all_public_streams, _default_narrow from zerver.views.events_register import (
_default_all_public_streams,
_default_narrow,
events_register_backend,
)
class EventsEndpointTest(ZulipTestCase): class EventsEndpointTest(ZulipTestCase):
@ -829,6 +833,16 @@ class ClientDescriptorsTest(ZulipTestCase):
class RestartEventsTest(ZulipTestCase): class RestartEventsTest(ZulipTestCase):
def tornado_call(
self,
view_func: Callable[[HttpRequest, UserProfile], HttpResponse],
user_profile: UserProfile,
post_data: Dict[str, Any],
client_name: Optional[str] = None,
) -> HttpResponse:
request = HostRequestMock(post_data, user_profile, client_name=client_name)
return view_func(request, user_profile)
def test_restart(self) -> None: def test_restart(self) -> None:
hamlet = self.example_user("hamlet") hamlet = self.example_user("hamlet")
realm = hamlet.realm realm = hamlet.realm
@ -869,6 +883,78 @@ class RestartEventsTest(ZulipTestCase):
), ),
) )
def test_restart_event_recursive_call_logic(self) -> None:
# This is a test for a subtle corner case; see the comments
# around RestartEventError for details.
hamlet = self.example_user("hamlet")
realm = hamlet.realm
# Setup an empty event queue
clear_client_event_queues_for_testing()
queue_data = dict(
all_public_streams=False,
apply_markdown=True,
client_gravatar=True,
client_type_name="website",
event_types=None,
last_connection_time=time.time(),
queue_timeout=0,
realm_id=realm.id,
user_profile_id=hamlet.id,
)
client = allocate_client_descriptor(queue_data)
# Add a restart event to it.
send_restart_events(immediate=True)
# Make a second queue after the restart events were sent.
second_client = allocate_client_descriptor(queue_data)
# Fetch the restart event just sent above, without removing it
# from the queue. We will use this as a mock return value in
# get_user_events.
restart_event = orjson.loads(
self.tornado_call(
get_events_backend,
hamlet,
post_data={
"queue_id": client.event_queue.id,
"last_event_id": -1,
"dont_block": "true",
"user_profile_id": hamlet.id,
"secret": settings.SHARED_SECRET,
"client": "internal",
},
client_name="internal",
).content
)["events"]
# Now the tricky part: We call events_register_backend,
# arranging it so that the first `get_user_events` call
# returns our restart event (triggering the recursive
# behavior), but the second (with a new queue) returns no
# events.
#
# Because get_user_events always returns [] in tests, we need
# to mock its return value as well; in an ideal world, we
# would only need to mock client / second_client.
with mock.patch(
"zerver.lib.events.request_event_queue",
side_effect=[client.event_queue.id, second_client.event_queue.id],
), mock.patch("zerver.lib.events.get_user_events", side_effect=[restart_event, []]):
self.tornado_call(
events_register_backend,
hamlet,
{
"queue_id": client.event_queue.id,
"user_client": "website",
"last_event_id": -1,
"dont_block": orjson.dumps(True).decode(),
},
client_name="website",
)
class FetchQueriesTest(ZulipTestCase): class FetchQueriesTest(ZulipTestCase):
def test_queries(self) -> None: def test_queries(self) -> None: