Use dictionaries for passing data into event server subsystem.

This marshalls data in a portable format in preparation for splitting
the event queue server from the Tornado server.
This commit is contained in:
Tim Abbott 2014-01-28 12:11:08 -05:00 committed by Tim Abbott
parent 6cd14af18f
commit ad1494f8e0
3 changed files with 57 additions and 23 deletions

View File

@ -324,16 +324,13 @@ def add_to_client_dicts(client):
if client.all_public_streams or client.narrow != []:
realm_clients_all_streams.setdefault(client.realm_id, []).append(client)
def allocate_client_descriptor(user_profile_id, user_profile_email, realm_id,
event_types, client_type_name, apply_markdown,
all_public_streams, lifespan_secs, narrow=[]):
def allocate_client_descriptor(new_queue_data):
global next_queue_id
id = str(settings.SERVER_GENERATION) + ':' + str(next_queue_id)
queue_id = str(settings.SERVER_GENERATION) + ':' + str(next_queue_id)
next_queue_id += 1
client = ClientDescriptor(user_profile_id, user_profile_email, realm_id,
EventQueue(id), event_types, client_type_name,
apply_markdown, all_public_streams, lifespan_secs, narrow)
clients[id] = client
new_queue_data["event_queue"] = EventQueue(queue_id).to_dict()
client = ClientDescriptor.from_dict(new_queue_data)
clients[queue_id] = client
add_to_client_dicts(client)
return client
@ -440,19 +437,22 @@ def setup_event_queue():
send_restart_events()
def fetch_events(user_profile_id, user_profile_realm_id, user_profile_email,
queue_id, last_event_id, event_types, client_type_name, apply_markdown,
all_public_streams, lifespan_secs, narrow, dont_block, handler_id):
def fetch_events(query):
queue_id = query["queue_id"]
dont_block = query["dont_block"]
last_event_id = query["last_event_id"]
user_profile_id = query["user_profile_id"]
new_queue_data = query.get("new_queue_data")
user_profile_email = query["user_profile_email"]
client_type_name = query["client_type_name"]
handler_id = query["handler_id"]
was_connected = False
orig_queue_id = queue_id
extra_log_data = ""
if queue_id is None:
if dont_block:
client = allocate_client_descriptor(user_profile_id, user_profile_email,
user_profile_realm_id,
event_types, client_type_name, apply_markdown,
all_public_streams, lifespan_secs,
narrow=narrow)
client = allocate_client_descriptor(new_queue_data)
queue_id = client.event_queue.id
else:
raise JsonableError("Missing 'queue_id' argument")

View File

@ -52,6 +52,7 @@ from zerver.views import _default_all_public_streams, _default_narrow
from zerver.tornadoviews import get_events_backend
from collections import OrderedDict
import time
import ujson
from six.moves import range
@ -211,9 +212,18 @@ class EventsRegisterTest(AuthedTestCase):
])
def do_test(self, action, event_types=None):
client = allocate_client_descriptor(self.user_profile.id, self.user_profile.email,
self.user_profile.realm.id, event_types,
"website", True, False, 600, [])
client = allocate_client_descriptor(
dict(user_profile_id = self.user_profile.id,
user_profile_email = self.user_profile.email,
realm_id = self.user_profile.realm.id,
event_types = event_types,
client_type_name = "website",
apply_markdown = True,
all_public_streams = False,
queue_timeout = 600,
last_connection_time = time.time(),
narrow = [])
)
# hybrid_state = initial fetch state + re-applying events triggered by our action
# normal_state = do action then fetch at the end (the "normal" code path)
hybrid_state = fetch_initial_state_data(self.user_profile, event_types, "")

View File

@ -14,6 +14,7 @@ from zerver.lib.event_queue import allocate_client_descriptor, get_client_descri
from zerver.lib.handlers import allocate_handler_id
from zerver.lib.narrow import check_supported_events_narrow_filter
import time
import ujson
import logging
@ -55,10 +56,33 @@ def get_events_backend(request, user_profile, handler,
if user_client is None:
user_client = request.client
(result, log_data) = fetch_events(
user_profile.id, user_profile.realm_id, user_profile.email, queue_id,
last_event_id, event_types, user_client.name, apply_markdown, all_public_streams,
lifespan_secs, narrow, dont_block, handler.handler_id)
events_query = dict(
user_profile_id = user_profile.id,
user_profile_email = user_profile.email,
queue_id = queue_id,
last_event_id = last_event_id,
event_types = event_types,
client_type_name = user_client.name,
all_public_streams = all_public_streams,
lifespan_secs = lifespan_secs,
narrow = narrow,
dont_block = dont_block,
handler_id = handler.handler_id)
if queue_id is None:
events_query['new_queue_data'] = dict(
user_profile_id = user_profile.id,
realm_id = user_profile.realm.id,
user_profile_email = user_profile.email,
event_types = event_types,
client_type_name = user_client.name,
apply_markdown = apply_markdown,
all_public_streams = all_public_streams,
queue_timeout = lifespan_secs,
last_connection_time = time.time(),
narrow = narrow)
(result, log_data) = fetch_events(events_query)
request._log_data['extra'] = log_data
if result == RespondAsynchronously:
handler._request = request