diff --git a/web/src/echo.ts b/web/src/echo.ts index 2841862575..9d91a7e2ff 100644 --- a/web/src/echo.ts +++ b/web/src/echo.ts @@ -451,6 +451,17 @@ export function reify_message_id(local_id: string, server_id: number): void { update_message_lists(opts); compose_notifications.reify_message_id(opts); recent_view_data.reify_message_id_if_available(opts); + + // We add the message to stream_topic_history only after we receive + // it from the server i.e., is acked, so as to maintain integer + // message id values there. + if (message.type === "stream") { + stream_topic_history.add_message({ + stream_id: message.stream_id, + topic_name: message.topic, + message_id: message.id, + }); + } } export function update_message_lists({old_id, new_id}: {old_id: number; new_id: number}): void { diff --git a/web/src/echo_state.ts b/web/src/echo_state.ts index 0406926004..83c49b8971 100644 --- a/web/src/echo_state.ts +++ b/web/src/echo_state.ts @@ -1,3 +1,5 @@ +import assert from "minimalistic-assert"; + import type {Message} from "./message_store"; const waiting_for_id = new Map(); @@ -31,3 +33,24 @@ export function _patch_waiting_for_ack(data: Map): void { // Only for testing waiting_for_ack = data; } + +export function get_waiting_for_ack_local_ids_by_topic(channel_id: number): Map { + const max_message_id_by_topic = new Map(); + + const channel_messages_waiting_for_ack = [...waiting_for_ack.values()].filter( + (message) => message.type === "stream" && message.stream_id === channel_id, + ); + + for (const message of channel_messages_waiting_for_ack) { + assert(message.type === "stream"); + const topic = message.topic; + const existing_id = max_message_id_by_topic.get(topic); + + // Here we're accessing message.id === float(message.local_id), + // since these are all local message IDs. + if (existing_id === undefined || message.id > existing_id) { + max_message_id_by_topic.set(topic, message.id); + } + } + return max_message_id_by_topic; +} diff --git a/web/src/message_events.js b/web/src/message_events.js index 02a1240fdc..b225e14121 100644 --- a/web/src/message_events.js +++ b/web/src/message_events.js @@ -171,7 +171,9 @@ export function update_views_filtered_on_message_property( } export function insert_new_messages(messages, sent_by_this_client, deliver_locally) { - messages = messages.map((message) => message_helper.process_new_message(message)); + messages = messages.map((message) => + message_helper.process_new_message(message, deliver_locally), + ); const any_untracked_unread_messages = unread.process_loaded_messages(messages, false); direct_message_group_data.process_loaded_messages(messages); diff --git a/web/src/message_helper.ts b/web/src/message_helper.ts index 472bcfdd57..ca11347306 100644 --- a/web/src/message_helper.ts +++ b/web/src/message_helper.ts @@ -14,7 +14,7 @@ import * as stream_topic_history from "./stream_topic_history"; import * as user_status from "./user_status"; import * as util from "./util"; -export function process_new_message(raw_message: RawMessage): Message { +export function process_new_message(raw_message: RawMessage, deliver_locally = false): Message { // Call this function when processing a new message. After // a message is processed and inserted into the message store // cache, most modules use message_store.get to look at @@ -61,11 +61,18 @@ export function process_new_message(raw_message: RawMessage): Message { topic = message_with_booleans.subject; } assert(topic !== undefined); - stream_topic_history.add_message({ - stream_id: message_with_booleans.stream_id, - topic_name: topic, - message_id: message_with_booleans.id, - }); + + // We add fully delivered messages to stream_topic_history, + // being careful to not include locally echoed messages, which + // don't have permanent IDs and don't belong in that structure. + if (!deliver_locally) { + stream_topic_history.add_message({ + stream_id: message_with_booleans.stream_id, + topic_name: topic, + message_id: message_with_booleans.id, + }); + } + recent_senders.process_stream_message({ stream_id: message_with_booleans.stream_id, topic, diff --git a/web/src/stream_topic_history.ts b/web/src/stream_topic_history.ts index dc3157b81b..436160cfc1 100644 --- a/web/src/stream_topic_history.ts +++ b/web/src/stream_topic_history.ts @@ -1,6 +1,7 @@ import assert from "minimalistic-assert"; import {all_messages_data} from "./all_messages_data"; +import * as echo_state from "./echo_state"; import {FoldDict} from "./fold_dict"; import * as message_util from "./message_util"; import * as sub_store from "./sub_store"; @@ -229,26 +230,45 @@ export class PerStreamHistory { } get_recent_topic_names(): string[] { + // Combines several data sources to produce a complete picture + // of topics the client knows about. + // + // This data source is this module's own data structures. const my_recents = [...this.topics.values()]; - - /* Add any older topics with unreads that may not be present - * in our local cache. */ + // This data source is older topics that we know exist because + // we have unread messages in the topic, even if we don't have + // any messages from the topic in our local cache. const missing_topics = unread.get_missing_topics({ stream_id: this.stream_id, topic_dict: this.topics, }); - const recents = [...my_recents, ...missing_topics]; + // This data source is locally echoed messages, which should + // are treated as newer than all delivered messages. + const local_echo_topics = [ + ...echo_state.get_waiting_for_ack_local_ids_by_topic(this.stream_id).entries(), + ].map(([topic, local_id]) => ({pretty_name: topic, message_id: local_id})); + const local_echo_set = new Set( + local_echo_topics.map((message_topic) => message_topic.pretty_name.toLowerCase()), + ); - recents.sort((a, b) => b.message_id - a.message_id); - - const names = recents.map((obj) => obj.pretty_name); - - return names; + // We first sort the topics without locally echoed messages, + // and then prepend topics with locally echoed messages. + const server_topics = [...my_recents, ...missing_topics].filter( + (message_topic) => !local_echo_set.has(message_topic.pretty_name.toLowerCase()), + ); + server_topics.sort((a, b) => b.message_id - a.message_id); + return [...local_echo_topics, ...server_topics].map((obj) => obj.pretty_name); } get_max_message_id(): number { - return this.max_message_id; + // TODO: We probably want to migrate towards this function + // ignoring locally echoed messages, and thus returning an integer. + const unacked_message_ids_in_stream = [ + ...echo_state.get_waiting_for_ack_local_ids_by_topic(this.stream_id).values(), + ]; + const max_message_id = Math.max(...unacked_message_ids_in_stream, this.max_message_id); + return max_message_id; } } diff --git a/web/tests/echo.test.js b/web/tests/echo.test.js index 39555fd1d3..e200738424 100644 --- a/web/tests/echo.test.js +++ b/web/tests/echo.test.js @@ -71,6 +71,7 @@ const echo = zrequire("echo"); const echo_state = zrequire("echo_state"); const people = zrequire("people"); const stream_data = zrequire("stream_data"); +const stream_topic_history = zrequire("stream_topic_history"); const general_sub = { stream_id: 101, @@ -363,10 +364,12 @@ run_test("test reify_message_id", ({override}) => { const local_id_float = 103.01; override(markdown, "render", noop); + override(markdown, "get_topic_links", noop); const message_request = { type: "stream", stream_id: general_sub.stream_id, + topic: "test", sender_email: "iago@zulip.com", sender_full_name: "Iago", sender_id: 123, @@ -392,6 +395,10 @@ run_test("test reify_message_id", ({override}) => { assert.ok(message_store_reify_called); assert.ok(notifications_reify_called); + + const history = stream_topic_history.find_or_create(general_sub.stream_id); + assert.equal(history.max_message_id, 110); + assert.equal(history.topics.get("test").message_id, 110); }); run_test("reset MockDate", () => { diff --git a/web/tests/stream_topic_history.test.js b/web/tests/stream_topic_history.test.js index 42b33b95a2..c6ef56fa00 100644 --- a/web/tests/stream_topic_history.test.js +++ b/web/tests/stream_topic_history.test.js @@ -9,6 +9,7 @@ const channel = mock_esm("../src/channel"); const message_util = mock_esm("../src/message_util"); const all_messages_data = zrequire("all_messages_data"); +const echo_state = zrequire("echo_state"); const unread = zrequire("unread"); const message_store = zrequire("message_store"); const stream_data = zrequire("stream_data"); @@ -469,3 +470,54 @@ test("ask_server_for_latest_topic_data", () => { assert.deepEqual(history, ["Topic1"]); assert.deepEqual(max_message_id, 102); }); + +// Test when a local unacked message is sent, then get_max_message_id would also +// consider this unacked message. However, the unacked message is not added to +// max_message_id of stream, or message_id of topic histories. +test("test_max_message_ids_in_channel_and_topics", () => { + const general_sub = { + stream_id: 101, + name: "general", + subscribed: true, + }; + + const history = stream_topic_history.find_or_create(general_sub.stream_id); + + stream_topic_history.add_message({ + stream_id: general_sub.stream_id, + message_id: 45, + topic_name: "topic 1", + }); + + assert.equal(stream_topic_history.get_max_message_id(general_sub.stream_id), 45); + assert.equal(history.max_message_id, 45); + + stream_topic_history.add_message({ + stream_id: general_sub.stream_id, + message_id: 47, + topic_name: "topic 1", + }); + + assert.equal(stream_topic_history.get_max_message_id(general_sub.stream_id), 47); + assert.equal(history.max_message_id, 47); + + const local_message = { + type: "stream", + stream_id: general_sub.stream_id, + topic: "topic 2", + sender_email: "iago@zulip.com", + sender_full_name: "Iago", + sender_id: 123, + id: 49.01, + }; + echo_state.set_message_waiting_for_ack("49.01", local_message); + + assert.equal(stream_topic_history.get_max_message_id(general_sub.stream_id), 49.01); + assert.equal(history.max_message_id, 47); + assert.equal(history.topics.get("topic 2"), undefined); + + assert.deepEqual(stream_topic_history.get_recent_topic_names(general_sub.stream_id), [ + "topic 2", + "topic 1", + ]); +});