2016-11-10 19:30:09 +01:00
|
|
|
from contextlib import contextmanager
|
2016-11-24 19:12:55 +01:00
|
|
|
from typing import (cast, Any, Callable, Dict, Iterable, Iterator, List, Mapping, Optional,
|
2016-12-21 13:17:53 +01:00
|
|
|
Sized, Tuple, Union, Text)
|
2016-11-10 19:30:09 +01:00
|
|
|
|
2018-01-30 06:05:25 +01:00
|
|
|
from django.urls import resolve
|
2016-11-10 19:30:09 +01:00
|
|
|
from django.conf import settings
|
|
|
|
from django.test import TestCase
|
|
|
|
from django.test.client import (
|
|
|
|
BOUNDARY, MULTIPART_CONTENT, encode_multipart,
|
|
|
|
)
|
2017-02-16 10:10:37 +01:00
|
|
|
from django.test.testcases import SerializeMixin
|
2016-11-10 19:30:09 +01:00
|
|
|
from django.http import HttpResponse
|
|
|
|
from django.db.utils import IntegrityError
|
|
|
|
|
|
|
|
from zerver.lib.initial_password import initial_password
|
2017-04-28 06:56:44 +02:00
|
|
|
from zerver.lib.utils import is_remote_server
|
2017-10-25 17:17:17 +02:00
|
|
|
from zerver.views.users import add_service
|
2016-11-10 19:30:09 +01:00
|
|
|
|
|
|
|
from zerver.lib.actions import (
|
|
|
|
check_send_message, create_stream_if_needed, bulk_add_subscriptions,
|
2017-10-27 17:57:23 +02:00
|
|
|
get_display_recipient, bulk_remove_subscriptions, do_create_user,
|
2017-11-16 22:12:31 +01:00
|
|
|
check_send_stream_message, gather_subscriptions
|
2016-11-10 19:30:09 +01:00
|
|
|
)
|
|
|
|
|
2017-10-29 17:11:11 +01:00
|
|
|
from zerver.lib.stream_subscription import (
|
|
|
|
get_stream_subscriptions_for_user,
|
|
|
|
)
|
|
|
|
|
2016-11-10 19:30:09 +01:00
|
|
|
from zerver.lib.test_helpers import (
|
|
|
|
instrument_url, find_key_by_email,
|
|
|
|
)
|
|
|
|
|
|
|
|
from zerver.models import (
|
|
|
|
get_stream,
|
2017-05-24 02:42:31 +02:00
|
|
|
get_user,
|
2017-11-26 01:45:15 +01:00
|
|
|
get_user,
|
2017-07-12 11:56:10 +02:00
|
|
|
get_realm,
|
2016-11-10 19:30:09 +01:00
|
|
|
Client,
|
|
|
|
Message,
|
|
|
|
Realm,
|
|
|
|
Recipient,
|
2017-10-25 17:17:17 +02:00
|
|
|
Service,
|
2016-11-10 19:30:09 +01:00
|
|
|
Stream,
|
|
|
|
Subscription,
|
|
|
|
UserProfile,
|
|
|
|
)
|
|
|
|
|
2016-10-27 23:55:31 +02:00
|
|
|
from zilencer.models import get_remote_server_by_uuid
|
2016-11-10 19:30:09 +01:00
|
|
|
|
|
|
|
|
|
|
|
import base64
|
|
|
|
import mock
|
|
|
|
import os
|
|
|
|
import re
|
|
|
|
import ujson
|
2017-11-05 05:30:31 +01:00
|
|
|
import urllib
|
2016-11-10 19:30:09 +01:00
|
|
|
|
2017-05-17 21:13:40 +02:00
|
|
|
API_KEYS = {} # type: Dict[Text, Text]
|
2016-11-10 19:30:09 +01:00
|
|
|
|
2017-11-27 05:27:04 +01:00
|
|
|
def flush_caches_for_testing() -> None:
|
2017-03-22 11:45:39 +01:00
|
|
|
global API_KEYS
|
|
|
|
API_KEYS = {}
|
|
|
|
|
2017-02-16 10:10:37 +01:00
|
|
|
class UploadSerializeMixin(SerializeMixin):
|
|
|
|
"""
|
|
|
|
We cannot use override_settings to change upload directory because
|
|
|
|
because settings.LOCAL_UPLOADS_DIR is used in url pattern and urls
|
|
|
|
are compiled only once. Otherwise using a different upload directory
|
|
|
|
for conflicting test cases would have provided better performance
|
|
|
|
while providing the required isolation.
|
|
|
|
"""
|
|
|
|
lockfile = 'var/upload_lock'
|
|
|
|
|
|
|
|
@classmethod
|
2017-11-27 05:27:04 +01:00
|
|
|
def setUpClass(cls: Any, *args: Any, **kwargs: Any) -> None:
|
2017-02-16 10:10:37 +01:00
|
|
|
if not os.path.exists(cls.lockfile):
|
2017-03-05 09:01:49 +01:00
|
|
|
with open(cls.lockfile, 'w'): # nocoverage - rare locking case
|
2017-02-16 10:10:37 +01:00
|
|
|
pass
|
|
|
|
|
|
|
|
super(UploadSerializeMixin, cls).setUpClass(*args, **kwargs)
|
|
|
|
|
2016-11-10 19:30:09 +01:00
|
|
|
class ZulipTestCase(TestCase):
|
2017-03-21 15:34:16 +01:00
|
|
|
# Ensure that the test system just shows us diffs
|
2017-05-24 04:21:29 +02:00
|
|
|
maxDiff = None # type: Optional[int]
|
2017-03-21 15:34:16 +01:00
|
|
|
|
2016-11-10 19:30:09 +01:00
|
|
|
'''
|
|
|
|
WRAPPER_COMMENT:
|
|
|
|
|
|
|
|
We wrap calls to self.client.{patch,put,get,post,delete} for various
|
|
|
|
reasons. Some of this has to do with fixing encodings before calling
|
|
|
|
into the Django code. Some of this has to do with providing a future
|
|
|
|
path for instrumentation. Some of it's just consistency.
|
|
|
|
|
|
|
|
The linter will prevent direct calls to self.client.foo, so the wrapper
|
|
|
|
functions have to fake out the linter by using a local variable called
|
|
|
|
django_client to fool the regext.
|
|
|
|
'''
|
2017-08-26 01:33:53 +02:00
|
|
|
DEFAULT_SUBDOMAIN = "zulip"
|
2016-11-18 01:51:13 +01:00
|
|
|
DEFAULT_REALM = Realm.objects.get(string_id='zulip')
|
2016-11-10 19:30:09 +01:00
|
|
|
|
2017-11-27 05:27:04 +01:00
|
|
|
def set_http_host(self, kwargs: Dict[str, Any]) -> None:
|
2017-08-26 00:02:02 +02:00
|
|
|
if 'subdomain' in kwargs:
|
2017-10-27 03:27:29 +02:00
|
|
|
kwargs['HTTP_HOST'] = Realm.host_for_subdomain(kwargs['subdomain'])
|
2017-08-26 00:40:17 +02:00
|
|
|
del kwargs['subdomain']
|
2017-08-25 22:02:00 +02:00
|
|
|
elif 'HTTP_HOST' not in kwargs:
|
2017-10-27 03:27:29 +02:00
|
|
|
kwargs['HTTP_HOST'] = Realm.host_for_subdomain(self.DEFAULT_SUBDOMAIN)
|
2017-08-26 00:02:02 +02:00
|
|
|
|
2016-11-10 19:30:09 +01:00
|
|
|
@instrument_url
|
2017-11-27 05:27:04 +01:00
|
|
|
def client_patch(self, url: Text, info: Dict[str, Any]={}, **kwargs: Any) -> HttpResponse:
|
2016-11-10 19:30:09 +01:00
|
|
|
"""
|
|
|
|
We need to urlencode, since Django's function won't do it for us.
|
|
|
|
"""
|
|
|
|
encoded = urllib.parse.urlencode(info)
|
2017-05-17 21:13:40 +02:00
|
|
|
django_client = self.client # see WRAPPER_COMMENT
|
2017-08-26 00:02:02 +02:00
|
|
|
self.set_http_host(kwargs)
|
2016-11-10 19:30:09 +01:00
|
|
|
return django_client.patch(url, encoded, **kwargs)
|
|
|
|
|
|
|
|
@instrument_url
|
|
|
|
def client_patch_multipart(self, url, info={}, **kwargs):
|
2016-12-21 13:17:53 +01:00
|
|
|
# type: (Text, Dict[str, Any], **Any) -> HttpResponse
|
2016-11-10 19:30:09 +01:00
|
|
|
"""
|
|
|
|
Use this for patch requests that have file uploads or
|
|
|
|
that need some sort of multi-part content. In the future
|
|
|
|
Django's test client may become a bit more flexible,
|
|
|
|
so we can hopefully eliminate this. (When you post
|
|
|
|
with the Django test client, it deals with MULTIPART_CONTENT
|
|
|
|
automatically, but not patch.)
|
|
|
|
"""
|
|
|
|
encoded = encode_multipart(BOUNDARY, info)
|
2017-05-17 21:13:40 +02:00
|
|
|
django_client = self.client # see WRAPPER_COMMENT
|
2017-08-26 00:02:02 +02:00
|
|
|
self.set_http_host(kwargs)
|
2016-11-10 19:30:09 +01:00
|
|
|
return django_client.patch(
|
|
|
|
url,
|
|
|
|
encoded,
|
|
|
|
content_type=MULTIPART_CONTENT,
|
|
|
|
**kwargs)
|
|
|
|
|
|
|
|
@instrument_url
|
2017-11-27 05:27:04 +01:00
|
|
|
def client_put(self, url: Text, info: Dict[str, Any]={}, **kwargs: Any) -> HttpResponse:
|
2016-11-10 19:30:09 +01:00
|
|
|
encoded = urllib.parse.urlencode(info)
|
2017-05-17 21:13:40 +02:00
|
|
|
django_client = self.client # see WRAPPER_COMMENT
|
2017-08-26 00:02:02 +02:00
|
|
|
self.set_http_host(kwargs)
|
2016-11-10 19:30:09 +01:00
|
|
|
return django_client.put(url, encoded, **kwargs)
|
|
|
|
|
2016-12-21 21:29:29 +01:00
|
|
|
@instrument_url
|
2017-11-27 05:27:04 +01:00
|
|
|
def client_delete(self, url: Text, info: Dict[str, Any]={}, **kwargs: Any) -> HttpResponse:
|
2016-11-10 19:30:09 +01:00
|
|
|
encoded = urllib.parse.urlencode(info)
|
2017-05-17 21:13:40 +02:00
|
|
|
django_client = self.client # see WRAPPER_COMMENT
|
2017-08-26 00:02:02 +02:00
|
|
|
self.set_http_host(kwargs)
|
2016-11-10 19:30:09 +01:00
|
|
|
return django_client.delete(url, encoded, **kwargs)
|
|
|
|
|
2017-03-05 09:31:17 +01:00
|
|
|
@instrument_url
|
2017-11-27 05:27:04 +01:00
|
|
|
def client_options(self, url: Text, info: Dict[str, Any]={}, **kwargs: Any) -> HttpResponse:
|
2017-03-05 09:31:17 +01:00
|
|
|
encoded = urllib.parse.urlencode(info)
|
2017-05-17 21:13:40 +02:00
|
|
|
django_client = self.client # see WRAPPER_COMMENT
|
2017-08-26 00:02:02 +02:00
|
|
|
self.set_http_host(kwargs)
|
2017-03-05 09:31:17 +01:00
|
|
|
return django_client.options(url, encoded, **kwargs)
|
|
|
|
|
2017-08-26 01:24:50 +02:00
|
|
|
@instrument_url
|
2017-11-27 05:27:04 +01:00
|
|
|
def client_head(self, url: Text, info: Dict[str, Any]={}, **kwargs: Any) -> HttpResponse:
|
2017-08-26 01:24:50 +02:00
|
|
|
encoded = urllib.parse.urlencode(info)
|
|
|
|
django_client = self.client # see WRAPPER_COMMENT
|
|
|
|
self.set_http_host(kwargs)
|
|
|
|
return django_client.head(url, encoded, **kwargs)
|
|
|
|
|
2016-11-10 19:30:09 +01:00
|
|
|
@instrument_url
|
2017-11-27 05:27:04 +01:00
|
|
|
def client_post(self, url: Text, info: Dict[str, Any]={}, **kwargs: Any) -> HttpResponse:
|
2017-05-17 21:13:40 +02:00
|
|
|
django_client = self.client # see WRAPPER_COMMENT
|
2017-08-26 00:02:02 +02:00
|
|
|
self.set_http_host(kwargs)
|
2016-11-10 19:30:09 +01:00
|
|
|
return django_client.post(url, info, **kwargs)
|
|
|
|
|
2016-11-17 16:52:28 +01:00
|
|
|
@instrument_url
|
2017-11-27 05:27:04 +01:00
|
|
|
def client_post_request(self, url: Text, req: Any) -> HttpResponse:
|
2016-11-17 16:52:28 +01:00
|
|
|
"""
|
|
|
|
We simulate hitting an endpoint here, although we
|
|
|
|
actually resolve the URL manually and hit the view
|
|
|
|
directly. We have this helper method to allow our
|
|
|
|
instrumentation to work for /notify_tornado and
|
|
|
|
future similar methods that require doing funny
|
|
|
|
things to a request object.
|
|
|
|
"""
|
|
|
|
|
|
|
|
match = resolve(url)
|
|
|
|
return match.func(req)
|
|
|
|
|
2016-11-10 19:30:09 +01:00
|
|
|
@instrument_url
|
2017-11-27 05:27:04 +01:00
|
|
|
def client_get(self, url: Text, info: Dict[str, Any]={}, **kwargs: Any) -> HttpResponse:
|
2017-05-17 21:13:40 +02:00
|
|
|
django_client = self.client # see WRAPPER_COMMENT
|
2017-08-26 00:02:02 +02:00
|
|
|
self.set_http_host(kwargs)
|
2016-11-10 19:30:09 +01:00
|
|
|
return django_client.get(url, info, **kwargs)
|
|
|
|
|
2017-05-07 17:21:26 +02:00
|
|
|
example_user_map = dict(
|
2017-11-03 03:12:25 +01:00
|
|
|
hamlet='hamlet@zulip.com',
|
|
|
|
cordelia='cordelia@zulip.com',
|
|
|
|
iago='iago@zulip.com',
|
|
|
|
prospero='prospero@zulip.com',
|
|
|
|
othello='othello@zulip.com',
|
|
|
|
AARON='AARON@zulip.com',
|
|
|
|
aaron='aaron@zulip.com',
|
|
|
|
ZOE='ZOE@zulip.com',
|
|
|
|
webhook_bot='webhook-bot@zulip.com',
|
|
|
|
welcome_bot='welcome-bot@zulip.com',
|
|
|
|
outgoing_webhook_bot='outgoing-webhook@zulip.com'
|
2017-05-07 17:21:26 +02:00
|
|
|
)
|
|
|
|
|
2017-05-23 01:26:38 +02:00
|
|
|
mit_user_map = dict(
|
2017-11-03 03:12:25 +01:00
|
|
|
sipbtest="sipbtest@mit.edu",
|
|
|
|
starnine="starnine@mit.edu",
|
|
|
|
espuser="espuser@mit.edu",
|
2017-05-23 01:26:38 +02:00
|
|
|
)
|
|
|
|
|
2017-05-24 02:42:31 +02:00
|
|
|
# Non-registered test users
|
|
|
|
nonreg_user_map = dict(
|
2017-11-03 03:12:25 +01:00
|
|
|
test='test@zulip.com',
|
|
|
|
test1='test1@zulip.com',
|
|
|
|
alice='alice@zulip.com',
|
|
|
|
newuser='newuser@zulip.com',
|
|
|
|
bob='bob@zulip.com',
|
|
|
|
cordelia='cordelia@zulip.com',
|
|
|
|
newguy='newguy@zulip.com',
|
|
|
|
me='me@zulip.com',
|
2017-05-24 02:42:31 +02:00
|
|
|
)
|
|
|
|
|
2017-11-27 05:27:04 +01:00
|
|
|
def nonreg_user(self, name: str) -> UserProfile:
|
2017-05-24 02:42:31 +02:00
|
|
|
email = self.nonreg_user_map[name]
|
2017-08-25 06:35:48 +02:00
|
|
|
return get_user(email, get_realm("zulip"))
|
2017-05-24 02:42:31 +02:00
|
|
|
|
2017-11-27 05:27:04 +01:00
|
|
|
def example_user(self, name: str) -> UserProfile:
|
2017-05-07 17:21:26 +02:00
|
|
|
email = self.example_user_map[name]
|
2017-07-12 11:56:10 +02:00
|
|
|
return get_user(email, get_realm('zulip'))
|
2017-05-23 01:26:38 +02:00
|
|
|
|
2017-11-27 05:27:04 +01:00
|
|
|
def mit_user(self, name: str) -> UserProfile:
|
2017-05-23 01:26:38 +02:00
|
|
|
email = self.mit_user_map[name]
|
2017-07-12 11:57:34 +02:00
|
|
|
return get_user(email, get_realm('zephyr'))
|
2017-05-07 17:21:26 +02:00
|
|
|
|
2017-11-27 05:27:04 +01:00
|
|
|
def nonreg_email(self, name: str) -> Text:
|
2017-05-24 02:42:31 +02:00
|
|
|
return self.nonreg_user_map[name]
|
|
|
|
|
2017-11-27 05:27:04 +01:00
|
|
|
def example_email(self, name: str) -> Text:
|
2017-05-23 23:35:03 +02:00
|
|
|
return self.example_user_map[name]
|
|
|
|
|
2017-11-27 05:27:04 +01:00
|
|
|
def mit_email(self, name: str) -> Text:
|
2017-05-23 23:35:03 +02:00
|
|
|
return self.mit_user_map[name]
|
|
|
|
|
2017-11-27 05:27:04 +01:00
|
|
|
def notification_bot(self) -> UserProfile:
|
2017-07-12 12:08:52 +02:00
|
|
|
return get_user('notification-bot@zulip.com', get_realm('zulip'))
|
2017-05-08 17:42:50 +02:00
|
|
|
|
2018-01-30 17:05:14 +01:00
|
|
|
def create_test_bot(self, short_name: Text, user_profile: UserProfile,
|
|
|
|
assert_json_error_msg: Text=None, **extras: Any) -> Optional[UserProfile]:
|
|
|
|
self.login(user_profile.email)
|
|
|
|
bot_info = {
|
|
|
|
'short_name': short_name,
|
|
|
|
'full_name': 'Foo Bot',
|
|
|
|
}
|
|
|
|
bot_info.update(extras)
|
|
|
|
result = self.client_post("/json/bots", bot_info)
|
|
|
|
if assert_json_error_msg is not None:
|
|
|
|
self.assert_json_error(result, assert_json_error_msg)
|
|
|
|
return None
|
|
|
|
else:
|
|
|
|
self.assert_json_success(result)
|
|
|
|
bot_email = '{}-bot@zulip.testserver'.format(short_name)
|
|
|
|
bot_profile = get_user(bot_email, self.user_profile.realm)
|
|
|
|
return bot_profile
|
2017-10-25 17:17:17 +02:00
|
|
|
|
2017-11-27 05:27:04 +01:00
|
|
|
def login_with_return(self, email: Text, password: Optional[Text]=None,
|
|
|
|
**kwargs: Any) -> HttpResponse:
|
2016-11-10 19:30:09 +01:00
|
|
|
if password is None:
|
|
|
|
password = initial_password(email)
|
|
|
|
return self.client_post('/accounts/login/',
|
2017-08-29 07:40:56 +02:00
|
|
|
{'username': email, 'password': password},
|
|
|
|
**kwargs)
|
2016-11-10 19:30:09 +01:00
|
|
|
|
2017-11-27 05:27:04 +01:00
|
|
|
def login(self, email: Text, password: Optional[Text]=None, fails: bool=False,
|
|
|
|
realm: Optional[Realm]=None) -> HttpResponse:
|
2017-11-18 00:11:24 +01:00
|
|
|
if realm is None:
|
|
|
|
realm = get_realm("zulip")
|
2016-11-10 19:30:09 +01:00
|
|
|
if password is None:
|
|
|
|
password = initial_password(email)
|
|
|
|
if not fails:
|
2017-11-18 00:11:24 +01:00
|
|
|
self.assertTrue(self.client.login(username=email, password=password,
|
2017-11-17 23:56:45 +01:00
|
|
|
realm=realm))
|
2016-11-10 19:30:09 +01:00
|
|
|
else:
|
2017-11-18 00:11:24 +01:00
|
|
|
self.assertFalse(self.client.login(username=email, password=password,
|
2017-11-17 23:56:45 +01:00
|
|
|
realm=realm))
|
2016-11-10 19:30:09 +01:00
|
|
|
|
2017-11-27 05:27:04 +01:00
|
|
|
def logout(self) -> None:
|
2017-04-18 03:23:32 +02:00
|
|
|
self.client.logout()
|
|
|
|
|
2017-11-27 05:27:04 +01:00
|
|
|
def register(self, email: Text, password: Text, **kwargs: Any) -> HttpResponse:
|
2017-08-26 01:08:14 +02:00
|
|
|
self.client_post('/accounts/home/', {'email': email},
|
|
|
|
**kwargs)
|
|
|
|
return self.submit_reg_form_for_user(email, password, **kwargs)
|
2016-11-10 19:30:09 +01:00
|
|
|
|
2017-11-27 05:27:04 +01:00
|
|
|
def submit_reg_form_for_user(
|
|
|
|
self, email: Text, password: Text,
|
|
|
|
realm_name: Optional[Text]="Zulip Test",
|
|
|
|
realm_subdomain: Optional[Text]="zuliptest",
|
|
|
|
from_confirmation: Optional[Text]='', full_name: Optional[Text]=None,
|
|
|
|
timezone: Optional[Text]='', realm_in_root_domain: Optional[Text]=None,
|
|
|
|
default_stream_groups: Optional[List[Text]]=[], **kwargs: Any) -> HttpResponse:
|
2016-11-10 19:30:09 +01:00
|
|
|
"""
|
|
|
|
Stage two of the two-step registration process.
|
|
|
|
|
|
|
|
If things are working correctly the account should be fully
|
|
|
|
registered after this call.
|
|
|
|
|
|
|
|
You can pass the HTTP_HOST variable for subdomains via kwargs.
|
|
|
|
"""
|
2017-02-08 05:04:14 +01:00
|
|
|
if full_name is None:
|
|
|
|
full_name = email.replace("@", "_")
|
2017-10-19 08:23:27 +02:00
|
|
|
payload = {
|
|
|
|
'full_name': full_name,
|
|
|
|
'password': password,
|
|
|
|
'realm_name': realm_name,
|
|
|
|
'realm_subdomain': realm_subdomain,
|
|
|
|
'key': find_key_by_email(email),
|
|
|
|
'timezone': timezone,
|
|
|
|
'terms': True,
|
|
|
|
'from_confirmation': from_confirmation,
|
2017-11-16 23:00:04 +01:00
|
|
|
'default_stream_group': default_stream_groups,
|
2017-10-19 08:23:27 +02:00
|
|
|
}
|
2017-10-19 08:30:40 +02:00
|
|
|
if realm_in_root_domain is not None:
|
|
|
|
payload['realm_in_root_domain'] = realm_in_root_domain
|
2017-10-19 08:23:27 +02:00
|
|
|
return self.client_post('/accounts/register/', payload, **kwargs)
|
2016-11-10 19:30:09 +01:00
|
|
|
|
2017-11-27 05:27:04 +01:00
|
|
|
def get_confirmation_url_from_outbox(self, email_address: Text, *,
|
|
|
|
url_pattern: Text=None) -> Text:
|
2016-11-10 19:30:09 +01:00
|
|
|
from django.core.mail import outbox
|
2017-10-30 22:56:14 +01:00
|
|
|
if url_pattern is None:
|
|
|
|
# This is a bit of a crude heuristic, but good enough for most tests.
|
|
|
|
url_pattern = settings.EXTERNAL_HOST + "(\S+)>"
|
2016-11-10 19:30:09 +01:00
|
|
|
for message in reversed(outbox):
|
|
|
|
if email_address in message.to:
|
2017-10-30 22:56:14 +01:00
|
|
|
return re.search(url_pattern, message.body).groups()[0]
|
2016-11-10 19:30:09 +01:00
|
|
|
else:
|
2017-03-05 09:01:49 +01:00
|
|
|
raise AssertionError("Couldn't find a confirmation email.")
|
2016-11-10 19:30:09 +01:00
|
|
|
|
2017-12-15 00:15:32 +01:00
|
|
|
def encode_credentials(self, identifier: Text, realm: Text="zulip") -> Text:
|
2017-04-27 11:41:27 +02:00
|
|
|
"""
|
|
|
|
identifier: Can be an email or a remote server uuid.
|
|
|
|
"""
|
2017-08-25 08:33:00 +02:00
|
|
|
if identifier in API_KEYS:
|
|
|
|
api_key = API_KEYS[identifier]
|
2016-10-27 23:55:31 +02:00
|
|
|
else:
|
2017-08-25 08:33:00 +02:00
|
|
|
if is_remote_server(identifier):
|
|
|
|
api_key = get_remote_server_by_uuid(identifier).api_key
|
|
|
|
else:
|
2017-11-26 01:45:15 +01:00
|
|
|
api_key = get_user(identifier, get_realm(realm)).api_key
|
2017-08-25 08:33:00 +02:00
|
|
|
API_KEYS[identifier] = api_key
|
2016-10-27 23:55:31 +02:00
|
|
|
|
2017-11-03 03:12:25 +01:00
|
|
|
credentials = "%s:%s" % (identifier, api_key)
|
2017-12-15 00:15:32 +01:00
|
|
|
return 'Basic ' + base64.b64encode(credentials.encode('utf-8')).decode('utf-8')
|
2016-11-10 19:30:09 +01:00
|
|
|
|
2017-12-14 19:02:02 +01:00
|
|
|
def api_get(self, email: Text, *args: Any, **kwargs: Any) -> HttpResponse:
|
2017-12-15 00:15:32 +01:00
|
|
|
kwargs['HTTP_AUTHORIZATION'] = self.encode_credentials(email)
|
2017-12-14 19:02:02 +01:00
|
|
|
return self.client_get(*args, **kwargs)
|
|
|
|
|
|
|
|
def api_post(self, identifier: Text, *args: Any, **kwargs: Any) -> HttpResponse:
|
2017-12-15 00:15:32 +01:00
|
|
|
kwargs['HTTP_AUTHORIZATION'] = self.encode_credentials(identifier, kwargs.get('realm', 'zulip'))
|
2017-12-14 19:02:02 +01:00
|
|
|
return self.client_post(*args, **kwargs)
|
|
|
|
|
|
|
|
def api_patch(self, email: Text, *args: Any, **kwargs: Any) -> HttpResponse:
|
2017-12-15 00:15:32 +01:00
|
|
|
kwargs['HTTP_AUTHORIZATION'] = self.encode_credentials(email)
|
2017-12-14 19:02:02 +01:00
|
|
|
return self.client_patch(*args, **kwargs)
|
|
|
|
|
|
|
|
def api_put(self, email: Text, *args: Any, **kwargs: Any) -> HttpResponse:
|
2017-12-15 00:15:32 +01:00
|
|
|
kwargs['HTTP_AUTHORIZATION'] = self.encode_credentials(email)
|
2017-12-14 19:02:02 +01:00
|
|
|
return self.client_put(*args, **kwargs)
|
|
|
|
|
|
|
|
def api_delete(self, email: Text, *args: Any, **kwargs: Any) -> HttpResponse:
|
2017-12-15 00:15:32 +01:00
|
|
|
kwargs['HTTP_AUTHORIZATION'] = self.encode_credentials(email)
|
2017-12-14 19:02:02 +01:00
|
|
|
return self.client_delete(*args, **kwargs)
|
|
|
|
|
2017-11-27 05:27:04 +01:00
|
|
|
def get_streams(self, email: Text, realm: Realm) -> List[Text]:
|
2016-11-10 19:30:09 +01:00
|
|
|
"""
|
|
|
|
Helper function to get the stream names for a user
|
|
|
|
"""
|
2017-07-12 13:07:48 +02:00
|
|
|
user_profile = get_user(email, realm)
|
2017-10-29 17:11:11 +01:00
|
|
|
subs = get_stream_subscriptions_for_user(user_profile).filter(
|
2016-11-10 19:30:09 +01:00
|
|
|
active=True,
|
2017-10-29 17:11:11 +01:00
|
|
|
)
|
2016-12-27 07:09:35 +01:00
|
|
|
return [cast(Text, get_display_recipient(sub.recipient)) for sub in subs]
|
2016-11-10 19:30:09 +01:00
|
|
|
|
2017-12-05 18:41:26 +01:00
|
|
|
def send_personal_message(self, from_email: Text, to_email: Text, content: Text="test content",
|
|
|
|
sender_realm: Text="zulip") -> int:
|
2017-11-26 01:45:15 +01:00
|
|
|
sender = get_user(from_email, get_realm(sender_realm))
|
2017-10-27 19:28:02 +02:00
|
|
|
|
|
|
|
recipient_list = [to_email]
|
|
|
|
(sending_client, _) = Client.objects.get_or_create(name="test suite")
|
|
|
|
|
|
|
|
return check_send_message(
|
|
|
|
sender, sending_client, 'private', recipient_list, None,
|
|
|
|
content
|
|
|
|
)
|
|
|
|
|
2017-12-05 18:41:26 +01:00
|
|
|
def send_huddle_message(self, from_email: Text, to_emails: List[Text], content: Text="test content",
|
|
|
|
sender_realm: Text="zulip") -> int:
|
2017-11-26 01:45:15 +01:00
|
|
|
sender = get_user(from_email, get_realm(sender_realm))
|
2017-10-27 19:53:08 +02:00
|
|
|
|
|
|
|
assert(len(to_emails) >= 2)
|
|
|
|
|
|
|
|
(sending_client, _) = Client.objects.get_or_create(name="test suite")
|
|
|
|
|
|
|
|
return check_send_message(
|
|
|
|
sender, sending_client, 'private', to_emails, None,
|
|
|
|
content
|
|
|
|
)
|
|
|
|
|
2017-12-05 18:41:26 +01:00
|
|
|
def send_stream_message(self, sender_email: Text, stream_name: Text, content: Text="test content",
|
|
|
|
topic_name: Text="test", sender_realm: Text="zulip") -> int:
|
2017-11-26 01:45:15 +01:00
|
|
|
sender = get_user(sender_email, get_realm(sender_realm))
|
2017-10-27 17:57:23 +02:00
|
|
|
|
|
|
|
(sending_client, _) = Client.objects.get_or_create(name="test suite")
|
|
|
|
|
|
|
|
return check_send_stream_message(
|
|
|
|
sender=sender,
|
|
|
|
client=sending_client,
|
|
|
|
stream_name=stream_name,
|
|
|
|
topic=topic_name,
|
|
|
|
body=content,
|
|
|
|
)
|
|
|
|
|
2017-12-05 18:41:26 +01:00
|
|
|
def get_messages(self, anchor: int=1, num_before: int=100, num_after: int=100,
|
|
|
|
use_first_unread_anchor: bool=False) -> List[Dict[str, Any]]:
|
2016-11-10 19:30:09 +01:00
|
|
|
post_params = {"anchor": anchor, "num_before": num_before,
|
2017-03-19 01:46:39 +01:00
|
|
|
"num_after": num_after,
|
|
|
|
"use_first_unread_anchor": ujson.dumps(use_first_unread_anchor)}
|
2016-11-10 19:30:09 +01:00
|
|
|
result = self.client_get("/json/messages", dict(post_params))
|
2017-08-17 08:46:39 +02:00
|
|
|
data = result.json()
|
2016-11-10 19:30:09 +01:00
|
|
|
return data['messages']
|
|
|
|
|
2017-11-27 05:27:04 +01:00
|
|
|
def users_subscribed_to_stream(self, stream_name: Text, realm: Realm) -> List[UserProfile]:
|
2016-11-10 19:30:09 +01:00
|
|
|
stream = Stream.objects.get(name=stream_name, realm=realm)
|
|
|
|
recipient = Recipient.objects.get(type_id=stream.id, type=Recipient.STREAM)
|
|
|
|
subscriptions = Subscription.objects.filter(recipient=recipient, active=True)
|
|
|
|
|
|
|
|
return [subscription.user_profile for subscription in subscriptions]
|
|
|
|
|
2017-11-27 05:27:04 +01:00
|
|
|
def assert_url_serves_contents_of_file(self, url: str, result: bytes) -> None:
|
2016-12-19 16:17:19 +01:00
|
|
|
response = self.client_get(url)
|
|
|
|
data = b"".join(response.streaming_content)
|
2017-02-09 01:32:42 +01:00
|
|
|
self.assertEqual(result, data)
|
2016-12-19 16:17:19 +01:00
|
|
|
|
2017-11-27 05:27:04 +01:00
|
|
|
def assert_json_success(self, result: HttpResponse) -> Dict[str, Any]:
|
2016-11-10 19:30:09 +01:00
|
|
|
"""
|
|
|
|
Successful POSTs return a 200 and JSON of the form {"result": "success",
|
|
|
|
"msg": ""}.
|
|
|
|
"""
|
2017-08-29 06:33:10 +02:00
|
|
|
try:
|
|
|
|
json = ujson.loads(result.content)
|
|
|
|
except Exception: # nocoverage
|
|
|
|
json = {'msg': "Error parsing JSON in response!"}
|
|
|
|
self.assertEqual(result.status_code, 200, json['msg'])
|
2016-11-10 19:30:09 +01:00
|
|
|
self.assertEqual(json.get("result"), "success")
|
|
|
|
# We have a msg key for consistency with errors, but it typically has an
|
|
|
|
# empty value.
|
|
|
|
self.assertIn("msg", json)
|
2017-08-29 06:33:10 +02:00
|
|
|
self.assertNotEqual(json["msg"], "Error parsing JSON in response!")
|
2016-11-10 19:30:09 +01:00
|
|
|
return json
|
|
|
|
|
2017-11-27 05:27:04 +01:00
|
|
|
def get_json_error(self, result: HttpResponse, status_code: int=400) -> Dict[str, Any]:
|
2017-08-29 06:33:10 +02:00
|
|
|
try:
|
|
|
|
json = ujson.loads(result.content)
|
|
|
|
except Exception: # nocoverage
|
|
|
|
json = {'msg': "Error parsing JSON in response!"}
|
|
|
|
self.assertEqual(result.status_code, status_code, msg=json.get('msg'))
|
2016-11-10 19:30:09 +01:00
|
|
|
self.assertEqual(json.get("result"), "error")
|
|
|
|
return json['msg']
|
|
|
|
|
2017-11-27 05:27:04 +01:00
|
|
|
def assert_json_error(self, result: HttpResponse, msg: Text, status_code: int=400) -> None:
|
2016-11-10 19:30:09 +01:00
|
|
|
"""
|
|
|
|
Invalid POSTs return an error status code and JSON of the form
|
|
|
|
{"result": "error", "msg": "reason"}.
|
|
|
|
"""
|
|
|
|
self.assertEqual(self.get_json_error(result, status_code=status_code), msg)
|
|
|
|
|
2017-11-27 05:27:04 +01:00
|
|
|
def assert_length(self, items: List[Any], count: int) -> None:
|
2017-10-06 23:28:22 +02:00
|
|
|
actual_count = len(items)
|
|
|
|
if actual_count != count: # nocoverage
|
|
|
|
print('ITEMS:\n')
|
|
|
|
for item in items:
|
|
|
|
print(item)
|
|
|
|
print("\nexpected length: %s\nactual length: %s" % (count, actual_count))
|
|
|
|
raise AssertionError('List is unexpected size!')
|
2016-11-10 19:30:09 +01:00
|
|
|
|
2017-11-27 05:27:04 +01:00
|
|
|
def assert_json_error_contains(self, result: HttpResponse, msg_substring: Text,
|
|
|
|
status_code: int=400) -> None:
|
2016-11-10 19:30:09 +01:00
|
|
|
self.assertIn(msg_substring, self.get_json_error(result, status_code=status_code))
|
|
|
|
|
2017-11-27 05:27:04 +01:00
|
|
|
def assert_in_response(self, substring: Text, response: HttpResponse) -> None:
|
2016-11-10 19:30:09 +01:00
|
|
|
self.assertIn(substring, response.content.decode('utf-8'))
|
|
|
|
|
2017-11-27 05:27:04 +01:00
|
|
|
def assert_in_success_response(self, substrings: List[Text],
|
|
|
|
response: HttpResponse) -> None:
|
2016-11-19 21:54:00 +01:00
|
|
|
self.assertEqual(response.status_code, 200)
|
|
|
|
decoded = response.content.decode('utf-8')
|
|
|
|
for substring in substrings:
|
|
|
|
self.assertIn(substring, decoded)
|
|
|
|
|
2017-11-27 05:27:04 +01:00
|
|
|
def assert_not_in_success_response(self, substrings: List[Text],
|
|
|
|
response: HttpResponse) -> None:
|
2017-04-10 12:35:56 +02:00
|
|
|
self.assertEqual(response.status_code, 200)
|
|
|
|
decoded = response.content.decode('utf-8')
|
|
|
|
for substring in substrings:
|
|
|
|
self.assertNotIn(substring, decoded)
|
|
|
|
|
2017-11-27 05:27:04 +01:00
|
|
|
def fixture_data(self, type: Text, action: Text, file_type: Text='json') -> Text:
|
2017-11-04 18:03:59 +01:00
|
|
|
fn = os.path.join(
|
|
|
|
os.path.dirname(__file__),
|
|
|
|
"../webhooks/%s/fixtures/%s.%s" % (type, action, file_type)
|
|
|
|
)
|
|
|
|
return open(fn).read()
|
2016-11-10 19:30:09 +01:00
|
|
|
|
2017-11-27 05:27:04 +01:00
|
|
|
def make_stream(self, stream_name: Text, realm: Optional[Realm]=None,
|
|
|
|
invite_only: Optional[bool]=False) -> Stream:
|
2016-11-10 19:30:09 +01:00
|
|
|
if realm is None:
|
2016-11-18 01:51:13 +01:00
|
|
|
realm = self.DEFAULT_REALM
|
2016-11-10 19:30:09 +01:00
|
|
|
|
|
|
|
try:
|
|
|
|
stream = Stream.objects.create(
|
|
|
|
realm=realm,
|
|
|
|
name=stream_name,
|
|
|
|
invite_only=invite_only,
|
|
|
|
)
|
2017-03-05 09:01:49 +01:00
|
|
|
except IntegrityError: # nocoverage -- this is for bugs in the tests
|
2016-11-10 19:30:09 +01:00
|
|
|
raise Exception('''
|
|
|
|
%s already exists
|
|
|
|
|
|
|
|
Please call make_stream with a stream name
|
|
|
|
that is not already in use.''' % (stream_name,))
|
|
|
|
|
|
|
|
Recipient.objects.create(type_id=stream.id, type=Recipient.STREAM)
|
|
|
|
return stream
|
|
|
|
|
2017-08-25 06:01:29 +02:00
|
|
|
# Subscribe to a stream directly
|
2017-11-27 05:27:04 +01:00
|
|
|
def subscribe(self, user_profile: UserProfile, stream_name: Text) -> Stream:
|
2017-08-25 06:01:29 +02:00
|
|
|
try:
|
|
|
|
stream = get_stream(stream_name, user_profile.realm)
|
|
|
|
from_stream_creation = False
|
|
|
|
except Stream.DoesNotExist:
|
|
|
|
stream, from_stream_creation = create_stream_if_needed(user_profile.realm, stream_name)
|
|
|
|
bulk_add_subscriptions([stream], [user_profile], from_stream_creation=from_stream_creation)
|
|
|
|
return stream
|
|
|
|
|
2017-11-27 05:27:04 +01:00
|
|
|
def unsubscribe(self, user_profile: UserProfile, stream_name: Text) -> None:
|
2017-08-25 06:23:11 +02:00
|
|
|
stream = get_stream(stream_name, user_profile.realm)
|
2016-11-10 19:30:09 +01:00
|
|
|
bulk_remove_subscriptions([user_profile], [stream])
|
|
|
|
|
|
|
|
# Subscribe to a stream by making an API request
|
2017-12-05 18:41:26 +01:00
|
|
|
def common_subscribe_to_streams(self, email: Text, streams: Iterable[Text],
|
|
|
|
extra_post_data: Dict[str, Any]={}, invite_only: bool=False,
|
|
|
|
**kwargs: Any) -> HttpResponse:
|
2016-11-10 19:30:09 +01:00
|
|
|
post_data = {'subscriptions': ujson.dumps([{"name": stream} for stream in streams]),
|
|
|
|
'invite_only': ujson.dumps(invite_only)}
|
|
|
|
post_data.update(extra_post_data)
|
2017-12-14 19:02:02 +01:00
|
|
|
kwargs['realm'] = kwargs.get('subdomain', 'zulip')
|
|
|
|
result = self.api_post(email, "/api/v1/users/me/subscriptions", post_data, **kwargs)
|
2016-11-10 19:30:09 +01:00
|
|
|
return result
|
|
|
|
|
2017-11-27 05:27:04 +01:00
|
|
|
def check_user_subscribed_only_to_streams(self, user_name: Text,
|
|
|
|
streams: List[Stream]) -> None:
|
2017-11-16 22:12:31 +01:00
|
|
|
streams = sorted(streams, key=lambda x: x.name)
|
|
|
|
subscribed_streams = gather_subscriptions(self.nonreg_user(user_name))[0]
|
|
|
|
|
|
|
|
self.assertEqual(len(subscribed_streams), len(streams))
|
|
|
|
|
|
|
|
for x, y in zip(subscribed_streams, streams):
|
|
|
|
self.assertEqual(x["name"], y.name)
|
|
|
|
|
2017-11-27 05:27:04 +01:00
|
|
|
def send_json_payload(self, user_profile: UserProfile, url: Text,
|
|
|
|
payload: Union[Text, Dict[str, Any]],
|
|
|
|
stream_name: Optional[Text]=None, **post_params: Any) -> Message:
|
2016-11-10 19:30:09 +01:00
|
|
|
if stream_name is not None:
|
2017-08-25 06:37:47 +02:00
|
|
|
self.subscribe(user_profile, stream_name)
|
2016-11-10 19:30:09 +01:00
|
|
|
|
|
|
|
result = self.client_post(url, payload, **post_params)
|
|
|
|
self.assert_json_success(result)
|
|
|
|
|
|
|
|
# Check the correct message was sent
|
|
|
|
msg = self.get_last_message()
|
2017-08-25 06:37:47 +02:00
|
|
|
self.assertEqual(msg.sender.email, user_profile.email)
|
2016-11-10 19:30:09 +01:00
|
|
|
if stream_name is not None:
|
|
|
|
self.assertEqual(get_display_recipient(msg.recipient), stream_name)
|
|
|
|
# TODO: should also validate recipient for private messages
|
|
|
|
|
|
|
|
return msg
|
|
|
|
|
2017-11-27 05:27:04 +01:00
|
|
|
def get_last_message(self) -> Message:
|
2016-11-10 19:30:09 +01:00
|
|
|
return Message.objects.latest('id')
|
|
|
|
|
2017-11-27 05:27:04 +01:00
|
|
|
def get_second_to_last_message(self) -> Message:
|
2016-11-10 19:30:09 +01:00
|
|
|
return Message.objects.all().order_by('-id')[1]
|
|
|
|
|
|
|
|
@contextmanager
|
2017-11-27 05:27:04 +01:00
|
|
|
def simulated_markdown_failure(self) -> Iterator[None]:
|
2016-11-10 19:30:09 +01:00
|
|
|
'''
|
|
|
|
This raises a failure inside of the try/except block of
|
|
|
|
bugdown.__init__.do_convert.
|
|
|
|
'''
|
|
|
|
with \
|
|
|
|
self.settings(ERROR_BOT=None), \
|
|
|
|
mock.patch('zerver.lib.bugdown.timeout', side_effect=KeyError('foo')), \
|
|
|
|
mock.patch('zerver.lib.bugdown.log_bugdown_error'):
|
|
|
|
yield
|
|
|
|
|
|
|
|
class WebhookTestCase(ZulipTestCase):
|
|
|
|
"""
|
|
|
|
Common for all webhooks tests
|
|
|
|
|
|
|
|
Override below class attributes and run send_and_test_message
|
|
|
|
If you create your url in uncommon way you can override build_webhook_url method
|
|
|
|
In case that you need modify body or create it without using fixture you can also override get_body method
|
|
|
|
"""
|
2017-05-17 21:13:40 +02:00
|
|
|
STREAM_NAME = None # type: Optional[Text]
|
2016-11-10 19:30:09 +01:00
|
|
|
TEST_USER_EMAIL = 'webhook-bot@zulip.com'
|
2017-05-17 21:13:40 +02:00
|
|
|
URL_TEMPLATE = None # type: Optional[Text]
|
|
|
|
FIXTURE_DIR_NAME = None # type: Optional[Text]
|
2016-11-10 19:30:09 +01:00
|
|
|
|
2017-08-25 06:37:47 +02:00
|
|
|
@property
|
2017-11-27 05:27:04 +01:00
|
|
|
def test_user(self) -> UserProfile:
|
2017-08-25 08:23:13 +02:00
|
|
|
return get_user(self.TEST_USER_EMAIL, get_realm("zulip"))
|
2017-08-25 06:37:47 +02:00
|
|
|
|
2017-11-27 05:27:04 +01:00
|
|
|
def setUp(self) -> None:
|
2016-11-10 19:30:09 +01:00
|
|
|
self.url = self.build_webhook_url()
|
|
|
|
|
2017-12-14 19:02:02 +01:00
|
|
|
def api_stream_message(self, email: Text, *args: Any, **kwargs: Any) -> HttpResponse:
|
2017-12-15 00:15:32 +01:00
|
|
|
kwargs['HTTP_AUTHORIZATION'] = self.encode_credentials(email)
|
2017-12-14 19:02:02 +01:00
|
|
|
return self.send_and_test_stream_message(*args, **kwargs)
|
|
|
|
|
2017-12-05 18:41:26 +01:00
|
|
|
def send_and_test_stream_message(self, fixture_name: Text, expected_subject: Optional[Text]=None,
|
|
|
|
expected_message: Optional[Text]=None,
|
|
|
|
content_type: Optional[Text]="application/json", **kwargs: Any) -> Message:
|
2016-11-10 19:30:09 +01:00
|
|
|
payload = self.get_body(fixture_name)
|
|
|
|
if content_type is not None:
|
|
|
|
kwargs['content_type'] = content_type
|
2017-08-25 06:37:47 +02:00
|
|
|
msg = self.send_json_payload(self.test_user, self.url, payload,
|
2016-11-10 19:30:09 +01:00
|
|
|
self.STREAM_NAME, **kwargs)
|
|
|
|
self.do_test_subject(msg, expected_subject)
|
|
|
|
self.do_test_message(msg, expected_message)
|
|
|
|
|
|
|
|
return msg
|
|
|
|
|
2017-12-05 18:41:26 +01:00
|
|
|
def send_and_test_private_message(self, fixture_name: Text, expected_subject: Text=None,
|
|
|
|
expected_message: Text=None, content_type: str="application/json",
|
|
|
|
**kwargs: Any)-> Message:
|
2016-11-10 19:30:09 +01:00
|
|
|
payload = self.get_body(fixture_name)
|
|
|
|
if content_type is not None:
|
|
|
|
kwargs['content_type'] = content_type
|
|
|
|
|
2017-08-25 06:37:47 +02:00
|
|
|
msg = self.send_json_payload(self.test_user, self.url, payload,
|
2016-11-10 19:30:09 +01:00
|
|
|
stream_name=None, **kwargs)
|
|
|
|
self.do_test_message(msg, expected_message)
|
|
|
|
|
|
|
|
return msg
|
|
|
|
|
2017-11-27 05:27:04 +01:00
|
|
|
def build_webhook_url(self, *args: Any, **kwargs: Any) -> Text:
|
2017-04-21 23:35:40 +02:00
|
|
|
url = self.URL_TEMPLATE
|
|
|
|
if url.find("api_key") >= 0:
|
2017-08-25 08:23:13 +02:00
|
|
|
api_key = self.test_user.api_key
|
2017-04-21 23:35:40 +02:00
|
|
|
url = self.URL_TEMPLATE.format(api_key=api_key,
|
|
|
|
stream=self.STREAM_NAME)
|
|
|
|
else:
|
|
|
|
url = self.URL_TEMPLATE.format(stream=self.STREAM_NAME)
|
|
|
|
|
|
|
|
has_arguments = kwargs or args
|
|
|
|
if has_arguments and url.find('?') == -1:
|
2017-04-06 23:26:29 +02:00
|
|
|
url = "{}?".format(url)
|
|
|
|
else:
|
|
|
|
url = "{}&".format(url)
|
|
|
|
|
|
|
|
for key, value in kwargs.items():
|
|
|
|
url = "{}{}={}&".format(url, key, value)
|
|
|
|
|
2017-04-21 23:35:40 +02:00
|
|
|
for arg in args:
|
|
|
|
url = "{}{}&".format(url, arg)
|
|
|
|
|
|
|
|
return url[:-1] if has_arguments else url
|
2016-11-10 19:30:09 +01:00
|
|
|
|
2017-11-27 05:27:04 +01:00
|
|
|
def get_body(self, fixture_name: Text) -> Union[Text, Dict[str, Text]]:
|
2016-11-10 19:30:09 +01:00
|
|
|
"""Can be implemented either as returning a dictionary containing the
|
|
|
|
post parameters or as string containing the body of the request."""
|
|
|
|
return ujson.dumps(ujson.loads(self.fixture_data(self.FIXTURE_DIR_NAME, fixture_name)))
|
|
|
|
|
2017-11-27 05:27:04 +01:00
|
|
|
def do_test_subject(self, msg: Message, expected_subject: Optional[Text]) -> None:
|
2016-11-10 19:30:09 +01:00
|
|
|
if expected_subject is not None:
|
|
|
|
self.assertEqual(msg.topic_name(), expected_subject)
|
|
|
|
|
2017-11-27 05:27:04 +01:00
|
|
|
def do_test_message(self, msg: Message, expected_message: Optional[Text]) -> None:
|
2016-11-10 19:30:09 +01:00
|
|
|
if expected_message is not None:
|
|
|
|
self.assertEqual(msg.content, expected_message)
|