socket: Build a real Request object to send through our full stack, including middleware

One quirk here is that the Request object is built in the
message_sender worker, not Tornado.  This means that the request time
only counts time taken for the actual sending and does not account
for socket overhead.  For this reason, I've left the fake logging in
for now so we can compare the two times.

(imported from commit b0c60a3017527a328cadf11ba68166e59cf23ddf)
This commit is contained in:
Zev Benjamin 2013-11-07 20:02:48 -05:00
parent ffbadf61df
commit 24ee4a5d57
3 changed files with 62 additions and 33 deletions

View File

@ -3,8 +3,7 @@ from __future__ import absolute_import
from django.http import HttpResponseRedirect from django.http import HttpResponseRedirect
from django.contrib.auth.decorators import login_required from django.contrib.auth.decorators import login_required
from django.views.decorators.csrf import csrf_exempt from django.views.decorators.csrf import csrf_exempt
from django.views.decorators.http import require_POST from django.http import QueryDict, HttpResponseNotAllowed
from django.http import QueryDict
from django.http.multipartparser import MultiPartParser from django.http.multipartparser import MultiPartParser
from zerver.models import UserProfile, get_client, get_user_profile_by_email from zerver.models import UserProfile, get_client, get_user_profile_by_email
from zerver.lib.response import json_error, json_unauthorized from zerver.lib.response import json_error, json_unauthorized
@ -63,8 +62,22 @@ def update_user_activity(request, user_profile):
'client': request.client.name} 'client': request.client.name}
queue_json_publish("user_activity", event, lambda event: None) queue_json_publish("user_activity", event, lambda event: None)
# I like the all-lowercase name better # Based on django.views.decorators.http.require_http_methods
require_post = require_POST def require_post(func):
@wraps(func)
def wrapper(request, *args, **kwargs):
if (request.method != "POST"
and not (request.method == "SOCKET"
and request.META['zulip.emulated_method'] == "POST")):
if request.method == "SOCKET":
err_method = "SOCKET/%s" % (request.META['zulip.emulated_method'],)
else:
err_method = request.method
logging.warning('Method Not Allowed (%s): %s', err_method, request.path,
extra={'status_code': 405, 'request': request})
return HttpResponseNotAllowed(["POST"])
return func(request, *args, **kwargs)
return wrapper
default_clients = {} default_clients = {}

View File

@ -153,21 +153,20 @@ class SocketConnection(sockjs.tornado.SockJSConnection):
'response': {'result': 'error', 'msg': error_msg}}) 'response': {'result': 'error', 'msg': error_msg}})
return return
req = msg['request']
req['sender_id'] = self.session.user_profile.id
req['client_name'] = req['client']
redis_key = req_redis_key(self.client_id, msg['req_id']) redis_key = req_redis_key(self.client_id, msg['req_id'])
with redis_client.pipeline() as pipeline: with redis_client.pipeline() as pipeline:
pipeline.hmset(redis_key, {'status': 'receieved'}); pipeline.hmset(redis_key, {'status': 'receieved'});
pipeline.expire(redis_key, 60 * 5) pipeline.expire(redis_key, 60 * 5)
pipeline.execute() pipeline.execute()
queue_json_publish("message_sender", dict(request=req, queue_json_publish("message_sender",
req_id=msg['req_id'], dict(request=msg['request'],
server_meta=dict(client_id=self.client_id, req_id=msg['req_id'],
return_queue="tornado_return", server_meta=dict(user_id=self.session.user_profile.id,
start_time=start_time)), client_id=self.client_id,
return_queue="tornado_return",
start_time=start_time,
request_environ=dict(REMOTE_ADDR=self.session.conn_info.ip))),
fake_message_sender) fake_message_sender)
def on_close(self): def on_close(self):
@ -182,8 +181,8 @@ class SocketConnection(sockjs.tornado.SockJSConnection):
def fake_message_sender(event): def fake_message_sender(event):
req = event['request'] req = event['request']
try: try:
sender = get_user_profile_by_id(req['sender_id']) sender = get_user_profile_by_id(event['server_meta']['user_id'])
client = get_client(req['client_name']) client = get_client(req['client'])
msg_id = check_send_message(sender, client, req['type'], msg_id = check_send_message(sender, client, req['type'],
extract_recipients(req['to']), extract_recipients(req['to']),
@ -205,9 +204,6 @@ def respond_send_message(data):
fake_log_line(connection.session.conn_info, fake_log_line(connection.session.conn_info,
time_elapsed, time_elapsed,
200, 'send_message', connection.session.user_profile.email) 200, 'send_message', connection.session.user_profile.email)
# Fake the old JSON send_message endpoint
statsd_prefix = "webreq.json.send_message.total"
statsd.timing(statsd_prefix, time_elapsed * 1000)
sockjs_router = sockjs.tornado.SockJSRouter(SocketConnection, "/sockjs", sockjs_router = sockjs.tornado.SockJSRouter(SocketConnection, "/sockjs",
{'sockjs_url': 'https://%s/static/third/sockjs/sockjs-0.3.4.js' % (settings.EXTERNAL_HOST,), {'sockjs_url': 'https://%s/static/third/sockjs/sockjs-0.3.4.js' % (settings.EXTERNAL_HOST,),

View File

@ -1,6 +1,8 @@
from __future__ import absolute_import from __future__ import absolute_import
from django.conf import settings from django.conf import settings
from django.core.handlers.wsgi import WSGIRequest
from django.core.handlers.base import BaseHandler
from postmonkey import PostMonkey, MailChimpException from postmonkey import PostMonkey, MailChimpException
from zerver.models import get_user_profile_by_email, \ from zerver.models import get_user_profile_by_email, \
get_user_profile_by_id, get_prereg_user_by_email, get_client get_user_profile_by_id, get_prereg_user_by_email, get_client
@ -25,6 +27,7 @@ import datetime
import logging import logging
import simplejson import simplejson
import redis import redis
import StringIO
def assign_queue(queue_name, enabled=True): def assign_queue(queue_name, enabled=True):
def decorate(clazz): def decorate(clazz):
@ -233,28 +236,45 @@ class MessageSenderWorker(QueueProcessingWorker):
def __init__(self): def __init__(self):
super(MessageSenderWorker, self).__init__() super(MessageSenderWorker, self).__init__()
self.redis_client = redis.StrictRedis(host=settings.REDIS_HOST, port=settings.REDIS_PORT, db=0) self.redis_client = redis.StrictRedis(host=settings.REDIS_HOST, port=settings.REDIS_PORT, db=0)
self.handler = BaseHandler()
self.handler.load_middleware()
def consume(self, event): def consume(self, event):
req = event['request'] server_meta = event['server_meta']
try:
sender = get_user_profile_by_id(req['sender_id'])
client = get_client(req['client_name'])
msg_id = check_send_message(sender, client, req['type'], environ = {'REQUEST_METHOD': 'SOCKET',
extract_recipients(req['to']), 'SCRIPT_NAME': '',
req['subject'], req['content']) 'PATH_INFO': '/json/send_message',
resp = {"result": "success", "msg": "", "id": msg_id} 'SERVER_NAME': 'localhost',
except JsonableError as e: 'SERVER_PORT': 9993,
resp = {"result": "error", "msg": str(e)} 'SERVER_PROTOCOL': 'ZULIP_SOCKET/1.0',
'wsgi.version': (1, 0),
'wsgi.input': StringIO.StringIO(),
'wsgi.errors': sys.stderr,
'wsgi.multithread': False,
'wsgi.multiprocess': True,
'wsgi.run_once': False,
'zulip.emulated_method': 'POST'}
# We're mostly using a WSGIRequest for convenience
environ.update(server_meta['request_environ'])
request = WSGIRequest(environ)
request._request = event['request']
request.csrf_processing_done = True
result = {'response': resp, 'req_id': event['req_id'], user_profile = get_user_profile_by_id(server_meta['user_id'])
'server_meta': event['server_meta']} request._cached_user = user_profile
redis_key = req_redis_key(event['server_meta']['client_id'], event['req_id']) resp = self.handler.get_response(request)
resp_content = resp.content
result = {'response': ujson.loads(resp_content), 'req_id': event['req_id'],
'server_meta': server_meta}
redis_key = req_redis_key(server_meta['client_id'], event['req_id'])
self.redis_client.hmset(redis_key, {'status': 'complete', self.redis_client.hmset(redis_key, {'status': 'complete',
'response': ujson.dumps(resp)}); 'response': resp_content});
queue_json_publish(event['server_meta']['return_queue'], result, lambda e: None) queue_json_publish(server_meta['return_queue'], result, lambda e: None)
@assign_queue('digest_emails') @assign_queue('digest_emails')
class DigestWorker(QueueProcessingWorker): class DigestWorker(QueueProcessingWorker):