upload: Reorder functions into logical groupings.

This commit is contained in:
Alex Vandiver 2023-02-28 03:33:25 +00:00 committed by Tim Abbott
parent 880a3f95a7
commit b31a6dc56c
6 changed files with 312 additions and 293 deletions

View File

@ -57,52 +57,13 @@ if settings.LOCAL_UPLOADS_DIR is not None:
else:
upload_backend = S3UploadBackend() # nocoverage
# Message attachment uploads
def get_public_upload_root_url() -> str:
return upload_backend.get_public_upload_root_url()
def delete_message_attachment(path_id: str) -> bool:
return upload_backend.delete_message_attachment(path_id)
def get_avatar_url(hash_key: str, medium: bool = False) -> str:
return upload_backend.get_avatar_url(hash_key, medium)
def upload_avatar_image(
user_file: IO[bytes],
acting_user_profile: UserProfile,
target_user_profile: UserProfile,
content_type: Optional[str] = None,
) -> None:
upload_backend.upload_avatar_image(
user_file, acting_user_profile, target_user_profile, content_type=content_type
)
def delete_avatar_image(user_profile: UserProfile) -> None:
upload_backend.delete_avatar_image(user_profile)
def copy_avatar(source_profile: UserProfile, target_profile: UserProfile) -> None:
upload_backend.copy_avatar(source_profile, target_profile)
def upload_icon_image(user_file: IO[bytes], user_profile: UserProfile) -> None:
upload_backend.upload_realm_icon_image(user_file, user_profile)
def upload_logo_image(user_file: IO[bytes], user_profile: UserProfile, night: bool) -> None:
upload_backend.upload_realm_logo_image(user_file, user_profile, night)
def upload_emoji_image(
emoji_file: IO[bytes], emoji_file_name: str, user_profile: UserProfile
) -> bool:
return upload_backend.upload_emoji_image(emoji_file, emoji_file_name, user_profile)
def upload_message_attachment(
uploaded_file_name: str,
uploaded_file_size: int,
@ -145,16 +106,54 @@ def upload_message_attachment_from_request(
)
def upload_export_tarball(
realm: Realm, tarball_path: str, percent_callback: Optional[Callable[[Any], None]] = None
) -> str:
return upload_backend.upload_export_tarball(
realm, tarball_path, percent_callback=percent_callback
def delete_message_attachment(path_id: str) -> bool:
return upload_backend.delete_message_attachment(path_id)
# Avatar image uploads
def get_avatar_url(hash_key: str, medium: bool = False) -> str:
return upload_backend.get_avatar_url(hash_key, medium)
def upload_avatar_image(
user_file: IO[bytes],
acting_user_profile: UserProfile,
target_user_profile: UserProfile,
content_type: Optional[str] = None,
) -> None:
upload_backend.upload_avatar_image(
user_file, acting_user_profile, target_user_profile, content_type=content_type
)
def delete_export_tarball(export_path: str) -> Optional[str]:
return upload_backend.delete_export_tarball(export_path)
def copy_avatar(source_profile: UserProfile, target_profile: UserProfile) -> None:
upload_backend.copy_avatar(source_profile, target_profile)
def delete_avatar_image(user_profile: UserProfile) -> None:
upload_backend.delete_avatar_image(user_profile)
# Realm icon and logo uploads
def upload_icon_image(user_file: IO[bytes], user_profile: UserProfile) -> None:
upload_backend.upload_realm_icon_image(user_file, user_profile)
def upload_logo_image(user_file: IO[bytes], user_profile: UserProfile, night: bool) -> None:
upload_backend.upload_realm_logo_image(user_file, user_profile, night)
# Realm emoji uploads
def upload_emoji_image(
emoji_file: IO[bytes], emoji_file_name: str, user_profile: UserProfile
) -> bool:
return upload_backend.upload_emoji_image(emoji_file, emoji_file_name, user_profile)
def get_emoji_file_content(
@ -205,3 +204,18 @@ def handle_reupload_emojis_event(realm: Realm, logger: logging.Logger) -> None:
logger.info("Reuploading emoji %s", realm_emoji.id)
realm_emoji.is_animated = upload_emoji_image(emoji_bytes_io, emoji_filename, user_profile)
realm_emoji.save(update_fields=["is_animated"])
# Export tarballs
def upload_export_tarball(
realm: Realm, tarball_path: str, percent_callback: Optional[Callable[[Any], None]] = None
) -> str:
return upload_backend.upload_export_tarball(
realm, tarball_path, percent_callback=percent_callback
)
def delete_export_tarball(export_path: str) -> Optional[str]:
return upload_backend.delete_export_tarball(export_path)

View File

@ -185,6 +185,7 @@ def resize_emoji(
class ZulipUploadBackend:
# Message attachment uploads
def get_public_upload_root_url(self) -> str:
raise NotImplementedError
@ -202,6 +203,13 @@ class ZulipUploadBackend:
) -> str:
raise NotImplementedError
def delete_message_attachment(self, path_id: str) -> bool:
raise NotImplementedError
# Avatar image uploads
def get_avatar_url(self, hash_key: str, medium: bool = False) -> str:
raise NotImplementedError
def upload_avatar_image(
self,
user_file: IO[bytes],
@ -211,25 +219,26 @@ class ZulipUploadBackend:
) -> None:
raise NotImplementedError
def delete_avatar_image(self, user: UserProfile) -> None:
raise NotImplementedError
def delete_message_attachment(self, path_id: str) -> bool:
raise NotImplementedError
def get_avatar_url(self, hash_key: str, medium: bool = False) -> str:
raise NotImplementedError
def copy_avatar(self, source_profile: UserProfile, target_profile: UserProfile) -> None:
raise NotImplementedError
def ensure_avatar_image(self, user_profile: UserProfile, is_medium: bool = False) -> None:
raise NotImplementedError
def delete_avatar_image(self, user: UserProfile) -> None:
raise NotImplementedError
# Realm icon and logo uploads
def realm_avatar_and_logo_path(self, realm: Realm) -> str:
return os.path.join(str(realm.id), "realm")
def get_realm_icon_url(self, realm_id: int, version: int) -> str:
raise NotImplementedError
def upload_realm_icon_image(self, icon_file: IO[bytes], user_profile: UserProfile) -> None:
raise NotImplementedError
def get_realm_icon_url(self, realm_id: int, version: int) -> str:
def get_realm_logo_url(self, realm_id: int, version: int, night: bool) -> str:
raise NotImplementedError
def upload_realm_logo_image(
@ -237,7 +246,8 @@ class ZulipUploadBackend:
) -> None:
raise NotImplementedError
def get_realm_logo_url(self, realm_id: int, version: int, night: bool) -> str:
# Realm emoji uploads
def get_emoji_url(self, emoji_file_name: str, realm_id: int, still: bool = False) -> str:
raise NotImplementedError
def upload_emoji_image(
@ -245,7 +255,8 @@ class ZulipUploadBackend:
) -> bool:
raise NotImplementedError
def get_emoji_url(self, emoji_file_name: str, realm_id: int, still: bool = False) -> str:
# Export tarballs
def get_export_tarball_url(self, realm: Realm, export_path: str) -> str:
raise NotImplementedError
def upload_export_tarball(
@ -259,12 +270,6 @@ class ZulipUploadBackend:
def delete_export_tarball(self, export_path: str) -> Optional[str]:
raise NotImplementedError
def get_export_tarball_url(self, realm: Realm, export_path: str) -> str:
raise NotImplementedError
def realm_avatar_and_logo_path(self, realm: Realm) -> str:
return os.path.join(str(realm.id), "realm")
def create_attachment(
file_name: str, path_id: str, user_profile: UserProfile, realm: Realm, file_size: int

View File

@ -99,6 +99,10 @@ class LocalUploadBackend(ZulipUploadBackend):
def delete_message_attachment(self, path_id: str) -> bool:
return delete_local_file("files", path_id)
def get_avatar_url(self, hash_key: str, medium: bool = False) -> str:
medium_suffix = "-medium" if medium else ""
return f"/user_avatars/{hash_key}{medium_suffix}.png"
def write_avatar_images(self, file_path: str, image_data: bytes) -> None:
write_local_file("avatars", file_path + ".original", image_data)
@ -120,17 +124,6 @@ class LocalUploadBackend(ZulipUploadBackend):
image_data = user_file.read()
self.write_avatar_images(file_path, image_data)
def delete_avatar_image(self, user: UserProfile) -> None:
path_id = user_avatar_path(user)
delete_local_file("avatars", path_id + ".original")
delete_local_file("avatars", path_id + ".png")
delete_local_file("avatars", path_id + "-medium.png")
def get_avatar_url(self, hash_key: str, medium: bool = False) -> str:
medium_suffix = "-medium" if medium else ""
return f"/user_avatars/{hash_key}{medium_suffix}.png"
def copy_avatar(self, source_profile: UserProfile, target_profile: UserProfile) -> None:
source_file_path = user_avatar_path(source_profile)
target_file_path = user_avatar_path(target_profile)
@ -138,40 +131,6 @@ class LocalUploadBackend(ZulipUploadBackend):
image_data = read_local_file("avatars", source_file_path + ".original")
self.write_avatar_images(target_file_path, image_data)
def upload_realm_icon_image(self, icon_file: IO[bytes], user_profile: UserProfile) -> None:
upload_path = self.realm_avatar_and_logo_path(user_profile.realm)
image_data = icon_file.read()
write_local_file("avatars", os.path.join(upload_path, "icon.original"), image_data)
resized_data = resize_avatar(image_data)
write_local_file("avatars", os.path.join(upload_path, "icon.png"), resized_data)
def get_realm_icon_url(self, realm_id: int, version: int) -> str:
return f"/user_avatars/{realm_id}/realm/icon.png?version={version}"
def upload_realm_logo_image(
self, logo_file: IO[bytes], user_profile: UserProfile, night: bool
) -> None:
upload_path = self.realm_avatar_and_logo_path(user_profile.realm)
if night:
original_file = "night_logo.original"
resized_file = "night_logo.png"
else:
original_file = "logo.original"
resized_file = "logo.png"
image_data = logo_file.read()
write_local_file("avatars", os.path.join(upload_path, original_file), image_data)
resized_data = resize_logo(image_data)
write_local_file("avatars", os.path.join(upload_path, resized_file), resized_data)
def get_realm_logo_url(self, realm_id: int, version: int, night: bool) -> str:
if night:
file_name = "night_logo.png"
else:
file_name = "logo.png"
return f"/user_avatars/{realm_id}/realm/{file_name}?version={version}"
def ensure_avatar_image(self, user_profile: UserProfile, is_medium: bool = False) -> None:
file_extension = "-medium.png" if is_medium else ".png"
file_path = user_avatar_path(user_profile)
@ -195,6 +154,64 @@ class LocalUploadBackend(ZulipUploadBackend):
resized_avatar = resize_avatar(image_data)
write_local_file("avatars", file_path + file_extension, resized_avatar)
def delete_avatar_image(self, user: UserProfile) -> None:
path_id = user_avatar_path(user)
delete_local_file("avatars", path_id + ".original")
delete_local_file("avatars", path_id + ".png")
delete_local_file("avatars", path_id + "-medium.png")
def get_realm_icon_url(self, realm_id: int, version: int) -> str:
return f"/user_avatars/{realm_id}/realm/icon.png?version={version}"
def upload_realm_icon_image(self, icon_file: IO[bytes], user_profile: UserProfile) -> None:
upload_path = self.realm_avatar_and_logo_path(user_profile.realm)
image_data = icon_file.read()
write_local_file("avatars", os.path.join(upload_path, "icon.original"), image_data)
resized_data = resize_avatar(image_data)
write_local_file("avatars", os.path.join(upload_path, "icon.png"), resized_data)
def get_realm_logo_url(self, realm_id: int, version: int, night: bool) -> str:
if night:
file_name = "night_logo.png"
else:
file_name = "logo.png"
return f"/user_avatars/{realm_id}/realm/{file_name}?version={version}"
def upload_realm_logo_image(
self, logo_file: IO[bytes], user_profile: UserProfile, night: bool
) -> None:
upload_path = self.realm_avatar_and_logo_path(user_profile.realm)
if night:
original_file = "night_logo.original"
resized_file = "night_logo.png"
else:
original_file = "logo.original"
resized_file = "logo.png"
image_data = logo_file.read()
write_local_file("avatars", os.path.join(upload_path, original_file), image_data)
resized_data = resize_logo(image_data)
write_local_file("avatars", os.path.join(upload_path, resized_file), resized_data)
def get_emoji_url(self, emoji_file_name: str, realm_id: int, still: bool = False) -> str:
if still:
return os.path.join(
"/user_avatars",
RealmEmoji.STILL_PATH_ID_TEMPLATE.format(
realm_id=realm_id,
emoji_filename_without_extension=os.path.splitext(emoji_file_name)[0],
),
)
else:
return os.path.join(
"/user_avatars",
RealmEmoji.PATH_ID_TEMPLATE.format(
realm_id=realm_id, emoji_file_name=emoji_file_name
),
)
def upload_emoji_image(
self, emoji_file: IO[bytes], emoji_file_name: str, user_profile: UserProfile
) -> bool:
@ -216,22 +233,9 @@ class LocalUploadBackend(ZulipUploadBackend):
write_local_file("avatars", still_path, still_image_data)
return is_animated
def get_emoji_url(self, emoji_file_name: str, realm_id: int, still: bool = False) -> str:
if still:
return os.path.join(
"/user_avatars",
RealmEmoji.STILL_PATH_ID_TEMPLATE.format(
realm_id=realm_id,
emoji_filename_without_extension=os.path.splitext(emoji_file_name)[0],
),
)
else:
return os.path.join(
"/user_avatars",
RealmEmoji.PATH_ID_TEMPLATE.format(
realm_id=realm_id, emoji_file_name=emoji_file_name
),
)
def get_export_tarball_url(self, realm: Realm, export_path: str) -> str:
# export_path has a leading `/`
return realm.uri + export_path
def upload_export_tarball(
self,
@ -258,7 +262,3 @@ class LocalUploadBackend(ZulipUploadBackend):
if delete_local_file("avatars", file_path):
return export_path
return None
def get_export_tarball_url(self, realm: Realm, export_path: str) -> str:
# export_path has a leading `/`
return realm.uri + export_path

View File

@ -123,6 +123,34 @@ class S3UploadBackend(ZulipUploadBackend):
self._boto_client: Optional[S3Client] = None
self.public_upload_url_base = self.construct_public_upload_url_base()
def get_boto_client(self) -> S3Client:
"""
Creating the client takes a long time so we need to cache it.
"""
if self._boto_client is None:
config = Config(signature_version=botocore.UNSIGNED)
self._boto_client = self.session.client(
"s3",
region_name=settings.S3_REGION,
endpoint_url=settings.S3_ENDPOINT_URL,
config=config,
)
return self._boto_client
def delete_file_from_s3(self, path_id: str, bucket: Bucket) -> bool:
key = bucket.Object(path_id)
try:
key.load()
except botocore.exceptions.ClientError:
file_name = path_id.split("/")[-1]
logging.warning(
"%s does not exist. Its entry in the database will be removed.", file_name
)
return False
key.delete()
return True
def construct_public_upload_url_base(self) -> str:
# Return the pattern for public URL for a key in the S3 Avatar bucket.
# For Amazon S3 itself, this will return the following:
@ -156,6 +184,9 @@ class S3UploadBackend(ZulipUploadBackend):
(split_url.scheme, split_url.netloc, split_url.path[: -len(DUMMY_KEY)], "", "")
)
def get_public_upload_root_url(self) -> str:
return self.public_upload_url_base
def get_public_upload_url(
self,
key: str,
@ -163,37 +194,6 @@ class S3UploadBackend(ZulipUploadBackend):
assert not key.startswith("/")
return urllib.parse.urljoin(self.public_upload_url_base, key)
def get_boto_client(self) -> S3Client:
"""
Creating the client takes a long time so we need to cache it.
"""
if self._boto_client is None:
config = Config(signature_version=botocore.UNSIGNED)
self._boto_client = self.session.client(
"s3",
region_name=settings.S3_REGION,
endpoint_url=settings.S3_ENDPOINT_URL,
config=config,
)
return self._boto_client
def delete_file_from_s3(self, path_id: str, bucket: Bucket) -> bool:
key = bucket.Object(path_id)
try:
key.load()
except botocore.exceptions.ClientError:
file_name = path_id.split("/")[-1]
logging.warning(
"%s does not exist. Its entry in the database will be removed.", file_name
)
return False
key.delete()
return True
def get_public_upload_root_url(self) -> str:
return self.public_upload_url_base
def generate_message_upload_path(self, realm_id: str, uploaded_file_name: str) -> str:
return "/".join(
[
@ -269,6 +269,14 @@ class S3UploadBackend(ZulipUploadBackend):
# See avatar_url in avatar.py for URL. (That code also handles the case
# that users use gravatar.)
def get_avatar_key(self, file_name: str) -> Object:
key = self.avatar_bucket.Object(file_name)
return key
def get_avatar_url(self, hash_key: str, medium: bool = False) -> str:
medium_suffix = "-medium.png" if medium else ""
return self.get_public_upload_url(f"{hash_key}{medium_suffix}")
def upload_avatar_image(
self,
user_file: IO[bytes],
@ -283,17 +291,6 @@ class S3UploadBackend(ZulipUploadBackend):
image_data = user_file.read()
self.write_avatar_images(s3_file_name, target_user_profile, image_data, content_type)
def delete_avatar_image(self, user: UserProfile) -> None:
path_id = user_avatar_path(user)
self.delete_file_from_s3(path_id + ".original", self.avatar_bucket)
self.delete_file_from_s3(path_id + "-medium.png", self.avatar_bucket)
self.delete_file_from_s3(path_id, self.avatar_bucket)
def get_avatar_key(self, file_name: str) -> Object:
key = self.avatar_bucket.Object(file_name)
return key
def copy_avatar(self, source_profile: UserProfile, target_profile: UserProfile) -> None:
s3_source_file_name = user_avatar_path(source_profile)
s3_target_file_name = user_avatar_path(target_profile)
@ -304,13 +301,38 @@ class S3UploadBackend(ZulipUploadBackend):
self.write_avatar_images(s3_target_file_name, target_profile, image_data, content_type)
def get_avatar_url(self, hash_key: str, medium: bool = False) -> str:
medium_suffix = "-medium.png" if medium else ""
return self.get_public_upload_url(f"{hash_key}{medium_suffix}")
def ensure_avatar_image(self, user_profile: UserProfile, is_medium: bool = False) -> None:
# BUG: The else case should be user_avatar_path(user_profile) + ".png".
# See #12852 for details on this bug and how to migrate it.
file_extension = "-medium.png" if is_medium else ""
file_path = user_avatar_path(user_profile)
s3_file_name = file_path
def get_export_tarball_url(self, realm: Realm, export_path: str) -> str:
# export_path has a leading /
return self.get_public_upload_url(export_path[1:])
key = self.avatar_bucket.Object(file_path + ".original")
image_data = key.get()["Body"].read()
if is_medium:
resized_avatar = resize_avatar(image_data, MEDIUM_AVATAR_SIZE)
else:
resized_avatar = resize_avatar(image_data)
upload_image_to_s3(
self.avatar_bucket,
s3_file_name + file_extension,
"image/png",
user_profile,
resized_avatar,
)
def delete_avatar_image(self, user: UserProfile) -> None:
path_id = user_avatar_path(user)
self.delete_file_from_s3(path_id + ".original", self.avatar_bucket)
self.delete_file_from_s3(path_id + "-medium.png", self.avatar_bucket)
self.delete_file_from_s3(path_id, self.avatar_bucket)
def get_realm_icon_url(self, realm_id: int, version: int) -> str:
public_url = self.get_public_upload_url(f"{realm_id}/realm/icon.png")
return public_url + f"?version={version}"
def upload_realm_icon_image(self, icon_file: IO[bytes], user_profile: UserProfile) -> None:
content_type = guess_type(icon_file.name)[0]
@ -336,8 +358,12 @@ class S3UploadBackend(ZulipUploadBackend):
# See avatar_url in avatar.py for URL. (That code also handles the case
# that users use gravatar.)
def get_realm_icon_url(self, realm_id: int, version: int) -> str:
public_url = self.get_public_upload_url(f"{realm_id}/realm/icon.png")
def get_realm_logo_url(self, realm_id: int, version: int, night: bool) -> str:
if not night:
file_name = "logo.png"
else:
file_name = "night_logo.png"
public_url = self.get_public_upload_url(f"{realm_id}/realm/{file_name}")
return public_url + f"?version={version}"
def upload_realm_logo_image(
@ -370,35 +396,18 @@ class S3UploadBackend(ZulipUploadBackend):
# See avatar_url in avatar.py for URL. (That code also handles the case
# that users use gravatar.)
def get_realm_logo_url(self, realm_id: int, version: int, night: bool) -> str:
if not night:
file_name = "logo.png"
def get_emoji_url(self, emoji_file_name: str, realm_id: int, still: bool = False) -> str:
if still:
emoji_path = RealmEmoji.STILL_PATH_ID_TEMPLATE.format(
realm_id=realm_id,
emoji_filename_without_extension=os.path.splitext(emoji_file_name)[0],
)
return self.get_public_upload_url(emoji_path)
else:
file_name = "night_logo.png"
public_url = self.get_public_upload_url(f"{realm_id}/realm/{file_name}")
return public_url + f"?version={version}"
def ensure_avatar_image(self, user_profile: UserProfile, is_medium: bool = False) -> None:
# BUG: The else case should be user_avatar_path(user_profile) + ".png".
# See #12852 for details on this bug and how to migrate it.
file_extension = "-medium.png" if is_medium else ""
file_path = user_avatar_path(user_profile)
s3_file_name = file_path
key = self.avatar_bucket.Object(file_path + ".original")
image_data = key.get()["Body"].read()
if is_medium:
resized_avatar = resize_avatar(image_data, MEDIUM_AVATAR_SIZE)
else:
resized_avatar = resize_avatar(image_data)
upload_image_to_s3(
self.avatar_bucket,
s3_file_name + file_extension,
"image/png",
user_profile,
resized_avatar,
)
emoji_path = RealmEmoji.PATH_ID_TEMPLATE.format(
realm_id=realm_id, emoji_file_name=emoji_file_name
)
return self.get_public_upload_url(emoji_path)
def upload_emoji_image(
self, emoji_file: IO[bytes], emoji_file_name: str, user_profile: UserProfile
@ -442,18 +451,9 @@ class S3UploadBackend(ZulipUploadBackend):
return is_animated
def get_emoji_url(self, emoji_file_name: str, realm_id: int, still: bool = False) -> str:
if still:
emoji_path = RealmEmoji.STILL_PATH_ID_TEMPLATE.format(
realm_id=realm_id,
emoji_filename_without_extension=os.path.splitext(emoji_file_name)[0],
)
return self.get_public_upload_url(emoji_path)
else:
emoji_path = RealmEmoji.PATH_ID_TEMPLATE.format(
realm_id=realm_id, emoji_file_name=emoji_file_name
)
return self.get_public_upload_url(emoji_path)
def get_export_tarball_url(self, realm: Realm, export_path: str) -> str:
# export_path has a leading /
return self.get_public_upload_url(export_path[1:])
def upload_export_tarball(
self,

View File

@ -145,27 +145,6 @@ class LocalStorageTest(UploadSerializeMixin, ZulipTestCase):
with open(output_path, "rb") as original_file:
self.assertEqual(resized_avatar, original_file.read())
def test_emoji_upload(self) -> None:
user_profile = self.example_user("hamlet")
file_name = "emoji.png"
with get_test_image_file("img.png") as image_file:
upload_emoji_image(image_file, file_name, user_profile)
emoji_path = RealmEmoji.PATH_ID_TEMPLATE.format(
realm_id=user_profile.realm_id,
emoji_file_name=file_name,
)
assert settings.LOCAL_AVATARS_DIR is not None
file_path = os.path.join(settings.LOCAL_AVATARS_DIR, emoji_path)
with open(file_path + ".original", "rb") as original_file:
self.assertEqual(read_test_image_file("img.png"), original_file.read())
expected_size = (DEFAULT_EMOJI_SIZE, DEFAULT_EMOJI_SIZE)
with Image.open(file_path) as resized_image:
self.assertEqual(expected_size, resized_image.size)
def test_get_emoji_url(self) -> None:
user_profile = self.example_user("hamlet")
file_name = "emoji.png"
@ -203,6 +182,27 @@ class LocalStorageTest(UploadSerializeMixin, ZulipTestCase):
expected_still_url = f"/user_avatars/{still_emoji_path}"
self.assertEqual(expected_still_url, still_url)
def test_emoji_upload(self) -> None:
user_profile = self.example_user("hamlet")
file_name = "emoji.png"
with get_test_image_file("img.png") as image_file:
upload_emoji_image(image_file, file_name, user_profile)
emoji_path = RealmEmoji.PATH_ID_TEMPLATE.format(
realm_id=user_profile.realm_id,
emoji_file_name=file_name,
)
assert settings.LOCAL_AVATARS_DIR is not None
file_path = os.path.join(settings.LOCAL_AVATARS_DIR, emoji_path)
with open(file_path + ".original", "rb") as original_file:
self.assertEqual(read_test_image_file("img.png"), original_file.read())
expected_size = (DEFAULT_EMOJI_SIZE, DEFAULT_EMOJI_SIZE)
with Image.open(file_path) as resized_image:
self.assertEqual(expected_size, resized_image.size)
def test_tarball_upload_and_deletion(self) -> None:
user_profile = self.example_user("iago")
self.assertTrue(user_profile.is_realm_admin)

View File

@ -288,6 +288,35 @@ class S3Test(ZulipTestCase):
target_medium_image_data = target_medium_image_key.get()["Body"].read()
self.assertEqual(source_medium_image_data, target_medium_image_data)
@use_s3_backend
def test_ensure_avatar_image(self) -> None:
bucket = create_s3_buckets(settings.S3_AVATAR_BUCKET)[0]
user_profile = self.example_user("hamlet")
base_file_path = user_avatar_path(user_profile)
# Bug: This should have + ".png", but the implementation is wrong.
file_path = base_file_path
original_file_path = base_file_path + ".original"
medium_file_path = base_file_path + "-medium.png"
with get_test_image_file("img.png") as image_file:
zerver.lib.upload.upload_backend.upload_avatar_image(
image_file, user_profile, user_profile
)
key = bucket.Object(original_file_path)
image_data = key.get()["Body"].read()
zerver.lib.upload.upload_backend.ensure_avatar_image(user_profile)
resized_avatar = resize_avatar(image_data)
key = bucket.Object(file_path)
self.assertEqual(resized_avatar, key.get()["Body"].read())
zerver.lib.upload.upload_backend.ensure_avatar_image(user_profile, is_medium=True)
resized_avatar = resize_avatar(image_data, MEDIUM_AVATAR_SIZE)
key = bucket.Object(medium_file_path)
self.assertEqual(resized_avatar, key.get()["Body"].read())
@use_s3_backend
def test_delete_avatar_image(self) -> None:
bucket = create_s3_buckets(settings.S3_AVATAR_BUCKET)[0]
@ -362,57 +391,6 @@ class S3Test(ZulipTestCase):
self._test_upload_logo_image(night=False, file_name="logo")
self._test_upload_logo_image(night=True, file_name="night_logo")
@use_s3_backend
def test_upload_emoji_image(self) -> None:
bucket = create_s3_buckets(settings.S3_AVATAR_BUCKET)[0]
user_profile = self.example_user("hamlet")
emoji_name = "emoji.png"
with get_test_image_file("img.png") as image_file:
zerver.lib.upload.upload_backend.upload_emoji_image(
image_file, emoji_name, user_profile
)
emoji_path = RealmEmoji.PATH_ID_TEMPLATE.format(
realm_id=user_profile.realm_id,
emoji_file_name=emoji_name,
)
original_key = bucket.Object(emoji_path + ".original")
self.assertEqual(read_test_image_file("img.png"), original_key.get()["Body"].read())
resized_data = bucket.Object(emoji_path).get()["Body"].read()
resized_image = Image.open(io.BytesIO(resized_data))
self.assertEqual(resized_image.size, (DEFAULT_EMOJI_SIZE, DEFAULT_EMOJI_SIZE))
@use_s3_backend
def test_ensure_avatar_image(self) -> None:
bucket = create_s3_buckets(settings.S3_AVATAR_BUCKET)[0]
user_profile = self.example_user("hamlet")
base_file_path = user_avatar_path(user_profile)
# Bug: This should have + ".png", but the implementation is wrong.
file_path = base_file_path
original_file_path = base_file_path + ".original"
medium_file_path = base_file_path + "-medium.png"
with get_test_image_file("img.png") as image_file:
zerver.lib.upload.upload_backend.upload_avatar_image(
image_file, user_profile, user_profile
)
key = bucket.Object(original_file_path)
image_data = key.get()["Body"].read()
zerver.lib.upload.upload_backend.ensure_avatar_image(user_profile)
resized_avatar = resize_avatar(image_data)
key = bucket.Object(file_path)
self.assertEqual(resized_avatar, key.get()["Body"].read())
zerver.lib.upload.upload_backend.ensure_avatar_image(user_profile, is_medium=True)
resized_avatar = resize_avatar(image_data, MEDIUM_AVATAR_SIZE)
key = bucket.Object(medium_file_path)
self.assertEqual(resized_avatar, key.get()["Body"].read())
@use_s3_backend
def test_get_emoji_url(self) -> None:
emoji_name = "emoji.png"
@ -442,6 +420,28 @@ class S3Test(ZulipTestCase):
expected_still_url = f"https://{bucket}.s3.amazonaws.com/{still_path}"
self.assertEqual(expected_still_url, still_url)
@use_s3_backend
def test_upload_emoji_image(self) -> None:
bucket = create_s3_buckets(settings.S3_AVATAR_BUCKET)[0]
user_profile = self.example_user("hamlet")
emoji_name = "emoji.png"
with get_test_image_file("img.png") as image_file:
zerver.lib.upload.upload_backend.upload_emoji_image(
image_file, emoji_name, user_profile
)
emoji_path = RealmEmoji.PATH_ID_TEMPLATE.format(
realm_id=user_profile.realm_id,
emoji_file_name=emoji_name,
)
original_key = bucket.Object(emoji_path + ".original")
self.assertEqual(read_test_image_file("img.png"), original_key.get()["Body"].read())
resized_data = bucket.Object(emoji_path).get()["Body"].read()
resized_image = Image.open(io.BytesIO(resized_data))
self.assertEqual(resized_image.size, (DEFAULT_EMOJI_SIZE, DEFAULT_EMOJI_SIZE))
@use_s3_backend
def test_tarball_upload_and_deletion(self) -> None:
bucket = create_s3_buckets(settings.S3_AVATAR_BUCKET)[0]