Initial event system implementation

This version has several limitations that are addressed in later
commits in this series.

(imported from commit 5d452b312d4204935059c4d602af0b9a8be1a009)
This commit is contained in:
Zev Benjamin 2013-03-26 13:06:00 -04:00
parent 5a58fb3c29
commit 8f4eaa63ad
4 changed files with 175 additions and 1 deletions

120
zephyr/lib/event_queue.py Normal file
View File

@ -0,0 +1,120 @@
from django.conf import settings
from collections import deque
from tornado.ioloop import PeriodicCallback
import time
import socket
import logging
import simplejson
IDLE_EVENT_QUEUE_TIMEOUT_SECS = 60 * 10
class ClientDescriptor(object):
def __init__(self, user_profile_id, id, apply_markdown=True):
self.user_profile_id = user_profile_id
self.current_handler = None
self.event_queue = EventQueue(id)
self.last_connection_time = time.time()
self.apply_markdown = apply_markdown
def add_event(self, event):
if self.current_handler is not None:
self.current_handler._request._time_restarted = time.time()
self.event_queue.push(event)
self.check_connection()
if self.current_handler is not None:
try:
self.current_handler.humbug_finish(dict(result='success', msg='',
events=[event],
queue_id=self.event_queue.id),
self.current_handler._request,
apply_markdown=self.apply_markdown)
return
except socket.error:
pass
def idle(self, now):
self.check_connection()
return (self.current_handler is None
and now - self.last_connection_time >= IDLE_EVENT_QUEUE_TIMEOUT_SECS)
def connect_handler(self, handler):
self.current_handler = handler
self.last_connection_time = time.time()
def disconnect_handler(self):
self.current_handler = None
def check_connection(self):
if (self.current_handler is not None
and self.current_handler.request.connection.stream.closed()):
self.current_handler = None
class EventQueue(object):
def __init__(self, id):
self.queue = deque()
self.connected_handler = None
self.next_event_id = 0
self.id = id
def push(self, event):
event['id'] = self.next_event_id
self.next_event_id += 1
self.queue.append(event)
def pop(self):
return self.queue.popleft()
def empty(self):
return len(self.queue) == 0
def prune(self, through_id):
while not self.empty() and self.queue[0]['id'] <= through_id:
self.pop()
def contents(self):
return list(self.queue)
# maps queue ids to client descriptors
clients = {}
# maps user id to list of client descriptors
user_clients = {}
next_queue_id = 0
def allocate_client_descriptor(user_profile_id, apply_markdown):
global next_queue_id
id = str(settings.SERVER_GENERATION) + ':' + str(next_queue_id)
next_queue_id += 1
client = ClientDescriptor(user_profile_id, id, apply_markdown)
clients[id] = client
user_clients.setdefault(user_profile_id, []).append(client)
return client
EVENT_QUEUE_GC_FREQ_MSECS = 1000 * 60 * 5
def gc_event_queues():
start = time.time()
to_remove = set()
affected_users = set()
for (id, client) in clients.iteritems():
if client.idle(start):
to_remove.add(id)
affected_users.add(client.user_profile_id)
for id in to_remove:
del clients[id]
for user_id in affected_users:
new_client_list = filter(lambda c: c.event_queue.id not in to_remove,
user_clients[user_id])
user_clients[user_id] = new_client_list
logging.info(('Tornado removed %d idle event queues owned by %d users in %.3fs.'
+ ' Now %d active queues')
% (len(to_remove), len(affected_users), time.time() - start,
len(clients)))
def setup_event_queue_gc(io_loop):
pc = PeriodicCallback(gc_event_queues, EVENT_QUEUE_GC_FREQ_MSECS, io_loop)
pc.start()

View File

@ -19,6 +19,7 @@ from tornado import ioloop
from zephyr.lib.debug import interactive_debug_listen
from zephyr.lib.response import json_response
from zephyr import tornado_callbacks
from zephyr.lib.event_queue import setup_event_queue_gc
if settings.USING_RABBITMQ:
from zephyr.lib.queue import queue_client
@ -102,6 +103,7 @@ class Command(BaseCommand):
if django.conf.settings.DEBUG:
ioloop.IOLoop.instance().set_blocking_log_threshold(5)
setup_event_queue_gc(ioloop.IOLoop.instance())
ioloop.IOLoop.instance().start()
except KeyboardInterrupt:
sys.exit(0)

View File

@ -5,6 +5,7 @@ from zephyr.models import Message, UserProfile, UserMessage, \
from zephyr.decorator import JsonableError
from zephyr.lib.cache_helpers import cache_get_message
from zephyr.lib.queue import queue_json_publish
from zephyr.lib.event_queue import user_clients
import os
import sys
@ -239,12 +240,21 @@ def update_pointer(user_profile_id, new_pointer):
new_pointer=new_pointer,
update_types=["pointer_update"])
for client in user_clients.get(user_profile_id, []):
event = dict(type='pointer', pointer=new_pointer)
client.add_event(event)
def process_new_message(data):
message = cache_get_message(data['message'])
for user_profile_id in data['users']:
user_receive_message(user_profile_id, message)
for client in user_clients.get(user_profile_id, []):
event = dict(type='message', message=message.to_dict(client.apply_markdown))
client.add_event(event)
if 'stream_name' in data:
stream_receive_message(data['realm_id'], data['stream_name'], message)

View File

@ -6,7 +6,7 @@ from zephyr.decorator import asynchronous, authenticated_api_view, \
has_request_variables, POST, to_non_negative_int, json_to_bool, \
JsonableError, authenticated_rest_api_view, REQ
from zephyr.lib.response import json_success, json_error
from zephyr.lib.response import json_response, json_success, json_error
from zephyr.tornado_callbacks import \
get_user_pointer, fetch_stream_messages, fetch_user_messages, \
@ -14,6 +14,7 @@ from zephyr.tornado_callbacks import \
add_pointer_update_callback, process_notification
from zephyr.lib.cache_helpers import cache_get_message
from zephyr.lib.event_queue import allocate_client_descriptor, clients
import datetime
import simplejson
@ -192,3 +193,44 @@ def get_updates_backend(request, user_profile, handler, client_id,
# runtornado recognizes this special return value.
return RespondAsynchronously
@asynchronous
@authenticated_json_post_view
def json_get_events(request, user_profile, handler):
return get_events_backend(request, user_profile, handler)
@asynchronous
@authenticated_rest_api_view
@has_request_variables
def rest_get_events(request, user_profile, handler,
apply_markdown=REQ(default=False, converter=json_to_bool)):
return get_events_backend(request, user_profile, handler,
apply_markdown=apply_markdown)
@has_request_variables
def get_events_backend(request, user_profile, handler,
last_event_id = REQ(converter=to_non_negative_int, default=None),
queue_id = REQ(default=None), apply_markdown=True):
if queue_id is None:
client = allocate_client_descriptor(user_profile.id, apply_markdown)
queue_id = client.event_queue.id
else:
if last_event_id is None:
return json_error("Missing 'last_event_id' argument")
client = clients.get(queue_id)
if client is None:
return json_error("Bad event queue id: %s" % (queue_id,))
if user_profile.id != client.user_profile_id:
return json_error("You are not authorized to get events from this queue")
client.event_queue.prune(last_event_id)
client.disconnect_handler()
if not client.event_queue.empty():
return json_success({'events': client.event_queue.contents(),
'queue_id': queue_id})
handler._request = request
client.connect_handler(handler)
# runtornado recognizes this special return value.
return RespondAsynchronously