From 5979af3a452a644c5bf6526bd8aa8579120e70a5 Mon Sep 17 00:00:00 2001 From: Zev Benjamin Date: Fri, 6 Sep 2013 18:27:10 -0400 Subject: [PATCH] [manual] Add asynchronous message sender via sockjs-tornado New dependency: sockjs-tornado One known limitation is that we don't clean up sessions for non-websockets transports. This is a bug in Tornado so I'm going to look at upgrading us to the latest version: https://github.com/mrjoes/sockjs-tornado/issues/47 (imported from commit 31cdb7596dd5ee094ab006c31757db17dca8899b) --- .../modules/zulip/manifests/app_frontend.pp | 3 +- tools/run-dev.py | 3 +- zerver/lib/socket.py | 152 ++++++++++++++++++ zerver/management/commands/runtornado.py | 6 +- zerver/worker/queue_processors.py | 25 ++- zproject/local_settings.py | 9 ++ zproject/settings.py | 6 + 7 files changed, 199 insertions(+), 5 deletions(-) create mode 100644 zerver/lib/socket.py diff --git a/servers/puppet/modules/zulip/manifests/app_frontend.pp b/servers/puppet/modules/zulip/manifests/app_frontend.pp index fd97eda953..0588904ac2 100644 --- a/servers/puppet/modules/zulip/manifests/app_frontend.pp +++ b/servers/puppet/modules/zulip/manifests/app_frontend.pp @@ -15,7 +15,8 @@ class zulip::app_frontend { "python-django-bitfield", "python-embedly", "python-postmonkey", "python-django-jstemplate", "redis-server", "python-redis", "python-django-guardian", - "python-diff-match-patch", "python-sourcemap", "python-mandrill"] + "python-diff-match-patch", "python-sourcemap", "python-mandrill", + "python-sockjs-tornado"] package { $web_packages: ensure => "installed" } file { "/etc/nginx/zulip-include/": diff --git a/tools/run-dev.py b/tools/run-dev.py index 42caebb9d3..837eeae136 100755 --- a/tools/run-dev.py +++ b/tools/run-dev.py @@ -73,7 +73,8 @@ class Resource(resource.Resource): if (request.uri in ['/json/get_updates', '/api/v1/get_messages', '/json/get_events'] or request.uri.startswith('/api/v1/messages/latest') or - request.uri.startswith('/api/v1/events')): + request.uri.startswith('/api/v1/events') or + request.uri.startswith('/sockjs')): return proxy.ReverseProxyResource('localhost', tornado_port, '/'+name) return proxy.ReverseProxyResource('localhost', django_port, '/'+name) diff --git a/zerver/lib/socket.py b/zerver/lib/socket.py new file mode 100644 index 0000000000..ec685bc953 --- /dev/null +++ b/zerver/lib/socket.py @@ -0,0 +1,152 @@ +from __future__ import absolute_import + +from django.conf import settings +from django.utils.importlib import import_module +from django.utils import timezone +from django.contrib.sessions.models import Session as djSession + +import sockjs.tornado +import tornado.ioloop +import ujson +import logging +import time + +from zerver.models import UserProfile +from zerver.lib.queue import queue_json_publish + +djsession_engine = import_module(settings.SESSION_ENGINE) +def get_user_profile(session_id): + if session_id is None: + return None + + try: + djsession = djSession.objects.get(expire_date__gt=timezone.now(), + session_key=session_id) + except djSession.DoesNotExist: + return None + + session_store = djsession_engine.SessionStore(djsession.session_key) + + try: + return UserProfile.objects.get(pk=session_store['_auth_user_id']) + except UserProfile.DoesNotExist: + return None + +connections = dict() +next_connection_seq = 0 + +def get_connection(id): + return connections.get(id) + +def register_connection(conn): + global next_connection_seq + conn.connection_id = "%s:%s" % (settings.SERVER_GENERATION, next_connection_seq) + next_connection_seq = next_connection_seq + 1 + connections[conn.connection_id] = conn + +def deregister_connection(conn): + del connections[conn.connection_id] + +def fake_log_line(conn_info, time, ret_code, path, email): + # These two functions are copied from our middleware. At some + # point we will just run the middleware directly. + def timedelta_ms(timedelta): + return timedelta * 1000 + + def format_timedelta(timedelta): + if (timedelta >= 1): + return "%.1fs" % (timedelta) + return "%.0fms" % (timedelta_ms(timedelta),) + + logging.info('%-15s %-7s %3d %5s %s (%s)' % + (conn_info.ip, 'SOCKET', ret_code, format_timedelta(time), + path, email)) + +class SocketConnection(sockjs.tornado.SockJSConnection): + def on_open(self, info): + self.authenticated = False + self.session.user_profile = None + self.browser_session_id = info.get_cookie(settings.SESSION_COOKIE_NAME).value + self.csrf_token = info.get_cookie(settings.CSRF_COOKIE_NAME).value + + ioloop = tornado.ioloop.IOLoop.instance() + self.timeout_handle = ioloop.add_timeout(time.time() + 10, self.close) + + register_connection(self) + fake_log_line(info, 0, 200, 'Connection opened using %s' % (self.session.transport_name,), 'unknown') + + def authenticate_client(self, msg): + if self.authenticated: + self.session.send_message({'client_meta': msg['client_meta'], + 'response': {'result': 'error', 'msg': 'Already authenticated'}}) + return + + user_profile = get_user_profile(self.browser_session_id) + if user_profile is None: + error_msg = 'Unknown or missing session' + fake_log_line(self.session.conn_info, 0, 403, error_msg, 'unknown') + self.session.send_message({'client_meta': msg['client_meta'], + 'response': {'result': 'error', 'msg': error_msg}}) + return + self.session.user_profile = user_profile + + if msg['request']['csrf_token'] != self.csrf_token: + error_msg = 'CSRF token does not match that in cookie' + fake_log_line(self.session.conn_info, 0, 403, error_msg, 'unknown') + self.session.send_message({'client_meta': msg['client_meta'], + 'response': {'result': 'error', 'msg': error_msg}}) + return + + self.session.send_message({'client_meta': msg['client_meta'], + 'response': {'result': 'success', 'msg': ''}}) + self.authenticated = True + fake_log_line(self.session.conn_info, 0, 200, "Authenticated", user_profile.email) + ioloop = tornado.ioloop.IOLoop.instance() + ioloop.remove_timeout(self.timeout_handle) + + def on_message(self, msg): + start_time = time.time() + msg = ujson.loads(msg) + + if msg['type'] == 'auth': + self.authenticate_client(msg) + return + else: + if not self.authenticated: + error_msg = 'Not yet authenticated' + fake_log_line(self.session.conn_info, 0, 403, error_msg, 'unknown') + self.session.send_message({'client_meta': msg['client_meta'], + 'response': {'result': 'error', 'msg': error_msg}}) + return + + req = msg['request'] + req['sender_id'] = self.session.user_profile.id + req['client_name'] = req['client'] + queue_json_publish("message_sender", dict(request=req, + client_meta=msg['client_meta'], + server_meta=dict(connection_id=self.connection_id, + return_queue="tornado_return", + start_time=start_time)), + lambda e: None) + + def on_close(self): + deregister_connection(self) + if self.session.user_profile is None: + fake_log_line(self.session.conn_info, 0, 408, + 'Timeout while waiting for authentication', 'unknown') + else: + fake_log_line(self.session.conn_info, 0, 200, + 'Connection closed', 'unknown') + +def respond_send_message(chan, method, props, data): + connection = get_connection(data['server_meta']['connection_id']) + if connection is not None: + connection.session.send_message({'client_meta': data['client_meta'], 'response': data['response']}) + fake_log_line(connection.session.conn_info, + time.time() - data['server_meta']['start_time'], + 200, 'send_message', connection.session.user_profile.email) + +sockjs_router = sockjs.tornado.SockJSRouter(SocketConnection, "/sockjs", + {'sockjs_url': 'https://%s/static/third/sockjs/sockjs-0.3.4.js' % (settings.EXTERNAL_HOST,)}) +def get_sockjs_router(): + return sockjs_router diff --git a/zerver/management/commands/runtornado.py b/zerver/management/commands/runtornado.py index 4e2cc60dde..bd39f63170 100644 --- a/zerver/management/commands/runtornado.py +++ b/zerver/management/commands/runtornado.py @@ -22,6 +22,7 @@ from zerver.lib.response import json_response from zerver import tornado_callbacks from zerver.lib.event_queue import setup_event_queue, add_client_gc_hook from zerver.lib.queue import setup_tornado_rabbitmq +from zerver.lib.socket import get_sockjs_router, respond_send_message from zerver.middleware import async_request_stop if settings.USING_RABBITMQ: @@ -83,6 +84,7 @@ class Command(BaseCommand): def process_notification(chan, method, props, data): tornado_callbacks.process_notification(data) queue_client.register_json_consumer('notify_tornado', process_notification) + queue_client.register_json_consumer('tornado_return', respond_send_message) try: urls = (r"/json/get_updates", @@ -92,8 +94,10 @@ class Command(BaseCommand): r"/json/get_events", r"/api/v1/events", ) + # Application is an instance of Django's standard wsgi handler. - application = web.Application([(url, AsyncDjangoHandler) for url in urls], + application = web.Application([(url, AsyncDjangoHandler) for url in urls] + + get_sockjs_router().urls, debug=django.conf.settings.DEBUG, # Disable Tornado's own request logging, since we have our own log_function=lambda x: None) diff --git a/zerver/worker/queue_processors.py b/zerver/worker/queue_processors.py index 96f0d7b625..be1cae91cb 100644 --- a/zerver/worker/queue_processors.py +++ b/zerver/worker/queue_processors.py @@ -5,11 +5,12 @@ from django.conf import settings from postmonkey import PostMonkey, MailChimpException from zerver.models import UserActivityInterval, get_user_profile_by_email, \ get_user_profile_by_id, get_prereg_user_by_email, get_client -from zerver.lib.queue import SimpleQueueClient +from zerver.lib.queue import SimpleQueueClient, queue_json_publish from zerver.lib.timestamp import timestamp_to_datetime from zerver.lib.actions import handle_missedmessage_emails, do_send_confirmation_email, \ do_update_user_activity, do_update_user_activity_interval, do_update_user_presence, \ - internal_send_message, send_local_email_template_with_delay, clear_followup_emails_queue + internal_send_message, send_local_email_template_with_delay, clear_followup_emails_queue, \ + check_send_message, extract_recipients from zerver.decorator import JsonableError from confirmation.models import Confirmation @@ -176,3 +177,23 @@ class SlowQueryWorker(QueueProcessingWorker): # Aggregate all slow query messages in 1-minute chunks to avoid message spam time.sleep(1 * 60) + +@assign_queue("message_sender") +class MessageSenderWorker(QueueProcessingWorker): + def consume(self, ch, method, properties, event): + req = event['request'] + try: + sender = get_user_profile_by_id(req['sender_id']) + client = get_client(req['client_name']) + + msg_id = check_send_message(sender, client, req['type'], + extract_recipients(req['to']), + req['subject'], req['content']) + resp = {"result": "success", "msg": "", "id": msg_id} + except JsonableError as e: + resp = {"result": "error", "msg": str(e)} + + result = {'response': resp, 'client_meta': event['client_meta'], + 'server_meta': event['server_meta']} + queue_json_publish(event['server_meta']['return_queue'], result, lambda e: None) + diff --git a/zproject/local_settings.py b/zproject/local_settings.py index 94b2592ef7..7096f5c70e 100644 --- a/zproject/local_settings.py +++ b/zproject/local_settings.py @@ -53,6 +53,15 @@ TESTING_DEPLOYED = not not re.match(r'^test', platform.node()) LOCALSERVER = os.path.exists('/etc/zulip-local') +if TESTING_DEPLOYED: + EXTERNAL_HOST = platform.node() +elif STAGING_DEPLOYED: + EXTERNAL_HOST = 'staging.zulip.com' +elif DEPLOYED: + EXTERNAL_HOST = 'zulip.com' +else: + EXTERNAL_HOST = 'localhost:9991' + EMBEDLY_KEY="xxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxx" # For now, LOCALSERVER is only testing, so write to our test buckets diff --git a/zproject/settings.py b/zproject/settings.py index c1af60aa42..5d9b1e33b2 100644 --- a/zproject/settings.py +++ b/zproject/settings.py @@ -345,6 +345,7 @@ JS_SPECS = { 'third/lazyload/lazyload.js', 'third/spectrum/spectrum.js', 'third/winchan/winchan.js', + 'third/sockjs/sockjs-0.3.4.js', ('third/handlebars/handlebars.runtime.js' if PIPELINE else 'third/handlebars/handlebars.js'), @@ -409,6 +410,11 @@ JS_SPECS = { ), 'output_filename': 'min/activity.js' }, + # We also want to minify sockjs separately for the sockjs iframe transport + 'sockjs': { + 'source_filenames': ('third/sockjs/sockjs-0.3.4.js',), + 'output_filename': 'min/sockjs-0.3.4.min.js' + }, } if not DEBUG: