zulip/zerver/actions/realm_emoji.py

87 lines
3.5 KiB
Python
Raw Normal View History

from typing import IO, Dict, Optional
from django.core.exceptions import ValidationError
from django.db import transaction
from django.db.utils import IntegrityError
from django.utils.timezone import now as timezone_now
from django.utils.translation import gettext as _
from zerver.lib.emoji import get_emoji_file_name
from zerver.lib.exceptions import JsonableError
from zerver.lib.thumbnail import THUMBNAIL_ACCEPT_IMAGE_TYPES, BadImageError
from zerver.lib.upload import upload_emoji_image
from zerver.lib.upload.base import INLINE_MIME_TYPES
from zerver.models import Realm, RealmAuditLog, RealmEmoji, UserProfile
from zerver.models.realm_emoji import EmojiInfo, get_all_custom_emoji_for_realm
from zerver.models.users import active_user_ids
from zerver.tornado.django_api import send_event_on_commit
def notify_realm_emoji(realm: Realm, realm_emoji: Dict[str, EmojiInfo]) -> None:
event = dict(type="realm_emoji", op="update", realm_emoji=realm_emoji)
send_event_on_commit(realm, event, active_user_ids(realm.id))
@transaction.atomic(durable=True)
def check_add_realm_emoji(
realm: Realm, name: str, author: UserProfile, image_file: IO[bytes], content_type: str
) -> RealmEmoji:
try:
realm_emoji = RealmEmoji(realm=realm, name=name, author=author)
realm_emoji.full_clean()
realm_emoji.save()
except (IntegrityError, ValidationError):
# This exists to handle races; upload_emoji checked prior to
# calling, and also hand-validated the name, but we're now in
# a transaction. The error string should match the one in
# upload_emoji.
raise JsonableError(_("A custom emoji with this name already exists."))
# This mirrors the check in upload_emoji_image because we want to
# have some reasonable guarantees on the content-type before
# deriving the file extension from it.
if content_type not in THUMBNAIL_ACCEPT_IMAGE_TYPES or content_type not in INLINE_MIME_TYPES:
raise BadImageError(_("Invalid image format"))
emoji_file_name = get_emoji_file_name(content_type, realm_emoji.id)
is_animated = upload_emoji_image(image_file, emoji_file_name, author, content_type)
realm_emoji.file_name = emoji_file_name
realm_emoji.is_animated = is_animated
realm_emoji.save(update_fields=["file_name", "is_animated"])
realm_emoji_dict = get_all_custom_emoji_for_realm(realm.id)
RealmAuditLog.objects.create(
realm=realm,
acting_user=author,
event_type=RealmAuditLog.REALM_EMOJI_ADDED,
event_time=timezone_now(),
extra_data={
"realm_emoji": dict(sorted(realm_emoji_dict.items())),
"added_emoji": realm_emoji_dict[str(realm_emoji.id)],
},
)
notify_realm_emoji(realm_emoji.realm, realm_emoji_dict)
return realm_emoji
@transaction.atomic(durable=True)
def do_remove_realm_emoji(realm: Realm, name: str, *, acting_user: Optional[UserProfile]) -> None:
emoji = RealmEmoji.objects.get(realm=realm, name=name, deactivated=False)
emoji.deactivated = True
emoji.save(update_fields=["deactivated"])
realm_emoji_dict = get_all_custom_emoji_for_realm(realm.id)
RealmAuditLog.objects.create(
realm=realm,
acting_user=acting_user,
event_type=RealmAuditLog.REALM_EMOJI_REMOVED,
event_time=timezone_now(),
extra_data={
"realm_emoji": dict(sorted(realm_emoji_dict.items())),
"deactivated_emoji": realm_emoji_dict[str(emoji.id)],
},
)
notify_realm_emoji(realm, realm_emoji_dict)