mirror of https://github.com/zulip/zulip.git
154 lines
5.8 KiB
Python
154 lines
5.8 KiB
Python
from mimetypes import guess_type
|
|
from typing import Union
|
|
|
|
from django.conf import settings
|
|
from django.contrib.auth.models import AnonymousUser
|
|
from django.core.files.uploadedfile import UploadedFile
|
|
from django.http import HttpRequest, HttpResponse, HttpResponseForbidden, HttpResponseNotFound
|
|
from django.shortcuts import redirect
|
|
from django.utils.cache import patch_cache_control
|
|
from django.utils.translation import gettext as _
|
|
from django_sendfile import sendfile
|
|
|
|
from zerver.context_processors import get_valid_realm_from_request
|
|
from zerver.lib.exceptions import JsonableError
|
|
from zerver.lib.response import json_success
|
|
from zerver.lib.upload import (
|
|
INLINE_MIME_TYPES,
|
|
check_upload_within_quota,
|
|
generate_unauthed_file_access_url,
|
|
get_local_file_path,
|
|
get_local_file_path_id_from_token,
|
|
get_signed_upload_url,
|
|
upload_message_image_from_request,
|
|
)
|
|
from zerver.models import UserProfile, validate_attachment_request
|
|
|
|
|
|
def serve_s3(
|
|
request: HttpRequest, url_path: str, url_only: bool, download: bool = False
|
|
) -> HttpResponse:
|
|
url = get_signed_upload_url(url_path, download=download)
|
|
if url_only:
|
|
return json_success(request, data=dict(url=url))
|
|
|
|
return redirect(url)
|
|
|
|
|
|
def serve_local(
|
|
request: HttpRequest, path_id: str, url_only: bool, download: bool = False
|
|
) -> HttpResponse:
|
|
local_path = get_local_file_path(path_id)
|
|
if local_path is None:
|
|
return HttpResponseNotFound("<p>File not found</p>")
|
|
|
|
if url_only:
|
|
url = generate_unauthed_file_access_url(path_id)
|
|
return json_success(request, data=dict(url=url))
|
|
|
|
# Here we determine whether a browser should treat the file like
|
|
# an attachment (and thus clicking a link to it should download)
|
|
# or like a link (and thus clicking a link to it should display it
|
|
# in a browser tab). This is controlled by the
|
|
# Content-Disposition header; `django-sendfile2` sends the
|
|
# attachment-style version of that header if and only if the
|
|
# attachment argument is passed to it. For attachments,
|
|
# django-sendfile2 sets the response['Content-disposition'] like
|
|
# this: `attachment; filename="zulip.txt"; filename*=UTF-8''zulip.txt`.
|
|
# The filename* parameter is omitted for ASCII filenames like this one.
|
|
#
|
|
# The "filename" field (used to name the file when downloaded) is
|
|
# unreliable because it doesn't have a well-defined encoding; the
|
|
# newer filename* field takes precedence, since it uses a
|
|
# consistent format (urlquoted). For more details on filename*
|
|
# and filename, see the below docs:
|
|
# https://developer.mozilla.org/en-US/docs/Web/HTTP/Headers/Content-Disposition
|
|
mimetype, encoding = guess_type(local_path)
|
|
attachment = download or mimetype not in INLINE_MIME_TYPES
|
|
|
|
response = sendfile(
|
|
request, local_path, attachment=attachment, mimetype=mimetype, encoding=encoding
|
|
)
|
|
patch_cache_control(response, private=True, immutable=True)
|
|
return response
|
|
|
|
|
|
def serve_file_download_backend(
|
|
request: HttpRequest, user_profile: UserProfile, realm_id_str: str, filename: str
|
|
) -> HttpResponse:
|
|
return serve_file(request, user_profile, realm_id_str, filename, url_only=False, download=True)
|
|
|
|
|
|
def serve_file_backend(
|
|
request: HttpRequest,
|
|
maybe_user_profile: Union[UserProfile, AnonymousUser],
|
|
realm_id_str: str,
|
|
filename: str,
|
|
) -> HttpResponse:
|
|
return serve_file(request, maybe_user_profile, realm_id_str, filename, url_only=False)
|
|
|
|
|
|
def serve_file_url_backend(
|
|
request: HttpRequest, user_profile: UserProfile, realm_id_str: str, filename: str
|
|
) -> HttpResponse:
|
|
"""
|
|
We should return a signed, short-lived URL
|
|
that the client can use for native mobile download, rather than serving a redirect.
|
|
"""
|
|
|
|
return serve_file(request, user_profile, realm_id_str, filename, url_only=True)
|
|
|
|
|
|
def serve_file(
|
|
request: HttpRequest,
|
|
maybe_user_profile: Union[UserProfile, AnonymousUser],
|
|
realm_id_str: str,
|
|
filename: str,
|
|
url_only: bool = False,
|
|
download: bool = False,
|
|
) -> HttpResponse:
|
|
path_id = f"{realm_id_str}/{filename}"
|
|
realm = get_valid_realm_from_request(request)
|
|
is_authorized = validate_attachment_request(maybe_user_profile, path_id, realm)
|
|
|
|
if is_authorized is None:
|
|
return HttpResponseNotFound(_("<p>File not found.</p>"))
|
|
if not is_authorized:
|
|
return HttpResponseForbidden(_("<p>You are not authorized to view this file.</p>"))
|
|
if settings.LOCAL_UPLOADS_DIR is not None:
|
|
return serve_local(request, path_id, url_only, download=download)
|
|
|
|
return serve_s3(request, path_id, url_only, download=download)
|
|
|
|
|
|
def serve_local_file_unauthed(request: HttpRequest, token: str, filename: str) -> HttpResponse:
|
|
path_id = get_local_file_path_id_from_token(token)
|
|
if path_id is None:
|
|
raise JsonableError(_("Invalid token"))
|
|
if path_id.split("/")[-1] != filename:
|
|
raise JsonableError(_("Invalid filename"))
|
|
|
|
return serve_local(request, path_id, url_only=False)
|
|
|
|
|
|
def upload_file_backend(request: HttpRequest, user_profile: UserProfile) -> HttpResponse:
|
|
if len(request.FILES) == 0:
|
|
raise JsonableError(_("You must specify a file to upload"))
|
|
if len(request.FILES) != 1:
|
|
raise JsonableError(_("You may only upload one file at a time"))
|
|
|
|
user_file = list(request.FILES.values())[0]
|
|
assert isinstance(user_file, UploadedFile)
|
|
file_size = user_file.size
|
|
assert file_size is not None
|
|
if settings.MAX_FILE_UPLOAD_SIZE * 1024 * 1024 < file_size:
|
|
raise JsonableError(
|
|
_("Uploaded file is larger than the allowed limit of {} MiB").format(
|
|
settings.MAX_FILE_UPLOAD_SIZE,
|
|
)
|
|
)
|
|
check_upload_within_quota(user_profile.realm, file_size)
|
|
|
|
uri = upload_message_image_from_request(user_file, user_profile, file_size)
|
|
return json_success(request, data={"uri": uri})
|