mirror of https://github.com/zulip/zulip.git
export: Make sure messages are sorted **across** files.
We now ensure that all message ids are sorted BEFORE we split them into batches. We now do a few extra "slim" queries to get message ids up front. But, now, when we divide them into batches, we no longer run 2 or 3 different complicated queries in a loop. We just basically hydrate our message ids, so `write_message_partials` should be easy to reason about. This change also means that for tiny realms with < 1000 messages you will always have just one json file, since we aggregate the ids from the queries before batching.
This commit is contained in:
parent
cef0e11816
commit
8f991f8eb1
|
@ -1248,7 +1248,7 @@ def export_partial_message_files(
|
|||
messages_we_received = Message.objects.filter(
|
||||
sender__in=ids_of_our_possible_senders,
|
||||
recipient__in=recipient_ids_for_us,
|
||||
).order_by("id")
|
||||
)
|
||||
|
||||
# For the public stream export, we only need the messages those streams received.
|
||||
message_queries = [
|
||||
|
@ -1261,7 +1261,7 @@ def export_partial_message_files(
|
|||
messages_we_received = Message.objects.filter(
|
||||
sender__in=ids_of_our_possible_senders,
|
||||
recipient__in=recipient_ids_for_us,
|
||||
).order_by("id")
|
||||
)
|
||||
|
||||
# The above query is missing some messages that consenting
|
||||
# users have access to, namely, PMs sent by one of the users
|
||||
|
@ -1280,7 +1280,7 @@ def export_partial_message_files(
|
|||
messages_we_sent_to_them = Message.objects.filter(
|
||||
sender__in=consented_user_ids,
|
||||
recipient__in=recipient_ids_for_them,
|
||||
).order_by("id")
|
||||
)
|
||||
|
||||
message_queries = [
|
||||
messages_we_received,
|
||||
|
@ -1288,43 +1288,42 @@ def export_partial_message_files(
|
|||
]
|
||||
|
||||
all_message_ids: Set[int] = set()
|
||||
dump_file_id = 1
|
||||
|
||||
for message_query in message_queries:
|
||||
dump_file_id = write_message_partial_for_query(
|
||||
realm=realm,
|
||||
message_query=message_query,
|
||||
dump_file_id=dump_file_id,
|
||||
all_message_ids=all_message_ids,
|
||||
output_dir=output_dir,
|
||||
user_profile_ids=user_ids_for_us,
|
||||
chunk_size=chunk_size,
|
||||
)
|
||||
message_ids = set(get_id_list_gently_from_database(base_query=message_query, id_field="id"))
|
||||
|
||||
# We expect our queries to be disjoint, although this assertion
|
||||
# isn't strictly necessary if you don't mind a little bit of
|
||||
# overhead.
|
||||
assert len(message_ids.intersection(all_message_ids)) == 0
|
||||
|
||||
all_message_ids |= message_ids
|
||||
|
||||
message_id_chunks = chunkify(sorted(list(all_message_ids)), chunk_size=MESSAGE_BATCH_CHUNK_SIZE)
|
||||
|
||||
write_message_partials(
|
||||
realm=realm,
|
||||
message_id_chunks=message_id_chunks,
|
||||
output_dir=output_dir,
|
||||
user_profile_ids=user_ids_for_us,
|
||||
)
|
||||
|
||||
return all_message_ids
|
||||
|
||||
|
||||
def write_message_partial_for_query(
|
||||
def write_message_partials(
|
||||
*,
|
||||
realm: Realm,
|
||||
message_query: Any,
|
||||
dump_file_id: int,
|
||||
all_message_ids: Set[int],
|
||||
message_id_chunks: List[List[int]],
|
||||
output_dir: Path,
|
||||
user_profile_ids: Set[int],
|
||||
chunk_size: int = MESSAGE_BATCH_CHUNK_SIZE,
|
||||
) -> int:
|
||||
min_id = -1
|
||||
) -> None:
|
||||
|
||||
while True:
|
||||
actual_query = message_query.filter(id__gt=min_id)[0:chunk_size]
|
||||
dump_file_id = 1
|
||||
|
||||
for message_id_chunk in message_id_chunks:
|
||||
actual_query = Message.objects.filter(id__in=message_id_chunk).order_by("id")
|
||||
message_chunk = make_raw(actual_query)
|
||||
message_ids = {m["id"] for m in message_chunk}
|
||||
assert len(message_ids.intersection(all_message_ids)) == 0
|
||||
|
||||
all_message_ids.update(message_ids)
|
||||
|
||||
if len(message_chunk) == 0:
|
||||
break
|
||||
|
||||
# Figure out the name of our shard file.
|
||||
message_filename = os.path.join(output_dir, f"messages-{dump_file_id:06}.json")
|
||||
|
@ -1347,11 +1346,8 @@ def write_message_partial_for_query(
|
|||
|
||||
# And write the data.
|
||||
write_data_to_file(message_filename, output)
|
||||
min_id = max(message_ids)
|
||||
dump_file_id += 1
|
||||
|
||||
return dump_file_id
|
||||
|
||||
|
||||
def export_uploads_and_avatars(
|
||||
realm: Realm, *, user: Optional[UserProfile], output_dir: Path
|
||||
|
@ -2058,7 +2054,7 @@ def get_id_list_gently_from_database(*, base_query: Any, id_field: str) -> List[
|
|||
all_ids = []
|
||||
batch_size = 10000 # we are just getting ints
|
||||
|
||||
assert id_field.endswith("_id")
|
||||
assert id_field == "id" or id_field.endswith("_id")
|
||||
|
||||
while True:
|
||||
filter_args = {f"{id_field}__gt": min_id}
|
||||
|
|
|
@ -142,12 +142,7 @@ class ImportExportTest(ZulipTestCase):
|
|||
result["realm"] = read_file("realm.json")
|
||||
result["attachment"] = read_file("attachment.json")
|
||||
result["message"] = read_file("messages-000001.json")
|
||||
try:
|
||||
message = read_file("messages-000002.json")
|
||||
result["message"]["zerver_usermessage"].extend(message["zerver_usermessage"])
|
||||
result["message"]["zerver_message"].extend(message["zerver_message"])
|
||||
except FileNotFoundError:
|
||||
pass
|
||||
# TODO: generate 1001+ test messages to exercise messages-000002.json
|
||||
result["uploads_dir"] = os.path.join(output_dir, "uploads")
|
||||
result["uploads_dir_records"] = read_file(os.path.join("uploads", "records.json"))
|
||||
result["emoji_dir"] = os.path.join(output_dir, "emoji")
|
||||
|
|
Loading…
Reference in New Issue