mirror of https://github.com/zulip/zulip.git
Add a event_queue gc hook for missed messages
(imported from commit a799abb35a9622e6aa59e9499cab3281ccb6859f)
This commit is contained in:
parent
81058f5f9b
commit
65b247edec
|
@ -118,8 +118,18 @@ clients = {}
|
||||||
# maps user id to list of client descriptors
|
# maps user id to list of client descriptors
|
||||||
user_clients = {}
|
user_clients = {}
|
||||||
|
|
||||||
|
# list of registered gc hooks.
|
||||||
|
# each one will be called with a user profile id, queue, and bool
|
||||||
|
# last_for_client that is true if this is the last queue pertaining
|
||||||
|
# to this user_profile_id
|
||||||
|
# that is about to be deleted
|
||||||
|
gc_hooks = []
|
||||||
|
|
||||||
next_queue_id = 0
|
next_queue_id = 0
|
||||||
|
|
||||||
|
def add_client_gc_hook(hook):
|
||||||
|
gc_hooks.append(hook)
|
||||||
|
|
||||||
def get_client_descriptor(queue_id):
|
def get_client_descriptor(queue_id):
|
||||||
return clients.get(queue_id)
|
return clients.get(queue_id)
|
||||||
|
|
||||||
|
@ -146,9 +156,6 @@ def gc_event_queues():
|
||||||
to_remove.add(id)
|
to_remove.add(id)
|
||||||
affected_users.add(client.user_profile_id)
|
affected_users.add(client.user_profile_id)
|
||||||
|
|
||||||
for id in to_remove:
|
|
||||||
del clients[id]
|
|
||||||
|
|
||||||
for user_id in affected_users:
|
for user_id in affected_users:
|
||||||
new_client_list = filter(lambda c: c.event_queue.id not in to_remove,
|
new_client_list = filter(lambda c: c.event_queue.id not in to_remove,
|
||||||
user_clients[user_id])
|
user_clients[user_id])
|
||||||
|
@ -157,6 +164,11 @@ def gc_event_queues():
|
||||||
else:
|
else:
|
||||||
user_clients[user_id] = new_client_list
|
user_clients[user_id] = new_client_list
|
||||||
|
|
||||||
|
for id in to_remove:
|
||||||
|
for cb in gc_hooks:
|
||||||
|
cb(clients[id].user_profile_id, clients[id], clients[id].user_profile_id not in user_clients)
|
||||||
|
del clients[id]
|
||||||
|
|
||||||
logging.info(('Tornado removed %d idle event queues owned by %d users in %.3fs.'
|
logging.info(('Tornado removed %d idle event queues owned by %d users in %.3fs.'
|
||||||
+ ' Now %d active queues')
|
+ ' Now %d active queues')
|
||||||
% (len(to_remove), len(affected_users), time.time() - start,
|
% (len(to_remove), len(affected_users), time.time() - start,
|
||||||
|
|
|
@ -21,7 +21,7 @@ from tornado import ioloop
|
||||||
from zephyr.lib.debug import interactive_debug_listen
|
from zephyr.lib.debug import interactive_debug_listen
|
||||||
from zephyr.lib.response import json_response
|
from zephyr.lib.response import json_response
|
||||||
from zephyr import tornado_callbacks
|
from zephyr import tornado_callbacks
|
||||||
from zephyr.lib.event_queue import setup_event_queue
|
from zephyr.lib.event_queue import setup_event_queue, add_client_gc_hook
|
||||||
from zephyr.lib.queue import setup_tornado_rabbitmq
|
from zephyr.lib.queue import setup_tornado_rabbitmq
|
||||||
from zephyr.middleware import async_request_stop
|
from zephyr.middleware import async_request_stop
|
||||||
|
|
||||||
|
@ -110,6 +110,7 @@ class Command(BaseCommand):
|
||||||
ioloop.IOLoop.instance().set_blocking_log_threshold(5)
|
ioloop.IOLoop.instance().set_blocking_log_threshold(5)
|
||||||
|
|
||||||
setup_event_queue()
|
setup_event_queue()
|
||||||
|
add_client_gc_hook(tornado_callbacks.missedmessage_hook)
|
||||||
setup_tornado_rabbitmq()
|
setup_tornado_rabbitmq()
|
||||||
ioloop.IOLoop.instance().start()
|
ioloop.IOLoop.instance().start()
|
||||||
except KeyboardInterrupt:
|
except KeyboardInterrupt:
|
||||||
|
|
|
@ -7,7 +7,9 @@ from zephyr.models import Message, UserProfile, UserMessage, \
|
||||||
from zephyr.decorator import JsonableError
|
from zephyr.decorator import JsonableError
|
||||||
from zephyr.lib.cache_helpers import cache_get_message
|
from zephyr.lib.cache_helpers import cache_get_message
|
||||||
from zephyr.lib.queue import queue_json_publish
|
from zephyr.lib.queue import queue_json_publish
|
||||||
from zephyr.lib.event_queue import get_client_descriptors_for_user
|
from zephyr.lib.timestamp import timestamp_to_datetime
|
||||||
|
from zephyr.lib.event_queue import get_client_descriptors_for_user, \
|
||||||
|
add_client_gc_hook, IDLE_EVENT_QUEUE_TIMEOUT_SECS
|
||||||
|
|
||||||
import os
|
import os
|
||||||
import sys
|
import sys
|
||||||
|
@ -248,6 +250,41 @@ def update_pointer(user_profile_id, new_pointer):
|
||||||
if client.accepts_event_type(event['type']):
|
if client.accepts_event_type(event['type']):
|
||||||
client.add_event(event.copy())
|
client.add_event(event.copy())
|
||||||
|
|
||||||
|
|
||||||
|
def receives_offline_notifications(user_profile_id):
|
||||||
|
user_profile = get_user_profile_by_id(user_profile_id)
|
||||||
|
return (user_profile.enable_offline_email_notifications and
|
||||||
|
not user_profile.is_bot)
|
||||||
|
|
||||||
|
def build_offline_notification_event(user_profile_id, message_id):
|
||||||
|
return {"user_profile_id": user_profile_id,
|
||||||
|
"message_id": message_id,
|
||||||
|
"timestamp": time.time()}
|
||||||
|
|
||||||
|
def missedmessage_hook(user_profile_id, queue, last_for_client):
|
||||||
|
# Only process missedmessage hook when the last queue for a
|
||||||
|
# client has been garbage collected
|
||||||
|
if not last_for_client:
|
||||||
|
return
|
||||||
|
|
||||||
|
# If a user has gone offline but has unread messages
|
||||||
|
# received in the idle time, send them a missed
|
||||||
|
# message email
|
||||||
|
if not receives_offline_notifications(user_profile_id):
|
||||||
|
return
|
||||||
|
|
||||||
|
message_ids = []
|
||||||
|
for event in queue.event_queue.contents():
|
||||||
|
if not event['type'] == 'message' or not event['flags']:
|
||||||
|
continue
|
||||||
|
|
||||||
|
if 'mentioned' in event['flags'] and not 'read' in event['flags']:
|
||||||
|
message_ids.append(event['message']['id'])
|
||||||
|
|
||||||
|
for msg_id in message_ids:
|
||||||
|
event = build_offline_notification_event(user_profile_id, msg_id)
|
||||||
|
queue_json_publish("missedmessage_emails", event, lambda event: None)
|
||||||
|
|
||||||
def process_new_message(data):
|
def process_new_message(data):
|
||||||
message = cache_get_message(data['message'])
|
message = cache_get_message(data['message'])
|
||||||
|
|
||||||
|
@ -293,13 +330,8 @@ def process_new_message(data):
|
||||||
mentioned = 'mentioned' in flags
|
mentioned = 'mentioned' in flags
|
||||||
idle = len(get_client_descriptors_for_user(user_profile_id)) == 0
|
idle = len(get_client_descriptors_for_user(user_profile_id)) == 0
|
||||||
if (received_pm or mentioned) and idle:
|
if (received_pm or mentioned) and idle:
|
||||||
user_profile = get_user_profile_by_id(user_profile_id)
|
if receives_offline_notifications(user_profile_id):
|
||||||
|
event = build_offline_notification_event(user_profile_id, message.id)
|
||||||
if user_profile.enable_offline_email_notifications and \
|
|
||||||
not user_profile.is_bot:
|
|
||||||
event = {"user_profile_id": user_profile_id,
|
|
||||||
"message_id": message.id,
|
|
||||||
"timestamp": time.time()}
|
|
||||||
|
|
||||||
# We require RabbitMQ to do this, as we can't call the email handler
|
# We require RabbitMQ to do this, as we can't call the email handler
|
||||||
# from the Tornado process. So if there's no rabbitmq support do nothing
|
# from the Tornado process. So if there's no rabbitmq support do nothing
|
||||||
|
|
Loading…
Reference in New Issue