[manual] Remove /messages/latest API and related legacy code.

This requires doing a puppet apply on our servers to take effect
properly.

(imported from commit 19dc56f071f07a5d2571eef49dd835121b2e82b6)
This commit is contained in:
Tim Abbott 2013-11-05 11:02:34 -05:00
parent ae8648b07f
commit e06722657a
8 changed files with 9 additions and 453 deletions

View File

@ -14,7 +14,7 @@ location /static/ {
} }
# Send longpoll requests to Tornado # Send longpoll requests to Tornado
location ~ /json/get_updates|/api/v1/get_messages|/api/v1/messages/latest|/json/get_events|/api/v1/events { location ~ /json/get_events|/api/v1/events {
proxy_pass http://localhost:9993; proxy_pass http://localhost:9993;
proxy_redirect off; proxy_redirect off;

View File

@ -83,8 +83,7 @@ class Resource(resource.Resource):
def getChild(self, name, request): def getChild(self, name, request):
request.requestHeaders.setRawHeaders('X-Forwarded-Host', [proxy_host]) request.requestHeaders.setRawHeaders('X-Forwarded-Host', [proxy_host])
if (request.uri in ['/json/get_updates', '/api/v1/get_messages', '/json/get_events'] or if (request.uri in ['/json/get_events'] or
request.uri.startswith('/api/v1/messages/latest') or
request.uri.startswith('/api/v1/events') or request.uri.startswith('/api/v1/events') or
request.uri.startswith('/sockjs')): request.uri.startswith('/sockjs')):
return proxy.ReverseProxyResource('localhost', tornado_port, '/'+name) return proxy.ReverseProxyResource('localhost', tornado_port, '/'+name)

View File

@ -1495,7 +1495,7 @@ def do_update_message(user_profile, message_id, subject, propagate_mode, content
"rendered_content_version", "last_edit_time", "rendered_content_version", "last_edit_time",
"edit_history"]) "edit_history"])
# Update the message as stored in both the (deprecated) message # Update the message as stored in the (deprecated) message
# cache (for shunting the message over to Tornado in the old # cache (for shunting the message over to Tornado in the old
# get_messages API) and also the to_dict caches. # get_messages API) and also the to_dict caches.
items_for_memcached = {} items_for_memcached = {}

View File

@ -86,10 +86,7 @@ class Command(BaseCommand):
queue_client.register_json_consumer('tornado_return', respond_send_message) queue_client.register_json_consumer('tornado_return', respond_send_message)
try: try:
urls = (r"/json/get_updates", urls = (r"/notify_tornado",
r"/api/v1/get_messages",
r"/notify_tornado",
r"/api/v1/messages/latest",
r"/json/get_events", r"/json/get_events",
r"/api/v1/events", r"/api/v1/events",
) )

View File

@ -13,7 +13,6 @@ from zerver.models import Message, UserProfile, Stream, Recipient, Subscription,
get_display_recipient, Realm, Client, UserActivity, \ get_display_recipient, Realm, Client, UserActivity, \
PreregistrationUser, UserMessage, \ PreregistrationUser, UserMessage, \
get_user_profile_by_email, email_to_domain, get_realm, get_stream, get_client get_user_profile_by_email, email_to_domain, get_realm, get_stream, get_client
from zerver.tornadoviews import json_get_updates, api_get_messages
from zerver.decorator import RespondAsynchronously, \ from zerver.decorator import RespondAsynchronously, \
RequestVariableConversionError, profiled, JsonableError RequestVariableConversionError, profiled, JsonableError
from zerver.lib.initial_password import initial_password from zerver.lib.initial_password import initial_password
@ -2421,66 +2420,6 @@ class POSTRequestMock(object):
self.session = DummySession() self.session = DummySession()
self.META = {'PATH_INFO': 'test'} self.META = {'PATH_INFO': 'test'}
class GetUpdatesTest(AuthedTestCase):
def common_test_get_updates(self, view_func, extra_post_data = {}):
user_profile = get_user_profile_by_email("hamlet@zulip.com")
message_content = 'tornado test message'
self.got_callback = False
def callback(response):
self.got_callback = True
msg = response['messages'][0]
if str(msg['content_type']) == 'text/html':
self.assertEqual('<p>%s</p>' % message_content, msg['content'])
else:
self.assertEqual(message_content, msg['content'])
post_data = {}
post_data.update(extra_post_data)
request = POSTRequestMock(post_data, user_profile, callback)
self.assertEqual(view_func(request), RespondAsynchronously)
self.send_message("hamlet@zulip.com", "hamlet@zulip.com",
Recipient.PERSONAL, message_content)
self.assertTrue(self.got_callback)
def test_json_get_updates(self):
"""
json_get_updates returns messages with IDs greater than the
last_received ID.
"""
self.login("hamlet@zulip.com")
self.common_test_get_updates(json_get_updates)
def test_api_get_messages(self):
"""
Same as above, but for the API view
"""
email = "hamlet@zulip.com"
api_key = self.get_api_key(email)
self.common_test_get_updates(api_get_messages, {'email': email, 'api-key': api_key})
def test_missing_last_received(self):
"""
Calling json_get_updates without any arguments should work
"""
self.login("hamlet@zulip.com")
user_profile = get_user_profile_by_email("hamlet@zulip.com")
request = POSTRequestMock({}, user_profile)
self.assertEqual(json_get_updates(request), RespondAsynchronously)
def test_bad_input(self):
"""
Specifying a bad value for 'pointer' should return an error
"""
self.login("hamlet@zulip.com")
user_profile = get_user_profile_by_email("hamlet@zulip.com")
request = POSTRequestMock({'pointer': 'foo'}, user_profile)
self.assertRaises(RequestVariableConversionError, json_get_updates, request)
class GetProfileTest(AuthedTestCase): class GetProfileTest(AuthedTestCase):
def common_update_pointer(self, email, pointer): def common_update_pointer(self, email, pointer):

View File

@ -3,8 +3,8 @@ from __future__ import absolute_import
from django.conf import settings from django.conf import settings
from django.utils.timezone import now from django.utils.timezone import now
from zerver.models import Message, UserProfile, UserMessage, \ from zerver.models import Message, UserProfile, \
Recipient, Stream, get_stream, get_user_profile_by_id Recipient, get_user_profile_by_id
from zerver.decorator import JsonableError from zerver.decorator import JsonableError
from zerver.lib.cache import cache_get_many, message_cache_key, \ from zerver.lib.cache import cache_get_many, message_cache_key, \
@ -15,223 +15,17 @@ from zerver.lib.event_queue import get_client_descriptors_for_user,\
get_client_descriptors_for_realm_all_streams get_client_descriptors_for_realm_all_streams
from zerver.lib.timestamp import timestamp_to_datetime from zerver.lib.timestamp import timestamp_to_datetime
import os
import sys
import time import time
import logging import logging
import requests import requests
import ujson import ujson
import subprocess
import collections
import datetime import datetime
from django.db import connection
# Send email notifications to idle users # Send email notifications to idle users
# after they are idle for 1 hour # after they are idle for 1 hour
NOTIFY_AFTER_IDLE_HOURS = 1 NOTIFY_AFTER_IDLE_HOURS = 1
class Callbacks(object):
# A user received a message. The key is user_profile.id.
TYPE_USER_RECEIVE = 0
# A stream received a message. The key is a tuple
# (realm_id, lowercased stream name).
# See comment attached to the global stream_messages for why.
# Callers of this callback need to be careful to provide
# a lowercased stream name.
TYPE_STREAM_RECEIVE = 1
# A user's pointer was updated. The key is user_profile.id.
TYPE_POINTER_UPDATE = 2
TYPE_MAX = 3
def __init__(self):
self.table = {}
def add(self, key, cb_type, callback):
if not self.table.has_key(key):
self.create_key(key)
self.table[key][cb_type].append(callback)
def call(self, key, cb_type, **kwargs):
if not self.table.has_key(key):
self.create_key(key)
for cb in self.table[key][cb_type]:
cb(**kwargs)
self.table[key][cb_type] = []
def create_key(self, key):
self.table[key] = [[] for i in range(0, Callbacks.TYPE_MAX)]
callbacks_table = Callbacks()
def add_user_receive_callback(user_profile, cb):
callbacks_table.add(user_profile.id, Callbacks.TYPE_USER_RECEIVE, cb)
def add_stream_receive_callback(realm_id, stream_name, cb):
callbacks_table.add((realm_id, stream_name.lower()), Callbacks.TYPE_STREAM_RECEIVE, cb)
def add_pointer_update_callback(user_profile, cb):
callbacks_table.add(user_profile.id, Callbacks.TYPE_POINTER_UPDATE, cb)
# in-process caching mechanism for tracking usermessages
#
# user table: Map user_profile_id => [deque of message ids he received]
#
# We don't use all the features of a deque -- the important ones are:
# * O(1) insert of new highest message id
# * O(k) read of highest k message ids
# * Automatic maximum size support.
#
# stream table: Map (realm_id, lowercased stream name) => [deque of message ids it received]
#
# Why don't we index by the stream_id? Because the client will make a
# request that specifies a particular realm and stream name, and since
# we're running within tornado, we don't want to have to do a database
# lookup to find the matching entry in this table.
mtables = {
'user': {},
'stream': {},
}
USERMESSAGE_CACHE_COUNT = 25000
STREAMMESSAGE_CACHE_COUNT = 5000
cache_minimum_id = sys.maxint
def initialize_user_messages():
global cache_minimum_id
try:
cache_minimum_id = Message.objects.all().order_by("-id")[0].id - USERMESSAGE_CACHE_COUNT
except Message.DoesNotExist:
cache_minimum_id = 1
# These next few lines implement the following Django ORM
# algorithm using raw SQL:
## for um in UserMessage.objects.filter(message_id__gte=cache_minimum_id).order_by("message"):
## add_user_message(um.user_profile_id, um.message_id)
# We do this because marshalling the Django objects is very
# inefficient; total time consumed with the raw SQL is about
# 600ms, vs. 3000ms-5000ms if we go through the ORM.
cursor = connection.cursor()
cursor.execute("SELECT user_profile_id, message_id from zerver_usermessage " +
"where message_id >= %s order by message_id", [cache_minimum_id])
for row in cursor.fetchall():
(user_profile_id, message_id) = row
add_user_message(user_profile_id, message_id)
streams = {}
for stream in Stream.objects.select_related().all():
streams[stream.id] = stream
for m in (Message.objects.only("id", "recipient").select_related("recipient")
.filter(id__gte=cache_minimum_id + (USERMESSAGE_CACHE_COUNT - STREAMMESSAGE_CACHE_COUNT),
recipient__type=Recipient.STREAM).order_by("id")):
stream = streams[m.recipient.type_id]
add_stream_message(stream.realm.id, stream.name, m.id)
if not settings.DEPLOYED and not settings.TEST_SUITE:
# Filling the memcached cache is a little slow, so do it in a child process.
# For DEPLOYED cases, we run this from restart_server.
subprocess.Popen(["python", os.path.join(os.path.dirname(__file__), "..", "manage.py"),
"fill_memcached_caches"])
def add_user_message(user_profile_id, message_id):
add_table_message("user", user_profile_id, message_id)
def add_stream_message(realm_id, stream_name, message_id):
add_table_message("stream", (realm_id, stream_name.lower()), message_id)
def add_table_message(table, key, message_id):
if cache_minimum_id == sys.maxint:
initialize_user_messages()
mtables[table].setdefault(key, collections.deque(maxlen=400))
mtables[table][key].appendleft(message_id)
def fetch_user_messages(user_profile_id, last):
return fetch_table_messages("user", user_profile_id, last)
def fetch_stream_messages(realm_id, stream_name, last):
return fetch_table_messages("stream", (realm_id, stream_name.lower()), last)
def fetch_table_messages(table, key, last):
if cache_minimum_id == sys.maxint:
initialize_user_messages()
# We need to initialize the deque here for any new users or
# streams that were created since Tornado was started
mtables[table].setdefault(key, collections.deque(maxlen=400))
# We need to do this check after initialize_user_messages has been called.
if len(mtables[table][key]) == 0:
# Since the request contains a value of "last", we can assume
# that the relevant user or stream has actually received a
# message, which means that mtabes[table][key] will not remain
# empty after the below completes.
#
# Thus, we will run this code at most once per key (user or
# stream that is being lurked on). Further, we only do this
# query for those keys that have not received a message since
# cache_minimum_id. So we can afford to do a database query
# from Tornado in this case.
if table == "user":
logging.info("tornado: Doing database query for user %d" % (key,),)
for um in reversed(UserMessage.objects.filter(user_profile_id=key).order_by('-message')[:400]):
add_user_message(um.user_profile_id, um.message_id)
elif table == "stream":
logging.info("tornado: Doing database query for stream %s" % (key,))
(realm_id, stream_name) = key
stream = get_stream(stream_name, realm_id)
# If a buggy client submits a "last" value with a nonexistent stream,
# do nothing (and proceed to longpoll) rather than crashing.
if stream is not None:
recipient = Recipient.objects.get(type=Recipient.STREAM, type_id=stream.id)
for m in Message.objects.only("id", "recipient").filter(recipient=recipient).order_by("id")[:400]:
add_stream_message(realm_id, stream_name, m.id)
if len(mtables[table][key]) == 0:
# Check the our assumption above that there are messages here.
# If false, this may just mean a misbehaving client submitted
# "last" even though it has no messages (in which case we
# should proceed with longpolling by falling through). But it
# could also be a server bug, so we log a warning.
logging.warning("Unexpected empty message queue for key %s!" % (key,))
elif last < mtables[table][key][-1]:
# The user's client has a way-too-old value for 'last'
# (presumably 400 messages old), we should return an error
# The error handler for get_updates in zulip.js parses this
# message. If you change this message, you must update that
# error handler.
raise JsonableError("last value of %d too old! Minimum valid is %d!" %
(last, mtables[table][key][-1]))
message_list = []
for message_id in mtables[table][key]:
if message_id <= last:
return reversed(message_list)
message_list.append(message_id)
return []
# The user receives this message
def user_receive_message(user_profile_id, message):
add_user_message(user_profile_id, message.id)
callbacks_table.call(user_profile_id, Callbacks.TYPE_USER_RECEIVE,
messages=[message], update_types=["new_messages"])
# The stream receives this message
def stream_receive_message(realm_id, stream_name, message):
add_stream_message(realm_id, stream_name, message.id)
callbacks_table.call((realm_id, stream_name.lower()),
Callbacks.TYPE_STREAM_RECEIVE,
messages=[message], update_types=["new_messages"])
def update_pointer(user_profile_id, new_pointer): def update_pointer(user_profile_id, new_pointer):
callbacks_table.call(user_profile_id, Callbacks.TYPE_POINTER_UPDATE,
new_pointer=new_pointer,
update_types=["pointer_update"])
event = dict(type='pointer', pointer=new_pointer) event = dict(type='pointer', pointer=new_pointer)
for client in get_client_descriptors_for_user(user_profile_id): for client in get_client_descriptors_for_user(user_profile_id):
if client.accepts_event_type(event['type']): if client.accepts_event_type(event['type']):
@ -367,8 +161,6 @@ def process_new_message(data):
user_profile = user_profiles[user_data['id']] user_profile = user_profiles[user_data['id']]
flags = user_data.get('flags', []) flags = user_data.get('flags', [])
user_receive_message(user_profile_id, message)
for client in get_client_descriptors_for_user(user_profile_id): for client in get_client_descriptors_for_user(user_profile_id):
send_to_clients[client.event_queue.id] = (client, flags) send_to_clients[client.event_queue.id] = (client, flags)
@ -408,9 +200,6 @@ def process_new_message(data):
event = dict(type='message', message=message_dict, flags=flags) event = dict(type='message', message=message_dict, flags=flags)
client.add_event(event) client.add_event(event)
if 'stream_name' in data:
stream_receive_message(data['realm_id'], data['stream_name'], message)
def process_event(data): def process_event(data):
event = data['event'] event = data['event']
for user_profile_id in data['users']: for user_profile_id in data['users']:

View File

@ -1,26 +1,18 @@
from __future__ import absolute_import from __future__ import absolute_import
from django.conf import settings
from django.views.decorators.csrf import csrf_exempt from django.views.decorators.csrf import csrf_exempt
from zerver.models import get_client from zerver.models import get_client
from zerver.decorator import asynchronous, authenticated_api_view, \ from zerver.decorator import asynchronous, \
authenticated_json_post_view, internal_notify_view, RespondAsynchronously, \ authenticated_json_post_view, internal_notify_view, RespondAsynchronously, \
has_request_variables, to_non_negative_int, json_to_bool, json_to_list, \ has_request_variables, json_to_bool, json_to_list, REQ
REQ
from zerver.lib.response import json_success, json_error from zerver.lib.response import json_success, json_error
from zerver.middleware import async_request_restart from zerver.tornado_callbacks import process_notification
from zerver.tornado_callbacks import \
fetch_stream_messages, fetch_user_messages, \
add_stream_receive_callback, add_user_receive_callback, \
add_pointer_update_callback, process_notification
from zerver.lib.cache_helpers import cache_get_message
from zerver.lib.event_queue import allocate_client_descriptor, get_client_descriptor from zerver.lib.event_queue import allocate_client_descriptor, get_client_descriptor
import ujson import ujson
import socket
from zerver.lib.rest import rest_dispatch as _rest_dispatch from zerver.lib.rest import rest_dispatch as _rest_dispatch
rest_dispatch = csrf_exempt((lambda request, *args, **kwargs: _rest_dispatch(request, globals(), *args, **kwargs))) rest_dispatch = csrf_exempt((lambda request, *args, **kwargs: _rest_dispatch(request, globals(), *args, **kwargs)))
@ -30,162 +22,6 @@ def notify(request):
process_notification(ujson.loads(request.POST['data'])) process_notification(ujson.loads(request.POST['data']))
return json_success() return json_success()
@authenticated_json_post_view
def json_get_updates(request, user_profile):
return get_updates_backend(request, user_profile,
client=request.client, apply_markdown=True)
@authenticated_api_view
def api_get_messages(request, user_profile):
return get_messages_backend(request, user_profile)
def get_messages_backend(request, user_profile):
return get_updates_backend(request, user_profile, client=request.client)
def format_updates_response(messages=[], apply_markdown=True,
user_profile=None, new_pointer=None,
client=None, update_types=[],
client_server_generation=None):
if client is not None and client.name.endswith("_mirror"):
messages = [m for m in messages if m.sending_client.name != client.name]
ret = {'messages': [message.to_dict(apply_markdown) for message in messages],
"result": "success",
"msg": "",
'update_types': update_types}
if client_server_generation is not None:
ret['server_generation'] = settings.SERVER_GENERATION
if new_pointer is not None:
ret['new_pointer'] = new_pointer
return ret
def return_messages_immediately(user_profile, last,
client_server_generation,
client_pointer, dont_block,
stream_name, **kwargs):
update_types = []
new_pointer = None
if dont_block:
update_types.append("nonblocking_request")
if (client_server_generation is not None and
client_server_generation != settings.SERVER_GENERATION):
update_types.append("client_reload")
ptr = user_profile.pointer
if (client_pointer is not None and ptr > client_pointer):
new_pointer = ptr
update_types.append("pointer_update")
if last is not None:
if stream_name is not None:
message_ids = fetch_stream_messages(user_profile.realm.id, stream_name, last)
else:
message_ids = fetch_user_messages(user_profile.id, last)
messages = map(cache_get_message, message_ids)
# Filter for mirroring before checking whether there are any
# messages to pass on. If we don't do this, when the only message
# to forward is one that was sent via the mirroring, the API
# client will end up in an endless loop requesting more data from
# us.
if "client" in kwargs and kwargs["client"].name.endswith("_mirror"):
messages = [m for m in messages if
m.sending_client.name != kwargs["client"].name]
else: # last is None, so we're not interested in any old messages
messages = []
if messages:
update_types.append("new_messages")
if update_types:
return format_updates_response(messages=messages,
user_profile=user_profile,
new_pointer=new_pointer,
client_server_generation=client_server_generation,
update_types=update_types,
**kwargs)
return None
# Note: We allow any stream name at all here! Validation and
# authorization (is the stream "public") are handled by the caller of
# notify new_message. If a user makes a get_updates request for a
# nonexistent or non-public stream, they won't get an error -- they'll
# just never receive any messages.
@asynchronous
@has_request_variables
def get_updates_backend(request, user_profile, handler=None,
last = REQ(converter=to_non_negative_int, default=None),
client_server_generation = REQ(whence='server_generation', default=None,
converter=int),
client_pointer = REQ(whence='pointer', converter=int, default=None),
dont_block = REQ(converter=json_to_bool, default=False),
stream_name = REQ(default=None),
apply_markdown = REQ(default=False, converter=json_to_bool),
**kwargs):
resp = return_messages_immediately(user_profile, last,
client_server_generation,
client_pointer,
dont_block, stream_name,
apply_markdown=apply_markdown, **kwargs)
if resp is not None:
handler.zulip_finish(resp, request, apply_markdown)
# We have already invoked handler.zulip_finish(), so we bypass the usual view
# response path. We are "responding asynchronously" except that it
# already happened. This is slightly weird.
return RespondAsynchronously
# Enter long-polling mode.
#
# Instead of responding to the client right away, leave our connection open
# and return to the Tornado main loop. One of the notify_* views will
# eventually invoke one of these callbacks, which will send the delayed
# response.
def cb(**cb_kwargs):
async_request_restart(request)
if handler.request.connection.stream.closed():
return
try:
# It would be nice to be able to do these checks in
# UserProfile.receive, but it doesn't know what the value
# of "last" was for each callback.
if last is not None and "messages" in cb_kwargs:
messages = cb_kwargs["messages"]
# Make sure the client doesn't get a message twice
# when messages are processed out of order.
if messages[0].id <= last:
# We must return a response because we don't have
# a way to re-queue a callback and so the client
# must do it by making a new request
handler.zulip_finish({"result": "success",
"msg": "",
'update_types': []},
request, apply_markdown)
return
kwargs.update(cb_kwargs)
res = format_updates_response(user_profile=user_profile,
client_server_generation=client_server_generation,
apply_markdown=apply_markdown,
**kwargs)
handler.zulip_finish(res, request, apply_markdown)
except socket.error:
pass
if stream_name is not None:
add_stream_receive_callback(user_profile.realm.id, stream_name, handler.async_callback(cb))
else:
add_user_receive_callback(user_profile, handler.async_callback(cb))
if client_pointer is not None:
add_pointer_update_callback(user_profile, handler.async_callback(cb))
# runtornado recognizes this special return value.
return RespondAsynchronously
@authenticated_json_post_view @authenticated_json_post_view
def json_get_events(request, user_profile): def json_get_events(request, user_profile):
return get_events_backend(request, user_profile, apply_markdown=True) return get_events_backend(request, user_profile, apply_markdown=True)

View File

@ -223,8 +223,6 @@ v1_api_and_json_patterns = patterns('zerver.views',
{'POST': 'api_events_register'}), {'POST': 'api_events_register'}),
) + patterns('zerver.tornadoviews', ) + patterns('zerver.tornadoviews',
url(r'^messages/latest$', 'rest_dispatch',
{'GET': 'get_updates_backend'}),
url(r'^events$', 'rest_dispatch', url(r'^events$', 'rest_dispatch',
{'GET': 'get_events_backend'}), {'GET': 'get_events_backend'}),
) )
@ -237,8 +235,6 @@ if not settings.LOCAL_SERVER:
urlpatterns += patterns('zerver.tornadoviews', urlpatterns += patterns('zerver.tornadoviews',
# Tornado views # Tornado views
url(r'^api/v1/get_messages$', 'api_get_messages'),
url(r'^json/get_updates$', 'json_get_updates'),
url(r'^json/get_events$', 'json_get_events'), url(r'^json/get_events$', 'json_get_events'),
# Used internally for communication between Django and Tornado processes # Used internally for communication between Django and Tornado processes
url(r'^notify_tornado$', 'notify'), url(r'^notify_tornado$', 'notify'),