From 65b247edecd3f4d9053325f9a5d59ca6579106fa Mon Sep 17 00:00:00 2001 From: Leo Franchi Date: Wed, 22 May 2013 17:49:02 -0400 Subject: [PATCH] Add a event_queue gc hook for missed messages (imported from commit a799abb35a9622e6aa59e9499cab3281ccb6859f) --- zephyr/lib/event_queue.py | 18 +++++++-- zephyr/management/commands/runtornado.py | 3 +- zephyr/tornado_callbacks.py | 48 ++++++++++++++++++++---- 3 files changed, 57 insertions(+), 12 deletions(-) diff --git a/zephyr/lib/event_queue.py b/zephyr/lib/event_queue.py index 5a11841854..7f48ff7ab4 100644 --- a/zephyr/lib/event_queue.py +++ b/zephyr/lib/event_queue.py @@ -118,8 +118,18 @@ clients = {} # maps user id to list of client descriptors user_clients = {} +# list of registered gc hooks. +# each one will be called with a user profile id, queue, and bool +# last_for_client that is true if this is the last queue pertaining +# to this user_profile_id +# that is about to be deleted +gc_hooks = [] + next_queue_id = 0 +def add_client_gc_hook(hook): + gc_hooks.append(hook) + def get_client_descriptor(queue_id): return clients.get(queue_id) @@ -146,9 +156,6 @@ def gc_event_queues(): to_remove.add(id) affected_users.add(client.user_profile_id) - for id in to_remove: - del clients[id] - for user_id in affected_users: new_client_list = filter(lambda c: c.event_queue.id not in to_remove, user_clients[user_id]) @@ -157,6 +164,11 @@ def gc_event_queues(): else: user_clients[user_id] = new_client_list + for id in to_remove: + for cb in gc_hooks: + cb(clients[id].user_profile_id, clients[id], clients[id].user_profile_id not in user_clients) + del clients[id] + logging.info(('Tornado removed %d idle event queues owned by %d users in %.3fs.' + ' Now %d active queues') % (len(to_remove), len(affected_users), time.time() - start, diff --git a/zephyr/management/commands/runtornado.py b/zephyr/management/commands/runtornado.py index 1999649322..af639509c1 100644 --- a/zephyr/management/commands/runtornado.py +++ b/zephyr/management/commands/runtornado.py @@ -21,7 +21,7 @@ from tornado import ioloop from zephyr.lib.debug import interactive_debug_listen from zephyr.lib.response import json_response from zephyr import tornado_callbacks -from zephyr.lib.event_queue import setup_event_queue +from zephyr.lib.event_queue import setup_event_queue, add_client_gc_hook from zephyr.lib.queue import setup_tornado_rabbitmq from zephyr.middleware import async_request_stop @@ -110,6 +110,7 @@ class Command(BaseCommand): ioloop.IOLoop.instance().set_blocking_log_threshold(5) setup_event_queue() + add_client_gc_hook(tornado_callbacks.missedmessage_hook) setup_tornado_rabbitmq() ioloop.IOLoop.instance().start() except KeyboardInterrupt: diff --git a/zephyr/tornado_callbacks.py b/zephyr/tornado_callbacks.py index cc3ac56817..ca37586599 100644 --- a/zephyr/tornado_callbacks.py +++ b/zephyr/tornado_callbacks.py @@ -7,7 +7,9 @@ from zephyr.models import Message, UserProfile, UserMessage, \ from zephyr.decorator import JsonableError from zephyr.lib.cache_helpers import cache_get_message from zephyr.lib.queue import queue_json_publish -from zephyr.lib.event_queue import get_client_descriptors_for_user +from zephyr.lib.timestamp import timestamp_to_datetime +from zephyr.lib.event_queue import get_client_descriptors_for_user, \ + add_client_gc_hook, IDLE_EVENT_QUEUE_TIMEOUT_SECS import os import sys @@ -248,6 +250,41 @@ def update_pointer(user_profile_id, new_pointer): if client.accepts_event_type(event['type']): client.add_event(event.copy()) + +def receives_offline_notifications(user_profile_id): + user_profile = get_user_profile_by_id(user_profile_id) + return (user_profile.enable_offline_email_notifications and + not user_profile.is_bot) + +def build_offline_notification_event(user_profile_id, message_id): + return {"user_profile_id": user_profile_id, + "message_id": message_id, + "timestamp": time.time()} + +def missedmessage_hook(user_profile_id, queue, last_for_client): + # Only process missedmessage hook when the last queue for a + # client has been garbage collected + if not last_for_client: + return + + # If a user has gone offline but has unread messages + # received in the idle time, send them a missed + # message email + if not receives_offline_notifications(user_profile_id): + return + + message_ids = [] + for event in queue.event_queue.contents(): + if not event['type'] == 'message' or not event['flags']: + continue + + if 'mentioned' in event['flags'] and not 'read' in event['flags']: + message_ids.append(event['message']['id']) + + for msg_id in message_ids: + event = build_offline_notification_event(user_profile_id, msg_id) + queue_json_publish("missedmessage_emails", event, lambda event: None) + def process_new_message(data): message = cache_get_message(data['message']) @@ -293,13 +330,8 @@ def process_new_message(data): mentioned = 'mentioned' in flags idle = len(get_client_descriptors_for_user(user_profile_id)) == 0 if (received_pm or mentioned) and idle: - user_profile = get_user_profile_by_id(user_profile_id) - - if user_profile.enable_offline_email_notifications and \ - not user_profile.is_bot: - event = {"user_profile_id": user_profile_id, - "message_id": message.id, - "timestamp": time.time()} + if receives_offline_notifications(user_profile_id): + event = build_offline_notification_event(user_profile_id, message.id) # We require RabbitMQ to do this, as we can't call the email handler # from the Tornado process. So if there's no rabbitmq support do nothing