import os from typing import Any from unittest import mock import orjson from zerver.data_import.gitter import do_convert_data, get_usermentions from zerver.lib.import_realm import do_import_realm from zerver.lib.test_classes import ZulipTestCase from zerver.models import Message, UserProfile, get_realm class GitterImporter(ZulipTestCase): @mock.patch("zerver.data_import.gitter.process_avatars", return_value=[]) def test_gitter_import_data_conversion(self, mock_process_avatars: mock.Mock) -> None: output_dir = self.make_import_output_dir("gitter") gitter_file = os.path.join(os.path.dirname(__file__), "fixtures/gitter_data.json") with self.assertLogs(level="INFO"): do_convert_data(gitter_file, output_dir) def read_file(output_file: str) -> Any: full_path = os.path.join(output_dir, output_file) with open(full_path, "rb") as f: return orjson.loads(f.read()) self.assertEqual(os.path.exists(os.path.join(output_dir, "avatars")), True) self.assertEqual(os.path.exists(os.path.join(output_dir, "emoji")), True) self.assertEqual(os.path.exists(os.path.join(output_dir, "attachment.json")), True) realm = read_file("realm.json") # test realm self.assertEqual( "Organization imported from Gitter!", realm["zerver_realm"][0]["description"] ) # test users exported_user_ids = self.get_set(realm["zerver_userprofile"], "id") exported_user_full_name = self.get_set(realm["zerver_userprofile"], "full_name") self.assertIn("User Full Name", exported_user_full_name) exported_user_email = self.get_set(realm["zerver_userprofile"], "email") self.assertIn("username2@users.noreply.github.com", exported_user_email) # test stream self.assertEqual(len(realm["zerver_stream"]), 1) self.assertEqual(realm["zerver_stream"][0]["name"], "from gitter") self.assertEqual(realm["zerver_stream"][0]["deactivated"], False) self.assertEqual(realm["zerver_stream"][0]["realm"], realm["zerver_realm"][0]["id"]) self.assertEqual( realm["zerver_defaultstream"][0]["stream"], realm["zerver_stream"][0]["id"] ) # test recipient exported_recipient_id = self.get_set(realm["zerver_recipient"], "id") exported_recipient_type = self.get_set(realm["zerver_recipient"], "type") self.assertEqual({1, 2}, exported_recipient_type) # test subscription exported_subscription_userprofile = self.get_set( realm["zerver_subscription"], "user_profile" ) self.assertEqual({0, 1}, exported_subscription_userprofile) exported_subscription_recipient = self.get_set(realm["zerver_subscription"], "recipient") self.assertEqual(len(exported_subscription_recipient), 3) self.assertIn(realm["zerver_subscription"][1]["recipient"], exported_recipient_id) messages = read_file("messages-000001.json") # test messages exported_messages_id = self.get_set(messages["zerver_message"], "id") self.assertIn(messages["zerver_message"][0]["sender"], exported_user_ids) self.assertIn(messages["zerver_message"][1]["recipient"], exported_recipient_id) self.assertIn(messages["zerver_message"][0]["content"], "test message") # test usermessages exported_usermessage_userprofile = self.get_set( messages["zerver_usermessage"], "user_profile" ) self.assertEqual(exported_user_ids, exported_usermessage_userprofile) exported_usermessage_message = self.get_set(messages["zerver_usermessage"], "message") self.assertEqual(exported_usermessage_message, exported_messages_id) @mock.patch("zerver.data_import.gitter.process_avatars", return_value=[]) def test_gitter_import_to_existing_database(self, mock_process_avatars: mock.Mock) -> None: output_dir = self.make_import_output_dir("gitter") gitter_file = os.path.join(os.path.dirname(__file__), "fixtures/gitter_data.json") with self.assertLogs(level="INFO"): do_convert_data(gitter_file, output_dir) with self.assertLogs(level="INFO"): do_import_realm(output_dir, "test-gitter-import") realm = get_realm("test-gitter-import") # test rendered_messages realm_users = UserProfile.objects.filter(realm=realm) messages = Message.objects.filter(sender__in=realm_users) for message in messages: self.assertIsNotNone(message.rendered_content, None) def test_get_usermentions(self) -> None: user_map = {"57124a4": 3, "57124b4": 5, "57124c4": 8} user_short_name_to_full_name = { "user": "user name", "user2": "user2", "user3": "user name 3", "user4": "user 4", } messages = [ {"text": "hi @user", "mentions": [{"screenName": "user", "userId": "57124a4"}]}, { "text": "hi @user2 @user3", "mentions": [ {"screenName": "user2", "userId": "57124b4"}, {"screenName": "user3", "userId": "57124c4"}, ], }, {"text": "hi @user4", "mentions": [{"screenName": "user4"}]}, {"text": "hi @user5", "mentions": [{"screenName": "user", "userId": "5712ds4"}]}, ] self.assertEqual(get_usermentions(messages[0], user_map, user_short_name_to_full_name), [3]) self.assertEqual(messages[0]["text"], "hi @**user name**") self.assertEqual( get_usermentions(messages[1], user_map, user_short_name_to_full_name), [5, 8] ) self.assertEqual(messages[1]["text"], "hi @**user2** @**user name 3**") self.assertEqual(get_usermentions(messages[2], user_map, user_short_name_to_full_name), []) self.assertEqual(messages[2]["text"], "hi @user4") self.assertEqual(get_usermentions(messages[3], user_map, user_short_name_to_full_name), []) self.assertEqual(messages[3]["text"], "hi @user5")