rocketchat: Complete metadata verification before importing uploads.

This is not the best factored version of this, but it saves effort
changing the tests, and importantly should make failures involving
metadata only take a couple seconds rather than first doing a giant
BSON read before learning about them.
This commit is contained in:
Tim Abbott 2024-10-17 11:28:35 -07:00
parent 79b6f43d0e
commit 6e4da50577
1 changed files with 99 additions and 66 deletions

View File

@ -977,25 +977,31 @@ def map_user_id_to_user(user_data_list: list[dict[str, Any]]) -> dict[str, dict[
return user_id_to_user_map return user_id_to_user_map
def rocketchat_data_to_dict(rocketchat_data_dir: str) -> dict[str, Any]: def rocketchat_data_to_dict(
rocketchat_data: dict[str, Any] = {} rocketchat_data_dir: str, sections: list[str] | None = None
rocketchat_data["instance"] = [] ) -> dict[str, Any]:
rocketchat_data["user"] = [] """Reads Rocket.Chat data from its BSON files for the requested sections of the
rocketchat_data["avatar"] = {"avatar": [], "file": [], "chunk": []} export. Defaults to fetching everything, which is convenient for tests, but
rocketchat_data["room"] = [] we prefer to fetch only those sections that are needed for a given stage to
rocketchat_data["message"] = [] provide a faster debug cycle for metadata data corruption issues.
rocketchat_data["custom_emoji"] = {"emoji": [], "file": [], "chunk": []}
rocketchat_data["upload"] = {"upload": [], "file": [], "chunk": []}
# Get instance TODO: Ideally, we'd read the big data sets, like messages and
uploads, with a streaming BSON parser, or pre-paginate the data.
"""
rocketchat_data: dict[str, Any] = {}
if sections is None or "instance" in sections:
rocketchat_data["instance"] = []
with open(os.path.join(rocketchat_data_dir, "instances.bson"), "rb") as fcache: with open(os.path.join(rocketchat_data_dir, "instances.bson"), "rb") as fcache:
rocketchat_data["instance"] = bson.decode_all(fcache.read(), bson_codec_options) rocketchat_data["instance"] = bson.decode_all(fcache.read(), bson_codec_options)
# Get user if sections is None or "user" in sections:
rocketchat_data["user"] = []
with open(os.path.join(rocketchat_data_dir, "users.bson"), "rb") as fcache: with open(os.path.join(rocketchat_data_dir, "users.bson"), "rb") as fcache:
rocketchat_data["user"] = bson.decode_all(fcache.read(), bson_codec_options) rocketchat_data["user"] = bson.decode_all(fcache.read(), bson_codec_options)
# Get avatar if sections is None or "avatar" in sections:
rocketchat_data["avatar"] = {"avatar": [], "file": [], "chunk": []}
with open(os.path.join(rocketchat_data_dir, "rocketchat_avatars.bson"), "rb") as fcache: with open(os.path.join(rocketchat_data_dir, "rocketchat_avatars.bson"), "rb") as fcache:
rocketchat_data["avatar"]["avatar"] = bson.decode_all(fcache.read(), bson_codec_options) rocketchat_data["avatar"]["avatar"] = bson.decode_all(fcache.read(), bson_codec_options)
@ -1003,23 +1009,32 @@ def rocketchat_data_to_dict(rocketchat_data_dir: str) -> dict[str, Any]:
with open( with open(
os.path.join(rocketchat_data_dir, "rocketchat_avatars.files.bson"), "rb" os.path.join(rocketchat_data_dir, "rocketchat_avatars.files.bson"), "rb"
) as fcache: ) as fcache:
rocketchat_data["avatar"]["file"] = bson.decode_all(fcache.read(), bson_codec_options) rocketchat_data["avatar"]["file"] = bson.decode_all(
fcache.read(), bson_codec_options
)
with open( with open(
os.path.join(rocketchat_data_dir, "rocketchat_avatars.chunks.bson"), "rb" os.path.join(rocketchat_data_dir, "rocketchat_avatars.chunks.bson"), "rb"
) as fcache: ) as fcache:
rocketchat_data["avatar"]["chunk"] = bson.decode_all(fcache.read(), bson_codec_options) rocketchat_data["avatar"]["chunk"] = bson.decode_all(
fcache.read(), bson_codec_options
)
# Get room if sections is None or "room" in sections:
rocketchat_data["room"] = []
with open(os.path.join(rocketchat_data_dir, "rocketchat_room.bson"), "rb") as fcache: with open(os.path.join(rocketchat_data_dir, "rocketchat_room.bson"), "rb") as fcache:
rocketchat_data["room"] = bson.decode_all(fcache.read(), bson_codec_options) rocketchat_data["room"] = bson.decode_all(fcache.read(), bson_codec_options)
# Get messages if sections is None or "message" in sections:
rocketchat_data["message"] = []
with open(os.path.join(rocketchat_data_dir, "rocketchat_message.bson"), "rb") as fcache: with open(os.path.join(rocketchat_data_dir, "rocketchat_message.bson"), "rb") as fcache:
rocketchat_data["message"] = bson.decode_all(fcache.read(), bson_codec_options) rocketchat_data["message"] = bson.decode_all(fcache.read(), bson_codec_options)
# Get custom emoji if sections is None or "custom_emoji" in sections:
with open(os.path.join(rocketchat_data_dir, "rocketchat_custom_emoji.bson"), "rb") as fcache: rocketchat_data["custom_emoji"] = {"emoji": [], "file": [], "chunk": []}
with open(
os.path.join(rocketchat_data_dir, "rocketchat_custom_emoji.bson"), "rb"
) as fcache:
rocketchat_data["custom_emoji"]["emoji"] = bson.decode_all( rocketchat_data["custom_emoji"]["emoji"] = bson.decode_all(
fcache.read(), bson_codec_options fcache.read(), bson_codec_options
) )
@ -1030,12 +1045,15 @@ def rocketchat_data_to_dict(rocketchat_data_dir: str) -> dict[str, Any]:
fcache.read(), bson_codec_options fcache.read(), bson_codec_options
) )
with open(os.path.join(rocketchat_data_dir, "custom_emoji.chunks.bson"), "rb") as fcache: with open(
os.path.join(rocketchat_data_dir, "custom_emoji.chunks.bson"), "rb"
) as fcache:
rocketchat_data["custom_emoji"]["chunk"] = bson.decode_all( rocketchat_data["custom_emoji"]["chunk"] = bson.decode_all(
fcache.read(), bson_codec_options fcache.read(), bson_codec_options
) )
# Get uploads if sections is None or "upload" in sections:
rocketchat_data["upload"] = {"upload": [], "file": [], "chunk": []}
with open(os.path.join(rocketchat_data_dir, "rocketchat_uploads.bson"), "rb") as fcache: with open(os.path.join(rocketchat_data_dir, "rocketchat_uploads.bson"), "rb") as fcache:
rocketchat_data["upload"]["upload"] = bson.decode_all(fcache.read(), bson_codec_options) rocketchat_data["upload"]["upload"] = bson.decode_all(fcache.read(), bson_codec_options)
@ -1043,28 +1061,35 @@ def rocketchat_data_to_dict(rocketchat_data_dir: str) -> dict[str, Any]:
with open( with open(
os.path.join(rocketchat_data_dir, "rocketchat_uploads.files.bson"), "rb" os.path.join(rocketchat_data_dir, "rocketchat_uploads.files.bson"), "rb"
) as fcache: ) as fcache:
rocketchat_data["upload"]["file"] = bson.decode_all(fcache.read(), bson_codec_options) rocketchat_data["upload"]["file"] = bson.decode_all(
fcache.read(), bson_codec_options
)
with open( with open(
os.path.join(rocketchat_data_dir, "rocketchat_uploads.chunks.bson"), "rb" os.path.join(rocketchat_data_dir, "rocketchat_uploads.chunks.bson"), "rb"
) as fcache: ) as fcache:
rocketchat_data["upload"]["chunk"] = bson.decode_all(fcache.read(), bson_codec_options) rocketchat_data["upload"]["chunk"] = bson.decode_all(
fcache.read(), bson_codec_options
)
return rocketchat_data return rocketchat_data
def do_convert_data(rocketchat_data_dir: str, output_dir: str) -> None: def do_convert_data(rocketchat_data_dir: str, output_dir: str) -> None:
# Get all required exported data in a dictionary # Get all required exported data in a dictionary
rocketchat_data = rocketchat_data_to_dict(rocketchat_data_dir)
# Subdomain is set by the user while running the import command # Subdomain is set by the user while running the import command
realm_subdomain = "" realm_subdomain = ""
realm_id = 0 realm_id = 0
domain_name = settings.EXTERNAL_HOST domain_name = settings.EXTERNAL_HOST
realm = make_realm(realm_id, realm_subdomain, domain_name, rocketchat_data["instance"][0]) rocketchat_instance_data = rocketchat_data_to_dict(rocketchat_data_dir, ["instance"])[
"instance"
][0]
realm = make_realm(realm_id, realm_subdomain, domain_name, rocketchat_instance_data)
user_id_to_user_map: dict[str, dict[str, Any]] = map_user_id_to_user(rocketchat_data["user"]) rocketchat_user_data = rocketchat_data_to_dict(rocketchat_data_dir, ["user"])["user"]
user_id_to_user_map: dict[str, dict[str, Any]] = map_user_id_to_user(rocketchat_user_data)
username_to_user_id_map: dict[str, str] = map_username_to_user_id(user_id_to_user_map) username_to_user_id_map: dict[str, str] = map_username_to_user_id(user_id_to_user_map)
user_handler = UserHandler() user_handler = UserHandler()
@ -1089,8 +1114,9 @@ def do_convert_data(rocketchat_data_dir: str, output_dir: str) -> None:
direct_message_group_id_to_direct_message_group_map: dict[str, dict[str, Any]] = {} direct_message_group_id_to_direct_message_group_map: dict[str, dict[str, Any]] = {}
livechat_id_to_livechat_map: dict[str, dict[str, Any]] = {} livechat_id_to_livechat_map: dict[str, dict[str, Any]] = {}
rocketchat_room_data = rocketchat_data_to_dict(rocketchat_data_dir, ["room"])["room"]
categorize_channels_and_map_with_id( categorize_channels_and_map_with_id(
channel_data=rocketchat_data["room"], channel_data=rocketchat_room_data,
room_id_to_room_map=room_id_to_room_map, room_id_to_room_map=room_id_to_room_map,
team_id_to_team_map=team_id_to_team_map, team_id_to_team_map=team_id_to_team_map,
dsc_id_to_dsc_map=dsc_id_to_dsc_map, dsc_id_to_dsc_map=dsc_id_to_dsc_map,
@ -1155,9 +1181,12 @@ def do_convert_data(rocketchat_data_dir: str, output_dir: str) -> None:
) )
realm["zerver_subscription"] = zerver_subscription realm["zerver_subscription"] = zerver_subscription
rocketchat_emoji_data = rocketchat_data_to_dict(rocketchat_data_dir, ["custom_emoji"])[
"custom_emoji"
]
zerver_realmemoji = build_custom_emoji( zerver_realmemoji = build_custom_emoji(
realm_id=realm_id, realm_id=realm_id,
custom_emoji_data=rocketchat_data["custom_emoji"], custom_emoji_data=rocketchat_emoji_data,
output_dir=output_dir, output_dir=output_dir,
) )
realm["zerver_realmemoji"] = zerver_realmemoji realm["zerver_realmemoji"] = zerver_realmemoji
@ -1181,8 +1210,9 @@ def do_convert_data(rocketchat_data_dir: str, output_dir: str) -> None:
private_messages: list[dict[str, Any]] = [] private_messages: list[dict[str, Any]] = []
livechat_messages: list[dict[str, Any]] = [] livechat_messages: list[dict[str, Any]] = []
rocketchat_message_data = rocketchat_data_to_dict(rocketchat_data_dir, ["message"])["message"]
separate_channel_private_and_livechat_messages( separate_channel_private_and_livechat_messages(
messages=rocketchat_data["message"], messages=rocketchat_message_data,
dsc_id_to_dsc_map=dsc_id_to_dsc_map, dsc_id_to_dsc_map=dsc_id_to_dsc_map,
direct_id_to_direct_map=direct_id_to_direct_map, direct_id_to_direct_map=direct_id_to_direct_map,
direct_message_group_id_to_direct_message_group_map=direct_message_group_id_to_direct_message_group_map, direct_message_group_id_to_direct_message_group_map=direct_message_group_id_to_direct_message_group_map,
@ -1191,12 +1221,15 @@ def do_convert_data(rocketchat_data_dir: str, output_dir: str) -> None:
private_messages=private_messages, private_messages=private_messages,
livechat_messages=livechat_messages, livechat_messages=livechat_messages,
) )
# Hint we can free the memory, now that we're done processing this.
rocketchat_message_data = []
total_reactions: list[ZerverFieldsT] = [] total_reactions: list[ZerverFieldsT] = []
uploads_list: list[ZerverFieldsT] = [] uploads_list: list[ZerverFieldsT] = []
zerver_attachment: list[ZerverFieldsT] = [] zerver_attachment: list[ZerverFieldsT] = []
upload_id_to_upload_data_map = map_upload_id_to_upload_data(rocketchat_data["upload"]) rocketchat_upload_data = rocketchat_data_to_dict(rocketchat_data_dir, ["upload"])["upload"]
upload_id_to_upload_data_map = map_upload_id_to_upload_data(rocketchat_upload_data)
# Process channel messages # Process channel messages
process_messages( process_messages(