diff --git a/api/zulip/__init__.py b/api/zulip/__init__.py index c79e84f062..56de4748a6 100644 --- a/api/zulip/__init__.py +++ b/api/zulip/__init__.py @@ -250,13 +250,13 @@ class Client(object): call.func_name = name setattr(cls, name, call) - def call_on_each_event(self, callback, event_types=None): + def call_on_each_event(self, callback, event_types=None, narrow=[]): def do_register(): while True: if event_types is None: res = self.register() else: - res = self.register(event_types=event_types) + res = self.register(event_types=event_types, narrow=narrow) if 'error' in res.get('result'): if self.verbose: @@ -318,10 +318,10 @@ def _mk_rm_subs(streams): def _mk_deregister(queue_id): return {'queue_id': queue_id} -def _mk_events(event_types=None): +def _mk_events(event_types=None, narrow=[]): if event_types is None: return dict() - return dict(event_types=event_types) + return dict(event_types=event_types, narrow=narrow) def _kwargs_to_dict(**kwargs): return kwargs diff --git a/zerver/lib/actions.py b/zerver/lib/actions.py index bb5ee3499d..aa4b77a640 100644 --- a/zerver/lib/actions.py +++ b/zerver/lib/actions.py @@ -47,6 +47,7 @@ from zerver.lib.alert_words import user_alert_words, add_user_alert_words, \ remove_user_alert_words, set_user_alert_words from zerver.lib.push_notifications import num_push_devices_for_user, \ send_apple_push_notification, send_android_push_notification +from zerver.lib.narrow import check_supported_events_narrow_filter from zerver import tornado_callbacks @@ -1679,9 +1680,15 @@ def get_status_dict(requesting_user_profile): def do_events_register(user_profile, user_client, apply_markdown=True, - event_types=None, queue_lifespan_secs=0, all_public_streams=False): + event_types=None, queue_lifespan_secs=0, all_public_streams=False, + narrow=[]): + # Technically we don't need to check this here because + # build_narrow_filter will check it, but it's nicer from an error + # handling perspective to do it before contacting Tornado + check_supported_events_narrow_filter(narrow) queue_id = request_event_queue(user_profile, user_client, apply_markdown, - queue_lifespan_secs, event_types, all_public_streams) + queue_lifespan_secs, event_types, all_public_streams, + narrow=narrow) if queue_id is None: raise JsonableError("Could not allocate event queue") diff --git a/zerver/lib/event_queue.py b/zerver/lib/event_queue.py index a2dbe42e9b..d40b21e4eb 100644 --- a/zerver/lib/event_queue.py +++ b/zerver/lib/event_queue.py @@ -18,6 +18,7 @@ import traceback from zerver.lib.utils import statsd from zerver.middleware import async_request_restart from zerver.models import get_client +from zerver.lib.narrow import build_narrow_filter import copy # The idle timeout used to be a week, but we found that in that @@ -39,7 +40,8 @@ HEARTBEAT_MIN_FREQ_SECS = 45 class ClientDescriptor(object): def __init__(self, user_profile_id, realm_id, event_queue, event_types, client_type, - apply_markdown=True, all_public_streams=False, lifespan_secs=0): + apply_markdown=True, all_public_streams=False, lifespan_secs=0, + narrow=[]): # These objects are serialized on shutdown and restored on restart. # If fields are added or semantics are changed, temporary code must be # added to load_event_queues() to update the restored objects. @@ -55,6 +57,7 @@ class ClientDescriptor(object): self.all_public_streams = all_public_streams self.client_type = client_type self._timeout_handle = None + self.narrow_filter = build_narrow_filter(narrow) # Clamp queue_timeout to between minimum and maximum timeouts self.queue_timeout = max(IDLE_EVENT_QUEUE_TIMEOUT_SECS, min(self.queue_timeout, MAX_QUEUE_TIMEOUT_SECS)) @@ -116,7 +119,11 @@ class ClientDescriptor(object): def accepts_event(self, event): if self.event_types is None: return True - return event["type"] in self.event_types + if event["type"] not in self.event_types: + return False + if event["type"] == "message": + return self.narrow_filter(event) + return True # TODO: Refactor so we don't need this function def accepts_messages(self): @@ -280,12 +287,13 @@ def get_client_descriptors_for_realm_all_streams(realm_id): return realm_clients_all_streams.get(realm_id, []) def allocate_client_descriptor(user_profile_id, realm_id, event_types, client_type, - apply_markdown, all_public_streams, lifespan_secs): + apply_markdown, all_public_streams, lifespan_secs, + narrow=[]): global next_queue_id id = str(settings.SERVER_GENERATION) + ':' + str(next_queue_id) next_queue_id += 1 client = ClientDescriptor(user_profile_id, realm_id, EventQueue(id), event_types, client_type, - apply_markdown, all_public_streams, lifespan_secs) + apply_markdown, all_public_streams, lifespan_secs, narrow) clients[id] = client user_clients.setdefault(user_profile_id, []).append(client) if all_public_streams: @@ -422,13 +430,15 @@ def extract_json_response(resp): return resp.json def request_event_queue(user_profile, user_client, apply_markdown, - queue_lifespan_secs, event_types=None, all_public_streams=False): + queue_lifespan_secs, event_types=None, all_public_streams=False, + narrow=[]): if settings.TORNADO_SERVER: req = {'dont_block' : 'true', 'apply_markdown': ujson.dumps(apply_markdown), 'all_public_streams': ujson.dumps(all_public_streams), 'client' : 'internal', 'user_client' : user_client.name, + 'narrow' : ujson.dumps(narrow), 'lifespan_secs' : queue_lifespan_secs} if event_types is not None: req['event_types'] = ujson.dumps(event_types) diff --git a/zerver/lib/narrow.py b/zerver/lib/narrow.py new file mode 100644 index 0000000000..c2c7104109 --- /dev/null +++ b/zerver/lib/narrow.py @@ -0,0 +1,41 @@ +from zerver.decorator import JsonableError + +def check_supported_events_narrow_filter(narrow): + for element in narrow: + operator = element[0] + if operator not in ["stream", "topic", "sender", "is"]: + raise JsonableError("Operator %s not supported." % (operator,)) + +def build_narrow_filter(narrow): + check_supported_events_narrow_filter(narrow) + def narrow_filter(event): + message = event["message"] + flags = event["flags"] + for element in narrow: + operator = element[0] + operand = element[1] + if operator == "stream": + if message["type"] != "stream": + return False + if operand.lower() != message["display_recipient"].lower(): + return False + elif operator == "topic": + if message["type"] != "stream": + return False + if operand.lower() != message["subject"].lower(): + return False + elif operator == "sender": + if operand.lower() != message["sender_email"].lower(): + return False + elif operator == "is" and operand == "private": + if message["type"] != "private": + return False + elif operator == "is" and operand in ["starred"]: + if operand not in flags: + return False + elif operator == "is" and operand in ["alerted", "mentioned"]: + if "mentioned" not in flags: + return False + + return True + return narrow_filter diff --git a/zerver/tests.py b/zerver/tests.py index 4ba0251a5b..f6af17b466 100644 --- a/zerver/tests.py +++ b/zerver/tests.py @@ -2530,6 +2530,46 @@ class GetEventsTest(AuthedTestCase): self.assertEqual(events[0]["type"], "message") self.assertEqual(events[0]["message"]["sender_email"], email) + def test_get_events_narrow(self): + email = "hamlet@zulip.com" + user_profile = get_user_profile_by_email(email) + self.login(email) + + result = self.tornado_call(get_events_backend, user_profile, + {"apply_markdown": ujson.dumps(True), + "event_types": ujson.dumps(["message"]), + "narrow": ujson.dumps([["stream", "denmark"]]), + "user_client": "website", + "dont_block": ujson.dumps(True), + }) + self.assert_json_success(result) + queue_id = ujson.loads(result.content)["queue_id"] + + result = self.tornado_call(get_events_backend, user_profile, + {"queue_id": queue_id, + "user_client": "website", + "last_event_id": -1, + "dont_block": ujson.dumps(True), + }) + events = ujson.loads(result.content)["events"] + self.assert_json_success(result) + self.assertEqual(len(events), 0) + + self.send_message(email, "othello@zulip.com", Recipient.PERSONAL, "hello") + self.send_message(email, "Denmark", Recipient.STREAM, "hello") + + result = self.tornado_call(get_events_backend, user_profile, + {"queue_id": queue_id, + "user_client": "website", + "last_event_id": -1, + "dont_block": ujson.dumps(True), + }) + events = ujson.loads(result.content)["events"] + self.assert_json_success(result) + self.assertEqual(len(events), 1) + self.assertEqual(events[0]["type"], "message") + self.assertEqual(events[0]["message"]["display_recipient"], "Denmark") + from zerver.lib.event_queue import EventQueue class EventQueueTest(TestCase): def test_one_event(self): diff --git a/zerver/tornadoviews.py b/zerver/tornadoviews.py index 24ac73cb79..f3b2ef9d09 100644 --- a/zerver/tornadoviews.py +++ b/zerver/tornadoviews.py @@ -5,12 +5,13 @@ from zerver.models import get_client from zerver.decorator import asynchronous, \ authenticated_json_post_view, internal_notify_view, RespondAsynchronously, \ - has_request_variables, json_to_bool, json_to_list, REQ + has_request_variables, json_to_bool, json_to_list, json_to_dict, REQ from zerver.lib.response import json_success, json_error from zerver.tornado_callbacks import process_notification from zerver.lib.event_queue import allocate_client_descriptor, get_client_descriptor +from zerver.lib.narrow import check_supported_events_narrow_filter import ujson import logging @@ -48,6 +49,7 @@ def get_events_backend(request, user_profile, handler = None, all_public_streams = REQ(default=False, converter=json_to_bool), event_types = REQ(default=None, converter=json_to_list), dont_block = REQ(default=False, converter=json_to_bool), + narrow = REQ(default=[], converter=json_to_list), lifespan_secs = REQ(default=0, converter=int)): if user_client is None: user_client = request.client @@ -58,7 +60,8 @@ def get_events_backend(request, user_profile, handler = None, if dont_block: client = allocate_client_descriptor(user_profile.id, user_profile.realm.id, event_types, user_client, apply_markdown, - all_public_streams, lifespan_secs) + all_public_streams, lifespan_secs, + narrow=narrow) queue_id = client.event_queue.id else: return json_error("Missing 'queue_id' argument") diff --git a/zerver/views/__init__.py b/zerver/views/__init__.py index d1af57137f..f01b8cc0f6 100644 --- a/zerver/views/__init__.py +++ b/zerver/views/__init__.py @@ -910,6 +910,7 @@ class BadNarrowOperator(Exception): def to_json_error_msg(self): return 'Invalid narrow operator: ' + self.desc +# When you add a new operator to this, also update zerver/lib/narrow.py class NarrowBuilder(object): def __init__(self, user_profile, prefix): self.user_profile = user_profile @@ -2085,9 +2086,11 @@ def api_events_register(request, user_profile, def events_register_backend(request, user_profile, apply_markdown=True, all_public_streams=False, event_types=REQ(converter=json_to_list, default=None), + narrow=REQ(converter=json_to_list, default=[]), queue_lifespan_secs=REQ(converter=int, default=0)): ret = do_events_register(user_profile, request.client, apply_markdown, - event_types, queue_lifespan_secs, all_public_streams) + event_types, queue_lifespan_secs, all_public_streams, + narrow=narrow) return json_success(ret) @authenticated_json_post_view