streams: Add zerver/lib/streams.py library for security checks.

The goal of this library is to make it a lot easier to prevent bugs
like CVE-2017-0881 by having all of our views logic for fetching a
stream go through a couple carefully tested code paths.
This commit is contained in:
Tim Abbott 2017-01-29 15:48:45 -08:00
parent de3e96162e
commit 0af34ee710
3 changed files with 142 additions and 0 deletions

View File

@ -383,6 +383,7 @@ def build_custom_checkers(by_lang):
('zerver/decorator.py', 'raise JsonableError(reason % (role,))'),
('zerver/lib/actions.py', 'raise JsonableError(e.messages[0])'),
('zerver/views/messages.py', 'raise JsonableError(error)'),
('zerver/lib/streams.py', 'raise JsonableError(error)'),
('zerver/lib/request.py', 'raise JsonableError(error)'),
('zerver/views/streams.py', 'raise JsonableError(response.content)'),
]),

64
zerver/lib/streams.py Normal file
View File

@ -0,0 +1,64 @@
from __future__ import absolute_import
from typing import Text, Tuple
from django.http import HttpRequest, HttpResponse
from django.utils.translation import ugettext as _
from zerver.lib.request import JsonableError
from zerver.models import UserProfile, Stream, Subscription, \
Recipient, get_recipient, get_stream
def access_stream_common(user_profile, stream, error):
# type: (UserProfile, Stream, Text) -> Tuple[Recipient, Subscription]
"""Common function for backend code where the target use attempts to
access the target stream, returning all the data fetched along the
way. If that user does not have permission to access that stream,
we throw an exception. A design goal is that the error message is
the same for streams you can't access and streams that don't exist."""
# First, we don't allow any access to streams in other realms.
if stream.realm_id != user_profile.realm_id:
raise JsonableError(error)
recipient = get_recipient(Recipient.STREAM, stream.id)
try:
sub = Subscription.objects.get(user_profile=user_profile,
recipient=recipient,
active=True)
except Subscription.DoesNotExist:
sub = None
# If the stream is in your realm and public, you can access it.
if stream.is_public():
return (recipient, sub)
# Or if you are subscribed to the stream, you can access it.
if sub is not None:
return (recipient, sub)
# Otherwise it is a private stream and you're not on it, so throw
# an error.
raise JsonableError(error)
def access_stream_by_id(user_profile, stream_id):
# type: (UserProfile, int) -> Tuple[Stream, Recipient, Subscription]
error = _("Invalid stream id")
try:
stream = Stream.objects.get(id=stream_id)
except Stream.DoesNotExist:
raise JsonableError(error)
(recipient, sub) = access_stream_common(user_profile, stream, error)
return (stream, recipient, sub)
def access_stream_by_name(user_profile, stream_name):
# type: (UserProfile, Text) -> Tuple[Stream, Recipient, Subscription]
error = _("Invalid stream name '%s'" % (stream_name,))
stream = get_stream(stream_name, user_profile.realm)
if stream is None:
raise JsonableError(error)
(recipient, sub) = access_stream_common(user_profile, stream, error)
return (stream, recipient, sub)

View File

@ -2383,3 +2383,80 @@ class GetSubscribersTest(ZulipTestCase):
result = self.make_subscriber_request(stream_id, email=other_email)
self.assert_json_error(result,
"Unable to retrieve subscribers for invite-only stream")
from zerver.lib.streams import access_stream_by_id, access_stream_by_name
class AccessStreamTest(ZulipTestCase):
def test_access_stream(self):
# type: () -> None
"""
A comprehensive security test for the access_stream_by_* API functions.
"""
# Create a private stream for which Hamlet is the only subscriber.
hamlet_email = "hamlet@zulip.com"
hamlet = get_user_profile_by_email(hamlet_email)
stream_name = "new_private_stream"
self.login(hamlet_email)
self.common_subscribe_to_streams(hamlet_email, [stream_name],
invite_only=True)
stream = get_stream(stream_name, hamlet.realm)
othello_email = "othello@zulip.com"
othello = get_user_profile_by_email(othello_email)
# Nobody can access a stream that doesn't exist
with self.assertRaisesRegex(JsonableError, "Invalid stream id"):
access_stream_by_id(hamlet, 501232)
with self.assertRaisesRegex(JsonableError, "Invalid stream name 'invalid stream'"):
access_stream_by_name(hamlet, "invalid stream")
# Hamlet can access the private stream
(stream_ret, rec_ret, sub_ret) = access_stream_by_id(hamlet, stream.id)
self.assertEqual(stream, stream_ret)
self.assertEqual(sub_ret.recipient, rec_ret)
self.assertEqual(sub_ret.recipient.type_id, stream.id)
(stream_ret2, rec_ret2, sub_ret2) = access_stream_by_name(hamlet, stream.name)
self.assertEqual(stream_ret, stream_ret2)
self.assertEqual(sub_ret, sub_ret2)
self.assertEqual(rec_ret, rec_ret2)
# Othello cannot access the private stream
with self.assertRaisesRegex(JsonableError, "Invalid stream id"):
access_stream_by_id(othello, stream.id)
with self.assertRaisesRegex(JsonableError, "Invalid stream name 'new_private_stream'"):
access_stream_by_name(othello, stream.name)
# Both Othello and Hamlet can access a public stream that only
# Hamlet is subscribed to in this realm
public_stream_name = "public_stream"
self.common_subscribe_to_streams(hamlet_email, [public_stream_name],
invite_only=False)
public_stream = get_stream(public_stream_name, hamlet.realm)
access_stream_by_id(othello, public_stream.id)
access_stream_by_name(othello, public_stream.name)
access_stream_by_id(hamlet, public_stream.id)
access_stream_by_name(hamlet, public_stream.name)
# Nobody can access a public stream in another realm
mit_realm = get_realm("mit")
mit_stream, _ = create_stream_if_needed(mit_realm, "mit_stream", invite_only=False)
sipbtest = get_user_profile_by_email("sipbtest@mit.edu")
with self.assertRaisesRegex(JsonableError, "Invalid stream id"):
access_stream_by_id(hamlet, mit_stream.id)
with self.assertRaisesRegex(JsonableError, "Invalid stream name 'mit_stream'"):
access_stream_by_name(hamlet, mit_stream.name)
with self.assertRaisesRegex(JsonableError, "Invalid stream id"):
access_stream_by_id(sipbtest, stream.id)
with self.assertRaisesRegex(JsonableError, "Invalid stream name 'new_private_stream'"):
access_stream_by_name(sipbtest, stream.name)
# MIT realm users cannot access even public streams in their realm
with self.assertRaisesRegex(JsonableError, "Invalid stream id"):
access_stream_by_id(sipbtest, mit_stream.id)
with self.assertRaisesRegex(JsonableError, "Invalid stream name 'mit_stream'"):
access_stream_by_name(sipbtest, mit_stream.name)
# But they can access streams they are subscribed to
self.common_subscribe_to_streams(sipbtest.email, [mit_stream.name])
access_stream_by_id(sipbtest, mit_stream.id)
access_stream_by_name(sipbtest, mit_stream.name)