Add is_webhook option to authentication decorats.

Modified:
authenticated_rest_api_view
authenticated_api_view and validate_api_key.
This commit is contained in:
Tomasz Kolek 2016-05-18 20:35:35 +02:00 committed by Tim Abbott
parent 80d92c1651
commit 9ae68ade8b
12 changed files with 179 additions and 64 deletions

View File

@ -138,7 +138,7 @@ def process_client(request, user_profile, is_json_view=False, client_name=None):
request.client = get_client(client_name)
update_user_activity(request, user_profile)
def validate_api_key(role, api_key):
def validate_api_key(role, api_key, is_webhook=False):
# Remove whitespace to protect users from trivial errors.
role, api_key = role.strip(), api_key.strip()
@ -158,6 +158,8 @@ def validate_api_key(role, api_key):
raise JsonableError(reason % (role,))
if not profile.is_active:
raise JsonableError(_("Account not active"))
if profile.is_incoming_webhook and not is_webhook:
raise JsonableError(_("Account is not valid to post webhook messages"))
try:
if profile.realm.deactivated:
raise JsonableError(_("Realm for account has been deactivated"))
@ -277,62 +279,66 @@ def zulip_internal(view_func):
# user_profile to the view function's arguments list, since we have to
# look it up anyway. It is deprecated in favor on the REST API
# versions.
def authenticated_api_view(view_func):
@csrf_exempt
@require_post
@has_request_variables
@wraps(view_func)
def _wrapped_view_func(request, email=REQ(), api_key=REQ(default=None),
api_key_legacy=REQ('api-key', default=None),
*args, **kwargs):
if not api_key and not api_key_legacy:
raise RequestVariableMissingError("api_key")
elif not api_key:
api_key = api_key_legacy
user_profile = validate_api_key(email, api_key)
request.user = user_profile
request._email = user_profile.email
process_client(request, user_profile)
# Apply rate limiting
limited_func = rate_limit()(view_func)
return limited_func(request, user_profile, *args, **kwargs)
def authenticated_api_view(is_webhook=False):
def _wrapped_view_func(view_func):
@csrf_exempt
@require_post
@has_request_variables
@wraps(view_func)
def _wrapped_func_arguments(request, email=REQ(), api_key=REQ(default=None),
api_key_legacy=REQ('api-key', default=None),
*args, **kwargs):
if not api_key and not api_key_legacy:
raise RequestVariableMissingError("api_key")
elif not api_key:
api_key = api_key_legacy
user_profile = validate_api_key(email, api_key, is_webhook)
request.user = user_profile
request._email = user_profile.email
process_client(request, user_profile)
# Apply rate limiting
limited_func = rate_limit()(view_func)
return limited_func(request, user_profile, *args, **kwargs)
return _wrapped_func_arguments
return _wrapped_view_func
# A more REST-y authentication decorator, using, in particular, HTTP Basic
# authentication.
def authenticated_rest_api_view(view_func):
@csrf_exempt
@wraps(view_func)
def _wrapped_view_func(request, *args, **kwargs):
# First try block attempts to get the credentials we need to do authentication
try:
# Grab the base64-encoded authentication string, decode it, and split it into
# the email and API key
auth_type, encoded_value = request.META['HTTP_AUTHORIZATION'].split()
# case insensitive per RFC 1945
if auth_type.lower() != "basic":
return json_error(_("Only Basic authentication is supported."))
role, api_key = base64.b64decode(encoded_value).split(":")
except ValueError:
return json_error(_("Invalid authorization header for basic auth"))
except KeyError:
return json_unauthorized(_("Missing authorization header for basic auth"))
def authenticated_rest_api_view(is_webhook=False):
def _wrapped_view_func(view_func):
@csrf_exempt
@wraps(view_func)
def _wrapped_func_arguments(request, *args, **kwargs):
# First try block attempts to get the credentials we need to do authentication
try:
# Grab the base64-encoded authentication string, decode it, and split it into
# the email and API key
auth_type, encoded_value = request.META['HTTP_AUTHORIZATION'].split()
# case insensitive per RFC 1945
if auth_type.lower() != "basic":
return json_error(_("Only Basic authentication is supported."))
role, api_key = base64.b64decode(encoded_value).split(":")
except ValueError:
json_error(_("Invalid authorization header for basic auth"))
except KeyError:
return json_unauthorized("Missing authorization header for basic auth")
# Now we try to do authentication or die
try:
# Could be a UserProfile or a Deployment
profile = validate_api_key(role, api_key)
except JsonableError as e:
return json_unauthorized(e.error)
request.user = profile
process_client(request, profile)
if isinstance(profile, UserProfile):
request._email = profile.email
else:
request._email = "deployment:" + role
profile.rate_limits = ""
# Apply rate limiting
return rate_limit()(view_func)(request, profile, *args, **kwargs)
# Now we try to do authentication or die
try:
# Could be a UserProfile or a Deployment
profile = validate_api_key(role, api_key, is_webhook)
except JsonableError as e:
return json_unauthorized(e.error)
request.user = profile
process_client(request, profile)
if isinstance(profile, UserProfile):
request._email = profile.email
else:
request._email = "deployment:" + role
profile.rate_limits = ""
# Apply rate limiting
return rate_limit()(view_func)(request, profile, *args, **kwargs)
return _wrapped_func_arguments
return _wrapped_view_func
def process_as_post(view_func):
@ -546,4 +552,3 @@ def uses_mandrill(func):
kwargs['mail_client'] = get_mandrill_client()
return func(*args, **kwargs)
return wrapped_func

View File

@ -77,7 +77,7 @@ def rest_dispatch(request, globals_list, **kwargs):
elif request.META.get('HTTP_AUTHORIZATION', None):
# Wrap function with decorator to authenticate the user before
# proceeding
target_function = authenticated_rest_api_view(target_function)
target_function = authenticated_rest_api_view()(target_function)
else:
if 'text/html' in request.META.get('HTTP_ACCEPT', ''):
# If this looks like a request from a top-level page in a

View File

@ -1,9 +1,11 @@
from __future__ import absolute_import
from contextlib import contextmanager
from typing import cast, Any, Callable, Generator, Iterable, Tuple, Sized, Union, Optional
from django.test import TestCase
from django.template import loader
from django.http import HttpResponse
from django.utils.translation import ugettext as _
from zerver.lib.initial_password import initial_password
from zerver.lib.db import TimeTrackingCursor
@ -33,6 +35,9 @@ from zerver.models import (
UserProfile,
)
from zerver.lib.request import JsonableError
import base64
import os
import re

View File

@ -1,15 +1,22 @@
# -*- coding: utf-8 -*-
from django.test import TestCase
from django.utils.translation import ugettext as _
from django.http import HttpResponse
from zerver.tests.test_hooks import WebhookTestCase
from zerver.lib.actions import do_deactivate_realm, do_deactivate_user, \
do_reactivate_user
do_reactivate_user, do_reactivate_realm
from zerver.lib.test_helpers import (
AuthedTestCase,
)
from zerver.lib.request import \
REQ, has_request_variables, RequestVariableMissingError, \
RequestVariableConversionError, JsonableError
from zerver.decorator import \
api_key_only_webhook_view,\
authenticated_json_post_view, authenticated_json_view,\
validate_api_key
from zerver.lib.validator import (
check_string, check_dict, check_bool, check_int, check_list
)
@ -109,6 +116,26 @@ class DecoratorTestCase(TestCase):
pass
test(request)
def test_api_key_only_webhook_view(self):
@api_key_only_webhook_view('ClientName')
def get_user_profile_api_key(request, user_profile, client):
return user_profile.api_key
class Request(object):
REQUEST = {} # type: Dict[str, str]
COOKIES = {}
META = {'PATH_INFO': ''}
webhook_bot_email = 'webhook-bot@zulip.com'
request = Request()
request.REQUEST['api_key'] = 'not_existing_api_key'
with self.assertRaises(JsonableError):
get_user_profile_api_key(request)
request.REQUEST['api_key'] = get_user_profile_by_email(webhook_bot_email).api_key
self.assertEqual(get_user_profile_api_key(request), get_user_profile_by_email(webhook_bot_email).api_key)
class ValidatorTestCase(TestCase):
def test_check_string(self):
x = "hello"
@ -389,3 +416,82 @@ class InactiveUserTest(AuthedTestCase):
result = self.client.post(url, data,
content_type="application/json")
self.assert_json_error_contains(result, "Account not active", status_code=400)
class TestValidateApiKey(AuthedTestCase):
def setUp(self):
self.webhook_bot = get_user_profile_by_email('webhook-bot@zulip.com')
self.default_bot = get_user_profile_by_email('hamlet@zulip.com')
def test_validate_api_key_if_profile_does_not_exist(self):
with self.assertRaises(JsonableError):
validate_api_key('email@doesnotexist.com', 'api_key')
def test_validate_api_key_if_api_key_does_not_match_profile_api_key(self):
with self.assertRaises(JsonableError):
validate_api_key(self.webhook_bot.email, 'not_32_length')
with self.assertRaises(JsonableError):
validate_api_key(self.webhook_bot.email, self.default_bot.api_key)
def test_validate_api_key_if_profile_is_not_active(self):
self._change_is_active_field(self.default_bot, False)
with self.assertRaises(JsonableError):
validate_api_key(self.default_bot.email, self.default_bot.api_key)
self._change_is_active_field(self.default_bot, True)
def test_validate_api_key_if_profile_is_incoming_webhook_and_is_webhook_is_unset(self):
with self.assertRaises(JsonableError):
validate_api_key(self.webhook_bot.email, self.webhook_bot.api_key)
def test_validate_api_key_if_profile_is_incoming_webhook_and_is_webhook_is_set(self):
profile = validate_api_key(self.webhook_bot.email, self.webhook_bot.api_key, is_webhook=True)
self.assertEqual(profile.pk, self.webhook_bot.pk)
def _change_is_active_field(self, profile, value):
profile.is_active = value
profile.save()
class TestAuthenticatedJsonPostViewDecorator(AuthedTestCase):
def test_authenticated_json_post_view_if_everything_is_correct(self):
user_email = 'hamlet@zulip.com'
self._login(user_email)
response = self._do_test(user_email)
self.assertEqual(response.status_code, 200)
def test_authenticated_json_post_view_if_user_is_incoming_webhook(self):
user_email = 'webhook-bot@zulip.com'
self._login(user_email, password="test") # we set a password because user is a bot
self.assert_json_error_contains(self._do_test(user_email), "Webhook bots can only access webhooks")
def test_authenticated_json_post_view_if_user_is_not_active(self):
user_email = 'hamlet@zulip.com'
user_profile = get_user_profile_by_email(user_email)
self._login(user_email, password="test")
# we deactivate user manually because do_deactivate_user removes user session
user_profile.is_active = False
user_profile.save()
self.assert_json_error_contains(self._do_test(user_email), "Account not active")
do_reactivate_user(user_profile)
def test_authenticated_json_post_view_if_user_realm_is_deactivated(self):
user_email = 'hamlet@zulip.com'
user_profile = get_user_profile_by_email(user_email)
self._login(user_email)
# we deactivate user's realm manually because do_deactivate_user removes user session
user_profile.realm.deactivated = True
user_profile.realm.save()
self.assert_json_error_contains(self._do_test(user_email), "Realm for account has been deactivated")
do_reactivate_realm(user_profile.realm)
def _do_test(self, user_email):
data = {"status": '"started"'}
return self.client.post(r'/json/tutorial_status', data)
def _login(self, user_email, password=None):
if password:
user_profile = get_user_profile_by_email(user_email)
user_profile.set_password(password)
user_profile.save()
self.login(user_email, password)

View File

@ -713,7 +713,7 @@ def same_realm_jabber_user(user_profile, email):
return user_profile.realm.domain == domain
@authenticated_api_view
@authenticated_api_view(is_webhook=False)
def api_send_message(request, user_profile):
return send_message_backend(request, user_profile)

View File

@ -38,7 +38,7 @@ def beanstalk_decoder(view_func):
return _wrapped_view_func
@beanstalk_decoder
@authenticated_rest_api_view
@authenticated_rest_api_view(is_webhook=True)
@has_request_variables
def api_beanstalk_webhook(request, user_profile,
payload=REQ(validator=check_dict([]))):

View File

@ -13,7 +13,7 @@ from zerver.decorator import REQ, has_request_variables, authenticated_rest_api_
from .github import build_commit_list_content
@authenticated_rest_api_view
@authenticated_rest_api_view(is_webhook=True)
@has_request_variables
def api_bitbucket_webhook(request, user_profile, payload=REQ(validator=check_dict([])),
stream=REQ(default='commits')):

View File

@ -13,7 +13,7 @@ from six import text_type
# There's no raw JSON for us to work from. Thus, it makes sense to just write
# a template Zulip message within Desk.com and have the webhook extract that
# from the "data" param and post it, which this does.
@authenticated_rest_api_view
@authenticated_rest_api_view(is_webhook=True)
@has_request_variables
def api_deskdotcom_webhook(request, user_profile, data=REQ(),
topic=REQ(default="Desk.com notification"),

View File

@ -112,8 +112,7 @@ def format_freshdesk_ticket_creation_message(ticket):
return content
@authenticated_rest_api_view
@authenticated_rest_api_view(is_webhook=True)
@has_request_variables
def api_freshdesk_webhook(request, user_profile, payload=REQ(argument_type='body'),
stream=REQ(default='freshdesk')):

View File

@ -122,7 +122,7 @@ def api_github_v2(user_profile, event, payload, branches, default_stream, commit
return target_stream, subject, content
@authenticated_api_view
@authenticated_api_view(is_webhook=True)
@has_request_variables
def api_github_landing(request, user_profile, event=REQ(),
payload=REQ(validator=check_dict([])),

View File

@ -15,7 +15,7 @@ from six import text_type
from typing import Any, Dict
@authenticated_rest_api_view
@authenticated_rest_api_view(is_webhook=True)
@has_request_variables
def api_stash_webhook(request, user_profile, payload=REQ(argument_type='body'),
stream=REQ(default='commits')):

View File

@ -11,7 +11,7 @@ def truncate(string, length):
string = string[:length-3] + '...'
return string
@authenticated_rest_api_view
@authenticated_rest_api_view(is_webhook=True)
@has_request_variables
def api_zendesk_webhook(request, user_profile, ticket_title=REQ(), ticket_id=REQ(),
message=REQ(), stream=REQ(default="zendesk")):