From 47c3ec1283b997aafa7cca75cbffc1b1d2c6ce4e Mon Sep 17 00:00:00 2001 From: medullaskyline Date: Sat, 4 Jun 2016 18:54:32 -0700 Subject: [PATCH] Annotate zerver.lib.upload. --- zerver/lib/upload.py | 26 ++++++++++++++++++++++++++ 1 file changed, 26 insertions(+) diff --git a/zerver/lib/upload.py b/zerver/lib/upload.py index a68063fa1b..b1edb3f0c3 100644 --- a/zerver/lib/upload.py +++ b/zerver/lib/upload.py @@ -1,26 +1,33 @@ from __future__ import absolute_import +from typing import Optional, Tuple from django.utils.translation import ugettext as _ from django.conf import settings from django.template.defaultfilters import slugify from django.utils.encoding import force_text +from django.core.files import File +from django.http import HttpRequest from jinja2 import Markup as mark_safe import unicodedata from zerver.lib.avatar import user_avatar_hash from zerver.lib.request import JsonableError +from boto.s3.bucket import Bucket from boto.s3.key import Key from boto.s3.connection import S3Connection from mimetypes import guess_type, guess_extension from zerver.models import get_user_profile_by_id from zerver.models import Attachment +from zerver.models import Realm, UserProfile, UserMessage import base64 import os import re +import six from PIL import Image, ImageOps +from six import text_type from six.moves import cStringIO as StringIO import random import logging @@ -41,6 +48,7 @@ import logging # through a sanitization function. def sanitize_name(value): + # type: (six.text_type) -> str """ Sanitizes a value to be safe to store in a Linux filesystem, in S3, and in a URL. So unicode is allowed, but not special @@ -58,12 +66,14 @@ def sanitize_name(value): return mark_safe(re.sub('[-\s]+', '-', value, flags=re.U)) def random_name(bytes=60): + # type: (int) -> str return base64.urlsafe_b64encode(os.urandom(bytes)) class BadImageError(JsonableError): pass def resize_avatar(image_data): + # type: (str) -> str AVATAR_SIZE = 100 try: im = Image.open(StringIO(image_data)) @@ -78,6 +88,7 @@ def resize_avatar(image_data): ### S3 def get_bucket(conn, bucket_name): + # type: (S3Connection, str) -> Bucket # Calling get_bucket() with validate=True can apparently lead # to expensive S3 bills: # http://www.appneta.com/blog/s3-list-get-bucket-default/ @@ -96,6 +107,7 @@ def upload_image_to_s3( user_profile, contents, ): + # type: (str, str, str, UserProfile, str) -> None conn = S3Connection(settings.S3_KEY, settings.S3_SECRET_KEY) bucket = get_bucket(conn, bucket_name) @@ -112,6 +124,7 @@ def upload_image_to_s3( key.set_contents_from_string(contents, headers=headers) def get_file_info(request, user_file): + # type: (HttpRequest, File) -> Tuple[str, str] uploaded_file_name = user_file.name content_type = request.GET.get('mimetype') if content_type is None: @@ -121,6 +134,7 @@ def get_file_info(request, user_file): return uploaded_file_name, content_type def upload_message_image_s3(uploaded_file_name, content_type, file_data, user_profile, target_realm=None): + # type: (str, str, str, UserProfile, Optional[Realm]) -> str bucket_name = settings.S3_AUTH_UPLOADS_BUCKET s3_file_name = "/".join([ str(target_realm.id if target_realm is not None else user_profile.realm.id), @@ -141,10 +155,12 @@ def upload_message_image_s3(uploaded_file_name, content_type, file_data, user_pr return url def get_signed_upload_url(path): + # type: (str) -> str conn = S3Connection(settings.S3_KEY, settings.S3_SECRET_KEY) return conn.generate_url(15, 'GET', bucket=settings.S3_AUTH_UPLOADS_BUCKET, key=path) def get_realm_for_filename(path): + # type: (str) -> int conn = S3Connection(settings.S3_KEY, settings.S3_SECRET_KEY) key = get_bucket(conn, settings.S3_AUTH_UPLOADS_BUCKET).get_key(path) if key is None: @@ -153,6 +169,7 @@ def get_realm_for_filename(path): return get_user_profile_by_id(key.metadata["user_profile_id"]).realm.id def delete_message_image_s3(path_id): + # type: (str) -> bool conn = S3Connection(settings.S3_KEY, settings.S3_SECRET_KEY) bucket = get_bucket(conn, settings.S3_AUTH_UPLOADS_BUCKET) @@ -167,6 +184,7 @@ def delete_message_image_s3(path_id): return False def upload_avatar_image_s3(user_file, user_profile, email): + # type: (File, UserProfile, str) -> None content_type = guess_type(user_file.name)[0] bucket_name = settings.S3_AVATAR_BUCKET s3_file_name = user_avatar_hash(email) @@ -194,17 +212,20 @@ def upload_avatar_image_s3(user_file, user_profile, email): ### Local def mkdirs(path): + # type: (str) -> None dirname = os.path.dirname(path) if not os.path.isdir(dirname): os.makedirs(dirname) def write_local_file(type, path, file_data): + # type: (str, str, str) -> None file_path = os.path.join(settings.LOCAL_UPLOADS_DIR, type, path) mkdirs(file_path) with open(file_path, 'wb') as f: f.write(file_data) def upload_message_image_local(uploaded_file_name, content_type, file_data, user_profile, target_realm=None): + # type: (str, str, str, UserProfile, Optional[Realm]) -> str # Split into 256 subdirectories to prevent directories from getting too big path = "/".join([ str(user_profile.realm.id), @@ -218,6 +239,7 @@ def upload_message_image_local(uploaded_file_name, content_type, file_data, user return '/user_uploads/' + path def delete_message_image_local(path_id): + # type: (str) -> bool file_path = os.path.join(settings.LOCAL_UPLOADS_DIR, 'files', path_id) if os.path.isfile(file_path): # This removes the file but the empty folders still remain. @@ -229,6 +251,7 @@ def delete_message_image_local(path_id): return False def upload_avatar_image_local(user_file, user_profile, email): + # type: (File, UserProfile, str) -> None email_hash = user_avatar_hash(email) image_data = user_file.read() @@ -249,6 +272,7 @@ else: delete_message_image = delete_message_image_s3 def claim_attachment(path_id, message): + # type: (text_type, UserMessage) -> bool try: attachment = Attachment.objects.get(path_id=path_id) attachment.messages.add(message) @@ -259,9 +283,11 @@ def claim_attachment(path_id, message): return False def create_attachment(file_name, path_id, user_profile): + # type: (str, str, UserProfile) -> bool Attachment.objects.create(file_name=file_name, path_id=path_id, owner=user_profile) return True def upload_message_image_through_web_client(request, user_file, user_profile): + # type: (HttpRequest, File, UserProfile) -> str uploaded_file_name, content_type = get_file_info(request, user_file) return upload_message_image(uploaded_file_name, content_type, user_file.read(), user_profile)