socket: Record the request status in redis

(imported from commit 6c3f6dc37d1abdcacf105f865ac24483717438ca)
This commit is contained in:
Zev Benjamin 2013-11-05 17:05:03 -05:00
parent 87fc182645
commit 48a25211fa
2 changed files with 24 additions and 0 deletions

View File

@ -10,6 +10,7 @@ import tornado.ioloop
import ujson
import logging
import time
import redis
from zerver.models import UserProfile, get_user_profile_by_id, get_client
from zerver.lib.queue import queue_json_publish
@ -63,6 +64,11 @@ def fake_log_line(conn_info, time, ret_code, path, email):
(conn_info.ip, 'SOCKET', ret_code, format_timedelta(time),
path, email))
redis_client = redis.StrictRedis(host=settings.REDIS_HOST, port=settings.REDIS_PORT, db=0)
def req_redis_key(client_id, req_id):
return 'socket_req_status:%s:%s' % (client_id, req_id)
class SocketAuthError(Exception):
def __init__(self, msg):
self.msg = msg
@ -137,6 +143,13 @@ class SocketConnection(sockjs.tornado.SockJSConnection):
req = msg['request']
req['sender_id'] = self.session.user_profile.id
req['client_name'] = req['client']
redis_key = req_redis_key(self.client_id, msg['req_id'])
with redis_client.pipeline() as pipeline:
pipeline.hmset(redis_key, {'status': 'receieved'});
pipeline.expire(redis_key, 60 * 5)
pipeline.execute()
queue_json_publish("message_sender", dict(request=req,
req_id=msg['req_id'],
server_meta=dict(client_id=self.client_id,

View File

@ -13,6 +13,7 @@ from zerver.lib.actions import handle_missedmessage_emails, do_send_confirmation
check_send_message, extract_recipients
from zerver.lib.digest import handle_digest_email
from zerver.decorator import JsonableError
from zerver.lib.socket import req_redis_key
from confirmation.models import Confirmation
import os
@ -23,6 +24,7 @@ import time
import datetime
import logging
import simplejson
import redis
def assign_queue(queue_name, enabled=True):
def decorate(clazz):
@ -228,6 +230,10 @@ class SlowQueryWorker(QueueProcessingWorker):
@assign_queue("message_sender")
class MessageSenderWorker(QueueProcessingWorker):
def __init__(self):
super(MessageSenderWorker, self).__init__()
self.redis_client = redis.StrictRedis(host=settings.REDIS_HOST, port=settings.REDIS_PORT, db=0)
def consume(self, event):
req = event['request']
try:
@ -243,6 +249,11 @@ class MessageSenderWorker(QueueProcessingWorker):
result = {'response': resp, 'req_id': event['req_id'],
'server_meta': event['server_meta']}
redis_key = req_redis_key(event['server_meta']['client_id'], event['req_id'])
self.redis_client.hmset(redis_key, {'status': 'complete',
'response': ujson.dumps(resp)});
queue_json_publish(event['server_meta']['return_queue'], result, lambda e: None)
@assign_queue('digest_emails')