Annotate zerver.lib.upload.

This commit is contained in:
medullaskyline 2016-06-04 18:54:32 -07:00 committed by Tim Abbott
parent 1bfe566c8d
commit 47c3ec1283
1 changed files with 26 additions and 0 deletions

View File

@ -1,26 +1,33 @@
from __future__ import absolute_import from __future__ import absolute_import
from typing import Optional, Tuple
from django.utils.translation import ugettext as _ from django.utils.translation import ugettext as _
from django.conf import settings from django.conf import settings
from django.template.defaultfilters import slugify from django.template.defaultfilters import slugify
from django.utils.encoding import force_text from django.utils.encoding import force_text
from django.core.files import File
from django.http import HttpRequest
from jinja2 import Markup as mark_safe from jinja2 import Markup as mark_safe
import unicodedata import unicodedata
from zerver.lib.avatar import user_avatar_hash from zerver.lib.avatar import user_avatar_hash
from zerver.lib.request import JsonableError from zerver.lib.request import JsonableError
from boto.s3.bucket import Bucket
from boto.s3.key import Key from boto.s3.key import Key
from boto.s3.connection import S3Connection from boto.s3.connection import S3Connection
from mimetypes import guess_type, guess_extension from mimetypes import guess_type, guess_extension
from zerver.models import get_user_profile_by_id from zerver.models import get_user_profile_by_id
from zerver.models import Attachment from zerver.models import Attachment
from zerver.models import Realm, UserProfile, UserMessage
import base64 import base64
import os import os
import re import re
import six
from PIL import Image, ImageOps from PIL import Image, ImageOps
from six import text_type
from six.moves import cStringIO as StringIO from six.moves import cStringIO as StringIO
import random import random
import logging import logging
@ -41,6 +48,7 @@ import logging
# through a sanitization function. # through a sanitization function.
def sanitize_name(value): def sanitize_name(value):
# type: (six.text_type) -> str
""" """
Sanitizes a value to be safe to store in a Linux filesystem, in Sanitizes a value to be safe to store in a Linux filesystem, in
S3, and in a URL. So unicode is allowed, but not special S3, and in a URL. So unicode is allowed, but not special
@ -58,12 +66,14 @@ def sanitize_name(value):
return mark_safe(re.sub('[-\s]+', '-', value, flags=re.U)) return mark_safe(re.sub('[-\s]+', '-', value, flags=re.U))
def random_name(bytes=60): def random_name(bytes=60):
# type: (int) -> str
return base64.urlsafe_b64encode(os.urandom(bytes)) return base64.urlsafe_b64encode(os.urandom(bytes))
class BadImageError(JsonableError): class BadImageError(JsonableError):
pass pass
def resize_avatar(image_data): def resize_avatar(image_data):
# type: (str) -> str
AVATAR_SIZE = 100 AVATAR_SIZE = 100
try: try:
im = Image.open(StringIO(image_data)) im = Image.open(StringIO(image_data))
@ -78,6 +88,7 @@ def resize_avatar(image_data):
### S3 ### S3
def get_bucket(conn, bucket_name): def get_bucket(conn, bucket_name):
# type: (S3Connection, str) -> Bucket
# Calling get_bucket() with validate=True can apparently lead # Calling get_bucket() with validate=True can apparently lead
# to expensive S3 bills: # to expensive S3 bills:
# http://www.appneta.com/blog/s3-list-get-bucket-default/ # http://www.appneta.com/blog/s3-list-get-bucket-default/
@ -96,6 +107,7 @@ def upload_image_to_s3(
user_profile, user_profile,
contents, contents,
): ):
# type: (str, str, str, UserProfile, str) -> None
conn = S3Connection(settings.S3_KEY, settings.S3_SECRET_KEY) conn = S3Connection(settings.S3_KEY, settings.S3_SECRET_KEY)
bucket = get_bucket(conn, bucket_name) bucket = get_bucket(conn, bucket_name)
@ -112,6 +124,7 @@ def upload_image_to_s3(
key.set_contents_from_string(contents, headers=headers) key.set_contents_from_string(contents, headers=headers)
def get_file_info(request, user_file): def get_file_info(request, user_file):
# type: (HttpRequest, File) -> Tuple[str, str]
uploaded_file_name = user_file.name uploaded_file_name = user_file.name
content_type = request.GET.get('mimetype') content_type = request.GET.get('mimetype')
if content_type is None: if content_type is None:
@ -121,6 +134,7 @@ def get_file_info(request, user_file):
return uploaded_file_name, content_type return uploaded_file_name, content_type
def upload_message_image_s3(uploaded_file_name, content_type, file_data, user_profile, target_realm=None): def upload_message_image_s3(uploaded_file_name, content_type, file_data, user_profile, target_realm=None):
# type: (str, str, str, UserProfile, Optional[Realm]) -> str
bucket_name = settings.S3_AUTH_UPLOADS_BUCKET bucket_name = settings.S3_AUTH_UPLOADS_BUCKET
s3_file_name = "/".join([ s3_file_name = "/".join([
str(target_realm.id if target_realm is not None else user_profile.realm.id), str(target_realm.id if target_realm is not None else user_profile.realm.id),
@ -141,10 +155,12 @@ def upload_message_image_s3(uploaded_file_name, content_type, file_data, user_pr
return url return url
def get_signed_upload_url(path): def get_signed_upload_url(path):
# type: (str) -> str
conn = S3Connection(settings.S3_KEY, settings.S3_SECRET_KEY) conn = S3Connection(settings.S3_KEY, settings.S3_SECRET_KEY)
return conn.generate_url(15, 'GET', bucket=settings.S3_AUTH_UPLOADS_BUCKET, key=path) return conn.generate_url(15, 'GET', bucket=settings.S3_AUTH_UPLOADS_BUCKET, key=path)
def get_realm_for_filename(path): def get_realm_for_filename(path):
# type: (str) -> int
conn = S3Connection(settings.S3_KEY, settings.S3_SECRET_KEY) conn = S3Connection(settings.S3_KEY, settings.S3_SECRET_KEY)
key = get_bucket(conn, settings.S3_AUTH_UPLOADS_BUCKET).get_key(path) key = get_bucket(conn, settings.S3_AUTH_UPLOADS_BUCKET).get_key(path)
if key is None: if key is None:
@ -153,6 +169,7 @@ def get_realm_for_filename(path):
return get_user_profile_by_id(key.metadata["user_profile_id"]).realm.id return get_user_profile_by_id(key.metadata["user_profile_id"]).realm.id
def delete_message_image_s3(path_id): def delete_message_image_s3(path_id):
# type: (str) -> bool
conn = S3Connection(settings.S3_KEY, settings.S3_SECRET_KEY) conn = S3Connection(settings.S3_KEY, settings.S3_SECRET_KEY)
bucket = get_bucket(conn, settings.S3_AUTH_UPLOADS_BUCKET) bucket = get_bucket(conn, settings.S3_AUTH_UPLOADS_BUCKET)
@ -167,6 +184,7 @@ def delete_message_image_s3(path_id):
return False return False
def upload_avatar_image_s3(user_file, user_profile, email): def upload_avatar_image_s3(user_file, user_profile, email):
# type: (File, UserProfile, str) -> None
content_type = guess_type(user_file.name)[0] content_type = guess_type(user_file.name)[0]
bucket_name = settings.S3_AVATAR_BUCKET bucket_name = settings.S3_AVATAR_BUCKET
s3_file_name = user_avatar_hash(email) s3_file_name = user_avatar_hash(email)
@ -194,17 +212,20 @@ def upload_avatar_image_s3(user_file, user_profile, email):
### Local ### Local
def mkdirs(path): def mkdirs(path):
# type: (str) -> None
dirname = os.path.dirname(path) dirname = os.path.dirname(path)
if not os.path.isdir(dirname): if not os.path.isdir(dirname):
os.makedirs(dirname) os.makedirs(dirname)
def write_local_file(type, path, file_data): def write_local_file(type, path, file_data):
# type: (str, str, str) -> None
file_path = os.path.join(settings.LOCAL_UPLOADS_DIR, type, path) file_path = os.path.join(settings.LOCAL_UPLOADS_DIR, type, path)
mkdirs(file_path) mkdirs(file_path)
with open(file_path, 'wb') as f: with open(file_path, 'wb') as f:
f.write(file_data) f.write(file_data)
def upload_message_image_local(uploaded_file_name, content_type, file_data, user_profile, target_realm=None): def upload_message_image_local(uploaded_file_name, content_type, file_data, user_profile, target_realm=None):
# type: (str, str, str, UserProfile, Optional[Realm]) -> str
# Split into 256 subdirectories to prevent directories from getting too big # Split into 256 subdirectories to prevent directories from getting too big
path = "/".join([ path = "/".join([
str(user_profile.realm.id), str(user_profile.realm.id),
@ -218,6 +239,7 @@ def upload_message_image_local(uploaded_file_name, content_type, file_data, user
return '/user_uploads/' + path return '/user_uploads/' + path
def delete_message_image_local(path_id): def delete_message_image_local(path_id):
# type: (str) -> bool
file_path = os.path.join(settings.LOCAL_UPLOADS_DIR, 'files', path_id) file_path = os.path.join(settings.LOCAL_UPLOADS_DIR, 'files', path_id)
if os.path.isfile(file_path): if os.path.isfile(file_path):
# This removes the file but the empty folders still remain. # This removes the file but the empty folders still remain.
@ -229,6 +251,7 @@ def delete_message_image_local(path_id):
return False return False
def upload_avatar_image_local(user_file, user_profile, email): def upload_avatar_image_local(user_file, user_profile, email):
# type: (File, UserProfile, str) -> None
email_hash = user_avatar_hash(email) email_hash = user_avatar_hash(email)
image_data = user_file.read() image_data = user_file.read()
@ -249,6 +272,7 @@ else:
delete_message_image = delete_message_image_s3 delete_message_image = delete_message_image_s3
def claim_attachment(path_id, message): def claim_attachment(path_id, message):
# type: (text_type, UserMessage) -> bool
try: try:
attachment = Attachment.objects.get(path_id=path_id) attachment = Attachment.objects.get(path_id=path_id)
attachment.messages.add(message) attachment.messages.add(message)
@ -259,9 +283,11 @@ def claim_attachment(path_id, message):
return False return False
def create_attachment(file_name, path_id, user_profile): def create_attachment(file_name, path_id, user_profile):
# type: (str, str, UserProfile) -> bool
Attachment.objects.create(file_name=file_name, path_id=path_id, owner=user_profile) Attachment.objects.create(file_name=file_name, path_id=path_id, owner=user_profile)
return True return True
def upload_message_image_through_web_client(request, user_file, user_profile): def upload_message_image_through_web_client(request, user_file, user_profile):
# type: (HttpRequest, File, UserProfile) -> str
uploaded_file_name, content_type = get_file_info(request, user_file) uploaded_file_name, content_type = get_file_info(request, user_file)
return upload_message_image(uploaded_file_name, content_type, user_file.read(), user_profile) return upload_message_image(uploaded_file_name, content_type, user_file.read(), user_profile)