2016-06-28 20:24:37 +02:00
|
|
|
|
2017-03-03 19:01:52 +01:00
|
|
|
from typing import Any, Dict, Mapping, Optional, Text, Union
|
2013-09-07 00:27:10 +02:00
|
|
|
|
|
|
|
from django.conf import settings
|
2017-04-15 04:03:56 +02:00
|
|
|
from django.utils.timezone import now as timezone_now
|
2017-07-21 02:29:10 +02:00
|
|
|
from django.utils.translation import ugettext as _
|
2013-09-07 00:27:10 +02:00
|
|
|
from django.contrib.sessions.models import Session as djSession
|
2016-11-15 10:21:11 +01:00
|
|
|
try:
|
|
|
|
from django.middleware.csrf import _compare_salted_tokens
|
|
|
|
except ImportError:
|
|
|
|
# This function was added in Django 1.10.
|
2017-10-26 11:38:28 +02:00
|
|
|
def _compare_salted_tokens(token1: str, token2: str) -> bool:
|
2016-11-15 10:21:11 +01:00
|
|
|
return token1 == token2
|
2013-09-07 00:27:10 +02:00
|
|
|
|
|
|
|
import sockjs.tornado
|
2016-06-04 01:21:17 +02:00
|
|
|
from sockjs.tornado.session import ConnectionInfo
|
2013-09-07 00:27:10 +02:00
|
|
|
import tornado.ioloop
|
|
|
|
import ujson
|
|
|
|
import logging
|
|
|
|
import time
|
|
|
|
|
2013-10-11 21:31:20 +02:00
|
|
|
from zerver.models import UserProfile, get_user_profile_by_id, get_client
|
2013-09-07 00:27:10 +02:00
|
|
|
from zerver.lib.queue import queue_json_publish
|
2013-10-11 21:31:20 +02:00
|
|
|
from zerver.lib.actions import check_send_message, extract_recipients
|
|
|
|
from zerver.decorator import JsonableError
|
2013-11-08 23:11:37 +01:00
|
|
|
from zerver.middleware import record_request_start_data, record_request_stop_data, \
|
|
|
|
record_request_restart_data, write_log_line, format_timedelta
|
2014-02-05 00:35:32 +01:00
|
|
|
from zerver.lib.redis_utils import get_redis_client
|
2017-03-08 11:43:35 +01:00
|
|
|
from zerver.lib.sessions import get_session_user
|
2016-11-27 06:56:06 +01:00
|
|
|
from zerver.tornado.event_queue import get_client_descriptor
|
2017-07-21 02:20:31 +02:00
|
|
|
from zerver.tornado.exceptions import BadEventQueueIdError
|
2013-09-07 00:27:10 +02:00
|
|
|
|
2013-12-06 22:19:32 +01:00
|
|
|
logger = logging.getLogger('zulip.socket')
|
|
|
|
|
2017-10-26 11:38:28 +02:00
|
|
|
def get_user_profile(session_id: Optional[Text]) -> Optional[UserProfile]:
|
2013-09-07 00:27:10 +02:00
|
|
|
if session_id is None:
|
|
|
|
return None
|
|
|
|
|
|
|
|
try:
|
2017-04-15 04:03:56 +02:00
|
|
|
djsession = djSession.objects.get(expire_date__gt=timezone_now(),
|
2013-09-07 00:27:10 +02:00
|
|
|
session_key=session_id)
|
|
|
|
except djSession.DoesNotExist:
|
|
|
|
return None
|
|
|
|
|
|
|
|
try:
|
2017-08-25 07:38:54 +02:00
|
|
|
return get_user_profile_by_id(get_session_user(djsession))
|
2013-12-18 21:28:50 +01:00
|
|
|
except (UserProfile.DoesNotExist, KeyError):
|
2013-09-07 00:27:10 +02:00
|
|
|
return None
|
|
|
|
|
2017-10-26 11:38:28 +02:00
|
|
|
connections = dict() # type: Dict[Union[int, str], 'SocketConnection']
|
2013-09-07 00:27:10 +02:00
|
|
|
|
2017-10-26 11:38:28 +02:00
|
|
|
def get_connection(id: Union[int, str]) -> Optional['SocketConnection']:
|
2013-09-07 00:27:10 +02:00
|
|
|
return connections.get(id)
|
|
|
|
|
2017-10-26 11:38:28 +02:00
|
|
|
def register_connection(id: Union[int, str], conn: 'SocketConnection') -> None:
|
2013-12-09 19:57:31 +01:00
|
|
|
# Kill any old connections if they exist
|
|
|
|
if id in connections:
|
|
|
|
connections[id].close()
|
|
|
|
|
2013-11-04 23:00:53 +01:00
|
|
|
conn.client_id = id
|
|
|
|
connections[conn.client_id] = conn
|
2013-09-07 00:27:10 +02:00
|
|
|
|
2017-10-26 11:38:28 +02:00
|
|
|
def deregister_connection(conn: 'SocketConnection') -> None:
|
2017-08-06 07:01:02 +02:00
|
|
|
assert conn.client_id is not None
|
2013-11-04 23:00:53 +01:00
|
|
|
del connections[conn.client_id]
|
2013-09-07 00:27:10 +02:00
|
|
|
|
2014-02-05 00:35:32 +01:00
|
|
|
redis_client = get_redis_client()
|
2013-11-05 23:05:03 +01:00
|
|
|
|
2017-10-26 11:38:28 +02:00
|
|
|
def req_redis_key(req_id: Text) -> Text:
|
2016-06-28 20:24:37 +02:00
|
|
|
return u'socket_req_status:%s' % (req_id,)
|
2013-11-05 23:05:03 +01:00
|
|
|
|
2017-11-05 11:52:10 +01:00
|
|
|
class CloseErrorInfo:
|
2017-10-26 11:38:28 +02:00
|
|
|
def __init__(self, status_code: int, err_msg: str) -> None:
|
2013-11-15 23:19:13 +01:00
|
|
|
self.status_code = status_code
|
|
|
|
self.err_msg = err_msg
|
|
|
|
|
2013-09-07 00:27:10 +02:00
|
|
|
class SocketConnection(sockjs.tornado.SockJSConnection):
|
2017-07-09 02:06:29 +02:00
|
|
|
client_id = None # type: Optional[Union[int, str]]
|
2016-05-05 23:46:39 +02:00
|
|
|
|
2017-10-26 11:38:28 +02:00
|
|
|
def on_open(self, info: ConnectionInfo) -> None:
|
2013-11-08 23:11:37 +01:00
|
|
|
log_data = dict(extra='[transport=%s]' % (self.session.transport_name,))
|
|
|
|
record_request_start_data(log_data)
|
|
|
|
|
2013-11-15 23:19:13 +01:00
|
|
|
ioloop = tornado.ioloop.IOLoop.instance()
|
|
|
|
|
2013-09-07 00:27:10 +02:00
|
|
|
self.authenticated = False
|
|
|
|
self.session.user_profile = None
|
2017-08-06 07:01:02 +02:00
|
|
|
self.close_info = None # type: Optional[CloseErrorInfo]
|
2013-12-06 22:19:32 +01:00
|
|
|
self.did_close = False
|
|
|
|
|
2013-11-15 23:19:13 +01:00
|
|
|
try:
|
|
|
|
self.browser_session_id = info.get_cookie(settings.SESSION_COOKIE_NAME).value
|
|
|
|
self.csrf_token = info.get_cookie(settings.CSRF_COOKIE_NAME).value
|
|
|
|
except AttributeError:
|
|
|
|
# The request didn't contain the necessary cookie values. We can't
|
|
|
|
# close immediately because sockjs-tornado doesn't expect a close
|
|
|
|
# inside on_open(), so do it on the next tick.
|
|
|
|
self.close_info = CloseErrorInfo(403, "Initial cookie lacked required values")
|
|
|
|
ioloop.add_callback(self.close)
|
|
|
|
return
|
2013-09-07 00:27:10 +02:00
|
|
|
|
2017-10-26 11:38:28 +02:00
|
|
|
def auth_timeout() -> None:
|
2013-11-15 23:19:13 +01:00
|
|
|
self.close_info = CloseErrorInfo(408, "Timeout while waiting for authentication")
|
|
|
|
self.close()
|
|
|
|
|
|
|
|
self.timeout_handle = ioloop.add_timeout(time.time() + 10, auth_timeout)
|
2013-11-08 23:11:37 +01:00
|
|
|
write_log_line(log_data, path='/socket/open', method='SOCKET',
|
|
|
|
remote_ip=info.ip, email='unknown', client_name='?')
|
2013-09-07 00:27:10 +02:00
|
|
|
|
2017-10-26 11:38:28 +02:00
|
|
|
def authenticate_client(self, msg: Dict[str, Any]) -> None:
|
2013-09-07 00:27:10 +02:00
|
|
|
if self.authenticated:
|
2013-11-18 23:12:56 +01:00
|
|
|
self.session.send_message({'req_id': msg['req_id'], 'type': 'response',
|
2017-10-12 07:39:14 +02:00
|
|
|
'response': {'result': 'error',
|
|
|
|
'msg': 'Already authenticated'}})
|
2013-09-07 00:27:10 +02:00
|
|
|
return
|
|
|
|
|
|
|
|
user_profile = get_user_profile(self.browser_session_id)
|
|
|
|
if user_profile is None:
|
2017-07-21 02:29:10 +02:00
|
|
|
raise JsonableError(_('Unknown or missing session'))
|
2013-09-07 00:27:10 +02:00
|
|
|
self.session.user_profile = user_profile
|
|
|
|
|
2017-10-12 07:39:14 +02:00
|
|
|
if 'csrf_token' not in msg['request']:
|
|
|
|
# Debugging code to help with understanding #6961
|
|
|
|
logging.error("Invalid websockets auth request: %s" % (msg['request'],))
|
|
|
|
raise JsonableError(_('CSRF token entry missing from request'))
|
2016-11-15 10:21:11 +01:00
|
|
|
if not _compare_salted_tokens(msg['request']['csrf_token'], self.csrf_token):
|
2017-07-21 02:29:10 +02:00
|
|
|
raise JsonableError(_('CSRF token does not match that in cookie'))
|
2013-09-07 00:27:10 +02:00
|
|
|
|
2016-05-10 01:55:43 +02:00
|
|
|
if 'queue_id' not in msg['request']:
|
2017-07-21 02:29:10 +02:00
|
|
|
raise JsonableError(_("Missing 'queue_id' argument"))
|
2013-11-04 23:00:53 +01:00
|
|
|
|
|
|
|
queue_id = msg['request']['queue_id']
|
|
|
|
client = get_client_descriptor(queue_id)
|
|
|
|
if client is None:
|
2017-07-21 02:20:31 +02:00
|
|
|
raise BadEventQueueIdError(queue_id)
|
2013-11-04 23:00:53 +01:00
|
|
|
|
|
|
|
if user_profile.id != client.user_profile_id:
|
2017-07-21 02:29:10 +02:00
|
|
|
raise JsonableError(_("You are not the owner of the queue with id '%s'") % (queue_id,))
|
2013-11-04 23:00:53 +01:00
|
|
|
|
2013-11-06 18:51:59 +01:00
|
|
|
self.authenticated = True
|
2013-11-04 23:00:53 +01:00
|
|
|
register_connection(queue_id, self)
|
|
|
|
|
2013-11-18 23:12:56 +01:00
|
|
|
response = {'req_id': msg['req_id'], 'type': 'response',
|
|
|
|
'response': {'result': 'success', 'msg': ''}}
|
2013-11-06 18:51:59 +01:00
|
|
|
|
|
|
|
status_inquiries = msg['request'].get('status_inquiries')
|
|
|
|
if status_inquiries is not None:
|
2017-08-26 00:46:28 +02:00
|
|
|
results = {} # type: Dict[str, Dict[str, str]]
|
2013-11-06 18:51:59 +01:00
|
|
|
for inquiry in status_inquiries:
|
2017-08-26 00:46:28 +02:00
|
|
|
status = redis_client.hgetall(req_redis_key(inquiry)) # type: Dict[bytes, bytes]
|
2013-11-06 18:51:59 +01:00
|
|
|
if len(status) == 0:
|
2017-08-26 00:46:28 +02:00
|
|
|
result = {'status': 'not_received'}
|
|
|
|
elif b'response' not in status:
|
|
|
|
result = {'status': status[b'status'].decode('utf-8')}
|
|
|
|
else:
|
|
|
|
result = {'status': status[b'status'].decode('utf-8'),
|
|
|
|
'response': ujson.loads(status[b'response'])}
|
|
|
|
results[str(inquiry)] = result
|
2013-11-06 18:51:59 +01:00
|
|
|
response['response']['status_inquiries'] = results
|
|
|
|
|
|
|
|
self.session.send_message(response)
|
2013-09-07 00:27:10 +02:00
|
|
|
ioloop = tornado.ioloop.IOLoop.instance()
|
|
|
|
ioloop.remove_timeout(self.timeout_handle)
|
|
|
|
|
2017-10-26 11:38:28 +02:00
|
|
|
def on_message(self, msg_raw: str) -> None:
|
2013-11-08 23:11:37 +01:00
|
|
|
log_data = dict(extra='[transport=%s' % (self.session.transport_name,))
|
|
|
|
record_request_start_data(log_data)
|
2016-06-04 01:21:17 +02:00
|
|
|
msg = ujson.loads(msg_raw)
|
2013-09-07 00:27:10 +02:00
|
|
|
|
2013-12-06 22:19:32 +01:00
|
|
|
if self.did_close:
|
|
|
|
logger.info("Received message on already closed socket! transport=%s user=%s client_id=%s"
|
|
|
|
% (self.session.transport_name,
|
|
|
|
self.session.user_profile.email if self.session.user_profile is not None else 'unknown',
|
|
|
|
self.client_id))
|
|
|
|
|
2013-12-04 20:45:06 +01:00
|
|
|
self.session.send_message({'req_id': msg['req_id'], 'type': 'ack'})
|
2013-11-18 23:12:56 +01:00
|
|
|
|
2013-09-07 00:27:10 +02:00
|
|
|
if msg['type'] == 'auth':
|
2013-11-08 23:11:37 +01:00
|
|
|
log_data['extra'] += ']'
|
2013-11-04 22:25:56 +01:00
|
|
|
try:
|
|
|
|
self.authenticate_client(msg)
|
2013-11-08 23:11:37 +01:00
|
|
|
# TODO: Fill in the correct client
|
|
|
|
write_log_line(log_data, path='/socket/auth', method='SOCKET',
|
|
|
|
remote_ip=self.session.conn_info.ip,
|
|
|
|
email=self.session.user_profile.email,
|
|
|
|
client_name='?')
|
2017-07-21 02:29:10 +02:00
|
|
|
except JsonableError as e:
|
|
|
|
response = e.to_json()
|
2013-11-18 23:12:56 +01:00
|
|
|
self.session.send_message({'req_id': msg['req_id'], 'type': 'response',
|
|
|
|
'response': response})
|
2013-11-08 23:11:37 +01:00
|
|
|
write_log_line(log_data, path='/socket/auth', method='SOCKET',
|
|
|
|
remote_ip=self.session.conn_info.ip,
|
|
|
|
email='unknown', client_name='?',
|
2013-11-18 17:30:40 +01:00
|
|
|
status_code=403, error_content=ujson.dumps(response))
|
2013-09-07 00:27:10 +02:00
|
|
|
return
|
|
|
|
else:
|
|
|
|
if not self.authenticated:
|
2013-11-08 23:11:37 +01:00
|
|
|
response = {'result': 'error', 'msg': "Not yet authenticated"}
|
2013-11-18 23:12:56 +01:00
|
|
|
self.session.send_message({'req_id': msg['req_id'], 'type': 'response',
|
|
|
|
'response': response})
|
2013-11-18 17:35:43 +01:00
|
|
|
write_log_line(log_data, path='/socket/service_request', method='SOCKET',
|
2013-11-08 23:11:37 +01:00
|
|
|
remote_ip=self.session.conn_info.ip,
|
|
|
|
email='unknown', client_name='?',
|
2013-11-18 17:30:40 +01:00
|
|
|
status_code=403, error_content=ujson.dumps(response))
|
2013-09-07 00:27:10 +02:00
|
|
|
return
|
|
|
|
|
2014-01-17 21:35:25 +01:00
|
|
|
redis_key = req_redis_key(msg['req_id'])
|
2013-11-05 23:05:03 +01:00
|
|
|
with redis_client.pipeline() as pipeline:
|
2013-12-05 21:48:25 +01:00
|
|
|
pipeline.hmset(redis_key, {'status': 'received'})
|
2013-12-08 17:54:29 +01:00
|
|
|
pipeline.expire(redis_key, 60 * 60 * 24)
|
2013-11-05 23:05:03 +01:00
|
|
|
pipeline.execute()
|
|
|
|
|
2013-11-08 23:11:37 +01:00
|
|
|
record_request_stop_data(log_data)
|
2013-11-08 02:02:48 +01:00
|
|
|
queue_json_publish("message_sender",
|
|
|
|
dict(request=msg['request'],
|
|
|
|
req_id=msg['req_id'],
|
|
|
|
server_meta=dict(user_id=self.session.user_profile.id,
|
|
|
|
client_id=self.client_id,
|
|
|
|
return_queue="tornado_return",
|
2013-11-08 23:11:37 +01:00
|
|
|
log_data=log_data,
|
2013-11-08 02:02:48 +01:00
|
|
|
request_environ=dict(REMOTE_ADDR=self.session.conn_info.ip))),
|
2017-11-03 22:11:23 +01:00
|
|
|
lambda x: None, call_consume_in_tests=True)
|
2013-09-07 00:27:10 +02:00
|
|
|
|
2017-10-26 11:38:28 +02:00
|
|
|
def on_close(self) -> None:
|
2013-11-08 23:11:37 +01:00
|
|
|
log_data = dict(extra='[transport=%s]' % (self.session.transport_name,))
|
|
|
|
record_request_start_data(log_data)
|
2013-11-15 23:19:13 +01:00
|
|
|
if self.close_info is not None:
|
2013-11-08 23:11:37 +01:00
|
|
|
write_log_line(log_data, path='/socket/close', method='SOCKET',
|
|
|
|
remote_ip=self.session.conn_info.ip, email='unknown',
|
2013-11-15 23:19:13 +01:00
|
|
|
client_name='?', status_code=self.close_info.status_code,
|
|
|
|
error_content=self.close_info.err_msg)
|
2013-09-07 00:27:10 +02:00
|
|
|
else:
|
2013-11-08 23:13:55 +01:00
|
|
|
deregister_connection(self)
|
2013-12-06 21:50:59 +01:00
|
|
|
email = self.session.user_profile.email \
|
|
|
|
if self.session.user_profile is not None else 'unknown'
|
2013-11-08 23:11:37 +01:00
|
|
|
write_log_line(log_data, path='/socket/close', method='SOCKET',
|
2013-12-06 21:50:59 +01:00
|
|
|
remote_ip=self.session.conn_info.ip, email=email,
|
2013-11-08 23:11:37 +01:00
|
|
|
client_name='?')
|
2013-09-07 00:27:10 +02:00
|
|
|
|
2013-12-06 22:19:32 +01:00
|
|
|
self.did_close = True
|
|
|
|
|
2017-10-26 11:38:28 +02:00
|
|
|
def respond_send_message(data: Mapping[str, Any]) -> None:
|
2013-11-08 23:11:37 +01:00
|
|
|
log_data = data['server_meta']['log_data']
|
|
|
|
record_request_restart_data(log_data)
|
|
|
|
|
|
|
|
worker_log_data = data['server_meta']['worker_log_data']
|
|
|
|
forward_queue_delay = worker_log_data['time_started'] - log_data['time_stopped']
|
|
|
|
return_queue_delay = log_data['time_restarted'] - data['server_meta']['time_request_finished']
|
|
|
|
service_time = data['server_meta']['time_request_finished'] - worker_log_data['time_started']
|
|
|
|
log_data['extra'] += ', queue_delay: %s/%s, service_time: %s]' % (
|
|
|
|
format_timedelta(forward_queue_delay), format_timedelta(return_queue_delay),
|
|
|
|
format_timedelta(service_time))
|
|
|
|
|
2013-12-06 22:19:32 +01:00
|
|
|
client_id = data['server_meta']['client_id']
|
|
|
|
connection = get_connection(client_id)
|
|
|
|
if connection is None:
|
|
|
|
logger.info("Could not find connection to send response to! client_id=%s" % (client_id,))
|
|
|
|
else:
|
2013-11-18 23:12:56 +01:00
|
|
|
connection.session.send_message({'req_id': data['req_id'], 'type': 'response',
|
|
|
|
'response': data['response']})
|
2013-10-29 22:10:58 +01:00
|
|
|
|
2013-11-08 23:11:37 +01:00
|
|
|
# TODO: Fill in client name
|
|
|
|
# TODO: Maybe fill in the status code correctly
|
|
|
|
write_log_line(log_data, path='/socket/service_request', method='SOCKET',
|
|
|
|
remote_ip=connection.session.conn_info.ip,
|
|
|
|
email=connection.session.user_profile.email, client_name='?')
|
2013-09-07 00:27:10 +02:00
|
|
|
|
2013-11-25 17:50:11 +01:00
|
|
|
# We disable the eventsource and htmlfile transports because they cannot
|
|
|
|
# securely send us the zulip.com cookie, which we use as part of our
|
|
|
|
# authentication scheme.
|
2013-09-07 00:27:10 +02:00
|
|
|
sockjs_router = sockjs.tornado.SockJSRouter(SocketConnection, "/sockjs",
|
2017-01-10 20:07:16 +01:00
|
|
|
{'sockjs_url': 'https://%s/static/third/sockjs/sockjs-0.3.4.js' % (
|
2017-01-24 07:06:13 +01:00
|
|
|
settings.EXTERNAL_HOST,),
|
2013-10-08 22:10:27 +02:00
|
|
|
'disabled_transports': ['eventsource', 'htmlfile']})
|
2017-10-26 11:38:28 +02:00
|
|
|
def get_sockjs_router() -> sockjs.tornado.SockJSRouter:
|
2013-09-07 00:27:10 +02:00
|
|
|
return sockjs_router
|