mirror of https://github.com/zulip/zulip.git
refactor: Simplify recent_senders code.
This reduces our dependency on message_list code (via message_util), and it makes moving streams/topics and deleting messages more performant. For every single message that was being updated or deleted, the previous code was basically re-computing lots of things, including having to iterate through every message in memory to find the messages matching your topic. Now everything basically happens in O(1) time. The only O(N) computation is that we now lazily re-compute the max message id every time you need it for typeahead logic, and then we cache it for subsequent use. The N here is the number of messages that the particular sender has sent to the particular stream/topic combination, so it should always be quite small, except for certain spammy bots. Once the max has been calculated, the common operation of adding a message doesn't invalidate our cached value. We only invalidate the cache on deletes. The main change that we make here from a data standpoint is that we just keep track of all message_ids for all senders. The storage overhead here should be negligible. By keeping track of our own messages, we don't have to punt to other code for update/delete situations. There is similar code in recent_topics that I think can be improved in similar ways, and it would allow us to eliminate functions like this one: export function get_messages_in_topic(stream_id, topic) { return message_list.all .all_messages() .filter( (x) => x.type === "stream" && x.stream_id === stream_id && x.topic.toLowerCase() === topic.toLowerCase(), ); }
This commit is contained in:
parent
007a5bb95e
commit
2126478867
|
@ -23,13 +23,6 @@ function make_stream_message({stream_id, topic, sender_id}) {
|
|||
return message;
|
||||
}
|
||||
|
||||
const {all_messages_data} = mock_esm("../../static/js/all_messages_data", {
|
||||
all_messages_data: {
|
||||
all_messages() {
|
||||
return Array.from(messages.values());
|
||||
},
|
||||
},
|
||||
});
|
||||
mock_esm("../../static/js/message_store", {
|
||||
get: (message_id) => messages.get(message_id),
|
||||
});
|
||||
|
@ -46,7 +39,64 @@ function test(label, f) {
|
|||
});
|
||||
}
|
||||
|
||||
test("process_message_for_senders", (override) => {
|
||||
test("IdTracker", () => {
|
||||
const id_tracker = new rs.IdTracker();
|
||||
|
||||
function test_add(id, expected_max_id) {
|
||||
id_tracker.add(id);
|
||||
assert.equal(id_tracker.max_id(), expected_max_id);
|
||||
}
|
||||
|
||||
test_add(5, 5);
|
||||
test_add(7, 7);
|
||||
test_add(3, 7);
|
||||
test_add(10, 10);
|
||||
test_add(12, 12);
|
||||
test_add(11, 12);
|
||||
|
||||
function test_remove(id, expected_max_id) {
|
||||
id_tracker.remove(id);
|
||||
assert.equal(id_tracker.max_id(), expected_max_id);
|
||||
}
|
||||
|
||||
test_remove(10, 12);
|
||||
test_remove(999999, 12); // bogus id has no effect
|
||||
test_remove(3, 12);
|
||||
test_remove(12, 11);
|
||||
|
||||
test_add(3, 11);
|
||||
test_add(7, 11);
|
||||
test_add(13, 13);
|
||||
|
||||
test_remove(3, 13);
|
||||
test_remove(13, 11);
|
||||
});
|
||||
|
||||
test("noop process_topic_edit", () => {
|
||||
// Just get line coverage on defensive code.
|
||||
const bogus_ids = [333, 444];
|
||||
rs.process_topic_edit({message_ids: bogus_ids});
|
||||
});
|
||||
|
||||
test("update_topics_of_deleted_message_ids", () => {
|
||||
// Just get line coverage on defensive code.
|
||||
const stream_id = 555;
|
||||
const topic = "whatever";
|
||||
const sender_id = 999;
|
||||
|
||||
const message = make_stream_message({
|
||||
stream_id,
|
||||
topic,
|
||||
sender_id,
|
||||
});
|
||||
rs.update_topics_of_deleted_message_ids([message.id]);
|
||||
assert.deepEqual(rs.get_topic_recent_senders(stream_id, topic), []);
|
||||
|
||||
rs.process_message_for_senders(message);
|
||||
assert.deepEqual(rs.get_topic_recent_senders(stream_id, topic), [sender_id]);
|
||||
});
|
||||
|
||||
test("process_message_for_senders", () => {
|
||||
const stream1 = 1;
|
||||
const stream2 = 2;
|
||||
const stream3 = 3;
|
||||
|
@ -182,42 +232,81 @@ test("process_message_for_senders", (override) => {
|
|||
// message7's topic was changed by user
|
||||
message7.topic = topic3;
|
||||
|
||||
rs.process_topic_edit(stream3, topic2, topic3);
|
||||
rs.process_topic_edit({
|
||||
message_ids: [message7.id],
|
||||
old_stream_id: stream3,
|
||||
new_stream_id: stream3,
|
||||
old_topic: topic2,
|
||||
new_topic: topic3,
|
||||
});
|
||||
|
||||
assert.equal(rs.get_topic_recent_senders(stream3, topic3).toString(), "2,3");
|
||||
assert.equal(rs.get_topic_recent_senders(stream3, topic2).toString(), "3");
|
||||
|
||||
// Test stream change
|
||||
assert.equal(rs.get_topic_recent_senders(stream3, topic3).toString(), "2,3");
|
||||
assert.equal(rs.get_topic_recent_senders(stream4, topic3).toString(), "");
|
||||
// stream of topic3 was changed to stream4.
|
||||
message7.stream_id = stream4; // message7's topic is topic3
|
||||
|
||||
message7.stream_id = stream4;
|
||||
message8.stream_id = stream4;
|
||||
rs.process_topic_edit(stream3, topic3, topic3, stream4);
|
||||
rs.process_topic_edit({
|
||||
message_ids: [message7.id, message8.id],
|
||||
old_stream_id: stream3,
|
||||
new_stream_id: stream4,
|
||||
old_topic: topic3,
|
||||
new_topic: topic3,
|
||||
});
|
||||
|
||||
assert.equal(rs.get_topic_recent_senders(stream3, topic3).toString(), "");
|
||||
assert.equal(rs.get_topic_recent_senders(stream4, topic3).toString(), "2,3");
|
||||
|
||||
// Test stream & topic change
|
||||
assert.equal(rs.get_topic_recent_senders(stream4, topic3).toString(), "2,3");
|
||||
assert.equal(rs.get_topic_recent_senders(stream5, topic4).toString(), "");
|
||||
// stream of topic3 was changed to stream5 and topic was changed to topic4.
|
||||
|
||||
message7.stream_id = stream5;
|
||||
message8.stream_id = stream5;
|
||||
message7.topic = topic4;
|
||||
|
||||
message8.stream_id = stream5;
|
||||
message8.topic = topic4;
|
||||
rs.process_topic_edit(stream4, topic3, topic4, stream5);
|
||||
|
||||
rs.process_topic_edit({
|
||||
message_ids: [message7.id, message8.id],
|
||||
old_stream_id: stream4,
|
||||
new_stream_id: stream5,
|
||||
old_topic: topic3,
|
||||
new_topic: topic4,
|
||||
});
|
||||
|
||||
assert.equal(rs.get_topic_recent_senders(stream4, topic3).toString(), "");
|
||||
assert.equal(rs.get_topic_recent_senders(stream5, topic4).toString(), "2,3");
|
||||
|
||||
const reduced_msgs = [message3, message4, message7, message8];
|
||||
|
||||
override(all_messages_data, "all_messages", () => reduced_msgs);
|
||||
assert.equal(rs.get_topic_recent_senders(stream1, topic1).toString(), "2,1");
|
||||
|
||||
// delete message1 and message5 sent by sender1
|
||||
rs.update_topics_of_deleted_message_ids([message1.id, message5.id]);
|
||||
assert.equal(rs.get_topic_recent_senders(stream1, topic1).toString(), "2");
|
||||
|
||||
// test that we can remove again, harmlessly
|
||||
rs.update_topics_of_deleted_message_ids([message1.id, message5.id]);
|
||||
assert.equal(rs.get_topic_recent_senders(stream1, topic1).toString(), "2");
|
||||
|
||||
// remove some more senders
|
||||
rs.update_topics_of_deleted_message_ids([message2.id, message3.id, message4.id, message5.id]);
|
||||
assert.equal(rs.get_topic_recent_senders(stream1, topic1).toString(), "");
|
||||
|
||||
rs.update_topics_of_deleted_message_ids([message6.id, message7.id, message8.id, message9.id]);
|
||||
assert.equal(rs.get_topic_recent_senders(stream1, topic1).toString(), "");
|
||||
assert.equal(rs.get_topic_recent_senders(stream2, topic2).toString(), "");
|
||||
assert.equal(rs.get_topic_recent_senders(stream3, topic3).toString(), "");
|
||||
|
||||
// deleting an old message which isn't locally stored.
|
||||
// We are just testing that it doesn't raise an error;
|
||||
// no changes should take place in this case.
|
||||
rs.update_topics_of_deleted_message_ids([-1]);
|
||||
|
||||
// Comparing on a non-existent topic doesn't crash.
|
||||
assert.equal(
|
||||
rs.compare_by_recency({user_id: sender2}, {user_id: sender1}, stream3, "bogus") < 0,
|
||||
true,
|
||||
);
|
||||
});
|
||||
|
|
|
@ -384,7 +384,13 @@ export function update_messages(events) {
|
|||
post_edit_topic = pre_edit_topic;
|
||||
}
|
||||
const args = [event.stream_id, pre_edit_topic, post_edit_topic, new_stream_id];
|
||||
recent_senders.process_topic_edit(...args);
|
||||
recent_senders.process_topic_edit({
|
||||
message_ids: event.message_ids,
|
||||
old_stream_id: event.stream_id,
|
||||
old_topic: pre_edit_topic,
|
||||
new_stream_id,
|
||||
new_topic: post_edit_topic,
|
||||
});
|
||||
recent_topics.process_topic_edit(...args);
|
||||
}
|
||||
|
||||
|
|
|
@ -1,92 +1,172 @@
|
|||
import {FoldDict} from "./fold_dict";
|
||||
import * as message_util from "./message_util";
|
||||
import _ from "lodash";
|
||||
|
||||
// topic_senders[stream_id][topic_id][sender_id] = latest_message_id
|
||||
const topic_senders = new Map();
|
||||
// topic_senders[stream_id][sender_id] = latest_message_id
|
||||
import {FoldDict} from "./fold_dict";
|
||||
import * as message_store from "./message_store";
|
||||
|
||||
// This class is only exported for unit testing purposes.
|
||||
// If we find reuse opportunities, we should just put it into
|
||||
// its own module.
|
||||
export class IdTracker {
|
||||
ids = new Set();
|
||||
|
||||
// We cache the max message id to make sure that
|
||||
// typeahead code is efficient. We don't eagerly
|
||||
// compute it, since it's plausible a spammy bot
|
||||
// could cause us to process many messages at a time
|
||||
// during fetching.
|
||||
_cached_max_id = undefined;
|
||||
|
||||
add(id) {
|
||||
this.ids.add(id);
|
||||
if (this._cached_max_id !== undefined && id > this._cached_max_id) {
|
||||
this._cached_max_id = id;
|
||||
}
|
||||
}
|
||||
|
||||
remove(id) {
|
||||
this.ids.delete(id);
|
||||
this._cached_max_id = undefined;
|
||||
}
|
||||
|
||||
max_id() {
|
||||
if (this._cached_max_id === undefined) {
|
||||
this._cached_max_id = _.max(Array.from(this.ids));
|
||||
}
|
||||
return this._cached_max_id || -1;
|
||||
}
|
||||
|
||||
empty() {
|
||||
return this.ids.size === 0;
|
||||
}
|
||||
}
|
||||
|
||||
// topic_senders[stream_id][sender_id] = IdTracker
|
||||
const stream_senders = new Map();
|
||||
|
||||
// topic_senders[stream_id][topic_id][sender_id] = IdTracker
|
||||
const topic_senders = new Map();
|
||||
|
||||
export function clear_for_testing() {
|
||||
topic_senders.clear();
|
||||
stream_senders.clear();
|
||||
topic_senders.clear();
|
||||
}
|
||||
|
||||
function max_id_for_stream_topic_sender({stream_id, topic, sender_id}) {
|
||||
const topic_dict = topic_senders.get(stream_id);
|
||||
if (!topic_dict) {
|
||||
return -1;
|
||||
}
|
||||
const sender_dict = topic_dict.get(topic);
|
||||
if (!sender_dict) {
|
||||
return -1;
|
||||
}
|
||||
const id_tracker = sender_dict.get(sender_id);
|
||||
return id_tracker ? id_tracker.max_id() : -1;
|
||||
}
|
||||
|
||||
function max_id_for_stream_sender({stream_id, sender_id}) {
|
||||
const sender_dict = stream_senders.get(stream_id);
|
||||
if (!sender_dict) {
|
||||
return -1;
|
||||
}
|
||||
const id_tracker = sender_dict.get(sender_id);
|
||||
return id_tracker ? id_tracker.max_id() : -1;
|
||||
}
|
||||
|
||||
function add_stream_message({stream_id, sender_id, message_id}) {
|
||||
const sender_dict = stream_senders.get(stream_id) || new Map();
|
||||
const id_tracker = sender_dict.get(sender_id) || new IdTracker();
|
||||
stream_senders.set(stream_id, sender_dict);
|
||||
sender_dict.set(sender_id, id_tracker);
|
||||
id_tracker.add(message_id);
|
||||
}
|
||||
|
||||
function add_topic_message({stream_id, topic, sender_id, message_id}) {
|
||||
const topic_dict = topic_senders.get(stream_id) || new FoldDict();
|
||||
const sender_dict = topic_dict.get(topic) || new Map();
|
||||
const id_tracker = sender_dict.get(sender_id) || new IdTracker();
|
||||
topic_senders.set(stream_id, topic_dict);
|
||||
topic_dict.set(topic, sender_dict);
|
||||
sender_dict.set(sender_id, id_tracker);
|
||||
id_tracker.add(message_id);
|
||||
}
|
||||
|
||||
export function process_message_for_senders(message) {
|
||||
const stream_id = message.stream_id;
|
||||
const topic = message.topic;
|
||||
const sender_id = message.sender_id;
|
||||
const message_id = message.id;
|
||||
|
||||
// Process most recent sender to topic
|
||||
const topic_dict = topic_senders.get(stream_id) || new FoldDict();
|
||||
const topic_sender_message_ids = topic_dict.get(topic) || new Map();
|
||||
let old_message_id = topic_sender_message_ids.get(message.sender_id);
|
||||
|
||||
if (old_message_id === undefined || old_message_id < message.id) {
|
||||
topic_sender_message_ids.set(message.sender_id, message.id);
|
||||
}
|
||||
|
||||
topic_dict.set(topic, topic_sender_message_ids);
|
||||
topic_senders.set(stream_id, topic_dict);
|
||||
|
||||
// Process most recent sender to whole stream
|
||||
const sender_message_ids = stream_senders.get(stream_id) || new Map();
|
||||
old_message_id = sender_message_ids.get(message.sender_id);
|
||||
|
||||
if (old_message_id === undefined || old_message_id < message.id) {
|
||||
sender_message_ids.set(message.sender_id, message.id);
|
||||
}
|
||||
|
||||
stream_senders.set(stream_id, sender_message_ids);
|
||||
add_stream_message({stream_id, sender_id, message_id});
|
||||
add_topic_message({stream_id, topic, sender_id, message_id});
|
||||
}
|
||||
|
||||
export function process_topic_edit(old_stream_id, old_topic, new_topic, new_stream_id) {
|
||||
// When topic-editing occurs, we need to update the set of known
|
||||
// senders in each stream/topic pair. This is complicated by the
|
||||
// fact that the event we receive from the server does not
|
||||
// communicate which senders were present before-and-after; so our
|
||||
// strategy is to just rebuild the data structure for the topic
|
||||
// from message_store data.
|
||||
|
||||
// This removes the old topic_dict
|
||||
const old_topic_dict = topic_senders.get(old_stream_id);
|
||||
old_topic_dict.delete(old_topic);
|
||||
|
||||
// Re-processing every message in both the old and new topics is
|
||||
// expensive. It also potentially loses data, because
|
||||
// `all_messages_data` only has contiguous message history, not
|
||||
// the complete set of message IDs we've received to the
|
||||
// `message_store` from the server (E.g. from when we narrowed to
|
||||
// a stream). But it's the most correct implementation we can
|
||||
// sensibly do with existing data structures.
|
||||
const old_topic_msgs = message_util.get_messages_in_topic(old_stream_id, old_topic);
|
||||
for (const msg of old_topic_msgs) {
|
||||
process_message_for_senders(msg);
|
||||
function remove_topic_message({stream_id, topic, sender_id, message_id}) {
|
||||
const topic_dict = topic_senders.get(stream_id);
|
||||
if (!topic_dict) {
|
||||
return;
|
||||
}
|
||||
|
||||
// use new_stream_id if topic was moved to a new stream,
|
||||
// otherwise we just use old_stream_id, implying that
|
||||
// just topic was renamed.
|
||||
new_stream_id = new_stream_id || old_stream_id;
|
||||
const new_topic_msgs = message_util.get_messages_in_topic(new_stream_id, new_topic);
|
||||
for (const msg of new_topic_msgs) {
|
||||
process_message_for_senders(msg);
|
||||
const sender_dict = topic_dict.get(topic);
|
||||
|
||||
if (!sender_dict) {
|
||||
return;
|
||||
}
|
||||
|
||||
const id_tracker = sender_dict.get(sender_id);
|
||||
|
||||
if (!id_tracker) {
|
||||
return;
|
||||
}
|
||||
|
||||
id_tracker.remove(message_id);
|
||||
if (id_tracker.empty()) {
|
||||
sender_dict.delete(sender_id);
|
||||
}
|
||||
|
||||
if (sender_dict.size === 0) {
|
||||
topic_dict.delete(topic);
|
||||
}
|
||||
}
|
||||
|
||||
export function process_topic_edit({
|
||||
message_ids,
|
||||
old_stream_id,
|
||||
old_topic,
|
||||
new_stream_id,
|
||||
new_topic,
|
||||
}) {
|
||||
// Note that we don't delete anything from stream_senders here.
|
||||
// Our view is that it's probably better to not do so; users who
|
||||
// recently posted to a stream are relevant for typeahead even if
|
||||
// the messages were moved to another stream or deleted.
|
||||
|
||||
for (const message_id of message_ids) {
|
||||
const message = message_store.get(message_id);
|
||||
if (!message) {
|
||||
continue;
|
||||
}
|
||||
const sender_id = message.sender_id;
|
||||
|
||||
remove_topic_message({stream_id: old_stream_id, topic: old_topic, sender_id, message_id});
|
||||
add_topic_message({stream_id: new_stream_id, topic: new_topic, sender_id, message_id});
|
||||
|
||||
add_stream_message({stream_id: new_stream_id, sender_id, message_id});
|
||||
}
|
||||
}
|
||||
|
||||
export function update_topics_of_deleted_message_ids(message_ids) {
|
||||
const topics_to_update = message_util.get_topics_for_message_ids(message_ids);
|
||||
|
||||
for (const [stream_id, topic] of topics_to_update.values()) {
|
||||
const topic_dict = topic_senders.get(stream_id);
|
||||
topic_dict.delete(topic);
|
||||
const topic_msgs = message_util.get_messages_in_topic(stream_id, topic);
|
||||
for (const msg of topic_msgs) {
|
||||
process_message_for_senders(msg);
|
||||
for (const message_id of message_ids) {
|
||||
const message = message_store.get(message_id);
|
||||
if (!message) {
|
||||
continue;
|
||||
}
|
||||
|
||||
const stream_id = message.stream_id;
|
||||
const topic = message.topic;
|
||||
const sender_id = message.sender_id;
|
||||
|
||||
remove_topic_message({stream_id, topic, sender_id, message_id});
|
||||
}
|
||||
}
|
||||
|
||||
|
@ -94,31 +174,17 @@ export function compare_by_recency(user_a, user_b, stream_id, topic) {
|
|||
let a_message_id;
|
||||
let b_message_id;
|
||||
|
||||
const topic_dict = topic_senders.get(stream_id);
|
||||
if (topic !== undefined && topic_dict !== undefined) {
|
||||
const sender_message_ids = topic_dict.get(topic);
|
||||
if (sender_message_ids !== undefined) {
|
||||
b_message_id = sender_message_ids.get(user_b.user_id) || Number.NEGATIVE_INFINITY;
|
||||
a_message_id = sender_message_ids.get(user_a.user_id) || Number.NEGATIVE_INFINITY;
|
||||
a_message_id = max_id_for_stream_topic_sender({stream_id, topic, sender_id: user_a.user_id});
|
||||
b_message_id = max_id_for_stream_topic_sender({stream_id, topic, sender_id: user_b.user_id});
|
||||
|
||||
if (a_message_id !== b_message_id) {
|
||||
return b_message_id - a_message_id;
|
||||
}
|
||||
}
|
||||
if (a_message_id !== b_message_id) {
|
||||
return b_message_id - a_message_id;
|
||||
}
|
||||
|
||||
// Check recency for whole stream as tiebreaker
|
||||
const stream_dict = stream_senders.get(stream_id);
|
||||
if (stream_dict !== undefined) {
|
||||
b_message_id = stream_dict.get(user_b.user_id) || Number.NEGATIVE_INFINITY;
|
||||
a_message_id = stream_dict.get(user_a.user_id) || Number.NEGATIVE_INFINITY;
|
||||
a_message_id = max_id_for_stream_sender({stream_id, sender_id: user_a.user_id});
|
||||
b_message_id = max_id_for_stream_sender({stream_id, sender_id: user_b.user_id});
|
||||
|
||||
if (a_message_id !== b_message_id) {
|
||||
return b_message_id - a_message_id;
|
||||
}
|
||||
}
|
||||
|
||||
return 0;
|
||||
return b_message_id - a_message_id;
|
||||
}
|
||||
|
||||
export function get_topic_recent_senders(stream_id, topic) {
|
||||
|
@ -127,12 +193,18 @@ export function get_topic_recent_senders(stream_id, topic) {
|
|||
return [];
|
||||
}
|
||||
|
||||
const sender_message_ids = topic_dict.get(topic);
|
||||
if (sender_message_ids === undefined) {
|
||||
const sender_dict = topic_dict.get(topic);
|
||||
if (sender_dict === undefined) {
|
||||
return [];
|
||||
}
|
||||
|
||||
const sorted_senders = Array.from(sender_message_ids.entries()).sort((s1, s2) => s1[1] - s2[1]);
|
||||
function by_max_message_id(item1, item2) {
|
||||
const list1 = item1[1];
|
||||
const list2 = item2[1];
|
||||
return list1.max_id() - list2.max_id();
|
||||
}
|
||||
|
||||
const sorted_senders = Array.from(sender_dict.entries()).sort(by_max_message_id);
|
||||
const recent_senders = [];
|
||||
for (const item of sorted_senders) {
|
||||
recent_senders.push(item[0]);
|
||||
|
|
Loading…
Reference in New Issue