zulip/zerver/actions/realm_emoji.py

90 lines
3.3 KiB
Python
Raw Normal View History

from typing import IO, Dict, Optional
import orjson
from django.core.exceptions import ValidationError
from django.db import transaction
from django.db.utils import IntegrityError
from django.utils.timezone import now as timezone_now
from django.utils.translation import gettext as _
from zerver.lib.emoji import get_emoji_file_name
from zerver.lib.exceptions import JsonableError
from zerver.lib.pysa import mark_sanitized
from zerver.lib.upload import upload_emoji_image
from zerver.models import EmojiInfo, Realm, RealmAuditLog, RealmEmoji, UserProfile, active_user_ids
from zerver.tornado.django_api import send_event
def notify_realm_emoji(realm: Realm, realm_emoji: Dict[str, EmojiInfo]) -> None:
event = dict(type="realm_emoji", op="update", realm_emoji=realm_emoji)
transaction.on_commit(lambda: send_event(realm, event, active_user_ids(realm.id)))
def check_add_realm_emoji(
realm: Realm, name: str, author: UserProfile, image_file: IO[bytes]
) -> RealmEmoji:
try:
realm_emoji = RealmEmoji(realm=realm, name=name, author=author)
realm_emoji.full_clean()
realm_emoji.save()
except (IntegrityError, ValidationError):
# Match the string in upload_emoji.
raise JsonableError(_("A custom emoji with this name already exists."))
emoji_file_name = get_emoji_file_name(image_file.name, realm_emoji.id)
# The only user-controlled portion of 'emoji_file_name' is an extension,
# which can not contain '..' or '/' or '\', making it difficult to exploit
emoji_file_name = mark_sanitized(emoji_file_name)
emoji_uploaded_successfully = False
is_animated = False
try:
is_animated = upload_emoji_image(image_file, emoji_file_name, author)
emoji_uploaded_successfully = True
finally:
if not emoji_uploaded_successfully:
realm_emoji.delete()
realm_emoji.file_name = emoji_file_name
realm_emoji.is_animated = is_animated
realm_emoji.save(update_fields=["file_name", "is_animated"])
realm_emoji_dict = realm.get_emoji()
RealmAuditLog.objects.create(
realm=realm,
acting_user=author,
event_type=RealmAuditLog.REALM_EMOJI_ADDED,
event_time=timezone_now(),
extra_data=orjson.dumps(
{
"realm_emoji": dict(sorted(realm_emoji_dict.items())),
"added_emoji": realm_emoji_dict[str(realm_emoji.id)],
}
).decode(),
)
notify_realm_emoji(realm_emoji.realm, realm_emoji_dict)
return realm_emoji
@transaction.atomic(durable=True)
def do_remove_realm_emoji(realm: Realm, name: str, *, acting_user: Optional[UserProfile]) -> None:
emoji = RealmEmoji.objects.get(realm=realm, name=name, deactivated=False)
emoji.deactivated = True
emoji.save(update_fields=["deactivated"])
realm_emoji_dict = realm.get_emoji()
RealmAuditLog.objects.create(
realm=realm,
acting_user=acting_user,
event_type=RealmAuditLog.REALM_EMOJI_REMOVED,
event_time=timezone_now(),
extra_data=orjson.dumps(
{
"realm_emoji": dict(sorted(realm_emoji_dict.items())),
"deactivated_emoji": realm_emoji_dict[str(emoji.id)],
}
).decode(),
)
notify_realm_emoji(realm, realm_emoji_dict)