import_realm: Speed up first_message_id calculation.

On my data (about 10 million messages in 1600 streams) this used to take
about 40 hours, while the improved statement completes in roughly 30
seconds.

The old solution had postgres go through the entire table until the
first match for each stream. Thus, the time spent scanning the table
got longer and longer for each stream because postgres always started at
the beginning (and somehow it did not use any indices) and had to skip
over all rows until it found the first message from the stream that is
was looking for each time.

This new statement just performans a bulk operation, scanning the table
only once and then inserts the results directly into the destination
table.

Slightly more verbose inforation about this change can be found in:
https://chat.zulip.org/#narrow/stream/31-production-help/topic/Import.20Rocketchat.20data/near/1408867

Signed-off-by: Florian Pritz <bluewind@xinu.at>
This commit is contained in:
Florian Pritz 2022-09-28 17:58:23 +02:00 committed by Tim Abbott
parent ebd4181ae0
commit f37ac80384
1 changed files with 15 additions and 8 deletions

View File

@ -1328,14 +1328,21 @@ def do_import_realm(import_dir: Path, subdomain: str, processes: int = 1) -> Rea
bulk_import_model(data, Reaction)
# Similarly, we need to recalculate the first_message_id for stream objects.
for stream in Stream.objects.filter(realm=realm):
recipient = Recipient.objects.get(type=Recipient.STREAM, type_id=stream.id)
first_message = Message.objects.filter(recipient=recipient).first()
if first_message is None:
stream.first_message_id = None
else:
stream.first_message_id = first_message.id
stream.save(update_fields=["first_message_id"])
update_first_message_id_query = """
UPDATE zerver_stream
SET first_message_id = subquery.first_message_id
FROM (
SELECT r.type_id id, min(m.id) first_message_id
FROM zerver_message m
JOIN zerver_recipient r ON
r.id = m.recipient_id
WHERE r.type = 2
GROUP BY r.type_id
) AS subquery
WHERE zerver_stream.id = subquery.id
"""
with connection.cursor() as cursor:
cursor.execute(update_first_message_id_query)
if "zerver_userstatus" in data:
fix_datetime_fields(data, "zerver_userstatus")