zulip/zerver/lib/socket.py

307 lines
13 KiB
Python

from __future__ import absolute_import
from six import text_type
from typing import Any, Union, Mapping, Optional
from django.conf import settings
from django.utils import timezone
from django.contrib.sessions.models import Session as djSession
import sockjs.tornado
from sockjs.tornado.session import ConnectionInfo
import tornado.ioloop
import ujson
import logging
import time
from zerver.models import UserProfile, get_user_profile_by_id, get_client
from zerver.lib.queue import queue_json_publish
from zerver.lib.actions import check_send_message, extract_recipients
from zerver.decorator import JsonableError
from zerver.lib.utils import statsd
from zerver.lib.event_queue import get_client_descriptor
from zerver.middleware import record_request_start_data, record_request_stop_data, \
record_request_restart_data, write_log_line, format_timedelta
from zerver.lib.redis_utils import get_redis_client
from zerver.lib.session_user import get_session_user
logger = logging.getLogger('zulip.socket')
def get_user_profile(session_id):
# type: (Optional[text_type]) -> Optional[UserProfile]
if session_id is None:
return None
try:
djsession = djSession.objects.get(expire_date__gt=timezone.now(),
session_key=session_id)
except djSession.DoesNotExist:
return None
try:
return UserProfile.objects.get(pk=get_session_user(djsession))
except (UserProfile.DoesNotExist, KeyError):
return None
connections = dict() # type: Dict[Union[int, str], SocketConnection]
def get_connection(id):
# type: (Union[int, str]) -> SocketConnection
return connections.get(id)
def register_connection(id, conn):
# type: (Union[int, str], SocketConnection) -> None
# Kill any old connections if they exist
if id in connections:
connections[id].close()
conn.client_id = id
connections[conn.client_id] = conn
def deregister_connection(conn):
# type: (SocketConnection) -> None
del connections[conn.client_id]
redis_client = get_redis_client()
def req_redis_key(req_id):
# type: (text_type) -> text_type
return u'socket_req_status:%s' % (req_id,)
class SocketAuthError(Exception):
def __init__(self, msg):
# type: (str) -> None
self.msg = msg
class CloseErrorInfo(object):
def __init__(self, status_code, err_msg):
# type: (int, str) -> None
self.status_code = status_code
self.err_msg = err_msg
class SocketConnection(sockjs.tornado.SockJSConnection):
client_id = None # type: Optional[Union[int, str]]
def on_open(self, info):
# type: (ConnectionInfo) -> None
log_data = dict(extra='[transport=%s]' % (self.session.transport_name,))
record_request_start_data(log_data)
ioloop = tornado.ioloop.IOLoop.instance()
self.authenticated = False
self.session.user_profile = None
self.close_info = None # type: CloseErrorInfo
self.did_close = False
try:
self.browser_session_id = info.get_cookie(settings.SESSION_COOKIE_NAME).value
self.csrf_token = info.get_cookie(settings.CSRF_COOKIE_NAME).value
except AttributeError:
# The request didn't contain the necessary cookie values. We can't
# close immediately because sockjs-tornado doesn't expect a close
# inside on_open(), so do it on the next tick.
self.close_info = CloseErrorInfo(403, "Initial cookie lacked required values")
ioloop.add_callback(self.close)
return
def auth_timeout():
# type: () -> None
self.close_info = CloseErrorInfo(408, "Timeout while waiting for authentication")
self.close()
self.timeout_handle = ioloop.add_timeout(time.time() + 10, auth_timeout)
write_log_line(log_data, path='/socket/open', method='SOCKET',
remote_ip=info.ip, email='unknown', client_name='?')
def authenticate_client(self, msg):
# type: (Dict[str, Any]) -> None
if self.authenticated:
self.session.send_message({'req_id': msg['req_id'], 'type': 'response',
'response': {'result': 'error', 'msg': 'Already authenticated'}})
return
user_profile = get_user_profile(self.browser_session_id)
if user_profile is None:
raise SocketAuthError('Unknown or missing session')
self.session.user_profile = user_profile
if msg['request']['csrf_token'] != self.csrf_token:
raise SocketAuthError('CSRF token does not match that in cookie')
if 'queue_id' not in msg['request']:
raise SocketAuthError("Missing 'queue_id' argument")
queue_id = msg['request']['queue_id']
client = get_client_descriptor(queue_id)
if client is None:
raise SocketAuthError('Bad event queue id: %s' % (queue_id,))
if user_profile.id != client.user_profile_id:
raise SocketAuthError("You are not the owner of the queue with id '%s'" % (queue_id,))
self.authenticated = True
register_connection(queue_id, self)
response = {'req_id': msg['req_id'], 'type': 'response',
'response': {'result': 'success', 'msg': ''}}
status_inquiries = msg['request'].get('status_inquiries')
if status_inquiries is not None:
results = {}
for inquiry in status_inquiries:
status = redis_client.hgetall(req_redis_key(inquiry))
if len(status) == 0:
status['status'] = 'not_received'
if 'response' in status:
status['response'] = ujson.loads(status['response'])
results[str(inquiry)] = status
response['response']['status_inquiries'] = results
self.session.send_message(response)
ioloop = tornado.ioloop.IOLoop.instance()
ioloop.remove_timeout(self.timeout_handle)
def on_message(self, msg_raw):
# type: (str) -> None
log_data = dict(extra='[transport=%s' % (self.session.transport_name,))
record_request_start_data(log_data)
msg = ujson.loads(msg_raw)
if self.did_close:
logger.info("Received message on already closed socket! transport=%s user=%s client_id=%s"
% (self.session.transport_name,
self.session.user_profile.email if self.session.user_profile is not None else 'unknown',
self.client_id))
self.session.send_message({'req_id': msg['req_id'], 'type': 'ack'})
if msg['type'] == 'auth':
log_data['extra'] += ']'
try:
self.authenticate_client(msg)
# TODO: Fill in the correct client
write_log_line(log_data, path='/socket/auth', method='SOCKET',
remote_ip=self.session.conn_info.ip,
email=self.session.user_profile.email,
client_name='?')
except SocketAuthError as e:
response = {'result': 'error', 'msg': e.msg}
self.session.send_message({'req_id': msg['req_id'], 'type': 'response',
'response': response})
write_log_line(log_data, path='/socket/auth', method='SOCKET',
remote_ip=self.session.conn_info.ip,
email='unknown', client_name='?',
status_code=403, error_content=ujson.dumps(response))
return
else:
if not self.authenticated:
response = {'result': 'error', 'msg': "Not yet authenticated"}
self.session.send_message({'req_id': msg['req_id'], 'type': 'response',
'response': response})
write_log_line(log_data, path='/socket/service_request', method='SOCKET',
remote_ip=self.session.conn_info.ip,
email='unknown', client_name='?',
status_code=403, error_content=ujson.dumps(response))
return
redis_key = req_redis_key(msg['req_id'])
with redis_client.pipeline() as pipeline:
pipeline.hmset(redis_key, {'status': 'received'})
pipeline.expire(redis_key, 60 * 60 * 24)
pipeline.execute()
record_request_stop_data(log_data)
queue_json_publish("message_sender",
dict(request=msg['request'],
req_id=msg['req_id'],
server_meta=dict(user_id=self.session.user_profile.id,
client_id=self.client_id,
return_queue="tornado_return",
log_data=log_data,
request_environ=dict(REMOTE_ADDR=self.session.conn_info.ip))),
fake_message_sender)
def on_close(self):
# type: () -> None
log_data = dict(extra='[transport=%s]' % (self.session.transport_name,))
record_request_start_data(log_data)
if self.close_info is not None:
write_log_line(log_data, path='/socket/close', method='SOCKET',
remote_ip=self.session.conn_info.ip, email='unknown',
client_name='?', status_code=self.close_info.status_code,
error_content=self.close_info.err_msg)
else:
deregister_connection(self)
email = self.session.user_profile.email \
if self.session.user_profile is not None else 'unknown'
write_log_line(log_data, path='/socket/close', method='SOCKET',
remote_ip=self.session.conn_info.ip, email=email,
client_name='?')
self.did_close = True
def fake_message_sender(event):
# type: (Dict[str, Any]) -> None
log_data = dict() # type: Dict[str, Any]
record_request_start_data(log_data)
req = event['request']
try:
sender = get_user_profile_by_id(event['server_meta']['user_id'])
client = get_client(req['client'])
msg_id = check_send_message(sender, client, req['type'],
extract_recipients(req['to']),
req['subject'], req['content'],
local_id=req.get('local_id', None),
sender_queue_id=req.get('queue_id', None))
resp = {"result": "success", "msg": "", "id": msg_id}
except JsonableError as e:
resp = {"result": "error", "msg": str(e)}
server_meta = event['server_meta']
server_meta.update({'worker_log_data': log_data,
'time_request_finished': time.time()})
result = {'response': resp, 'req_id': event['req_id'],
'server_meta': server_meta}
respond_send_message(result)
def respond_send_message(data):
# type: (Mapping[str, Any]) -> None
log_data = data['server_meta']['log_data']
record_request_restart_data(log_data)
worker_log_data = data['server_meta']['worker_log_data']
forward_queue_delay = worker_log_data['time_started'] - log_data['time_stopped']
return_queue_delay = log_data['time_restarted'] - data['server_meta']['time_request_finished']
service_time = data['server_meta']['time_request_finished'] - worker_log_data['time_started']
log_data['extra'] += ', queue_delay: %s/%s, service_time: %s]' % (
format_timedelta(forward_queue_delay), format_timedelta(return_queue_delay),
format_timedelta(service_time))
client_id = data['server_meta']['client_id']
connection = get_connection(client_id)
if connection is None:
logger.info("Could not find connection to send response to! client_id=%s" % (client_id,))
else:
connection.session.send_message({'req_id': data['req_id'], 'type': 'response',
'response': data['response']})
# TODO: Fill in client name
# TODO: Maybe fill in the status code correctly
write_log_line(log_data, path='/socket/service_request', method='SOCKET',
remote_ip=connection.session.conn_info.ip,
email=connection.session.user_profile.email, client_name='?')
# We disable the eventsource and htmlfile transports because they cannot
# securely send us the zulip.com cookie, which we use as part of our
# authentication scheme.
sockjs_router = sockjs.tornado.SockJSRouter(SocketConnection, "/sockjs",
{'sockjs_url': 'https://%s/static/third/sockjs/sockjs-0.3.4.js' % (
settings.EXTERNAL_HOST,),
'disabled_transports': ['eventsource', 'htmlfile']})
def get_sockjs_router():
# type: () -> sockjs.tornado.SockJSRouter
return sockjs_router