ldap: Fix avatar sync not working with the S3 backend.

This fixes an issue that caused LDAP synchronization to fail for
avatars.  The problem occurred due to the lack of a 'name' attribute
on the BytesIO object that we pass to the upload backend (which is
only used in the S3 backend for computing Content-Type).

Fixes #12411.
This commit is contained in:
vinitS101 2019-06-08 03:06:19 +05:30 committed by Tim Abbott
parent 05e0e99b6e
commit a6eda858d0
3 changed files with 74 additions and 9 deletions

View File

@ -197,7 +197,8 @@ class ZulipUploadBackend:
def upload_avatar_image(self, user_file: File,
acting_user_profile: UserProfile,
target_user_profile: UserProfile) -> None:
target_user_profile: UserProfile,
content_type: Optional[str]=None) -> None:
raise NotImplementedError()
def delete_avatar_image(self, user: UserProfile) -> None:
@ -389,8 +390,10 @@ class S3UploadBackend(ZulipUploadBackend):
def upload_avatar_image(self, user_file: File,
acting_user_profile: UserProfile,
target_user_profile: UserProfile) -> None:
content_type = guess_type(user_file.name)[0]
target_user_profile: UserProfile,
content_type: Optional[str] = None) -> None:
if content_type is None:
content_type = guess_type(user_file.name)[0]
s3_file_name = user_avatar_path(target_user_profile)
image_data = user_file.read()
@ -631,7 +634,8 @@ class LocalUploadBackend(ZulipUploadBackend):
def upload_avatar_image(self, user_file: File,
acting_user_profile: UserProfile,
target_user_profile: UserProfile) -> None:
target_user_profile: UserProfile,
content_type: Optional[str] = None) -> None:
file_path = user_avatar_path(target_user_profile)
image_data = user_file.read()
@ -756,8 +760,10 @@ def delete_message_image(path_id: str) -> bool:
return upload_backend.delete_message_image(path_id)
def upload_avatar_image(user_file: File, acting_user_profile: UserProfile,
target_user_profile: UserProfile) -> None:
upload_backend.upload_avatar_image(user_file, acting_user_profile, target_user_profile)
target_user_profile: UserProfile,
content_type: Optional[str]=None) -> None:
upload_backend.upload_avatar_image(user_file, acting_user_profile,
target_user_profile, content_type=content_type)
def delete_avatar_image(user_profile: UserProfile) -> None:
upload_backend.delete_avatar_image(user_profile)

View File

@ -30,12 +30,14 @@ from zerver.lib.actions import (
validate_email,
)
from zerver.lib.avatar import avatar_url
from zerver.lib.avatar_hash import user_avatar_path
from zerver.lib.dev_ldap_directory import generate_dev_ldap_dir
from zerver.lib.mobile_auth_otp import otp_decrypt_api_key
from zerver.lib.validator import validate_login_email, \
check_bool, check_dict_only, check_string, Validator
from zerver.lib.request import JsonableError
from zerver.lib.users import get_all_api_keys
from zerver.lib.upload import resize_avatar, MEDIUM_AVATAR_SIZE
from zerver.lib.initial_password import initial_password
from zerver.lib.test_classes import (
ZulipTestCase,
@ -67,7 +69,8 @@ from social_django.storage import BaseDjangoStorage
import json
import urllib
import ujson
from zerver.lib.test_helpers import MockLDAP, load_subdomain_token
from zerver.lib.test_helpers import MockLDAP, load_subdomain_token, \
use_s3_backend, create_s3_buckets, get_test_image_file
class AuthBackendTest(ZulipTestCase):
def get_username(self, email_to_username: Optional[Callable[[str], str]]=None) -> str:
@ -2859,6 +2862,50 @@ class TestZulipLDAPUserPopulator(ZulipLDAPTestCase):
hamlet = self.example_user('hamlet')
self.assertEqual(hamlet.avatar_source, UserProfile.AVATAR_FROM_USER)
@use_s3_backend
def test_update_user_avatar_for_s3(self) -> None:
bucket = create_s3_buckets(settings.S3_AVATAR_BUCKET)[0]
test_image_data = open(get_test_image_file('img.png').name, 'rb').read()
self.mock_ldap.directory = {
'uid=hamlet,ou=users,dc=zulip,dc=com': {
'cn': ['King Hamlet', ],
'thumbnailPhoto': [test_image_data, ],
}
}
with self.settings(AUTH_LDAP_USER_ATTR_MAP={'full_name': 'cn',
'avatar': 'thumbnailPhoto'}):
self.perform_ldap_sync(self.example_user('hamlet'))
hamlet = self.example_user('hamlet')
path_id = user_avatar_path(hamlet)
original_image_path_id = path_id + ".original"
medium_path_id = path_id + "-medium.png"
original_image_key = bucket.get_key(original_image_path_id)
medium_image_key = bucket.get_key(medium_path_id)
image_data = original_image_key.get_contents_as_string()
self.assertEqual(image_data, test_image_data)
test_medium_image_data = resize_avatar(test_image_data, MEDIUM_AVATAR_SIZE)
medium_image_data = medium_image_key.get_contents_as_string()
self.assertEqual(medium_image_data, test_medium_image_data)
self.mock_ldap.directory = {
'uid=hamlet,ou=users,dc=zulip,dc=com': {
'cn': ['Cordelia Lear', ],
# Submit invalid data for the thumbnailPhoto field
'thumbnailPhoto': [b'00' + test_image_data, ],
}
}
with self.settings(AUTH_LDAP_USER_ATTR_MAP={'full_name': 'cn',
'avatar': 'thumbnailPhoto'}):
with mock.patch('logging.warning') as mock_warning:
self.perform_ldap_sync(self.example_user('hamlet'))
mock_warning.assert_called_once_with(
'Could not parse thumbnailPhoto field for user %s' % (hamlet.id,))
def test_deactivate_non_matching_users(self) -> None:
self.mock_ldap.directory = {}

View File

@ -12,7 +12,9 @@
# call the authenticate methods of all backends registered in
# settings.AUTHENTICATION_BACKENDS that have a function signature
# matching the args/kwargs passed in the authenticate() call.
import copy
import logging
import magic
from typing import Any, Dict, List, Optional, Set, Tuple, Union
from django_auth_ldap.backend import LDAPBackend, _LDAPUser
@ -343,8 +345,18 @@ class ZulipLDAPAuthBackendBase(ZulipAuthMixin, LDAPBackend):
# If this specific user doesn't have e.g. a
# thumbnailPhoto set in LDAP, just skip that user.
return
upload_avatar_image(BytesIO(ldap_user.attrs[avatar_attr_name][0]), user, user)
do_change_avatar_fields(user, UserProfile.AVATAR_FROM_USER)
io = BytesIO(ldap_user.attrs[avatar_attr_name][0])
# Structurally, to make the S3 backend happy, we need to
# provide a Content-Type; since that isn't specified in
# any metadata, we auto-detect it.
content_type = magic.from_buffer(copy.deepcopy(io).read()[0:1024], mime=True)
if content_type.startswith("image/"):
upload_avatar_image(io, user, user, content_type=content_type)
do_change_avatar_fields(user, UserProfile.AVATAR_FROM_USER)
else:
logging.warning("Could not parse %s field for user %s" %
(avatar_attr_name, user.id))
def is_account_control_disabled_user(self, ldap_user: _LDAPUser) -> bool:
"""Implements the userAccountControl check for whether a user has been