From e06722657a74aa850f667dc0e889a5364ff691c6 Mon Sep 17 00:00:00 2001 From: Tim Abbott Date: Tue, 5 Nov 2013 11:02:34 -0500 Subject: [PATCH] [manual] Remove /messages/latest API and related legacy code. This requires doing a puppet apply on our servers to take effect properly. (imported from commit 19dc56f071f07a5d2571eef49dd835121b2e82b6) --- puppet/zulip/files/nginx/zulip-include/app | 2 +- tools/run-dev.py | 3 +- zerver/lib/actions.py | 2 +- zerver/management/commands/runtornado.py | 5 +- zerver/tests.py | 61 ------ zerver/tornado_callbacks.py | 215 +-------------------- zerver/tornadoviews.py | 170 +--------------- zproject/urls.py | 4 - 8 files changed, 9 insertions(+), 453 deletions(-) diff --git a/puppet/zulip/files/nginx/zulip-include/app b/puppet/zulip/files/nginx/zulip-include/app index bf3adb7261..da8a1bc50b 100644 --- a/puppet/zulip/files/nginx/zulip-include/app +++ b/puppet/zulip/files/nginx/zulip-include/app @@ -14,7 +14,7 @@ location /static/ { } # Send longpoll requests to Tornado -location ~ /json/get_updates|/api/v1/get_messages|/api/v1/messages/latest|/json/get_events|/api/v1/events { +location ~ /json/get_events|/api/v1/events { proxy_pass http://localhost:9993; proxy_redirect off; diff --git a/tools/run-dev.py b/tools/run-dev.py index c7e97217c0..eeb1ac773b 100755 --- a/tools/run-dev.py +++ b/tools/run-dev.py @@ -83,8 +83,7 @@ class Resource(resource.Resource): def getChild(self, name, request): request.requestHeaders.setRawHeaders('X-Forwarded-Host', [proxy_host]) - if (request.uri in ['/json/get_updates', '/api/v1/get_messages', '/json/get_events'] or - request.uri.startswith('/api/v1/messages/latest') or + if (request.uri in ['/json/get_events'] or request.uri.startswith('/api/v1/events') or request.uri.startswith('/sockjs')): return proxy.ReverseProxyResource('localhost', tornado_port, '/'+name) diff --git a/zerver/lib/actions.py b/zerver/lib/actions.py index 30e4e301f7..d60f8c3ca8 100644 --- a/zerver/lib/actions.py +++ b/zerver/lib/actions.py @@ -1495,7 +1495,7 @@ def do_update_message(user_profile, message_id, subject, propagate_mode, content "rendered_content_version", "last_edit_time", "edit_history"]) - # Update the message as stored in both the (deprecated) message + # Update the message as stored in the (deprecated) message # cache (for shunting the message over to Tornado in the old # get_messages API) and also the to_dict caches. items_for_memcached = {} diff --git a/zerver/management/commands/runtornado.py b/zerver/management/commands/runtornado.py index f1adcca717..d935db3979 100644 --- a/zerver/management/commands/runtornado.py +++ b/zerver/management/commands/runtornado.py @@ -86,10 +86,7 @@ class Command(BaseCommand): queue_client.register_json_consumer('tornado_return', respond_send_message) try: - urls = (r"/json/get_updates", - r"/api/v1/get_messages", - r"/notify_tornado", - r"/api/v1/messages/latest", + urls = (r"/notify_tornado", r"/json/get_events", r"/api/v1/events", ) diff --git a/zerver/tests.py b/zerver/tests.py index 90bd2737c6..f528ef2738 100644 --- a/zerver/tests.py +++ b/zerver/tests.py @@ -13,7 +13,6 @@ from zerver.models import Message, UserProfile, Stream, Recipient, Subscription, get_display_recipient, Realm, Client, UserActivity, \ PreregistrationUser, UserMessage, \ get_user_profile_by_email, email_to_domain, get_realm, get_stream, get_client -from zerver.tornadoviews import json_get_updates, api_get_messages from zerver.decorator import RespondAsynchronously, \ RequestVariableConversionError, profiled, JsonableError from zerver.lib.initial_password import initial_password @@ -2421,66 +2420,6 @@ class POSTRequestMock(object): self.session = DummySession() self.META = {'PATH_INFO': 'test'} -class GetUpdatesTest(AuthedTestCase): - - def common_test_get_updates(self, view_func, extra_post_data = {}): - user_profile = get_user_profile_by_email("hamlet@zulip.com") - message_content = 'tornado test message' - self.got_callback = False - - def callback(response): - self.got_callback = True - msg = response['messages'][0] - if str(msg['content_type']) == 'text/html': - self.assertEqual('

%s

' % message_content, msg['content']) - else: - self.assertEqual(message_content, msg['content']) - - post_data = {} - post_data.update(extra_post_data) - request = POSTRequestMock(post_data, user_profile, callback) - self.assertEqual(view_func(request), RespondAsynchronously) - self.send_message("hamlet@zulip.com", "hamlet@zulip.com", - Recipient.PERSONAL, message_content) - self.assertTrue(self.got_callback) - - - def test_json_get_updates(self): - """ - json_get_updates returns messages with IDs greater than the - last_received ID. - """ - self.login("hamlet@zulip.com") - self.common_test_get_updates(json_get_updates) - - def test_api_get_messages(self): - """ - Same as above, but for the API view - """ - email = "hamlet@zulip.com" - api_key = self.get_api_key(email) - self.common_test_get_updates(api_get_messages, {'email': email, 'api-key': api_key}) - - def test_missing_last_received(self): - """ - Calling json_get_updates without any arguments should work - """ - self.login("hamlet@zulip.com") - user_profile = get_user_profile_by_email("hamlet@zulip.com") - - request = POSTRequestMock({}, user_profile) - self.assertEqual(json_get_updates(request), RespondAsynchronously) - - def test_bad_input(self): - """ - Specifying a bad value for 'pointer' should return an error - """ - self.login("hamlet@zulip.com") - user_profile = get_user_profile_by_email("hamlet@zulip.com") - - request = POSTRequestMock({'pointer': 'foo'}, user_profile) - self.assertRaises(RequestVariableConversionError, json_get_updates, request) - class GetProfileTest(AuthedTestCase): def common_update_pointer(self, email, pointer): diff --git a/zerver/tornado_callbacks.py b/zerver/tornado_callbacks.py index 5c882e7094..c2617c03f1 100644 --- a/zerver/tornado_callbacks.py +++ b/zerver/tornado_callbacks.py @@ -3,8 +3,8 @@ from __future__ import absolute_import from django.conf import settings from django.utils.timezone import now -from zerver.models import Message, UserProfile, UserMessage, \ - Recipient, Stream, get_stream, get_user_profile_by_id +from zerver.models import Message, UserProfile, \ + Recipient, get_user_profile_by_id from zerver.decorator import JsonableError from zerver.lib.cache import cache_get_many, message_cache_key, \ @@ -15,223 +15,17 @@ from zerver.lib.event_queue import get_client_descriptors_for_user,\ get_client_descriptors_for_realm_all_streams from zerver.lib.timestamp import timestamp_to_datetime -import os -import sys import time import logging import requests import ujson -import subprocess -import collections import datetime -from django.db import connection # Send email notifications to idle users # after they are idle for 1 hour NOTIFY_AFTER_IDLE_HOURS = 1 -class Callbacks(object): - # A user received a message. The key is user_profile.id. - TYPE_USER_RECEIVE = 0 - - # A stream received a message. The key is a tuple - # (realm_id, lowercased stream name). - # See comment attached to the global stream_messages for why. - # Callers of this callback need to be careful to provide - # a lowercased stream name. - TYPE_STREAM_RECEIVE = 1 - - # A user's pointer was updated. The key is user_profile.id. - TYPE_POINTER_UPDATE = 2 - - TYPE_MAX = 3 - - def __init__(self): - self.table = {} - - def add(self, key, cb_type, callback): - if not self.table.has_key(key): - self.create_key(key) - self.table[key][cb_type].append(callback) - - def call(self, key, cb_type, **kwargs): - if not self.table.has_key(key): - self.create_key(key) - - for cb in self.table[key][cb_type]: - cb(**kwargs) - - self.table[key][cb_type] = [] - - def create_key(self, key): - self.table[key] = [[] for i in range(0, Callbacks.TYPE_MAX)] - -callbacks_table = Callbacks() - -def add_user_receive_callback(user_profile, cb): - callbacks_table.add(user_profile.id, Callbacks.TYPE_USER_RECEIVE, cb) - -def add_stream_receive_callback(realm_id, stream_name, cb): - callbacks_table.add((realm_id, stream_name.lower()), Callbacks.TYPE_STREAM_RECEIVE, cb) - -def add_pointer_update_callback(user_profile, cb): - callbacks_table.add(user_profile.id, Callbacks.TYPE_POINTER_UPDATE, cb) - -# in-process caching mechanism for tracking usermessages -# -# user table: Map user_profile_id => [deque of message ids he received] -# -# We don't use all the features of a deque -- the important ones are: -# * O(1) insert of new highest message id -# * O(k) read of highest k message ids -# * Automatic maximum size support. -# -# stream table: Map (realm_id, lowercased stream name) => [deque of message ids it received] -# -# Why don't we index by the stream_id? Because the client will make a -# request that specifies a particular realm and stream name, and since -# we're running within tornado, we don't want to have to do a database -# lookup to find the matching entry in this table. - -mtables = { - 'user': {}, - 'stream': {}, -} - -USERMESSAGE_CACHE_COUNT = 25000 -STREAMMESSAGE_CACHE_COUNT = 5000 -cache_minimum_id = sys.maxint -def initialize_user_messages(): - global cache_minimum_id - try: - cache_minimum_id = Message.objects.all().order_by("-id")[0].id - USERMESSAGE_CACHE_COUNT - except Message.DoesNotExist: - cache_minimum_id = 1 - - # These next few lines implement the following Django ORM - # algorithm using raw SQL: - ## for um in UserMessage.objects.filter(message_id__gte=cache_minimum_id).order_by("message"): - ## add_user_message(um.user_profile_id, um.message_id) - # We do this because marshalling the Django objects is very - # inefficient; total time consumed with the raw SQL is about - # 600ms, vs. 3000ms-5000ms if we go through the ORM. - cursor = connection.cursor() - cursor.execute("SELECT user_profile_id, message_id from zerver_usermessage " + - "where message_id >= %s order by message_id", [cache_minimum_id]) - for row in cursor.fetchall(): - (user_profile_id, message_id) = row - add_user_message(user_profile_id, message_id) - - streams = {} - for stream in Stream.objects.select_related().all(): - streams[stream.id] = stream - for m in (Message.objects.only("id", "recipient").select_related("recipient") - .filter(id__gte=cache_minimum_id + (USERMESSAGE_CACHE_COUNT - STREAMMESSAGE_CACHE_COUNT), - recipient__type=Recipient.STREAM).order_by("id")): - stream = streams[m.recipient.type_id] - add_stream_message(stream.realm.id, stream.name, m.id) - - if not settings.DEPLOYED and not settings.TEST_SUITE: - # Filling the memcached cache is a little slow, so do it in a child process. - # For DEPLOYED cases, we run this from restart_server. - subprocess.Popen(["python", os.path.join(os.path.dirname(__file__), "..", "manage.py"), - "fill_memcached_caches"]) - -def add_user_message(user_profile_id, message_id): - add_table_message("user", user_profile_id, message_id) - -def add_stream_message(realm_id, stream_name, message_id): - add_table_message("stream", (realm_id, stream_name.lower()), message_id) - -def add_table_message(table, key, message_id): - if cache_minimum_id == sys.maxint: - initialize_user_messages() - mtables[table].setdefault(key, collections.deque(maxlen=400)) - mtables[table][key].appendleft(message_id) - -def fetch_user_messages(user_profile_id, last): - return fetch_table_messages("user", user_profile_id, last) - -def fetch_stream_messages(realm_id, stream_name, last): - return fetch_table_messages("stream", (realm_id, stream_name.lower()), last) - -def fetch_table_messages(table, key, last): - if cache_minimum_id == sys.maxint: - initialize_user_messages() - - # We need to initialize the deque here for any new users or - # streams that were created since Tornado was started - mtables[table].setdefault(key, collections.deque(maxlen=400)) - - # We need to do this check after initialize_user_messages has been called. - if len(mtables[table][key]) == 0: - # Since the request contains a value of "last", we can assume - # that the relevant user or stream has actually received a - # message, which means that mtabes[table][key] will not remain - # empty after the below completes. - # - # Thus, we will run this code at most once per key (user or - # stream that is being lurked on). Further, we only do this - # query for those keys that have not received a message since - # cache_minimum_id. So we can afford to do a database query - # from Tornado in this case. - if table == "user": - logging.info("tornado: Doing database query for user %d" % (key,),) - for um in reversed(UserMessage.objects.filter(user_profile_id=key).order_by('-message')[:400]): - add_user_message(um.user_profile_id, um.message_id) - elif table == "stream": - logging.info("tornado: Doing database query for stream %s" % (key,)) - (realm_id, stream_name) = key - stream = get_stream(stream_name, realm_id) - # If a buggy client submits a "last" value with a nonexistent stream, - # do nothing (and proceed to longpoll) rather than crashing. - if stream is not None: - recipient = Recipient.objects.get(type=Recipient.STREAM, type_id=stream.id) - for m in Message.objects.only("id", "recipient").filter(recipient=recipient).order_by("id")[:400]: - add_stream_message(realm_id, stream_name, m.id) - - if len(mtables[table][key]) == 0: - # Check the our assumption above that there are messages here. - # If false, this may just mean a misbehaving client submitted - # "last" even though it has no messages (in which case we - # should proceed with longpolling by falling through). But it - # could also be a server bug, so we log a warning. - logging.warning("Unexpected empty message queue for key %s!" % (key,)) - elif last < mtables[table][key][-1]: - # The user's client has a way-too-old value for 'last' - # (presumably 400 messages old), we should return an error - - # The error handler for get_updates in zulip.js parses this - # message. If you change this message, you must update that - # error handler. - raise JsonableError("last value of %d too old! Minimum valid is %d!" % - (last, mtables[table][key][-1])) - - message_list = [] - for message_id in mtables[table][key]: - if message_id <= last: - return reversed(message_list) - message_list.append(message_id) - return [] - -# The user receives this message -def user_receive_message(user_profile_id, message): - add_user_message(user_profile_id, message.id) - callbacks_table.call(user_profile_id, Callbacks.TYPE_USER_RECEIVE, - messages=[message], update_types=["new_messages"]) - -# The stream receives this message -def stream_receive_message(realm_id, stream_name, message): - add_stream_message(realm_id, stream_name, message.id) - callbacks_table.call((realm_id, stream_name.lower()), - Callbacks.TYPE_STREAM_RECEIVE, - messages=[message], update_types=["new_messages"]) - def update_pointer(user_profile_id, new_pointer): - callbacks_table.call(user_profile_id, Callbacks.TYPE_POINTER_UPDATE, - new_pointer=new_pointer, - update_types=["pointer_update"]) - event = dict(type='pointer', pointer=new_pointer) for client in get_client_descriptors_for_user(user_profile_id): if client.accepts_event_type(event['type']): @@ -367,8 +161,6 @@ def process_new_message(data): user_profile = user_profiles[user_data['id']] flags = user_data.get('flags', []) - user_receive_message(user_profile_id, message) - for client in get_client_descriptors_for_user(user_profile_id): send_to_clients[client.event_queue.id] = (client, flags) @@ -408,9 +200,6 @@ def process_new_message(data): event = dict(type='message', message=message_dict, flags=flags) client.add_event(event) - if 'stream_name' in data: - stream_receive_message(data['realm_id'], data['stream_name'], message) - def process_event(data): event = data['event'] for user_profile_id in data['users']: diff --git a/zerver/tornadoviews.py b/zerver/tornadoviews.py index 35c92af5ac..12cc836aaf 100644 --- a/zerver/tornadoviews.py +++ b/zerver/tornadoviews.py @@ -1,26 +1,18 @@ from __future__ import absolute_import -from django.conf import settings from django.views.decorators.csrf import csrf_exempt from zerver.models import get_client -from zerver.decorator import asynchronous, authenticated_api_view, \ +from zerver.decorator import asynchronous, \ authenticated_json_post_view, internal_notify_view, RespondAsynchronously, \ - has_request_variables, to_non_negative_int, json_to_bool, json_to_list, \ - REQ + has_request_variables, json_to_bool, json_to_list, REQ from zerver.lib.response import json_success, json_error -from zerver.middleware import async_request_restart -from zerver.tornado_callbacks import \ - fetch_stream_messages, fetch_user_messages, \ - add_stream_receive_callback, add_user_receive_callback, \ - add_pointer_update_callback, process_notification +from zerver.tornado_callbacks import process_notification -from zerver.lib.cache_helpers import cache_get_message from zerver.lib.event_queue import allocate_client_descriptor, get_client_descriptor import ujson -import socket from zerver.lib.rest import rest_dispatch as _rest_dispatch rest_dispatch = csrf_exempt((lambda request, *args, **kwargs: _rest_dispatch(request, globals(), *args, **kwargs))) @@ -30,162 +22,6 @@ def notify(request): process_notification(ujson.loads(request.POST['data'])) return json_success() -@authenticated_json_post_view -def json_get_updates(request, user_profile): - return get_updates_backend(request, user_profile, - client=request.client, apply_markdown=True) - -@authenticated_api_view -def api_get_messages(request, user_profile): - return get_messages_backend(request, user_profile) - -def get_messages_backend(request, user_profile): - return get_updates_backend(request, user_profile, client=request.client) - -def format_updates_response(messages=[], apply_markdown=True, - user_profile=None, new_pointer=None, - client=None, update_types=[], - client_server_generation=None): - if client is not None and client.name.endswith("_mirror"): - messages = [m for m in messages if m.sending_client.name != client.name] - ret = {'messages': [message.to_dict(apply_markdown) for message in messages], - "result": "success", - "msg": "", - 'update_types': update_types} - if client_server_generation is not None: - ret['server_generation'] = settings.SERVER_GENERATION - if new_pointer is not None: - ret['new_pointer'] = new_pointer - - return ret - -def return_messages_immediately(user_profile, last, - client_server_generation, - client_pointer, dont_block, - stream_name, **kwargs): - update_types = [] - new_pointer = None - if dont_block: - update_types.append("nonblocking_request") - - if (client_server_generation is not None and - client_server_generation != settings.SERVER_GENERATION): - update_types.append("client_reload") - - ptr = user_profile.pointer - if (client_pointer is not None and ptr > client_pointer): - new_pointer = ptr - update_types.append("pointer_update") - - if last is not None: - if stream_name is not None: - message_ids = fetch_stream_messages(user_profile.realm.id, stream_name, last) - else: - message_ids = fetch_user_messages(user_profile.id, last) - messages = map(cache_get_message, message_ids) - - # Filter for mirroring before checking whether there are any - # messages to pass on. If we don't do this, when the only message - # to forward is one that was sent via the mirroring, the API - # client will end up in an endless loop requesting more data from - # us. - if "client" in kwargs and kwargs["client"].name.endswith("_mirror"): - messages = [m for m in messages if - m.sending_client.name != kwargs["client"].name] - else: # last is None, so we're not interested in any old messages - messages = [] - - if messages: - update_types.append("new_messages") - - if update_types: - return format_updates_response(messages=messages, - user_profile=user_profile, - new_pointer=new_pointer, - client_server_generation=client_server_generation, - update_types=update_types, - **kwargs) - - return None - -# Note: We allow any stream name at all here! Validation and -# authorization (is the stream "public") are handled by the caller of -# notify new_message. If a user makes a get_updates request for a -# nonexistent or non-public stream, they won't get an error -- they'll -# just never receive any messages. -@asynchronous -@has_request_variables -def get_updates_backend(request, user_profile, handler=None, - last = REQ(converter=to_non_negative_int, default=None), - client_server_generation = REQ(whence='server_generation', default=None, - converter=int), - client_pointer = REQ(whence='pointer', converter=int, default=None), - dont_block = REQ(converter=json_to_bool, default=False), - stream_name = REQ(default=None), - apply_markdown = REQ(default=False, converter=json_to_bool), - **kwargs): - resp = return_messages_immediately(user_profile, last, - client_server_generation, - client_pointer, - dont_block, stream_name, - apply_markdown=apply_markdown, **kwargs) - if resp is not None: - handler.zulip_finish(resp, request, apply_markdown) - - # We have already invoked handler.zulip_finish(), so we bypass the usual view - # response path. We are "responding asynchronously" except that it - # already happened. This is slightly weird. - return RespondAsynchronously - - # Enter long-polling mode. - # - # Instead of responding to the client right away, leave our connection open - # and return to the Tornado main loop. One of the notify_* views will - # eventually invoke one of these callbacks, which will send the delayed - # response. - - def cb(**cb_kwargs): - async_request_restart(request) - if handler.request.connection.stream.closed(): - return - try: - # It would be nice to be able to do these checks in - # UserProfile.receive, but it doesn't know what the value - # of "last" was for each callback. - if last is not None and "messages" in cb_kwargs: - messages = cb_kwargs["messages"] - - # Make sure the client doesn't get a message twice - # when messages are processed out of order. - if messages[0].id <= last: - # We must return a response because we don't have - # a way to re-queue a callback and so the client - # must do it by making a new request - handler.zulip_finish({"result": "success", - "msg": "", - 'update_types': []}, - request, apply_markdown) - return - - kwargs.update(cb_kwargs) - res = format_updates_response(user_profile=user_profile, - client_server_generation=client_server_generation, - apply_markdown=apply_markdown, - **kwargs) - handler.zulip_finish(res, request, apply_markdown) - except socket.error: - pass - - if stream_name is not None: - add_stream_receive_callback(user_profile.realm.id, stream_name, handler.async_callback(cb)) - else: - add_user_receive_callback(user_profile, handler.async_callback(cb)) - if client_pointer is not None: - add_pointer_update_callback(user_profile, handler.async_callback(cb)) - - # runtornado recognizes this special return value. - return RespondAsynchronously - @authenticated_json_post_view def json_get_events(request, user_profile): return get_events_backend(request, user_profile, apply_markdown=True) diff --git a/zproject/urls.py b/zproject/urls.py index 5afb1a376c..e452367f14 100644 --- a/zproject/urls.py +++ b/zproject/urls.py @@ -223,8 +223,6 @@ v1_api_and_json_patterns = patterns('zerver.views', {'POST': 'api_events_register'}), ) + patterns('zerver.tornadoviews', - url(r'^messages/latest$', 'rest_dispatch', - {'GET': 'get_updates_backend'}), url(r'^events$', 'rest_dispatch', {'GET': 'get_events_backend'}), ) @@ -237,8 +235,6 @@ if not settings.LOCAL_SERVER: urlpatterns += patterns('zerver.tornadoviews', # Tornado views - url(r'^api/v1/get_messages$', 'api_get_messages'), - url(r'^json/get_updates$', 'json_get_updates'), url(r'^json/get_events$', 'json_get_events'), # Used internally for communication between Django and Tornado processes url(r'^notify_tornado$', 'notify'),