events: Add support for not storing redundant events.

This should dramatically improve the speed of the dump/load part of
our restart process, especially with large long-lived event queues.

(imported from commit cc493fa50b4c339257e060b3f0c0956c682e449d)
This commit is contained in:
Tim Abbott 2013-11-22 14:30:32 -05:00
parent afa2cc25d1
commit 1db8fba5aa
1 changed files with 57 additions and 4 deletions

View File

@ -18,6 +18,7 @@ import traceback
from zerver.lib.utils import statsd
from zerver.middleware import async_request_restart
from zerver.models import get_client
import copy
# The idle timeout used to be a week, but we found that in that
# situation, queues from dead browser sessions would grow quite large
@ -142,27 +143,57 @@ class ClientDescriptor(object):
do_gc_event_queues([self.event_queue.id], [self.user_profile_id],
[self.realm_id])
def compute_full_event_type(event):
if event["type"] == "update_message_flags":
if event["all"]:
# Put the "all" case in its own category
return "%s/%s/all" % (event["flag"], event["operation"])
return "%s/%s" % (event["flag"], event["operation"])
return event["type"]
# Virtual events are a mechanism for storing pointer changes and other
# easily collapsed event types efficiently.
VIRTUAL_EVENT_TYPES = ["pointer", "read/add"]
class EventQueue(object):
def __init__(self, id):
self.queue = deque()
self.next_event_id = 0
self.id = id
self.virtual_events = {}
def to_dict(self):
return dict(id=self.id, next_event_id=self.next_event_id,
queue=list(self.queue))
return dict(id=self.id,
next_event_id=self.next_event_id,
queue=list(self.queue),
virtual_events=self.virtual_events)
@classmethod
def from_dict(cls, d):
ret = cls(d['id'])
ret.next_event_id = d['next_event_id']
ret.queue = deque(d['queue'])
ret.virtual_events = d["virtual_events"]
return ret
def push(self, event):
event['id'] = self.next_event_id
self.next_event_id += 1
self.queue.append(event)
full_event_type = compute_full_event_type(event)
if full_event_type in VIRTUAL_EVENT_TYPES:
if full_event_type not in self.virtual_events:
self.virtual_events[full_event_type] = copy.deepcopy(event)
return
# Update the virtual event with the values from the event
virtual_event = self.virtual_events[full_event_type]
virtual_event["id"] = event["id"]
if "timestamp" in event:
virtual_event["timestamp"] = event["timestamp"]
if full_event_type == "pointer":
virtual_event["pointer"] = event["pointer"]
elif full_event_type == "read/add":
virtual_event["messages"] += event["messages"]
else:
self.queue.append(event)
def pop(self):
return self.queue.popleft()
@ -175,7 +206,29 @@ class EventQueue(object):
self.pop()
def contents(self):
return list(self.queue)
contents = []
virtual_id_map = {}
for event_type in self.virtual_events:
virtual_id_map[self.virtual_events[event_type]["id"]] = self.virtual_events[event_type]
virtual_ids = sorted(list(virtual_id_map.keys()))
# Merge the virtual events into their final place in the queue
index = 0
length = len(virtual_ids)
for event in self.queue:
if index >= length or event["id"] < virtual_ids[index]:
contents.append(event)
continue
while index < length and virtual_ids[index] < event["id"]:
contents.append(virtual_id_map[virtual_ids[index]])
index += 1
while index < length:
contents.append(virtual_id_map[virtual_ids[index]])
index += 1
self.virtual_events = {}
self.queue = deque(contents)
return contents
# maps queue ids to client descriptors
clients = {}