zulip/zerver/tests/test_import_export.py

1672 lines
66 KiB
Python
Raw Normal View History

import datetime
import os
import shutil
from typing import Any, Callable, Dict, FrozenSet, List, Optional, Set, Tuple
from unittest.mock import patch
import orjson
from django.conf import settings
from django.db.models import Q
from django.utils.timezone import now as timezone_now
from analytics.models import UserCount
from zerver.lib import upload
from zerver.lib.actions import (
check_add_reaction,
check_add_realm_emoji,
do_add_alert_words,
do_add_reaction,
do_change_icon_source,
do_change_logo_source,
do_change_realm_plan_type,
do_create_user,
do_deactivate_user,
do_mute_topic,
do_mute_user,
do_update_user_activity,
do_update_user_activity_interval,
do_update_user_custom_profile_data_if_changed,
do_update_user_presence,
do_update_user_status,
try_add_realm_custom_profile_field,
)
from zerver.lib.avatar_hash import user_avatar_path
from zerver.lib.bot_config import set_bot_config
from zerver.lib.bot_lib import StateHandler
from zerver.lib.export import Record, do_export_realm, do_export_user, export_usermessages_batch
from zerver.lib.import_realm import do_import_realm, get_incoming_message_ids
from zerver.lib.streams import create_stream_if_needed
from zerver.lib.test_classes import ZulipTestCase
from zerver.lib.test_helpers import (
create_s3_buckets,
get_test_image_file,
most_recent_message,
most_recent_usermessage,
read_test_image_file,
use_s3_backend,
)
from zerver.lib.topic_mutes import add_topic_mute
from zerver.lib.upload import claim_attachment, upload_avatar_image, upload_message_file
from zerver.models import (
2020-07-16 16:11:34 +02:00
AlertWord,
Attachment,
BotConfigData,
BotStorageData,
CustomProfileField,
CustomProfileFieldValue,
Huddle,
Message,
MutedUser,
Reaction,
Realm,
RealmAuditLog,
RealmEmoji,
RealmUserDefault,
Recipient,
Stream,
Subscription,
2018-07-12 13:27:12 +02:00
UserGroup,
UserGroupMembership,
UserHotspot,
UserMessage,
UserPresence,
UserProfile,
UserStatus,
UserTopic,
get_active_streams,
get_client,
get_huddle_hash,
2018-07-14 16:10:45 +02:00
get_stream,
)
def make_datetime(val: float) -> datetime.datetime:
return datetime.datetime.fromtimestamp(val, tz=datetime.timezone.utc)
def get_output_dir() -> str:
return os.path.join(settings.TEST_WORKER_DIR, "test-export")
def make_export_output_dir() -> str:
output_dir = get_output_dir()
if os.path.exists(output_dir):
shutil.rmtree(output_dir)
os.makedirs(output_dir)
return output_dir
def read_json(fn: str) -> Any:
output_dir = get_output_dir()
full_fn = os.path.join(output_dir, fn)
with open(full_fn, "rb") as f:
return orjson.loads(f.read())
def export_fn(fn: str) -> str:
output_dir = get_output_dir()
return os.path.join(output_dir, fn)
def get_user_id(r: Realm, full_name: str) -> int:
return UserProfile.objects.get(realm=r, full_name=full_name).id
def get_huddle_hashes(r: Realm) -> str:
cordelia_full_name = "Cordelia, Lear's daughter"
hamlet_full_name = "King Hamlet"
othello_full_name = "Othello, the Moor of Venice"
user_id_list = [
get_user_id(r, cordelia_full_name),
get_user_id(r, hamlet_full_name),
get_user_id(r, othello_full_name),
]
huddle_hash = get_huddle_hash(user_id_list)
return huddle_hash
class ExportFile(ZulipTestCase):
"""This class is a container for shared helper functions
used for both the realm-level and user-level export tests."""
def setUp(self) -> None:
super().setUp()
self.rm_tree(settings.LOCAL_UPLOADS_DIR)
# Deleting LOCAL_UPLOADS_DIR results in the test database
# having RealmEmoji records without associated files.
#
# Even if we didn't delete them, the way that the test runner
# varies settings.LOCAL_UPLOADS_DIR for each test worker
# process would likely result in this being necessary anyway.
RealmEmoji.objects.all().delete()
def upload_files_for_user(
self, user_profile: UserProfile, *, emoji_name: str = "whatever"
) -> None:
message = most_recent_message(user_profile)
url = upload_message_file(
"dummy.txt", len(b"zulip!"), "text/plain", b"zulip!", user_profile
)
attachment_path_id = url.replace("/user_uploads/", "")
claim_attachment(
user_profile=user_profile,
path_id=attachment_path_id,
message=message,
is_message_realm_public=True,
)
with get_test_image_file("img.png") as img_file:
upload_avatar_image(img_file, user_profile, user_profile)
user_profile.avatar_source = "U"
user_profile.save()
realm = user_profile.realm
with get_test_image_file("img.png") as img_file:
check_add_realm_emoji(realm, emoji_name, user_profile, img_file)
def upload_files_for_realm(self, user_profile: UserProfile) -> None:
realm = user_profile.realm
with get_test_image_file("img.png") as img_file:
upload.upload_backend.upload_realm_icon_image(img_file, user_profile)
do_change_icon_source(realm, Realm.ICON_UPLOADED, acting_user=None)
with get_test_image_file("img.png") as img_file:
upload.upload_backend.upload_realm_logo_image(img_file, user_profile, night=False)
do_change_logo_source(realm, Realm.LOGO_UPLOADED, False, acting_user=user_profile)
with get_test_image_file("img.png") as img_file:
upload.upload_backend.upload_realm_logo_image(img_file, user_profile, night=True)
do_change_logo_source(realm, Realm.LOGO_UPLOADED, True, acting_user=user_profile)
def verify_attachment_json(self, user: UserProfile) -> None:
attachment = Attachment.objects.get(owner=user)
(record,) = read_json("attachment.json")["zerver_attachment"]
self.assertEqual(record["path_id"], attachment.path_id)
self.assertEqual(record["owner"], attachment.owner_id)
self.assertEqual(record["realm"], attachment.realm_id)
def verify_uploads(self, user: UserProfile, is_s3: bool) -> None:
2021-12-12 21:14:35 +01:00
realm = user.realm
attachment = Attachment.objects.get(owner=user)
path_id = attachment.path_id
# Test uploads
fn = export_fn(f"uploads/{path_id}")
with open(fn) as f:
self.assertEqual(f.read(), "zulip!")
2021-12-12 21:14:35 +01:00
(record,) = read_json("uploads/records.json")
self.assertEqual(record["path"], path_id)
self.assertEqual(record["s3_path"], path_id)
if is_s3:
realm_str, random_hash, file_name = path_id.split("/")
self.assertEqual(realm_str, str(realm.id))
self.assert_length(random_hash, 24)
self.assertEqual(file_name, "dummy.txt")
self.assertEqual(record["realm_id"], realm.id)
self.assertEqual(record["user_profile_id"], user.id)
else:
realm_str, slot, random_hash, file_name = path_id.split("/")
self.assertEqual(realm_str, str(realm.id))
# We randomly pick a number between 0 and 255 and turn it into
# hex in order to avoid large directories.
assert len(slot) <= 2
self.assert_length(random_hash, 24)
self.assertEqual(file_name, "dummy.txt")
2021-12-12 21:09:20 +01:00
def verify_emojis(self, user: UserProfile, is_s3: bool) -> None:
realm = user.realm
realm_emoji = RealmEmoji.objects.get(author=user)
file_name = realm_emoji.file_name
assert file_name.endswith(".png")
emoji_path = f"{realm.id}/emoji/images/{file_name}"
2021-12-12 18:14:01 +01:00
emoji_dir = export_fn(f"emoji/{realm.id}/emoji/images")
self.assertEqual(os.listdir(emoji_dir), [file_name])
2021-12-12 18:14:01 +01:00
(record,) = read_json("emoji/records.json")
self.assertEqual(record["file_name"], file_name)
2021-12-12 18:14:01 +01:00
self.assertEqual(record["path"], emoji_path)
self.assertEqual(record["s3_path"], emoji_path)
2021-12-12 21:09:20 +01:00
if is_s3:
self.assertEqual(record["realm_id"], realm.id)
self.assertEqual(record["user_profile_id"], user.id)
def verify_realm_logo_and_icon(self) -> None:
records = read_json("realm_icons/records.json")
image_files = set()
for record in records:
self.assertEqual(record["path"], record["s3_path"])
image_path = export_fn(f"realm_icons/{record['path']}")
if image_path.endswith(".original"):
with open(image_path, "rb") as image_file:
image_data = image_file.read()
self.assertEqual(image_data, read_test_image_file("img.png"))
else:
self.assertTrue(os.path.exists(image_path))
image_files.add(os.path.basename(image_path))
self.assertEqual(
set(image_files),
{
"night_logo.png",
"logo.original",
"logo.png",
"icon.png",
"night_logo.original",
"icon.original",
},
)
def verify_avatars(self, user: UserProfile) -> None:
records = read_json("avatars/records.json")
exported_paths = set()
# Make sure all files in records.json got written.
for record in records:
self.assertEqual(record["path"], record["s3_path"])
path = record["path"]
fn = export_fn(f"avatars/{path}")
assert os.path.exists(fn)
if path.endswith(".original"):
exported_paths.add(path)
# For now we know that all our tests use
# emojis based on img.png. This may change some
# day.
with open(fn, "rb") as fb:
fn_data = fb.read()
self.assertEqual(fn_data, read_test_image_file("img.png"))
assert exported_paths
# Right now we expect only our user to have an uploaded avatar.
db_paths = {user_avatar_path(user) + ".original"}
self.assertEqual(exported_paths, db_paths)
class RealmImportExportTest(ExportFile):
def export_realm(
self,
realm: Realm,
exportable_user_ids: Optional[Set[int]] = None,
consent_message_id: Optional[int] = None,
) -> None:
output_dir = make_export_output_dir()
with patch("zerver.lib.export.create_soft_link"), self.assertLogs(level="INFO"):
do_export_realm(
realm=realm,
output_dir=output_dir,
threads=0,
exportable_user_ids=exportable_user_ids,
consent_message_id=consent_message_id,
)
export_usermessages_batch(
input_path=os.path.join(output_dir, "messages-000001.json.partial"),
output_path=os.path.join(output_dir, "messages-000001.json"),
consent_message_id=consent_message_id,
)
def test_export_files_from_local(self) -> None:
user = self.example_user("hamlet")
realm = user.realm
self.upload_files_for_user(user)
self.upload_files_for_realm(user)
self.export_realm(realm)
self.verify_attachment_json(user)
self.verify_uploads(user, is_s3=False)
self.verify_avatars(user)
self.verify_emojis(user, is_s3=False)
self.verify_realm_logo_and_icon()
@use_s3_backend
def test_export_files_from_s3(self) -> None:
create_s3_buckets(settings.S3_AUTH_UPLOADS_BUCKET, settings.S3_AVATAR_BUCKET)
user = self.example_user("hamlet")
realm = user.realm
self.upload_files_for_user(user)
self.upload_files_for_realm(user)
self.export_realm(realm)
self.verify_attachment_json(user)
self.verify_uploads(user, is_s3=True)
self.verify_avatars(user)
2021-12-12 21:09:20 +01:00
self.verify_emojis(user, is_s3=True)
self.verify_realm_logo_and_icon()
def test_zulip_realm(self) -> None:
realm = Realm.objects.get(string_id="zulip")
default_bot = self.example_user("default_bot")
pm_a_msg_id = self.send_personal_message(self.example_user("AARON"), default_bot)
pm_b_msg_id = self.send_personal_message(default_bot, self.example_user("iago"))
pm_c_msg_id = self.send_personal_message(
self.example_user("othello"), self.example_user("hamlet")
)
realm_user_default = RealmUserDefault.objects.get(realm=realm)
realm_user_default.default_language = "de"
realm_user_default.save()
self.export_realm(realm)
data = read_json("realm.json")
self.assert_length(data["zerver_userprofile_crossrealm"], 3)
self.assert_length(data["zerver_userprofile_mirrordummy"], 0)
exported_user_emails = self.get_set(data["zerver_userprofile"], "delivery_email")
self.assertIn(self.example_email("cordelia"), exported_user_emails)
self.assertIn("default-bot@zulip.com", exported_user_emails)
exported_streams = self.get_set(data["zerver_stream"], "name")
self.assertEqual(
exported_streams,
{"Denmark", "Rome", "Scotland", "Venice", "Verona", "core team"},
)
exported_alert_words = data["zerver_alertword"]
2020-07-16 15:38:56 +02:00
# We set up 4 alert words for Hamlet, Cordelia, etc.
# when we populate the test database.
num_zulip_users = 10
self.assert_length(exported_alert_words, num_zulip_users * 4)
2020-07-16 15:38:56 +02:00
self.assertIn("robotics", {r["word"] for r in exported_alert_words})
2020-07-16 15:38:56 +02:00
exported_realm_user_default = data["zerver_realmuserdefault"]
self.assert_length(exported_realm_user_default, 1)
self.assertEqual(exported_realm_user_default[0]["default_language"], "de")
data = read_json("messages-000001.json")
um = UserMessage.objects.all()[0]
exported_um = self.find_by_id(data["zerver_usermessage"], um.id)
self.assertEqual(exported_um["message"], um.message_id)
self.assertEqual(exported_um["user_profile"], um.user_profile_id)
exported_message = self.find_by_id(data["zerver_message"], um.message_id)
self.assertEqual(exported_message["content"], um.message.content)
exported_message_ids = self.get_set(data["zerver_message"], "id")
self.assertIn(pm_a_msg_id, exported_message_ids)
self.assertIn(pm_b_msg_id, exported_message_ids)
self.assertIn(pm_c_msg_id, exported_message_ids)
def test_export_realm_with_exportable_user_ids(self) -> None:
realm = Realm.objects.get(string_id="zulip")
cordelia = self.example_user("iago")
hamlet = self.example_user("hamlet")
user_ids = {cordelia.id, hamlet.id}
pm_a_msg_id = self.send_personal_message(
self.example_user("AARON"), self.example_user("othello")
)
pm_b_msg_id = self.send_personal_message(
self.example_user("cordelia"), self.example_user("iago")
)
pm_c_msg_id = self.send_personal_message(
self.example_user("hamlet"), self.example_user("othello")
)
pm_d_msg_id = self.send_personal_message(
self.example_user("iago"), self.example_user("hamlet")
)
self.export_realm(realm, exportable_user_ids=user_ids)
data = read_json("realm.json")
exported_user_emails = self.get_set(data["zerver_userprofile"], "delivery_email")
self.assertIn(self.example_email("iago"), exported_user_emails)
self.assertIn(self.example_email("hamlet"), exported_user_emails)
self.assertNotIn("default-bot@zulip.com", exported_user_emails)
self.assertNotIn(self.example_email("cordelia"), exported_user_emails)
dummy_user_emails = self.get_set(data["zerver_userprofile_mirrordummy"], "delivery_email")
self.assertIn(self.example_email("cordelia"), dummy_user_emails)
self.assertIn(self.example_email("othello"), dummy_user_emails)
self.assertIn("default-bot@zulip.com", dummy_user_emails)
self.assertNotIn(self.example_email("iago"), dummy_user_emails)
self.assertNotIn(self.example_email("hamlet"), dummy_user_emails)
data = read_json("messages-000001.json")
exported_message_ids = self.get_set(data["zerver_message"], "id")
self.assertNotIn(pm_a_msg_id, exported_message_ids)
self.assertIn(pm_b_msg_id, exported_message_ids)
self.assertIn(pm_c_msg_id, exported_message_ids)
self.assertIn(pm_d_msg_id, exported_message_ids)
def test_export_realm_with_member_consent(self) -> None:
realm = Realm.objects.get(string_id="zulip")
# Create private streams and subscribe users for testing export
create_stream_if_needed(realm, "Private A", invite_only=True)
self.subscribe(self.example_user("iago"), "Private A")
self.subscribe(self.example_user("othello"), "Private A")
self.send_stream_message(self.example_user("iago"), "Private A", "Hello stream A")
create_stream_if_needed(realm, "Private B", invite_only=True)
self.subscribe(self.example_user("prospero"), "Private B")
stream_b_message_id = self.send_stream_message(
self.example_user("prospero"), "Private B", "Hello stream B"
)
self.subscribe(self.example_user("hamlet"), "Private B")
create_stream_if_needed(realm, "Private C", invite_only=True)
self.subscribe(self.example_user("othello"), "Private C")
self.subscribe(self.example_user("prospero"), "Private C")
stream_c_message_id = self.send_stream_message(
self.example_user("othello"), "Private C", "Hello stream C"
)
# Create huddles
self.send_huddle_message(
self.example_user("iago"), [self.example_user("cordelia"), self.example_user("AARON")]
)
huddle_a = Huddle.objects.last()
self.send_huddle_message(
self.example_user("ZOE"),
[self.example_user("hamlet"), self.example_user("AARON"), self.example_user("othello")],
)
huddle_b = Huddle.objects.last()
huddle_c_message_id = self.send_huddle_message(
self.example_user("AARON"),
[self.example_user("cordelia"), self.example_user("ZOE"), self.example_user("othello")],
)
# Create PMs
pm_a_msg_id = self.send_personal_message(
self.example_user("AARON"), self.example_user("othello")
)
pm_b_msg_id = self.send_personal_message(
self.example_user("cordelia"), self.example_user("iago")
)
pm_c_msg_id = self.send_personal_message(
self.example_user("hamlet"), self.example_user("othello")
)
pm_d_msg_id = self.send_personal_message(
self.example_user("iago"), self.example_user("hamlet")
)
# Send message advertising export and make users react
self.send_stream_message(
self.example_user("othello"),
"Verona",
topic_name="Export",
content="Thumbs up for export",
)
message = Message.objects.last()
consented_user_ids = [self.example_user(user).id for user in ["iago", "hamlet"]]
do_add_reaction(
self.example_user("iago"), message, "outbox", "1f4e4", Reaction.UNICODE_EMOJI
)
do_add_reaction(
self.example_user("hamlet"), message, "outbox", "1f4e4", Reaction.UNICODE_EMOJI
)
assert message is not None
self.export_realm(realm, consent_message_id=message.id)
data = read_json("realm.json")
self.assert_length(data["zerver_userprofile_crossrealm"], 3)
self.assert_length(data["zerver_userprofile_mirrordummy"], 0)
exported_user_emails = self.get_set(data["zerver_userprofile"], "delivery_email")
self.assertIn(self.example_email("cordelia"), exported_user_emails)
self.assertIn(self.example_email("hamlet"), exported_user_emails)
self.assertIn(self.example_email("iago"), exported_user_emails)
self.assertIn(self.example_email("othello"), exported_user_emails)
self.assertIn("default-bot@zulip.com", exported_user_emails)
exported_streams = self.get_set(data["zerver_stream"], "name")
self.assertEqual(
exported_streams,
{
"core team",
"Denmark",
"Rome",
"Scotland",
"Venice",
"Verona",
"Private A",
"Private B",
"Private C",
},
)
data = read_json("messages-000001.json")
exported_usermessages = UserMessage.objects.filter(
user_profile__in=[self.example_user("iago"), self.example_user("hamlet")]
)
um = exported_usermessages[0]
self.assert_length(data["zerver_usermessage"], len(exported_usermessages))
exported_um = self.find_by_id(data["zerver_usermessage"], um.id)
self.assertEqual(exported_um["message"], um.message_id)
self.assertEqual(exported_um["user_profile"], um.user_profile_id)
exported_message = self.find_by_id(data["zerver_message"], um.message_id)
self.assertEqual(exported_message["content"], um.message.content)
public_stream_names = ["Denmark", "Rome", "Scotland", "Venice", "Verona"]
public_stream_ids = Stream.objects.filter(name__in=public_stream_names).values_list(
"id", flat=True
)
public_stream_recipients = Recipient.objects.filter(
type_id__in=public_stream_ids, type=Recipient.STREAM
)
public_stream_message_ids = Message.objects.filter(
recipient__in=public_stream_recipients
).values_list("id", flat=True)
# Messages from Private stream C are not exported since no member gave consent
private_stream_ids = Stream.objects.filter(
name__in=["Private A", "Private B", "core team"]
).values_list("id", flat=True)
private_stream_recipients = Recipient.objects.filter(
type_id__in=private_stream_ids, type=Recipient.STREAM
)
private_stream_message_ids = Message.objects.filter(
recipient__in=private_stream_recipients
).values_list("id", flat=True)
pm_recipients = Recipient.objects.filter(
type_id__in=consented_user_ids, type=Recipient.PERSONAL
)
pm_query = Q(recipient__in=pm_recipients) | Q(sender__in=consented_user_ids)
exported_pm_ids = (
Message.objects.filter(pm_query)
.values_list("id", flat=True)
.values_list("id", flat=True)
)
# Third huddle is not exported since none of the members gave consent
assert huddle_a is not None and huddle_b is not None
huddle_recipients = Recipient.objects.filter(
type_id__in=[huddle_a.id, huddle_b.id], type=Recipient.HUDDLE
)
pm_query = Q(recipient__in=huddle_recipients) | Q(sender__in=consented_user_ids)
exported_huddle_ids = (
Message.objects.filter(pm_query)
.values_list("id", flat=True)
.values_list("id", flat=True)
)
exported_msg_ids = (
set(public_stream_message_ids)
| set(private_stream_message_ids)
| set(exported_pm_ids)
| set(exported_huddle_ids)
)
2019-05-21 12:21:32 +02:00
self.assertEqual(self.get_set(data["zerver_message"], "id"), exported_msg_ids)
# TODO: This behavior is wrong and should be fixed. The message should not be exported
# since it was sent before the only consented user iago joined the stream.
self.assertIn(stream_b_message_id, exported_msg_ids)
self.assertNotIn(stream_c_message_id, exported_msg_ids)
self.assertNotIn(huddle_c_message_id, exported_msg_ids)
self.assertNotIn(pm_a_msg_id, exported_msg_ids)
self.assertIn(pm_b_msg_id, exported_msg_ids)
self.assertIn(pm_c_msg_id, exported_msg_ids)
self.assertIn(pm_d_msg_id, exported_msg_ids)
"""
Tests for import_realm
"""
def test_import_realm(self) -> None:
original_realm = Realm.objects.get(string_id="zulip")
hamlet = self.example_user("hamlet")
cordelia = self.example_user("cordelia")
othello = self.example_user("othello")
with get_test_image_file("img.png") as img_file:
realm_emoji = check_add_realm_emoji(
realm=hamlet.realm, name="hawaii", author=hamlet, image_file=img_file
)
assert realm_emoji
self.assertEqual(realm_emoji.name, "hawaii")
# Deactivate a user to ensure such a case is covered.
do_deactivate_user(self.example_user("aaron"), acting_user=None)
# data to test import of huddles
huddle = [
self.example_user("hamlet"),
self.example_user("othello"),
]
self.send_huddle_message(
self.example_user("cordelia"),
huddle,
"test huddle message",
)
user_mention_message = "@**King Hamlet** Hello"
self.send_stream_message(self.example_user("iago"), "Verona", user_mention_message)
stream_mention_message = "Subscribe to #**Denmark**"
self.send_stream_message(self.example_user("hamlet"), "Verona", stream_mention_message)
user_group_mention_message = "Hello @*hamletcharacters*"
self.send_stream_message(self.example_user("othello"), "Verona", user_group_mention_message)
special_characters_message = "```\n'\n```\n@**Polonius**"
self.send_stream_message(self.example_user("iago"), "Denmark", special_characters_message)
sample_user = self.example_user("hamlet")
2018-07-14 16:10:45 +02:00
check_add_reaction(
user_profile=cordelia,
message_id=most_recent_message(hamlet).id,
emoji_name="hawaii",
emoji_code=None,
reaction_type=None,
)
reaction = Reaction.objects.order_by("id").last()
assert reaction
# Verify strange invariant for Reaction/RealmEmoji.
self.assertEqual(reaction.emoji_code, str(realm_emoji.id))
# data to test import of hotspots
2018-07-12 16:34:26 +02:00
UserHotspot.objects.create(
user=sample_user,
hotspot="intro_streams",
2018-07-12 16:34:26 +02:00
)
2018-07-14 16:10:45 +02:00
# data to test import of muted topic
stream = get_stream("Verona", original_realm)
recipient = stream.recipient
assert recipient is not None
2018-07-14 16:10:45 +02:00
add_topic_mute(
user_profile=sample_user,
stream_id=stream.id,
recipient_id=recipient.id,
topic_name="Verona2",
)
2018-07-14 16:10:45 +02:00
# data to test import of muted users
do_mute_user(hamlet, cordelia)
do_mute_user(cordelia, hamlet)
do_mute_user(cordelia, othello)
client = get_client("website")
do_update_user_presence(sample_user, client, timezone_now(), UserPresence.ACTIVE)
# send Cordelia to the islands
do_update_user_status(
user_profile=cordelia,
away=True,
status_text="in Hawaii",
client_id=client.id,
emoji_name="hawaii",
emoji_code=str(realm_emoji.id),
reaction_type=Reaction.REALM_EMOJI,
)
user_status = UserStatus.objects.order_by("id").last()
assert user_status
# Verify strange invariant for UserStatus/RealmEmoji.
self.assertEqual(user_status.emoji_code, str(realm_emoji.id))
# data to test import of botstoragedata and botconfigdata
bot_profile = do_create_user(
email="bot-1@zulip.com",
password="test",
realm=original_realm,
full_name="bot",
bot_type=UserProfile.EMBEDDED_BOT,
bot_owner=sample_user,
acting_user=None,
)
storage = StateHandler(bot_profile)
storage.put("some key", "some value")
set_bot_config(bot_profile, "entry 1", "value 1")
realm_user_default = RealmUserDefault.objects.get(realm=original_realm)
realm_user_default.default_language = "de"
realm_user_default.twenty_four_hour_time = True
realm_user_default.save()
getters = self.get_realm_getters()
snapshots: Dict[str, object] = {}
for f in getters:
snapshots[f.__name__] = f(original_realm)
self.export_realm(original_realm)
with self.settings(BILLING_ENABLED=False), self.assertLogs(level="INFO"):
do_import_realm(os.path.join(settings.TEST_WORKER_DIR, "test-export"), "test-zulip")
# Make sure our export/import didn't somehow leak info into the
# original realm.
for f in getters:
# One way this will fail is if you make a getter that doesn't
# properly restrict its results to a single realm.
if f(original_realm) != snapshots[f.__name__]:
raise AssertionError(
f"""
The export/import process is corrupting your
original realm according to {f.__name__}!
If you wrote that getter, are you sure you
are only grabbing objects from one realm?
"""
)
2021-12-07 18:46:35 +01:00
imported_realm = Realm.objects.get(string_id="test-zulip")
# test realm
self.assertTrue(Realm.objects.filter(string_id="test-zulip").exists())
self.assertNotEqual(imported_realm.id, original_realm.id)
def assert_realm_values(f: Callable[[Realm], object]) -> None:
2021-12-07 18:46:35 +01:00
orig_realm_result = f(original_realm)
imported_realm_result = f(imported_realm)
# orig_realm_result should be truthy and have some values, otherwise
# the test is kind of meaningless
assert orig_realm_result
# It may be helpful to do print(f.__name__) if you are having
# trouble debugging this.
# print(f.__name__, orig_realm_result, imported_realm_result)
self.assertEqual(orig_realm_result, imported_realm_result)
for f in getters:
assert_realm_values(f)
self.verify_emoji_code_foreign_keys()
# Our huddle hashes change, because hashes use ids that change.
self.assertNotEqual(get_huddle_hashes(original_realm), get_huddle_hashes(imported_realm))
2021-12-07 18:46:35 +01:00
# test to highlight that bs4 which we use to do data-**id
# replacements modifies the HTML sometimes. eg replacing <br>
# with </br>, &#39; with \' etc. The modifications doesn't
# affect how the browser displays the rendered_content so we
# are okay with using bs4 for this. lxml package also has
# similar behavior.
orig_polonius_user = self.example_user("polonius")
original_msg = Message.objects.get(
content=special_characters_message, sender__realm=original_realm
)
self.assertEqual(
original_msg.rendered_content,
'<div class="codehilite"><pre><span></span><code>&#39;\n</code></pre></div>\n'
f'<p><span class="user-mention" data-user-id="{orig_polonius_user.id}">@Polonius</span></p>',
)
imported_polonius_user = UserProfile.objects.get(
delivery_email=self.example_email("polonius"), realm=imported_realm
)
imported_msg = Message.objects.get(
content=special_characters_message, sender__realm=imported_realm
)
self.assertEqual(
imported_msg.rendered_content,
'<div class="codehilite"><pre><span></span><code>\'\n</code></pre></div>\n'
f'<p><span class="user-mention" data-user-id="{imported_polonius_user.id}">@Polonius</span></p>',
)
# Check recipient_id was generated correctly for the imported users and streams.
for user_profile in UserProfile.objects.filter(realm=imported_realm):
self.assertEqual(
user_profile.recipient_id,
Recipient.objects.get(type=Recipient.PERSONAL, type_id=user_profile.id).id,
)
for stream in Stream.objects.filter(realm=imported_realm):
self.assertEqual(
stream.recipient_id,
Recipient.objects.get(type=Recipient.STREAM, type_id=stream.id).id,
)
for huddle_object in Huddle.objects.all():
# Huddles don't have a realm column, so we just test all Huddles for simplicity.
self.assertEqual(
huddle_object.recipient_id,
Recipient.objects.get(type=Recipient.HUDDLE, type_id=huddle_object.id).id,
)
for user_profile in UserProfile.objects.filter(realm=imported_realm):
# Check that all Subscriptions have the correct is_user_active set.
self.assertEqual(
Subscription.objects.filter(
user_profile=user_profile, is_user_active=user_profile.is_active
).count(),
Subscription.objects.filter(user_profile=user_profile).count(),
)
# Verify that we've actually tested something meaningful instead of a blind import
# with is_user_active=True used for everything.
self.assertTrue(Subscription.objects.filter(is_user_active=False).exists())
def get_realm_getters(self) -> List[Callable[[Realm], object]]:
2021-12-07 18:46:35 +01:00
names = set()
getters: List[Callable[[Realm], object]] = []
def getter(f: Callable[[Realm], object]) -> Callable[[Realm], object]:
getters.append(f)
assert f.__name__.startswith("get_")
2021-12-07 18:46:35 +01:00
# Avoid dups
assert f.__name__ not in names
names.add(f.__name__)
return f
@getter
2021-12-07 17:25:26 +01:00
def get_admin_bot_emails(r: Realm) -> Set[str]:
return {user.email for user in r.get_admin_users_and_bots()}
@getter
2021-12-07 17:25:26 +01:00
def get_active_emails(r: Realm) -> Set[str]:
return {user.email for user in r.get_active_users()}
@getter
2021-12-07 17:25:26 +01:00
def get_active_stream_names(r: Realm) -> Set[str]:
return {stream.name for stream in get_active_streams(r)}
# test recipients
def get_recipient_stream(r: Realm) -> Recipient:
return Stream.objects.get(name="Verona", realm=r).recipient
def get_recipient_user(r: Realm) -> Recipient:
return UserProfile.objects.get(full_name="Iago", realm=r).recipient
@getter
2021-12-07 17:25:26 +01:00
def get_stream_recipient_type(r: Realm) -> int:
return get_recipient_stream(r).type
@getter
2021-12-07 17:25:26 +01:00
def get_user_recipient_type(r: Realm) -> int:
return get_recipient_user(r).type
# test subscription
def get_subscribers(recipient: Recipient) -> Set[str]:
subscriptions = Subscription.objects.filter(recipient=recipient)
users = {sub.user_profile.email for sub in subscriptions}
return users
@getter
2021-12-07 17:25:26 +01:00
def get_stream_subscribers(r: Realm) -> Set[str]:
return get_subscribers(get_recipient_stream(r))
@getter
2021-12-07 17:25:26 +01:00
def get_user_subscribers(r: Realm) -> Set[str]:
return get_subscribers(get_recipient_user(r))
# test custom profile fields
@getter
def get_custom_profile_field_names(r: Realm) -> Set[str]:
custom_profile_fields = CustomProfileField.objects.filter(realm=r)
custom_profile_field_names = {field.name for field in custom_profile_fields}
return custom_profile_field_names
@getter
def get_custom_profile_with_field_type_user(
r: Realm,
) -> Tuple[Set[object], Set[object], Set[FrozenSet[str]]]:
fields = CustomProfileField.objects.filter(field_type=CustomProfileField.USER, realm=r)
def get_email(user_id: int) -> str:
return UserProfile.objects.get(id=user_id).email
def get_email_from_value(field_value: CustomProfileFieldValue) -> Set[str]:
user_id_list = orjson.loads(field_value.value)
return {get_email(user_id) for user_id in user_id_list}
def custom_profile_field_values_for(
fields: List[CustomProfileField],
) -> Set[FrozenSet[str]]:
python: Convert assignment type annotations to Python 3.6 style. This commit was split by tabbott; this piece covers the vast majority of files in Zulip, but excludes scripts/, tools/, and puppet/ to help ensure we at least show the right error messages for Xenial systems. We can likely further refine the remaining pieces with some testing. Generated by com2ann, with whitespace fixes and various manual fixes for runtime issues: - invoiced_through: Optional[LicenseLedger] = models.ForeignKey( + invoiced_through: Optional["LicenseLedger"] = models.ForeignKey( -_apns_client: Optional[APNsClient] = None +_apns_client: Optional["APNsClient"] = None - notifications_stream: Optional[Stream] = models.ForeignKey('Stream', related_name='+', null=True, blank=True, on_delete=CASCADE) - signup_notifications_stream: Optional[Stream] = models.ForeignKey('Stream', related_name='+', null=True, blank=True, on_delete=CASCADE) + notifications_stream: Optional["Stream"] = models.ForeignKey('Stream', related_name='+', null=True, blank=True, on_delete=CASCADE) + signup_notifications_stream: Optional["Stream"] = models.ForeignKey('Stream', related_name='+', null=True, blank=True, on_delete=CASCADE) - author: Optional[UserProfile] = models.ForeignKey('UserProfile', blank=True, null=True, on_delete=CASCADE) + author: Optional["UserProfile"] = models.ForeignKey('UserProfile', blank=True, null=True, on_delete=CASCADE) - bot_owner: Optional[UserProfile] = models.ForeignKey('self', null=True, on_delete=models.SET_NULL) + bot_owner: Optional["UserProfile"] = models.ForeignKey('self', null=True, on_delete=models.SET_NULL) - default_sending_stream: Optional[Stream] = models.ForeignKey('zerver.Stream', null=True, related_name='+', on_delete=CASCADE) - default_events_register_stream: Optional[Stream] = models.ForeignKey('zerver.Stream', null=True, related_name='+', on_delete=CASCADE) + default_sending_stream: Optional["Stream"] = models.ForeignKey('zerver.Stream', null=True, related_name='+', on_delete=CASCADE) + default_events_register_stream: Optional["Stream"] = models.ForeignKey('zerver.Stream', null=True, related_name='+', on_delete=CASCADE) -descriptors_by_handler_id: Dict[int, ClientDescriptor] = {} +descriptors_by_handler_id: Dict[int, "ClientDescriptor"] = {} -worker_classes: Dict[str, Type[QueueProcessingWorker]] = {} -queues: Dict[str, Dict[str, Type[QueueProcessingWorker]]] = {} +worker_classes: Dict[str, Type["QueueProcessingWorker"]] = {} +queues: Dict[str, Dict[str, Type["QueueProcessingWorker"]]] = {} -AUTH_LDAP_REVERSE_EMAIL_SEARCH: Optional[LDAPSearch] = None +AUTH_LDAP_REVERSE_EMAIL_SEARCH: Optional["LDAPSearch"] = None Signed-off-by: Anders Kaseorg <anders@zulipchat.com>
2020-04-22 01:09:50 +02:00
user_emails: Set[FrozenSet[str]] = set()
for field in fields:
values = CustomProfileFieldValue.objects.filter(field=field)
for value in values:
user_emails.add(frozenset(get_email_from_value(value)))
return user_emails
field_names, field_hints = (set() for i in range(2))
for field in fields:
field_names.add(field.name)
field_hints.add(field.hint)
return (field_hints, field_names, custom_profile_field_values_for(fields))
2018-07-05 20:08:40 +02:00
# test realmauditlog
@getter
def get_realm_audit_log_event_type(r: Realm) -> Set[str]:
realmauditlogs = RealmAuditLog.objects.filter(realm=r).exclude(
event_type__in=[RealmAuditLog.REALM_PLAN_TYPE_CHANGED, RealmAuditLog.STREAM_CREATED]
)
realmauditlog_event_type = {log.event_type for log in realmauditlogs}
return realmauditlog_event_type
@getter
def get_huddle_message(r: Realm) -> str:
huddle_hash = get_huddle_hashes(r)
huddle_id = Huddle.objects.get(huddle_hash=huddle_hash).id
huddle_recipient = Recipient.objects.get(type_id=huddle_id, type=3)
huddle_message = Message.objects.get(recipient=huddle_recipient)
self.assertEqual(huddle_message.content, "test huddle message")
return huddle_message.content
@getter
2020-07-16 16:11:34 +02:00
def get_alertwords(r: Realm) -> Set[str]:
return {rec.word for rec in AlertWord.objects.filter(realm_id=r.id)}
2020-07-16 16:11:34 +02:00
@getter
def get_realm_emoji_names(r: Realm) -> Set[str]:
names = {rec.name for rec in RealmEmoji.objects.filter(realm_id=r.id)}
assert "hawaii" in names
return names
@getter
def get_realm_user_statuses(r: Realm) -> Set[Tuple[str, str, int, str]]:
2021-12-07 18:46:35 +01:00
cordelia = self.example_user("cordelia")
tups = {
(rec.user_profile.full_name, rec.emoji_name, rec.status, rec.status_text)
for rec in UserStatus.objects.filter(user_profile__realm_id=r.id)
}
assert (cordelia.full_name, "hawaii", UserStatus.AWAY, "in Hawaii") in tups
return tups
@getter
def get_realm_emoji_reactions(r: Realm) -> Set[Tuple[str, str]]:
2021-12-07 18:46:35 +01:00
cordelia = self.example_user("cordelia")
tups = {
(rec.emoji_name, rec.user_profile.full_name)
for rec in Reaction.objects.filter(
user_profile__realm_id=r.id, reaction_type=Reaction.REALM_EMOJI
)
}
self.assertEqual(tups, {("hawaii", cordelia.full_name)})
return tups
2018-07-12 16:34:26 +02:00
# test userhotspot
@getter
def get_user_hotspots(r: Realm) -> Set[str]:
user_id = get_user_id(r, "King Hamlet")
hotspots = UserHotspot.objects.filter(user_id=user_id)
2018-07-12 16:34:26 +02:00
user_hotspots = {hotspot.hotspot for hotspot in hotspots}
return user_hotspots
2018-07-14 16:10:45 +02:00
# test muted topics
@getter
2018-07-14 16:10:45 +02:00
def get_muted_topics(r: Realm) -> Set[str]:
user_profile_id = get_user_id(r, "King Hamlet")
muted_topics = UserTopic.objects.filter(
user_profile_id=user_profile_id, visibility_policy=UserTopic.MUTED
)
2018-07-14 16:10:45 +02:00
topic_names = {muted_topic.topic_name for muted_topic in muted_topics}
return topic_names
@getter
def get_muted_users(r: Realm) -> Set[Tuple[str, str, str]]:
mute_objects = MutedUser.objects.filter(user_profile__realm=r)
muter_tuples = {
(
mute_object.user_profile.full_name,
mute_object.muted_user.full_name,
str(mute_object.date_muted),
)
for mute_object in mute_objects
}
return muter_tuples
@getter
2021-12-07 17:25:26 +01:00
def get_user_group_names(r: Realm) -> Set[str]:
return {group.name for group in UserGroup.objects.filter(realm=r)}
@getter
def get_user_membership(r: Realm) -> Set[str]:
usergroup = UserGroup.objects.get(realm=r, name="hamletcharacters")
2018-07-12 13:27:12 +02:00
usergroup_membership = UserGroupMembership.objects.filter(user_group=usergroup)
users = {membership.user_profile.email for membership in usergroup_membership}
return users
# test botstoragedata and botconfigdata
@getter
def get_botstoragedata(r: Realm) -> Dict[str, object]:
bot_profile = UserProfile.objects.get(full_name="bot", realm=r)
bot_storage_data = BotStorageData.objects.get(bot_profile=bot_profile)
return {"key": bot_storage_data.key, "data": bot_storage_data.value}
@getter
def get_botconfigdata(r: Realm) -> Dict[str, object]:
bot_profile = UserProfile.objects.get(full_name="bot", realm=r)
bot_config_data = BotConfigData.objects.get(bot_profile=bot_profile)
return {"key": bot_config_data.key, "data": bot_config_data.value}
# test messages
def get_stream_messages(r: Realm) -> Message:
recipient = get_recipient_stream(r)
messages = Message.objects.filter(recipient=recipient)
return messages
@getter
def get_stream_topics(r: Realm) -> Set[str]:
messages = get_stream_messages(r)
2018-11-10 16:11:12 +01:00
topics = {m.topic_name() for m in messages}
return topics
# test usermessages
@getter
def get_usermessages_user(r: Realm) -> Set[object]:
messages = get_stream_messages(r).order_by("content")
usermessage = UserMessage.objects.filter(message=messages[0])
usermessage_user = {um.user_profile.email for um in usermessage}
return usermessage_user
# tests to make sure that various data-*-ids in rendered_content
# are replaced correctly with the values of newer realm.
@getter
def get_user_mention(r: Realm) -> str:
mentioned_user = UserProfile.objects.get(
delivery_email=self.example_email("hamlet"), realm=r
)
data_user_id = f'data-user-id="{mentioned_user.id}"'
mention_message = get_stream_messages(r).get(rendered_content__contains=data_user_id)
return mention_message.content
@getter
def get_stream_mention(r: Realm) -> str:
mentioned_stream = get_stream("Denmark", r)
data_stream_id = f'data-stream-id="{mentioned_stream.id}"'
mention_message = get_stream_messages(r).get(rendered_content__contains=data_stream_id)
return mention_message.content
@getter
def get_user_group_mention(r: Realm) -> str:
user_group = UserGroup.objects.get(realm=r, name="hamletcharacters")
data_usergroup_id = f'data-user-group-id="{user_group.id}"'
mention_message = get_stream_messages(r).get(
rendered_content__contains=data_usergroup_id
)
return mention_message.content
@getter
def get_userpresence_timestamp(r: Realm) -> Set[object]:
# It should be sufficient to compare UserPresence timestamps to verify
# they got exported/imported correctly.
return set(UserPresence.objects.filter(realm=r).values_list("timestamp", flat=True))
@getter
def get_realm_user_default_values(r: Realm) -> Dict[str, object]:
realm_user_default = RealmUserDefault.objects.get(realm=r)
return {
"default_language": realm_user_default.default_language,
"twenty_four_hour_time": realm_user_default.twenty_four_hour_time,
}
2021-12-07 18:46:35 +01:00
return getters
def test_import_realm_with_no_realm_user_default_table(self) -> None:
original_realm = Realm.objects.get(string_id="zulip")
RealmUserDefault.objects.get(realm=original_realm).delete()
self.export_realm(original_realm)
with self.settings(BILLING_ENABLED=False), self.assertLogs(level="INFO"):
do_import_realm(os.path.join(settings.TEST_WORKER_DIR, "test-export"), "test-zulip")
self.assertTrue(Realm.objects.filter(string_id="test-zulip").exists())
imported_realm = Realm.objects.get(string_id="test-zulip")
# RealmUserDefault table with default values is created, if it is not present in
# the import data.
self.assertTrue(RealmUserDefault.objects.filter(realm=imported_realm).exists())
realm_user_default = RealmUserDefault.objects.get(realm=imported_realm)
self.assertEqual(realm_user_default.default_language, "en")
self.assertEqual(realm_user_default.twenty_four_hour_time, False)
def test_import_files_from_local(self) -> None:
user = self.example_user("hamlet")
realm = user.realm
self.upload_files_for_user(user)
self.upload_files_for_realm(user)
self.export_realm(realm)
with self.settings(BILLING_ENABLED=False), self.assertLogs(level="INFO"):
do_import_realm(os.path.join(settings.TEST_WORKER_DIR, "test-export"), "test-zulip")
imported_realm = Realm.objects.get(string_id="test-zulip")
# Test attachments
uploaded_file = Attachment.objects.get(realm=imported_realm)
self.assert_length(b"zulip!", uploaded_file.size)
attachment_file_path = os.path.join(
settings.LOCAL_UPLOADS_DIR, "files", uploaded_file.path_id
)
self.assertTrue(os.path.isfile(attachment_file_path))
# Test emojis
realm_emoji = RealmEmoji.objects.get(realm=imported_realm)
emoji_path = RealmEmoji.PATH_ID_TEMPLATE.format(
realm_id=imported_realm.id,
emoji_file_name=realm_emoji.file_name,
)
emoji_file_path = os.path.join(settings.LOCAL_UPLOADS_DIR, "avatars", emoji_path)
self.assertTrue(os.path.isfile(emoji_file_path))
# Test avatars
user_profile = UserProfile.objects.get(full_name=user.full_name, realm=imported_realm)
avatar_path_id = user_avatar_path(user_profile) + ".original"
avatar_file_path = os.path.join(settings.LOCAL_UPLOADS_DIR, "avatars", avatar_path_id)
self.assertTrue(os.path.isfile(avatar_file_path))
# Test realm icon and logo
upload_path = upload.upload_backend.realm_avatar_and_logo_path(imported_realm)
full_upload_path = os.path.join(settings.LOCAL_UPLOADS_DIR, upload_path)
test_image_data = read_test_image_file("img.png")
self.assertIsNotNone(test_image_data)
with open(os.path.join(full_upload_path, "icon.original"), "rb") as f:
self.assertEqual(f.read(), test_image_data)
self.assertTrue(os.path.isfile(os.path.join(full_upload_path, "icon.png")))
self.assertEqual(imported_realm.icon_source, Realm.ICON_UPLOADED)
with open(os.path.join(full_upload_path, "logo.original"), "rb") as f:
self.assertEqual(f.read(), test_image_data)
self.assertTrue(os.path.isfile(os.path.join(full_upload_path, "logo.png")))
self.assertEqual(imported_realm.logo_source, Realm.LOGO_UPLOADED)
with open(os.path.join(full_upload_path, "night_logo.original"), "rb") as f:
self.assertEqual(f.read(), test_image_data)
self.assertTrue(os.path.isfile(os.path.join(full_upload_path, "night_logo.png")))
self.assertEqual(imported_realm.night_logo_source, Realm.LOGO_UPLOADED)
@use_s3_backend
def test_import_files_from_s3(self) -> None:
uploads_bucket, avatar_bucket = create_s3_buckets(
settings.S3_AUTH_UPLOADS_BUCKET, settings.S3_AVATAR_BUCKET
)
user = self.example_user("hamlet")
realm = user.realm
self.upload_files_for_realm(user)
self.upload_files_for_user(user)
self.export_realm(realm)
with self.settings(BILLING_ENABLED=False), self.assertLogs(level="INFO"):
do_import_realm(os.path.join(settings.TEST_WORKER_DIR, "test-export"), "test-zulip")
imported_realm = Realm.objects.get(string_id="test-zulip")
test_image_data = read_test_image_file("img.png")
# Test attachments
uploaded_file = Attachment.objects.get(realm=imported_realm)
self.assert_length(b"zulip!", uploaded_file.size)
attachment_content = uploads_bucket.Object(uploaded_file.path_id).get()["Body"].read()
self.assertEqual(b"zulip!", attachment_content)
# Test emojis
realm_emoji = RealmEmoji.objects.get(realm=imported_realm)
emoji_path = RealmEmoji.PATH_ID_TEMPLATE.format(
realm_id=imported_realm.id,
emoji_file_name=realm_emoji.file_name,
)
emoji_key = avatar_bucket.Object(emoji_path)
self.assertIsNotNone(emoji_key.get()["Body"].read())
self.assertEqual(emoji_key.key, emoji_path)
# Test avatars
user_profile = UserProfile.objects.get(full_name=user.full_name, realm=imported_realm)
avatar_path_id = user_avatar_path(user_profile) + ".original"
original_image_key = avatar_bucket.Object(avatar_path_id)
self.assertEqual(original_image_key.key, avatar_path_id)
image_data = avatar_bucket.Object(avatar_path_id).get()["Body"].read()
self.assertEqual(image_data, test_image_data)
# Test realm icon and logo
upload_path = upload.upload_backend.realm_avatar_and_logo_path(imported_realm)
original_icon_path_id = os.path.join(upload_path, "icon.original")
original_icon_key = avatar_bucket.Object(original_icon_path_id)
self.assertEqual(original_icon_key.get()["Body"].read(), test_image_data)
resized_icon_path_id = os.path.join(upload_path, "icon.png")
resized_icon_key = avatar_bucket.Object(resized_icon_path_id)
self.assertEqual(resized_icon_key.key, resized_icon_path_id)
self.assertEqual(imported_realm.icon_source, Realm.ICON_UPLOADED)
original_logo_path_id = os.path.join(upload_path, "logo.original")
original_logo_key = avatar_bucket.Object(original_logo_path_id)
self.assertEqual(original_logo_key.get()["Body"].read(), test_image_data)
resized_logo_path_id = os.path.join(upload_path, "logo.png")
resized_logo_key = avatar_bucket.Object(resized_logo_path_id)
self.assertEqual(resized_logo_key.key, resized_logo_path_id)
self.assertEqual(imported_realm.logo_source, Realm.LOGO_UPLOADED)
night_logo_original_path_id = os.path.join(upload_path, "night_logo.original")
night_logo_original_key = avatar_bucket.Object(night_logo_original_path_id)
self.assertEqual(night_logo_original_key.get()["Body"].read(), test_image_data)
resized_night_logo_path_id = os.path.join(upload_path, "night_logo.png")
resized_night_logo_key = avatar_bucket.Object(resized_night_logo_path_id)
self.assertEqual(resized_night_logo_key.key, resized_night_logo_path_id)
self.assertEqual(imported_realm.night_logo_source, Realm.LOGO_UPLOADED)
def test_get_incoming_message_ids(self) -> None:
import_dir = os.path.join(
settings.DEPLOY_ROOT, "zerver", "tests", "fixtures", "import_fixtures"
)
message_ids = get_incoming_message_ids(
import_dir=import_dir,
sort_by_date=True,
)
self.assertEqual(message_ids, [888, 999, 555])
message_ids = get_incoming_message_ids(
import_dir=import_dir,
sort_by_date=False,
)
self.assertEqual(message_ids, [555, 888, 999])
def test_plan_type(self) -> None:
user = self.example_user("hamlet")
realm = user.realm
do_change_realm_plan_type(realm, Realm.PLAN_TYPE_LIMITED, acting_user=None)
self.upload_files_for_user(user)
self.export_realm(realm)
with self.settings(BILLING_ENABLED=True), self.assertLogs(level="INFO"):
realm = do_import_realm(
os.path.join(settings.TEST_WORKER_DIR, "test-export"), "test-zulip-1"
)
self.assertEqual(realm.plan_type, Realm.PLAN_TYPE_LIMITED)
self.assertEqual(realm.max_invites, 100)
self.assertEqual(realm.upload_quota_gb, 5)
self.assertEqual(realm.message_visibility_limit, 10000)
self.assertTrue(
RealmAuditLog.objects.filter(
realm=realm, event_type=RealmAuditLog.REALM_PLAN_TYPE_CHANGED
).exists()
)
with self.settings(BILLING_ENABLED=False), self.assertLogs(level="INFO"):
realm = do_import_realm(
os.path.join(settings.TEST_WORKER_DIR, "test-export"), "test-zulip-2"
)
self.assertEqual(realm.plan_type, Realm.PLAN_TYPE_SELF_HOSTED)
self.assertEqual(realm.max_invites, 100)
self.assertEqual(realm.upload_quota_gb, None)
self.assertEqual(realm.message_visibility_limit, None)
self.assertTrue(
RealmAuditLog.objects.filter(
realm=realm, event_type=RealmAuditLog.REALM_PLAN_TYPE_CHANGED
).exists()
)
class SingleUserExportTest(ExportFile):
def do_files_test(self, is_s3: bool) -> None:
output_dir = make_export_output_dir()
cordelia = self.example_user("cordelia")
othello = self.example_user("othello")
self.upload_files_for_user(cordelia)
self.upload_files_for_user(othello, emoji_name="bogus") # try to pollute export
with self.assertLogs(level="INFO"):
do_export_user(cordelia, output_dir)
self.verify_uploads(cordelia, is_s3=is_s3)
self.verify_avatars(cordelia)
self.verify_emojis(cordelia, is_s3=is_s3)
def test_local_files(self) -> None:
self.do_files_test(is_s3=False)
@use_s3_backend
def test_s3_files(self) -> None:
create_s3_buckets(settings.S3_AUTH_UPLOADS_BUCKET, settings.S3_AVATAR_BUCKET)
self.do_files_test(is_s3=True)
def test_message_data(self) -> None:
hamlet = self.example_user("hamlet")
cordelia = self.example_user("cordelia")
othello = self.example_user("othello")
polonius = self.example_user("polonius")
self.subscribe(cordelia, "Denmark")
smile_message_id = self.send_stream_message(hamlet, "Denmark", "SMILE!")
check_add_reaction(
user_profile=cordelia,
message_id=smile_message_id,
emoji_name="smile",
emoji_code=None,
reaction_type=None,
)
reaction = Reaction.objects.order_by("id").last()
assert reaction
# Send a message that Cordelia should not have in the export.
self.send_stream_message(othello, "Denmark", "bogus")
hi_stream_message_id = self.send_stream_message(cordelia, "Denmark", "hi stream")
assert most_recent_usermessage(cordelia).message_id == hi_stream_message_id
# Try to fool the export again
self.send_personal_message(othello, hamlet)
self.send_huddle_message(othello, [hamlet, polonius])
hi_hamlet_message_id = self.send_personal_message(cordelia, hamlet, "hi hamlet")
hi_peeps_message_id = self.send_huddle_message(cordelia, [hamlet, othello], "hi peeps")
bye_peeps_message_id = self.send_huddle_message(othello, [cordelia, hamlet], "bye peeps")
bye_hamlet_message_id = self.send_personal_message(cordelia, hamlet, "bye hamlet")
hi_myself_message_id = self.send_personal_message(cordelia, cordelia, "hi myself")
bye_stream_message_id = self.send_stream_message(cordelia, "Denmark", "bye stream")
output_dir = make_export_output_dir()
cordelia = self.example_user("cordelia")
with self.assertLogs(level="INFO"):
do_export_user(cordelia, output_dir)
messages = read_json("messages-000001.json")
huddle_name = "Cordelia, Lear's daughter, King Hamlet, Othello, the Moor of Venice"
excerpt = [
(rec["id"], rec["content"], rec["recipient_name"])
for rec in messages["zerver_message"][-8:]
]
self.assertEqual(
excerpt,
[
(smile_message_id, "SMILE!", "Denmark"),
(hi_stream_message_id, "hi stream", "Denmark"),
(hi_hamlet_message_id, "hi hamlet", hamlet.full_name),
(hi_peeps_message_id, "hi peeps", huddle_name),
(bye_peeps_message_id, "bye peeps", huddle_name),
(bye_hamlet_message_id, "bye hamlet", hamlet.full_name),
(hi_myself_message_id, "hi myself", cordelia.full_name),
(bye_stream_message_id, "bye stream", "Denmark"),
],
)
def test_user_data(self) -> None:
# We register checkers during test setup, and then we call them at the end.
checkers = {}
def checker(f: Callable[[List[Record]], None]) -> Callable[[List[Record]], None]:
# Every checker function that gets decorated here should be named
# after one of the tables that we export in the single-user
# export. The table name then is used by code toward the end of the
# test to determine which portion of the data from users.json
# to pass into the checker.
table_name = f.__name__
assert table_name not in checkers
checkers[table_name] = f
return f
cordelia = self.example_user("cordelia")
hamlet = self.example_user("hamlet")
othello = self.example_user("othello")
realm = cordelia.realm
scotland = get_stream("Scotland", realm)
client = get_client("some_app")
now = timezone_now()
@checker
def zerver_userprofile(records: List[Record]) -> None:
(rec,) = records
self.assertEqual(rec["id"], cordelia.id)
self.assertEqual(rec["email"], cordelia.email)
self.assertEqual(rec["full_name"], cordelia.full_name)
"""
Try to set up the test data roughly in order of table name, where
possible, just to make it a bit easier to read the test.
"""
do_add_alert_words(cordelia, ["pizza"])
do_add_alert_words(hamlet, ["bogus"])
@checker
def zerver_alertword(records: List[Record]) -> None:
self.assertEqual(records[-1]["word"], "pizza")
favorite_city = try_add_realm_custom_profile_field(
realm,
"Favorite city",
CustomProfileField.SHORT_TEXT,
)
def set_favorite_city(user: UserProfile, city: str) -> None:
do_update_user_custom_profile_data_if_changed(
user, [dict(id=favorite_city.id, value=city)]
)
set_favorite_city(cordelia, "Seattle")
set_favorite_city(othello, "Moscow")
@checker
def zerver_customprofilefieldvalue(records: List[Record]) -> None:
(rec,) = records
self.assertEqual(rec["field"], favorite_city.id)
self.assertEqual(rec["rendered_value"], "<p>Seattle</p>")
do_mute_user(cordelia, othello)
do_mute_user(hamlet, cordelia) # should be ignored
@checker
def zerver_muteduser(records: List[Record]) -> None:
self.assertEqual(records[-1]["muted_user"], othello.id)
smile_message_id = self.send_stream_message(hamlet, "Denmark")
check_add_reaction(
user_profile=cordelia,
message_id=smile_message_id,
emoji_name="smile",
emoji_code=None,
reaction_type=None,
)
reaction = Reaction.objects.order_by("id").last()
assert reaction
@checker
def zerver_reaction(records: List[Record]) -> None:
(exported_reaction,) = records
self.assertEqual(
exported_reaction,
dict(
id=reaction.id,
user_profile=cordelia.id,
emoji_name="smile",
reaction_type="unicode_emoji",
emoji_code=reaction.emoji_code,
message=smile_message_id,
),
)
self.subscribe(cordelia, "Scotland")
create_stream_if_needed(realm, "bogus")
self.subscribe(othello, "bogus")
@checker
def zerver_recipient(records: List[Record]) -> None:
last_recipient = Recipient.objects.get(id=records[-1]["id"])
self.assertEqual(last_recipient.type, Recipient.STREAM)
stream_id = last_recipient.type_id
self.assertEqual(stream_id, get_stream("Scotland", realm).id)
@checker
def zerver_stream(records: List[Record]) -> None:
streams = {rec["name"] for rec in records}
self.assertEqual(streams, {"Scotland", "Verona"})
@checker
def zerver_subscription(records: List[Record]) -> None:
last_recipient = Recipient.objects.get(id=records[-1]["recipient"])
self.assertEqual(last_recipient.type, Recipient.STREAM)
stream_id = last_recipient.type_id
self.assertEqual(stream_id, get_stream("Scotland", realm).id)
do_update_user_activity(cordelia.id, client.id, "/some/endpoint", 2, now)
do_update_user_activity(cordelia.id, client.id, "/some/endpoint", 3, now)
do_update_user_activity(othello.id, client.id, "/bogus", 20, now)
@checker
def zerver_useractivity(records: List[Record]) -> None:
(rec,) = records
self.assertEqual(
rec,
dict(
client=client.id,
count=5,
id=rec["id"],
last_visit=rec["last_visit"],
query="/some/endpoint",
user_profile=cordelia.id,
),
)
self.assertEqual(make_datetime(rec["last_visit"]), now)
do_update_user_activity_interval(cordelia, now)
do_update_user_activity_interval(othello, now)
@checker
def zerver_useractivityinterval(records: List[Record]) -> None:
(rec,) = records
self.assertEqual(rec["user_profile"], cordelia.id)
self.assertEqual(make_datetime(rec["start"]), now)
do_update_user_presence(cordelia, client, now, UserPresence.ACTIVE)
do_update_user_presence(othello, client, now, UserPresence.IDLE)
@checker
def zerver_userpresence(records: List[Record]) -> None:
self.assertEqual(records[-1]["status"], UserPresence.ACTIVE)
self.assertEqual(records[-1]["client"], client.id)
self.assertEqual(make_datetime(records[-1]["timestamp"]), now)
do_update_user_status(
user_profile=cordelia,
away=True,
status_text="on vacation",
client_id=client.id,
emoji_name=None,
emoji_code=None,
reaction_type=None,
)
do_update_user_status(
user_profile=othello,
away=False,
status_text="at my desk",
client_id=client.id,
emoji_name=None,
emoji_code=None,
reaction_type=None,
)
@checker
def zerver_userstatus(records: List[Record]) -> None:
rec = records[-1]
self.assertEqual(rec["status_text"], "on vacation")
self.assertEqual(rec["status"], UserStatus.AWAY)
do_mute_topic(cordelia, scotland, "bagpipe music")
do_mute_topic(othello, scotland, "nessie")
@checker
def zerver_usertopic(records: List[Record]) -> None:
rec = records[-1]
self.assertEqual(rec["topic_name"], "bagpipe music")
self.assertEqual(rec["visibility_policy"], UserTopic.MUTED)
"""
For some tables we don't bother with super realistic test data
setup.
"""
UserCount.objects.create(
user=cordelia, realm=realm, property="whatever", value=42, end_time=now
)
UserCount.objects.create(
user=othello, realm=realm, property="bogus", value=999999, end_time=now
)
@checker
def analytics_usercount(records: List[Record]) -> None:
(rec,) = records
self.assertEqual(rec["value"], 42)
UserHotspot.objects.create(user=cordelia, hotspot="topics")
UserHotspot.objects.create(user=othello, hotspot="bogus")
@checker
def zerver_userhotspot(records: List[Record]) -> None:
self.assertEqual(records[-1]["hotspot"], "topics")
"""
The zerver_realmauditlog checker basically assumes that
we subscribed Cordelia to Scotland.
"""
@checker
def zerver_realmauditlog(records: List[Record]) -> None:
self.assertEqual(records[-1]["modified_stream"], scotland.id)
output_dir = make_export_output_dir()
with self.assertLogs(level="INFO"):
do_export_user(cordelia, output_dir)
user = read_json("user.json")
for table_name, f in checkers.items():
f(user[table_name])
for table_name in user:
if table_name not in checkers:
raise AssertionError(
f"""
Please create a checker called "{table_name}"
to check the user["{table_name}"] data in users.json.
Please be thoughtful about where you introduce
the new code--if you read the test, the patterns
for how to test table data should be clear.
Try to mostly keep checkers in alphabetical order.
"""
)