tornado: Track which queues were inherited from old Tornado instances.

This commit is contained in:
Alex Vandiver 2024-02-07 21:03:15 +00:00 committed by Tim Abbott
parent fc41d6085b
commit 1d3813ec4f
3 changed files with 47 additions and 3 deletions

View File

@ -36,6 +36,7 @@ from zerver.tornado.event_queue import (
allocate_client_descriptor,
clear_client_event_queues_for_testing,
get_client_info_for_message_event,
mark_clients_to_reload,
process_message_event,
send_web_reload_client_events,
)
@ -1141,7 +1142,10 @@ class WebReloadClientsTest(ZulipTestCase):
client = allocate_client_descriptor(queue_data)
send_web_reload_client_events()
self.assert_length(client.event_queue.queue, 0)
mark_clients_to_reload([client.event_queue.id])
send_web_reload_client_events()
self.assert_length(client.event_queue.queue, 1)
reload_event = client.event_queue.queue[0]

View File

@ -251,6 +251,7 @@ from zerver.tornado.event_queue import (
allocate_client_descriptor,
clear_client_event_queues_for_testing,
create_heartbeat_event,
mark_clients_to_reload,
send_web_reload_client_events,
)
from zerver.views.realm_playgrounds import access_playground_by_id
@ -288,6 +289,7 @@ class BaseAction(ZulipTestCase):
pronouns_field_type_supported: bool = True,
linkifier_url_template: bool = True,
user_list_incomplete: bool = False,
client_is_old: bool = False,
) -> List[Dict[str, Any]]:
"""
Make sure we have a clean slate of client descriptors for these tests.
@ -336,6 +338,9 @@ class BaseAction(ZulipTestCase):
user_list_incomplete=user_list_incomplete,
)
if client_is_old:
mark_clients_to_reload([client.event_queue.id])
# We want even those `send_event` calls which have been hooked to
# `transaction.on_commit` to execute in tests.
# See the comment in `ZulipTestCase.capture_send_event_calls`.
@ -3457,8 +3462,14 @@ class NormalActionsTest(BaseAction):
check_has_zoom_token("events[0]", events[0], value=False)
def test_web_reload_client_event(self) -> None:
self.verify_action(
lambda: send_web_reload_client_events(),
client_is_old=False,
num_events=0,
state_change_expected=False,
)
with self.assertRaises(WebReloadClientError):
self.verify_action(lambda: send_web_reload_client_events())
self.verify_action(lambda: send_web_reload_client_events(), client_is_old=True)
def test_display_setting_event_not_sent(self) -> None:
events = self.verify_action(

View File

@ -19,6 +19,7 @@ from typing import (
Dict,
Iterable,
List,
Literal,
Mapping,
MutableMapping,
Optional,
@ -443,6 +444,11 @@ def prune_internal_data(events: List[Dict[str, Any]]) -> List[Dict[str, Any]]:
return events
# Queue-ids which still need to be sent a web_reload_client event.
# This is treated as an ordered set, which is sorted by realm-id when
# loaded from disk.
web_reload_clients: Dict[str, Literal[True]] = {}
# maps queue ids to client descriptors
clients: Dict[str, ClientDescriptor] = {}
# maps user id to list of client descriptors
@ -461,6 +467,7 @@ gc_hooks: List[Callable[[int, ClientDescriptor, bool], None]] = []
def clear_client_event_queues_for_testing() -> None:
assert settings.TEST_SUITE
clients.clear()
web_reload_clients.clear()
user_clients.clear()
realm_clients_all_streams.clear()
gc_hooks.clear()
@ -530,6 +537,8 @@ def do_gc_event_queues(
filter_client_dict(realm_clients_all_streams, realm_id)
for id in to_remove:
if id in web_reload_clients:
del web_reload_clients[id]
for cb in gc_hooks:
cb(
clients[id].user_profile_id,
@ -615,6 +624,8 @@ def load_event_queues(port: int) -> None:
"Tornado %d could not deserialize event queues", port, stack_info=True
)
mark_clients_to_reload(clients.keys())
for client in clients.values():
# Put code for migrations due to event queue data format changes here
@ -642,12 +653,30 @@ def send_restart_events() -> None:
client.add_event(event)
def send_web_reload_client_events(immediate: bool = False) -> None:
def mark_clients_to_reload(queue_ids: Iterable[str]) -> None:
# Build web_reload_clients, which is a sorted-by-realm-id list of
# website client queue-ids which were were loaded from old Tornado
# instances. We use an (ordered) dict to make removing one be
# O(1), as well as pulling an ordered N of them to be O(N). We
# sort by realm_id so that restarts are rolling by realm.
for qid in sorted(
(qid for qid in queue_ids if clients[qid].accepts_event({"type": "web_reload_client"})),
key=lambda qid: clients[qid].realm_id,
):
web_reload_clients[qid] = True
def send_web_reload_client_events(immediate: bool = False, count: Optional[int] = None) -> None:
event: Dict[str, Any] = dict(
type="web_reload_client",
immediate=immediate,
)
for client in clients.values():
if count is None:
count = len(web_reload_clients)
queue_ids = list(web_reload_clients.keys())[:count]
for qid in queue_ids:
del web_reload_clients[qid]
client = clients[qid]
if client.accepts_event(event):
client.add_event(event)