stream_topic_history: Update history to only store acked message ids.

Previously, the "stream_topic_history" used to store unacked float
message ids as well in its "max_message_id" of stream and
"message_id" of topic histories.

This commit updates it to rather store only the acked message ids
here, and rather use the "echo_state" module so as to look up
for unacked messages in case of looking for recent topics, or
max message id in functions.
This commit is contained in:
roanster007 2024-08-03 17:46:14 +05:30 committed by Tim Abbott
parent da75fed5be
commit 81efecf463
7 changed files with 139 additions and 17 deletions

View File

@ -451,6 +451,17 @@ export function reify_message_id(local_id: string, server_id: number): void {
update_message_lists(opts);
compose_notifications.reify_message_id(opts);
recent_view_data.reify_message_id_if_available(opts);
// We add the message to stream_topic_history only after we receive
// it from the server i.e., is acked, so as to maintain integer
// message id values there.
if (message.type === "stream") {
stream_topic_history.add_message({
stream_id: message.stream_id,
topic_name: message.topic,
message_id: message.id,
});
}
}
export function update_message_lists({old_id, new_id}: {old_id: number; new_id: number}): void {

View File

@ -1,3 +1,5 @@
import assert from "minimalistic-assert";
import type {Message} from "./message_store";
const waiting_for_id = new Map<string, Message>();
@ -31,3 +33,24 @@ export function _patch_waiting_for_ack(data: Map<string, Message>): void {
// Only for testing
waiting_for_ack = data;
}
export function get_waiting_for_ack_local_ids_by_topic(channel_id: number): Map<string, number> {
const max_message_id_by_topic = new Map<string, number>();
const channel_messages_waiting_for_ack = [...waiting_for_ack.values()].filter(
(message) => message.type === "stream" && message.stream_id === channel_id,
);
for (const message of channel_messages_waiting_for_ack) {
assert(message.type === "stream");
const topic = message.topic;
const existing_id = max_message_id_by_topic.get(topic);
// Here we're accessing message.id === float(message.local_id),
// since these are all local message IDs.
if (existing_id === undefined || message.id > existing_id) {
max_message_id_by_topic.set(topic, message.id);
}
}
return max_message_id_by_topic;
}

View File

@ -171,7 +171,9 @@ export function update_views_filtered_on_message_property(
}
export function insert_new_messages(messages, sent_by_this_client, deliver_locally) {
messages = messages.map((message) => message_helper.process_new_message(message));
messages = messages.map((message) =>
message_helper.process_new_message(message, deliver_locally),
);
const any_untracked_unread_messages = unread.process_loaded_messages(messages, false);
direct_message_group_data.process_loaded_messages(messages);

View File

@ -14,7 +14,7 @@ import * as stream_topic_history from "./stream_topic_history";
import * as user_status from "./user_status";
import * as util from "./util";
export function process_new_message(raw_message: RawMessage): Message {
export function process_new_message(raw_message: RawMessage, deliver_locally = false): Message {
// Call this function when processing a new message. After
// a message is processed and inserted into the message store
// cache, most modules use message_store.get to look at
@ -61,11 +61,18 @@ export function process_new_message(raw_message: RawMessage): Message {
topic = message_with_booleans.subject;
}
assert(topic !== undefined);
stream_topic_history.add_message({
stream_id: message_with_booleans.stream_id,
topic_name: topic,
message_id: message_with_booleans.id,
});
// We add fully delivered messages to stream_topic_history,
// being careful to not include locally echoed messages, which
// don't have permanent IDs and don't belong in that structure.
if (!deliver_locally) {
stream_topic_history.add_message({
stream_id: message_with_booleans.stream_id,
topic_name: topic,
message_id: message_with_booleans.id,
});
}
recent_senders.process_stream_message({
stream_id: message_with_booleans.stream_id,
topic,

View File

@ -1,6 +1,7 @@
import assert from "minimalistic-assert";
import {all_messages_data} from "./all_messages_data";
import * as echo_state from "./echo_state";
import {FoldDict} from "./fold_dict";
import * as message_util from "./message_util";
import * as sub_store from "./sub_store";
@ -229,26 +230,45 @@ export class PerStreamHistory {
}
get_recent_topic_names(): string[] {
// Combines several data sources to produce a complete picture
// of topics the client knows about.
//
// This data source is this module's own data structures.
const my_recents = [...this.topics.values()];
/* Add any older topics with unreads that may not be present
* in our local cache. */
// This data source is older topics that we know exist because
// we have unread messages in the topic, even if we don't have
// any messages from the topic in our local cache.
const missing_topics = unread.get_missing_topics({
stream_id: this.stream_id,
topic_dict: this.topics,
});
const recents = [...my_recents, ...missing_topics];
// This data source is locally echoed messages, which should
// are treated as newer than all delivered messages.
const local_echo_topics = [
...echo_state.get_waiting_for_ack_local_ids_by_topic(this.stream_id).entries(),
].map(([topic, local_id]) => ({pretty_name: topic, message_id: local_id}));
const local_echo_set = new Set<string>(
local_echo_topics.map((message_topic) => message_topic.pretty_name.toLowerCase()),
);
recents.sort((a, b) => b.message_id - a.message_id);
const names = recents.map((obj) => obj.pretty_name);
return names;
// We first sort the topics without locally echoed messages,
// and then prepend topics with locally echoed messages.
const server_topics = [...my_recents, ...missing_topics].filter(
(message_topic) => !local_echo_set.has(message_topic.pretty_name.toLowerCase()),
);
server_topics.sort((a, b) => b.message_id - a.message_id);
return [...local_echo_topics, ...server_topics].map((obj) => obj.pretty_name);
}
get_max_message_id(): number {
return this.max_message_id;
// TODO: We probably want to migrate towards this function
// ignoring locally echoed messages, and thus returning an integer.
const unacked_message_ids_in_stream = [
...echo_state.get_waiting_for_ack_local_ids_by_topic(this.stream_id).values(),
];
const max_message_id = Math.max(...unacked_message_ids_in_stream, this.max_message_id);
return max_message_id;
}
}

View File

@ -71,6 +71,7 @@ const echo = zrequire("echo");
const echo_state = zrequire("echo_state");
const people = zrequire("people");
const stream_data = zrequire("stream_data");
const stream_topic_history = zrequire("stream_topic_history");
const general_sub = {
stream_id: 101,
@ -363,10 +364,12 @@ run_test("test reify_message_id", ({override}) => {
const local_id_float = 103.01;
override(markdown, "render", noop);
override(markdown, "get_topic_links", noop);
const message_request = {
type: "stream",
stream_id: general_sub.stream_id,
topic: "test",
sender_email: "iago@zulip.com",
sender_full_name: "Iago",
sender_id: 123,
@ -392,6 +395,10 @@ run_test("test reify_message_id", ({override}) => {
assert.ok(message_store_reify_called);
assert.ok(notifications_reify_called);
const history = stream_topic_history.find_or_create(general_sub.stream_id);
assert.equal(history.max_message_id, 110);
assert.equal(history.topics.get("test").message_id, 110);
});
run_test("reset MockDate", () => {

View File

@ -9,6 +9,7 @@ const channel = mock_esm("../src/channel");
const message_util = mock_esm("../src/message_util");
const all_messages_data = zrequire("all_messages_data");
const echo_state = zrequire("echo_state");
const unread = zrequire("unread");
const message_store = zrequire("message_store");
const stream_data = zrequire("stream_data");
@ -469,3 +470,54 @@ test("ask_server_for_latest_topic_data", () => {
assert.deepEqual(history, ["Topic1"]);
assert.deepEqual(max_message_id, 102);
});
// Test when a local unacked message is sent, then get_max_message_id would also
// consider this unacked message. However, the unacked message is not added to
// max_message_id of stream, or message_id of topic histories.
test("test_max_message_ids_in_channel_and_topics", () => {
const general_sub = {
stream_id: 101,
name: "general",
subscribed: true,
};
const history = stream_topic_history.find_or_create(general_sub.stream_id);
stream_topic_history.add_message({
stream_id: general_sub.stream_id,
message_id: 45,
topic_name: "topic 1",
});
assert.equal(stream_topic_history.get_max_message_id(general_sub.stream_id), 45);
assert.equal(history.max_message_id, 45);
stream_topic_history.add_message({
stream_id: general_sub.stream_id,
message_id: 47,
topic_name: "topic 1",
});
assert.equal(stream_topic_history.get_max_message_id(general_sub.stream_id), 47);
assert.equal(history.max_message_id, 47);
const local_message = {
type: "stream",
stream_id: general_sub.stream_id,
topic: "topic 2",
sender_email: "iago@zulip.com",
sender_full_name: "Iago",
sender_id: 123,
id: 49.01,
};
echo_state.set_message_waiting_for_ack("49.01", local_message);
assert.equal(stream_topic_history.get_max_message_id(general_sub.stream_id), 49.01);
assert.equal(history.max_message_id, 47);
assert.equal(history.topics.get("topic 2"), undefined);
assert.deepEqual(stream_topic_history.get_recent_topic_names(general_sub.stream_id), [
"topic 2",
"topic 1",
]);
});