Add support for event queues with an included narrow.

This will allow us to substantially decrease the server-side work that
we do to support our Mirroring systems (since the personal mirrors can
request only messages that user sent) and also is what we need to
support a single-stream Zulip widget that we embed in webpages.

(imported from commit 055f2e9a523920719815181f8fdb44d3384e4a34)
This commit is contained in:
Tim Abbott 2013-12-10 10:28:16 -05:00
parent 48963c1cdf
commit 71ad82b6aa
7 changed files with 118 additions and 14 deletions

View File

@ -250,13 +250,13 @@ class Client(object):
call.func_name = name
setattr(cls, name, call)
def call_on_each_event(self, callback, event_types=None):
def call_on_each_event(self, callback, event_types=None, narrow=[]):
def do_register():
while True:
if event_types is None:
res = self.register()
else:
res = self.register(event_types=event_types)
res = self.register(event_types=event_types, narrow=narrow)
if 'error' in res.get('result'):
if self.verbose:
@ -318,10 +318,10 @@ def _mk_rm_subs(streams):
def _mk_deregister(queue_id):
return {'queue_id': queue_id}
def _mk_events(event_types=None):
def _mk_events(event_types=None, narrow=[]):
if event_types is None:
return dict()
return dict(event_types=event_types)
return dict(event_types=event_types, narrow=narrow)
def _kwargs_to_dict(**kwargs):
return kwargs

View File

@ -47,6 +47,7 @@ from zerver.lib.alert_words import user_alert_words, add_user_alert_words, \
remove_user_alert_words, set_user_alert_words
from zerver.lib.push_notifications import num_push_devices_for_user, \
send_apple_push_notification, send_android_push_notification
from zerver.lib.narrow import check_supported_events_narrow_filter
from zerver import tornado_callbacks
@ -1679,9 +1680,15 @@ def get_status_dict(requesting_user_profile):
def do_events_register(user_profile, user_client, apply_markdown=True,
event_types=None, queue_lifespan_secs=0, all_public_streams=False):
event_types=None, queue_lifespan_secs=0, all_public_streams=False,
narrow=[]):
# Technically we don't need to check this here because
# build_narrow_filter will check it, but it's nicer from an error
# handling perspective to do it before contacting Tornado
check_supported_events_narrow_filter(narrow)
queue_id = request_event_queue(user_profile, user_client, apply_markdown,
queue_lifespan_secs, event_types, all_public_streams)
queue_lifespan_secs, event_types, all_public_streams,
narrow=narrow)
if queue_id is None:
raise JsonableError("Could not allocate event queue")

View File

@ -18,6 +18,7 @@ import traceback
from zerver.lib.utils import statsd
from zerver.middleware import async_request_restart
from zerver.models import get_client
from zerver.lib.narrow import build_narrow_filter
import copy
# The idle timeout used to be a week, but we found that in that
@ -39,7 +40,8 @@ HEARTBEAT_MIN_FREQ_SECS = 45
class ClientDescriptor(object):
def __init__(self, user_profile_id, realm_id, event_queue, event_types, client_type,
apply_markdown=True, all_public_streams=False, lifespan_secs=0):
apply_markdown=True, all_public_streams=False, lifespan_secs=0,
narrow=[]):
# These objects are serialized on shutdown and restored on restart.
# If fields are added or semantics are changed, temporary code must be
# added to load_event_queues() to update the restored objects.
@ -55,6 +57,7 @@ class ClientDescriptor(object):
self.all_public_streams = all_public_streams
self.client_type = client_type
self._timeout_handle = None
self.narrow_filter = build_narrow_filter(narrow)
# Clamp queue_timeout to between minimum and maximum timeouts
self.queue_timeout = max(IDLE_EVENT_QUEUE_TIMEOUT_SECS, min(self.queue_timeout, MAX_QUEUE_TIMEOUT_SECS))
@ -116,7 +119,11 @@ class ClientDescriptor(object):
def accepts_event(self, event):
if self.event_types is None:
return True
return event["type"] in self.event_types
if event["type"] not in self.event_types:
return False
if event["type"] == "message":
return self.narrow_filter(event)
return True
# TODO: Refactor so we don't need this function
def accepts_messages(self):
@ -280,12 +287,13 @@ def get_client_descriptors_for_realm_all_streams(realm_id):
return realm_clients_all_streams.get(realm_id, [])
def allocate_client_descriptor(user_profile_id, realm_id, event_types, client_type,
apply_markdown, all_public_streams, lifespan_secs):
apply_markdown, all_public_streams, lifespan_secs,
narrow=[]):
global next_queue_id
id = str(settings.SERVER_GENERATION) + ':' + str(next_queue_id)
next_queue_id += 1
client = ClientDescriptor(user_profile_id, realm_id, EventQueue(id), event_types, client_type,
apply_markdown, all_public_streams, lifespan_secs)
apply_markdown, all_public_streams, lifespan_secs, narrow)
clients[id] = client
user_clients.setdefault(user_profile_id, []).append(client)
if all_public_streams:
@ -422,13 +430,15 @@ def extract_json_response(resp):
return resp.json
def request_event_queue(user_profile, user_client, apply_markdown,
queue_lifespan_secs, event_types=None, all_public_streams=False):
queue_lifespan_secs, event_types=None, all_public_streams=False,
narrow=[]):
if settings.TORNADO_SERVER:
req = {'dont_block' : 'true',
'apply_markdown': ujson.dumps(apply_markdown),
'all_public_streams': ujson.dumps(all_public_streams),
'client' : 'internal',
'user_client' : user_client.name,
'narrow' : ujson.dumps(narrow),
'lifespan_secs' : queue_lifespan_secs}
if event_types is not None:
req['event_types'] = ujson.dumps(event_types)

41
zerver/lib/narrow.py Normal file
View File

@ -0,0 +1,41 @@
from zerver.decorator import JsonableError
def check_supported_events_narrow_filter(narrow):
for element in narrow:
operator = element[0]
if operator not in ["stream", "topic", "sender", "is"]:
raise JsonableError("Operator %s not supported." % (operator,))
def build_narrow_filter(narrow):
check_supported_events_narrow_filter(narrow)
def narrow_filter(event):
message = event["message"]
flags = event["flags"]
for element in narrow:
operator = element[0]
operand = element[1]
if operator == "stream":
if message["type"] != "stream":
return False
if operand.lower() != message["display_recipient"].lower():
return False
elif operator == "topic":
if message["type"] != "stream":
return False
if operand.lower() != message["subject"].lower():
return False
elif operator == "sender":
if operand.lower() != message["sender_email"].lower():
return False
elif operator == "is" and operand == "private":
if message["type"] != "private":
return False
elif operator == "is" and operand in ["starred"]:
if operand not in flags:
return False
elif operator == "is" and operand in ["alerted", "mentioned"]:
if "mentioned" not in flags:
return False
return True
return narrow_filter

View File

@ -2530,6 +2530,46 @@ class GetEventsTest(AuthedTestCase):
self.assertEqual(events[0]["type"], "message")
self.assertEqual(events[0]["message"]["sender_email"], email)
def test_get_events_narrow(self):
email = "hamlet@zulip.com"
user_profile = get_user_profile_by_email(email)
self.login(email)
result = self.tornado_call(get_events_backend, user_profile,
{"apply_markdown": ujson.dumps(True),
"event_types": ujson.dumps(["message"]),
"narrow": ujson.dumps([["stream", "denmark"]]),
"user_client": "website",
"dont_block": ujson.dumps(True),
})
self.assert_json_success(result)
queue_id = ujson.loads(result.content)["queue_id"]
result = self.tornado_call(get_events_backend, user_profile,
{"queue_id": queue_id,
"user_client": "website",
"last_event_id": -1,
"dont_block": ujson.dumps(True),
})
events = ujson.loads(result.content)["events"]
self.assert_json_success(result)
self.assertEqual(len(events), 0)
self.send_message(email, "othello@zulip.com", Recipient.PERSONAL, "hello")
self.send_message(email, "Denmark", Recipient.STREAM, "hello")
result = self.tornado_call(get_events_backend, user_profile,
{"queue_id": queue_id,
"user_client": "website",
"last_event_id": -1,
"dont_block": ujson.dumps(True),
})
events = ujson.loads(result.content)["events"]
self.assert_json_success(result)
self.assertEqual(len(events), 1)
self.assertEqual(events[0]["type"], "message")
self.assertEqual(events[0]["message"]["display_recipient"], "Denmark")
from zerver.lib.event_queue import EventQueue
class EventQueueTest(TestCase):
def test_one_event(self):

View File

@ -5,12 +5,13 @@ from zerver.models import get_client
from zerver.decorator import asynchronous, \
authenticated_json_post_view, internal_notify_view, RespondAsynchronously, \
has_request_variables, json_to_bool, json_to_list, REQ
has_request_variables, json_to_bool, json_to_list, json_to_dict, REQ
from zerver.lib.response import json_success, json_error
from zerver.tornado_callbacks import process_notification
from zerver.lib.event_queue import allocate_client_descriptor, get_client_descriptor
from zerver.lib.narrow import check_supported_events_narrow_filter
import ujson
import logging
@ -48,6 +49,7 @@ def get_events_backend(request, user_profile, handler = None,
all_public_streams = REQ(default=False, converter=json_to_bool),
event_types = REQ(default=None, converter=json_to_list),
dont_block = REQ(default=False, converter=json_to_bool),
narrow = REQ(default=[], converter=json_to_list),
lifespan_secs = REQ(default=0, converter=int)):
if user_client is None:
user_client = request.client
@ -58,7 +60,8 @@ def get_events_backend(request, user_profile, handler = None,
if dont_block:
client = allocate_client_descriptor(user_profile.id, user_profile.realm.id,
event_types, user_client, apply_markdown,
all_public_streams, lifespan_secs)
all_public_streams, lifespan_secs,
narrow=narrow)
queue_id = client.event_queue.id
else:
return json_error("Missing 'queue_id' argument")

View File

@ -910,6 +910,7 @@ class BadNarrowOperator(Exception):
def to_json_error_msg(self):
return 'Invalid narrow operator: ' + self.desc
# When you add a new operator to this, also update zerver/lib/narrow.py
class NarrowBuilder(object):
def __init__(self, user_profile, prefix):
self.user_profile = user_profile
@ -2085,9 +2086,11 @@ def api_events_register(request, user_profile,
def events_register_backend(request, user_profile, apply_markdown=True,
all_public_streams=False,
event_types=REQ(converter=json_to_list, default=None),
narrow=REQ(converter=json_to_list, default=[]),
queue_lifespan_secs=REQ(converter=int, default=0)):
ret = do_events_register(user_profile, request.client, apply_markdown,
event_types, queue_lifespan_secs, all_public_streams)
event_types, queue_lifespan_secs, all_public_streams,
narrow=narrow)
return json_success(ret)
@authenticated_json_post_view