python: Pass query parameters as a dict when making GET requests.

This provides automatic URL-encoding.

Signed-off-by: Anders Kaseorg <anders@zulip.com>
This commit is contained in:
Anders Kaseorg 2020-09-12 15:11:30 -07:00 committed by Tim Abbott
parent f0b11d62f6
commit 4e9d587535
14 changed files with 99 additions and 132 deletions

View File

@ -654,7 +654,7 @@ class StripeTest(StripeTestCase):
self.assert_not_in_success_response(["Go to your Zulip organization"], response)
with patch('corporate.views.timezone_now', return_value=self.now):
response = self.client_get("/billing/?onboarding=true")
response = self.client_get("/billing/", {"onboarding": "true"})
self.assert_in_success_response(["Go to your Zulip organization"], response)
with patch('corporate.lib.stripe.get_latest_seat_count', return_value=12):
@ -1160,7 +1160,7 @@ class StripeTest(StripeTestCase):
self.assertEqual(response.status_code, 302)
self.assertEqual(response.url, "/billing/")
response = self.client_get("/upgrade/?onboarding=true")
response = self.client_get("/upgrade/", {"onboarding": "true"})
self.assertEqual(response.status_code, 302)
self.assertEqual(response.url, "/billing/?onboarding=true")

View File

@ -58,7 +58,8 @@ def fetch_contributors(repo_name: str, max_retries: int) -> List[Contributor]:
retry_attempts = 0
page_index = 1
api_link = f"https://api.github.com/repos/zulip/{repo_name}/contributors?anon=1"
api_link = f"https://api.github.com/repos/zulip/{repo_name}/contributors"
api_data = {"anon": "1"}
certificates = os.environ.get('CUSTOM_CA_CERTIFICATES')
headers: Dict[str, str] = {}
@ -67,7 +68,7 @@ def fetch_contributors(repo_name: str, max_retries: int) -> List[Contributor]:
headers = {"Authorization": f"token {personal_access_token}"}
while True:
response: requests.Response = requests.get(f"{api_link}&page={page_index}", verify=certificates, headers=headers)
response: requests.Response = requests.get(api_link, {**api_data, "page": f"{page_index}"}, verify=certificates, headers=headers)
if response.status_code == 200:
data = response.json()
if len(data) == 0:

View File

@ -6,7 +6,6 @@ import shutil
import subprocess
from collections import defaultdict
from typing import Any, Dict, Iterator, List, Optional, Set, Tuple
from urllib.parse import urlencode
import orjson
import requests
@ -1135,7 +1134,7 @@ def log_token_warning(token: str) -> None:
def get_slack_api_data(slack_api_url: str, get_param: str, **kwargs: Any) -> Any:
if not kwargs.get("token"):
raise AssertionError("Slack token missing in kwargs")
data = requests.get(f"{slack_api_url}?{urlencode(kwargs)}")
data = requests.get(slack_api_url, kwargs)
if data.status_code == requests.codes.ok:
result = data.json()

View File

@ -317,7 +317,7 @@ class AuthBackendTest(ZulipTestCase):
# instead of redirecting to app
self.login('iago')
realm = get_realm("zulip")
result = self.client_get('/login/?preview=true')
result = self.client_get("/login/", {"preview": "true"})
self.assertEqual(result.status_code, 200)
self.assert_in_response(realm.description, result)
assert realm.name is not None
@ -329,7 +329,7 @@ class AuthBackendTest(ZulipTestCase):
result = self.client_patch('/json/realm', data)
self.assert_json_success(result)
result = self.client_get('/login/?preview=true')
result = self.client_get("/login/", {"preview": "true"})
self.assertEqual(result.status_code, 200)
self.assert_in_response("New realm description", result)
self.assert_in_response("New Zulip", result)
@ -1763,7 +1763,7 @@ class SAMLAuthBackendTest(SocialAuthBase):
self.assertEqual(m.output, [self.logger_output("/login/saml/ : Bad idp param: KeyError: {}.".format("'idp'"), 'info')])
with self.assertLogs(self.logger_string, level='INFO') as m:
result = self.client_get('/login/saml/?idp=bad_idp')
result = self.client_get("/login/saml/", {"idp": "bad_idp"})
self.assertEqual(result.status_code, 302)
self.assertEqual('/login/', result.url)
self.assertEqual(m.output, [self.logger_output("/login/saml/ : Bad idp param: KeyError: {}.".format("'bad_idp'"), 'info')])
@ -3627,7 +3627,7 @@ class TestDevAuthBackend(ZulipTestCase):
class TestZulipRemoteUserBackend(DesktopFlowTestingLib, ZulipTestCase):
def test_start_remote_user_sso(self) -> None:
result = self.client_get('/accounts/login/start/sso/?param1=value1&params=value2')
result = self.client_get("/accounts/login/start/sso/", {"param1": "value1", "params": "value2"})
self.assertEqual(result.status_code, 302)
url = result.url
@ -3871,7 +3871,7 @@ class TestZulipRemoteUserBackend(DesktopFlowTestingLib, ZulipTestCase):
user_profile = self.example_user('hamlet')
email = user_profile.delivery_email
with self.settings(AUTHENTICATION_BACKENDS=('zproject.backends.ZulipRemoteUserBackend',)):
result = self.client_get('/accounts/login/sso/?next=' + next, REMOTE_USER=email)
result = self.client_get("/accounts/login/sso/", {"next": next}, REMOTE_USER=email)
return result
res = test_with_redirect_to_param_set_as_next()
@ -3883,13 +3883,6 @@ class TestZulipRemoteUserBackend(DesktopFlowTestingLib, ZulipTestCase):
res = test_with_redirect_to_param_set_as_next('https://rogue.zulip-like.server/login')
self.assertEqual('http://zulip.testserver', res.url)
# In SSO based auth we never make browser send the hash to the backend.
# Rather we depend upon the browser's behaviour of persisting hash anchors
# in between redirect requests. See below stackoverflow conversation
# https://stackoverflow.com/questions/5283395/url-hash-is-persisting-between-redirects
res = test_with_redirect_to_param_set_as_next('#narrow/stream/7-test-here')
self.assertEqual('http://zulip.testserver', res.url)
class TestJWTLogin(ZulipTestCase):
"""
JWT uses ZulipDummyBackend.

View File

@ -174,7 +174,7 @@ class TestVideoCall(ZulipTestCase):
def test_join_bigbluebutton_redirect(self) -> None:
responses.add(responses.GET, "https://bbb.example.com/bigbluebutton/api/create?meetingID=zulip-1&moderatorPW=a&attendeePW=aa&checksum=check",
"<response><returncode>SUCCESS</returncode><messageKey/></response>")
response = self.client_get("/calls/bigbluebutton/join?meeting_id=%22zulip-1%22&password=%22a%22&checksum=%22check%22")
response = self.client_get("/calls/bigbluebutton/join", {"meeting_id": '"zulip-1"', "password": '"a"', "checksum": '"check"'})
self.assertEqual(response.status_code, 302)
self.assertEqual(isinstance(response, HttpResponseRedirect), True)
self.assertEqual(response.url, "https://bbb.example.com/bigbluebutton/api/join?meetingID=zulip-1&password=a"
@ -186,7 +186,7 @@ class TestVideoCall(ZulipTestCase):
"https://bbb.example.com/bigbluebutton/api/create?meetingID=zulip-1&moderatorPW=a&attendeePW=aa&checksum=check",
"<response><returncode>FAILED</returncode><messageKey>checksumError</messageKey>"
"<message>You did not pass the checksum security check</message></response>")
response = self.client_get("/calls/bigbluebutton/join?meeting_id=%22zulip-1%22&password=%22a%22&checksum=%22check%22")
response = self.client_get("/calls/bigbluebutton/join", {"meeting_id": '"zulip-1"', "password": '"a"', "checksum": '"check"'})
self.assert_json_error(response, "Error authenticating to the Big Blue Button server.")
@responses.activate
@ -195,7 +195,7 @@ class TestVideoCall(ZulipTestCase):
responses.add(responses.GET,
"https://bbb.example.com/bigbluebutton/api/create?meetingID=zulip-1&moderatorPW=a&attendeePW=aa&checksum=check", "", status=500)
response = self.client_get(
"/calls/bigbluebutton/join?meeting_id=%22zulip-1%22&password=%22a%22&checksum=%22check%22")
"/calls/bigbluebutton/join", {"meeting_id": '"zulip-1"', "password": '"a"', "checksum": '"check"'})
self.assert_json_error(response, "Error connecting to the Big Blue Button server.")
@responses.activate
@ -205,12 +205,12 @@ class TestVideoCall(ZulipTestCase):
"https://bbb.example.com/bigbluebutton/api/create?meetingID=zulip-1&moderatorPW=a&attendeePW=aa&checksum=check",
"<response><returncode>FAILURE</returncode><messageKey>otherFailure</messageKey></response>")
response = self.client_get(
"/calls/bigbluebutton/join?meeting_id=%22zulip-1%22&password=%22a%22&checksum=%22check%22")
"/calls/bigbluebutton/join", {"meeting_id": '"zulip-1"', "password": '"a"', "checksum": '"check"'})
self.assert_json_error(response, "Big Blue Button server returned an unexpected error.")
def test_join_bigbluebutton_redirect_not_configured(self) -> None:
with self.settings(BIG_BLUE_BUTTON_SECRET=None,
BIG_BLUE_BUTTON_URL=None):
response = self.client_get(
"/calls/bigbluebutton/join?meeting_id=%22zulip-1%22&password=%22a%22&checksum=%22check%22")
"/calls/bigbluebutton/join", {"meeting_id": '"zulip-1"', "password": '"a"', "checksum": '"check"'})
self.assert_json_error(response, "Big Blue Button is not configured.")

View File

@ -674,9 +674,8 @@ class ListCustomProfileFieldTest(CustomProfileFieldTestCase):
self.login_user(iago)
assert(test_bot)
url = "/json/users?client_gravatar=false&include_custom_profile_fields=true"
with queries_captured() as queries:
response = self.client_get(url)
response = self.client_get("/json/users", {"client_gravatar": "false", "include_custom_profile_fields": "true"})
self.assertEqual(len(queries), 4)
@ -712,8 +711,7 @@ class ListCustomProfileFieldTest(CustomProfileFieldTestCase):
self.assertEqual(test_bot_raw_data["bot_type"], 1)
self.assertEqual(test_bot_raw_data["bot_owner_id"], iago_raw_data["user_id"])
url = "/json/users?client_gravatar=false"
response = self.client_get(url)
response = self.client_get("/json/users", {"client_gravatar": "false"})
self.assertEqual(response.status_code, 200)
raw_users_data = response.json()["members"]
for user_dict in raw_users_data:

View File

@ -518,8 +518,8 @@ class PrivacyTermsTest(ZulipTestCase):
response = self.client_get("/terms/")
self.assert_in_success_response(["Plans"], response)
response = self.client_get("/terms/?nav=no")
response = self.client_get("/terms/", {"nav": "no"})
self.assert_not_in_success_response(["Plans"], response)
response = self.client_get("/privacy/?nav=no")
response = self.client_get("/privacy/", {"nav": "no"})
self.assert_not_in_success_response(["Plans"], response)

View File

@ -312,7 +312,7 @@ class MessageDictTest(ZulipTestCase):
def test_missing_anchor(self) -> None:
self.login('hamlet')
result = self.client_get(
'/json/messages?use_first_unread_anchor=false&num_before=1&num_after=1')
"/json/messages", {"use_first_unread_anchor": "false", "num_before": "1", "num_after": "1"})
self.assert_json_error(
result, "Missing 'anchor' argument.")
@ -320,7 +320,7 @@ class MessageDictTest(ZulipTestCase):
def test_invalid_anchor(self) -> None:
self.login('hamlet')
result = self.client_get(
'/json/messages?use_first_unread_anchor=false&num_before=1&num_after=1&anchor=chocolate')
"/json/messages", {"use_first_unread_anchor": "false", "num_before": "1", "num_after": "1", "anchor": "chocolate"})
self.assert_json_error(
result, "Invalid anchor")

View File

@ -4067,7 +4067,7 @@ class TestLoginPage(ZulipTestCase):
self.assertEqual(result.status_code, 302)
self.assertEqual(result.url, '/accounts/go/')
result = self.client_get("/en/login/?next=/upgrade/")
result = self.client_get("/en/login/", {"next": "/upgrade/"})
self.assertEqual(result.status_code, 302)
self.assertEqual(result.url, '/accounts/go/?next=%2Fupgrade%2F')
@ -4079,7 +4079,7 @@ class TestLoginPage(ZulipTestCase):
self.assertEqual(result.status_code, 302)
self.assertEqual(result.url, '/accounts/go/')
result = self.client_get("/en/login/?next=/upgrade/")
result = self.client_get("/en/login/", {"next": "/upgrade/"})
self.assertEqual(result.status_code, 302)
self.assertEqual(result.url, '/accounts/go/?next=%2Fupgrade%2F')
@ -4091,7 +4091,7 @@ class TestLoginPage(ZulipTestCase):
self.assertEqual(result.status_code, 302)
self.assertEqual(result.url, '/accounts/go/')
result = self.client_get("/en/login/?next=/upgrade/")
result = self.client_get("/en/login/", {"next": "/upgrade/"})
self.assertEqual(result.status_code, 302)
self.assertEqual(result.url, '/accounts/go/?next=%2Fupgrade%2F')
@ -4160,7 +4160,7 @@ class TestFindMyTeam(ZulipTestCase):
self.assertEqual(len(outbox), 0)
# Just for coverage on perhaps-unnecessary validation code.
result = self.client_get('/accounts/find/?emails=invalid')
result = self.client_get("/accounts/find/", {"emails": "invalid"})
self.assertEqual(result.status_code, 200)
def test_find_team_zero_emails(self) -> None:
@ -4336,7 +4336,7 @@ class RealmRedirectTest(ZulipTestCase):
self.assert_in_success_response(["We couldn&#39;t find that Zulip organization."], result)
def test_realm_redirect_with_next_param(self) -> None:
result = self.client_get("/accounts/go/?next=billing")
result = self.client_get("/accounts/go/", {"next": "billing"})
self.assert_in_success_response(["Enter your organization's Zulip URL", 'action="/accounts/go/?next=billing"'], result)
result = self.client_post("/accounts/go/?next=billing", {"subdomain": "lear"})

View File

@ -59,9 +59,9 @@ class MockResponse:
# This method will be used by the mock to replace requests.get
def mocked_requests_get(*args: str) -> MockResponse:
if args[0] == 'https://slack.com/api/users.list?token=xoxp-valid-token':
if args == ("https://slack.com/api/users.list", {"token": "xoxp-valid-token"}):
return MockResponse({"ok": True, "members": "user_data"}, 200)
elif args[0] == 'https://slack.com/api/users.list?token=xoxp-invalid-token':
elif args == ("https://slack.com/api/users.list", {"token": "xoxp-invalid-token"}):
return MockResponse({"ok": False, "error": "invalid_auth"}, 200)
else:
return MockResponse(None, 404)

View File

@ -2,7 +2,6 @@ import random
from datetime import timedelta
from typing import Any, Dict, List, Mapping, Optional, Sequence, Set, Union
from unittest import mock
from urllib.parse import urlencode
import orjson
from django.conf import settings
@ -1240,7 +1239,7 @@ class StreamAdminTest(ZulipTestCase):
self.assertEqual(subscribers, [])
# It doesn't show up in the list of public streams anymore.
result = self.client_get("/json/streams?include_subscribed=false")
result = self.client_get("/json/streams", {"include_subscribed": "false"})
public_streams = [s["name"] for s in result.json()["streams"]]
self.assertNotIn(active_name, public_streams)
self.assertNotIn(deactivated_stream_name, public_streams)
@ -1795,7 +1794,7 @@ class DefaultStreamTest(ZulipTestCase):
self.assertEqual(user_profile.role, UserProfile.ROLE_GUEST)
# Get all the streams that Polonius has access to (subscribed + web public streams)
result = self.client_get('/json/streams?include_web_public=true')
result = self.client_get("/json/streams", {"include_web_public": "true"})
streams = result.json()['streams']
subscribed, unsubscribed, never_subscribed = gather_subscriptions_helper(user_profile)
self.assertEqual(len(streams),
@ -3856,10 +3855,7 @@ class GetStreamsTest(ZulipTestCase):
include_public = "false",
include_subscribed = "false",
)
request_variables = urlencode(filters)
result = self.api_get(
test_bot,
f"/api/v1/streams?{request_variables}")
result = self.api_get(test_bot, "/api/v1/streams", filters)
owner_subs = self.api_get(hamlet, "/api/v1/users/me/subscriptions")
self.assert_json_success(result)
@ -3881,11 +3877,7 @@ class GetStreamsTest(ZulipTestCase):
include_public = "false",
include_subscribed = "true",
)
request_variables = urlencode(filters)
result = self.api_get(
test_bot,
f"/api/v1/streams?{request_variables}",
)
result = self.api_get(test_bot, "/api/v1/streams", filters)
self.assert_json_success(result)
json = result.json()
@ -3904,7 +3896,8 @@ class GetStreamsTest(ZulipTestCase):
self.subscribe(test_bot, 'private_stream')
result = self.api_get(
test_bot,
"/api/v1/streams?include_owner_subscribed=true&include_public=true&include_subscribed=false",
"/api/v1/streams",
{"include_owner_subscribed": "true", "include_public": "true", "include_subscribed": "false"},
)
self.assert_json_success(result)
@ -3923,7 +3916,8 @@ class GetStreamsTest(ZulipTestCase):
# the bot's subs
result = self.api_get(
test_bot,
"/api/v1/streams?include_owner_subscribed=true&include_public=true&include_subscribed=true",
"/api/v1/streams",
{"include_owner_subscribed": "true", "include_public": "true", "include_subscribed": "true"},
)
self.assert_json_success(result)
@ -3939,18 +3933,19 @@ class GetStreamsTest(ZulipTestCase):
self.assertEqual(actual, expected)
def test_all_active_streams_api(self) -> None:
url = '/api/v1/streams?include_all_active=true'
url = "/api/v1/streams"
data = {"include_all_active": "true"}
# Check non-superuser can't use include_all_active
normal_user = self.example_user('cordelia')
result = self.api_get(normal_user, url)
result = self.api_get(normal_user, url, data)
self.assertEqual(result.status_code, 400)
# Even realm admin users can't see all
# active streams (without additional privileges).
admin_user = self.example_user('iago')
self.assertTrue(admin_user.is_realm_admin)
result = self.api_get(admin_user, url)
result = self.api_get(admin_user, url, data)
self.assertEqual(result.status_code, 400)
'''
@ -3966,7 +3961,7 @@ class GetStreamsTest(ZulipTestCase):
super_user.is_api_super_user = True
super_user.save()
result = self.api_get(super_user, url)
result = self.api_get(super_user, url, data)
self.assert_json_success(result)
json = result.json()
@ -3990,7 +3985,7 @@ class GetStreamsTest(ZulipTestCase):
self.login_user(user)
# Check it correctly lists the user's subs with include_public=false
result = self.api_get(user, "/api/v1/streams?include_public=false")
result = self.api_get(user, "/api/v1/streams", {"include_public": "false"})
result2 = self.api_get(user, "/api/v1/users/me/subscriptions")
self.assert_json_success(result)
@ -4011,8 +4006,7 @@ class GetStreamsTest(ZulipTestCase):
include_public = "true",
include_subscribed = "false"
)
request_variables = urlencode(filters)
result = self.api_get(user, f"/api/v1/streams?{request_variables}")
result = self.api_get(user, "/api/v1/streams", filters)
self.assert_json_success(result)
json = result.json()
@ -4026,14 +4020,14 @@ class StreamIdTest(ZulipTestCase):
user = self.example_user('hamlet')
self.login_user(user)
stream = gather_subscriptions(user)[0][0]
result = self.client_get("/json/get_stream_id?stream={}".format(stream['name']))
result = self.client_get("/json/get_stream_id", {"stream": stream['name']})
self.assert_json_success(result)
self.assertEqual(result.json()['stream_id'], stream['stream_id'])
def test_get_stream_id_wrong_name(self) -> None:
user = self.example_user('hamlet')
self.login_user(user)
result = self.client_get("/json/get_stream_id?stream=wrongname")
result = self.client_get("/json/get_stream_id", {"stream": "wrongname"})
self.assert_json_error(result, "Invalid stream name 'wrongname'")
class InviteOnlyStreamTest(ZulipTestCase):

View File

@ -1,5 +1,4 @@
import base64
import urllib
from io import StringIO
import orjson
@ -46,16 +45,14 @@ class ThumbnailTest(ZulipTestCase):
base = '/user_uploads/'
self.assertEqual(base, uri[:len(base)])
quoted_uri = urllib.parse.quote(uri[1:], safe='')
# Test full size image.
result = self.client_get(f"/thumbnail?url={quoted_uri}&size=full")
result = self.client_get("/thumbnail", {"url": uri[1:], "size": "full"})
self.assertEqual(result.status_code, 302, result)
expected_part_url = get_file_path_urlpart(uri)
self.assertIn(expected_part_url, result.url)
# Test thumbnail size.
result = self.client_get(f"/thumbnail?url={quoted_uri}&size=thumbnail")
result = self.client_get("/thumbnail", {"url": uri[1:], "size": "thumbnail"})
self.assertEqual(result.status_code, 302, result)
expected_part_url = get_file_path_urlpart(uri, '0x300')
self.assertIn(expected_part_url, result.url)
@ -70,10 +67,8 @@ class ThumbnailTest(ZulipTestCase):
emoji_url_base = '/user_avatars/'
self.assertEqual(emoji_url_base, custom_emoji_url[:len(emoji_url_base)])
quoted_emoji_url = urllib.parse.quote(custom_emoji_url[1:], safe='')
# Test full size custom emoji image (for emoji link in messages case).
result = self.client_get(f"/thumbnail?url={quoted_emoji_url}&size=full")
result = self.client_get("/thumbnail", {"url": custom_emoji_url[1:], "size": "full"})
self.assertEqual(result.status_code, 302, result)
self.assertIn(custom_emoji_url, result.url)
@ -81,14 +76,14 @@ class ThumbnailTest(ZulipTestCase):
self.logout()
result = self.api_get(
hamlet,
f'/thumbnail?url={quoted_uri}&size=full')
"/thumbnail", {"url": uri[1:], "size": "full"})
self.assertEqual(result.status_code, 302, result)
expected_part_url = get_file_path_urlpart(uri)
self.assertIn(expected_part_url, result.url)
# Test with another user trying to access image using thumbor.
self.login('iago')
result = self.client_get(f"/thumbnail?url={quoted_uri}&size=full")
result = self.client_get("/thumbnail", {"url": uri[1:], "size": "full"})
self.assertEqual(result.status_code, 403, result)
self.assert_in_response("You are not authorized to view this file.", result)
@ -96,15 +91,14 @@ class ThumbnailTest(ZulipTestCase):
def run_test_with_image_url(image_url: str) -> None:
# Test full size image.
self.login('hamlet')
quoted_url = urllib.parse.quote(image_url, safe='')
encoded_url = base64.urlsafe_b64encode(image_url.encode()).decode('utf-8')
result = self.client_get(f"/thumbnail?url={quoted_url}&size=full")
result = self.client_get("/thumbnail", {"url": image_url, "size": "full"})
self.assertEqual(result.status_code, 302, result)
expected_part_url = '/smart/filters:no_upscale()/' + encoded_url + '/source_type/external'
self.assertIn(expected_part_url, result.url)
# Test thumbnail size.
result = self.client_get(f"/thumbnail?url={quoted_url}&size=thumbnail")
result = self.client_get("/thumbnail", {"url": image_url, "size": "thumbnail"})
self.assertEqual(result.status_code, 302, result)
expected_part_url = '/0x300/smart/filters:no_upscale():sharpen(0.5,0.2,true)/' + encoded_url + '/source_type/external'
self.assertIn(expected_part_url, result.url)
@ -113,21 +107,21 @@ class ThumbnailTest(ZulipTestCase):
self.logout()
user_profile = self.example_user("hamlet")
result = self.api_get(user_profile,
f"/thumbnail?url={quoted_url}&size=thumbnail")
"/thumbnail", {"url": image_url, "size": "thumbnail"})
self.assertEqual(result.status_code, 302, result)
expected_part_url = '/0x300/smart/filters:no_upscale():sharpen(0.5,0.2,true)/' + encoded_url + '/source_type/external'
self.assertIn(expected_part_url, result.url)
# Test API endpoint with legacy API authentication.
user_profile = self.example_user("hamlet")
result = self.client_get(f"/thumbnail?url={quoted_url}&size=thumbnail&api_key={get_api_key(user_profile)}")
result = self.client_get("/thumbnail", {"url": image_url, "size": "thumbnail", "api_key": get_api_key(user_profile)})
self.assertEqual(result.status_code, 302, result)
expected_part_url = '/0x300/smart/filters:no_upscale():sharpen(0.5,0.2,true)/' + encoded_url + '/source_type/external'
self.assertIn(expected_part_url, result.url)
# Test a second logged-in user; they should also be able to access it
user_profile = self.example_user("iago")
result = self.client_get(f"/thumbnail?url={quoted_url}&size=thumbnail&api_key={get_api_key(user_profile)}")
result = self.client_get("/thumbnail", {"url": image_url, "size": "thumbnail", "api_key": get_api_key(user_profile)})
self.assertEqual(result.status_code, 302, result)
expected_part_url = '/0x300/smart/filters:no_upscale():sharpen(0.5,0.2,true)/' + encoded_url + '/source_type/external'
self.assertIn(expected_part_url, result.url)
@ -135,7 +129,7 @@ class ThumbnailTest(ZulipTestCase):
# Test with another user trying to access image using thumbor.
# File should be always accessible to user in case of external source
self.login('iago')
result = self.client_get(f"/thumbnail?url={quoted_url}&size=full")
result = self.client_get("/thumbnail", {"url": image_url, "size": "full"})
self.assertEqual(result.status_code, 302, result)
expected_part_url = '/smart/filters:no_upscale()/' + encoded_url + '/source_type/external'
self.assertIn(expected_part_url, result.url)
@ -174,14 +168,13 @@ class ThumbnailTest(ZulipTestCase):
# Test full size image.
# We remove the forward slash infront of the `/user_uploads/` to match
# Markdown behaviour.
quoted_uri = urllib.parse.quote(uri[1:], safe='')
result = self.client_get(f"/thumbnail?url={quoted_uri}&size=full")
result = self.client_get("/thumbnail", {"url": uri[1:], "size": "full"})
self.assertEqual(result.status_code, 302, result)
expected_part_url = get_file_path_urlpart(uri)
self.assertIn(expected_part_url, result.url)
# Test thumbnail size.
result = self.client_get(f"/thumbnail?url={quoted_uri}&size=thumbnail")
result = self.client_get("/thumbnail", {"url": uri[1:], "size": "thumbnail"})
self.assertEqual(result.status_code, 302, result)
expected_part_url = get_file_path_urlpart(uri, '0x300')
self.assertIn(expected_part_url, result.url)
@ -198,8 +191,7 @@ class ThumbnailTest(ZulipTestCase):
# We remove the forward slash infront of the `/user_uploads/` to match
# Markdown behaviour.
quoted_uri = urllib.parse.quote(uri[1:], safe='')
result = self.client_get(f"/thumbnail?url={quoted_uri}&size=full")
result = self.client_get("/thumbnail", {"url": uri[1:], "size": "full"})
self.assertEqual(result.status_code, 302, result)
expected_part_url = get_file_path_urlpart(uri)
self.assertIn(expected_part_url, result.url)
@ -214,10 +206,8 @@ class ThumbnailTest(ZulipTestCase):
emoji_url_base = '/user_avatars/'
self.assertEqual(emoji_url_base, custom_emoji_url[:len(emoji_url_base)])
quoted_emoji_url = urllib.parse.quote(custom_emoji_url[1:], safe='')
# Test full size custom emoji image (for emoji link in messages case).
result = self.client_get(f"/thumbnail?url={quoted_emoji_url}&size=full")
result = self.client_get("/thumbnail", {"url": custom_emoji_url[1:], "size": "full"})
self.assertEqual(result.status_code, 302, result)
self.assertIn(custom_emoji_url, result.url)
@ -226,7 +216,7 @@ class ThumbnailTest(ZulipTestCase):
user_profile = self.example_user("hamlet")
result = self.api_get(
user_profile,
f'/thumbnail?url={quoted_uri}&size=full')
"/thumbnail", {"url": uri[1:], "size": "full"})
self.assertEqual(result.status_code, 302, result)
expected_part_url = get_file_path_urlpart(uri)
self.assertIn(expected_part_url, result.url)
@ -235,14 +225,14 @@ class ThumbnailTest(ZulipTestCase):
# auth.
user_profile = self.example_user("hamlet")
result = self.client_get(
f'/thumbnail?url={quoted_uri}&size=full&api_key={get_api_key(user_profile)}')
"/thumbnail", {"url": uri[1:], "size": "full", "api_key": get_api_key(user_profile)})
self.assertEqual(result.status_code, 302, result)
expected_part_url = get_file_path_urlpart(uri)
self.assertIn(expected_part_url, result.url)
# Test with another user trying to access image using thumbor.
self.login('iago')
result = self.client_get(f"/thumbnail?url={quoted_uri}&size=full")
result = self.client_get("/thumbnail", {"url": uri[1:], "size": "full"})
self.assertEqual(result.status_code, 403, result)
self.assert_in_response("You are not authorized to view this file.", result)
@ -250,8 +240,7 @@ class ThumbnailTest(ZulipTestCase):
def test_with_static_files(self) -> None:
self.login('hamlet')
uri = '/static/images/cute/turtle.png'
quoted_uri = urllib.parse.quote(uri[1:], safe='')
result = self.client_get(f"/thumbnail?url={quoted_uri}&size=full")
result = self.client_get("/thumbnail", {"url": uri[1:], "size": "full"})
self.assertEqual(result.status_code, 302, result)
self.assertEqual(uri, result.url)
@ -268,33 +257,28 @@ class ThumbnailTest(ZulipTestCase):
base = '/user_uploads/'
self.assertEqual(base, uri[:len(base)])
quoted_uri = urllib.parse.quote(uri[1:], safe='')
with self.settings(THUMBOR_URL=''):
result = self.client_get(f"/thumbnail?url={quoted_uri}&size=full")
result = self.client_get("/thumbnail", {"url": uri[1:], "size": "full"})
self.assertEqual(result.status_code, 302, result)
self.assertEqual(uri, result.url)
uri = 'https://www.google.com/images/srpr/logo4w.png'
quoted_uri = urllib.parse.quote(uri, safe='')
with self.settings(THUMBOR_URL=''):
result = self.client_get(f"/thumbnail?url={quoted_uri}&size=full")
result = self.client_get("/thumbnail", {"url": uri, "size": "full"})
self.assertEqual(result.status_code, 302, result)
base = 'https://external-content.zulipcdn.net/external_content/56c362a24201593891955ff526b3b412c0f9fcd2/68747470733a2f2f7777772e676f6f676c652e636f6d2f696d616765732f737270722f6c6f676f34772e706e67'
self.assertEqual(base, result.url)
uri = 'http://www.google.com/images/srpr/logo4w.png'
quoted_uri = urllib.parse.quote(uri, safe='')
with self.settings(THUMBOR_URL=''):
result = self.client_get(f"/thumbnail?url={quoted_uri}&size=full")
result = self.client_get("/thumbnail", {"url": uri, "size": "full"})
self.assertEqual(result.status_code, 302, result)
base = 'https://external-content.zulipcdn.net/external_content/7b6552b60c635e41e8f6daeb36d88afc4eabde79/687474703a2f2f7777772e676f6f676c652e636f6d2f696d616765732f737270722f6c6f676f34772e706e67'
self.assertEqual(base, result.url)
uri = '//www.google.com/images/srpr/logo4w.png'
quoted_uri = urllib.parse.quote(uri, safe='')
with self.settings(THUMBOR_URL=''):
result = self.client_get(f"/thumbnail?url={quoted_uri}&size=full")
result = self.client_get("/thumbnail", {"url": uri, "size": "full"})
self.assertEqual(result.status_code, 302, result)
base = 'https://external-content.zulipcdn.net/external_content/676530cf4b101d56f56cc4a37c6ef4d4fd9b0c03/2f2f7777772e676f6f676c652e636f6d2f696d616765732f737270722f6c6f676f34772e706e67'
self.assertEqual(base, result.url)
@ -312,10 +296,9 @@ class ThumbnailTest(ZulipTestCase):
base = '/user_uploads/'
self.assertEqual(base, uri[:len(base)])
quoted_uri = urllib.parse.quote(uri[1:], safe='')
hex_uri = base64.urlsafe_b64encode(uri.encode()).decode('utf-8')
with self.settings(THUMBOR_URL='http://test-thumborhost.com'):
result = self.client_get(f"/thumbnail?url={quoted_uri}&size=full")
result = self.client_get("/thumbnail", {"url": uri[1:], "size": "full"})
self.assertEqual(result.status_code, 302, result)
base = 'http://test-thumborhost.com/'
self.assertEqual(base, result.url[:len(base)])
@ -347,24 +330,23 @@ class ThumbnailTest(ZulipTestCase):
# Test with size supplied as a query parameter.
# size=thumbnail should return a 0x300 sized image.
# size=full should return the original resolution image.
quoted_uri = urllib.parse.quote(uri[1:], safe='')
result = self.client_get(f"/thumbnail?url={quoted_uri}&size=thumbnail")
result = self.client_get("/thumbnail", {"url": uri[1:], "size": "thumbnail"})
self.assertEqual(result.status_code, 302, result)
expected_part_url = get_file_path_urlpart(uri, '0x300')
self.assertIn(expected_part_url, result.url)
result = self.client_get(f"/thumbnail?url={quoted_uri}&size=full")
result = self.client_get("/thumbnail", {"url": uri[1:], "size": "full"})
self.assertEqual(result.status_code, 302, result)
expected_part_url = get_file_path_urlpart(uri)
self.assertIn(expected_part_url, result.url)
# Test with size supplied as a query parameter where size is anything
# else than 'full' or 'thumbnail'. Result should be an error message.
result = self.client_get(f"/thumbnail?url={quoted_uri}&size=480x360")
result = self.client_get("/thumbnail", {"url": uri[1:], "size": "480x360"})
self.assertEqual(result.status_code, 403, result)
self.assert_in_response("Invalid size.", result)
# Test with no size param supplied. In this case as well we show an
# error message.
result = self.client_get(f"/thumbnail?url={quoted_uri}")
result = self.client_get("/thumbnail", {"url": uri[1:]})
self.assertEqual(result.status_code, 400, "Missing 'size' argument")

View File

@ -126,10 +126,10 @@ class FileUploadTest(UploadSerializeMixin, ZulipTestCase):
# Try to download file via API, passing URL and invalid API key
user_profile = self.example_user("hamlet")
response = self.client_get(uri + "?api_key=" + "invalid")
response = self.client_get(uri, {"api_key": "invalid"})
self.assertEqual(response.status_code, 401)
response = self.client_get(uri + "?api_key=" + get_api_key(user_profile))
response = self.client_get(uri, {"api_key": get_api_key(user_profile)})
self.assertEqual(response.status_code, 200)
data = b"".join(response.streaming_content)
self.assertEqual(b"zulip!", data)
@ -940,12 +940,12 @@ class AvatarTest(UploadSerializeMixin, ZulipTestCase):
cordelia.avatar_source = UserProfile.AVATAR_FROM_GRAVATAR
cordelia.save()
with self.settings(ENABLE_GRAVATAR=True):
response = self.client_get("/avatar/cordelia@zulip.com?foo=bar")
response = self.client_get("/avatar/cordelia@zulip.com", {"foo": "bar"})
redirect_url = response['Location']
self.assertEqual(redirect_url, str(avatar_url(cordelia)) + '&foo=bar')
with self.settings(ENABLE_GRAVATAR=False):
response = self.client_get("/avatar/cordelia@zulip.com?foo=bar")
response = self.client_get("/avatar/cordelia@zulip.com", {"foo": "bar"})
redirect_url = response['Location']
self.assertTrue(redirect_url.endswith(str(avatar_url(cordelia)) + '&foo=bar'))
@ -960,11 +960,11 @@ class AvatarTest(UploadSerializeMixin, ZulipTestCase):
cordelia.avatar_source = UserProfile.AVATAR_FROM_USER
cordelia.save()
response = self.client_get("/avatar/cordelia@zulip.com?foo=bar")
response = self.client_get("/avatar/cordelia@zulip.com", {"foo": "bar"})
redirect_url = response['Location']
self.assertTrue(redirect_url.endswith(str(avatar_url(cordelia)) + '&foo=bar'))
response = self.client_get(f"/avatar/{cordelia.id}?foo=bar")
response = self.client_get(f"/avatar/{cordelia.id}", {"foo": "bar"})
redirect_url = response['Location']
self.assertTrue(redirect_url.endswith(str(avatar_url(cordelia)) + '&foo=bar'))
@ -974,25 +974,25 @@ class AvatarTest(UploadSerializeMixin, ZulipTestCase):
self.logout()
# Test /avatar/<email_or_id> endpoint with HTTP basic auth.
response = self.api_get(hamlet, "/avatar/cordelia@zulip.com?foo=bar")
response = self.api_get(hamlet, "/avatar/cordelia@zulip.com", {"foo": "bar"})
redirect_url = response['Location']
self.assertTrue(redirect_url.endswith(str(avatar_url(cordelia)) + '&foo=bar'))
response = self.api_get(hamlet, f"/avatar/{cordelia.id}?foo=bar")
response = self.api_get(hamlet, f"/avatar/{cordelia.id}", {"foo": "bar"})
redirect_url = response['Location']
self.assertTrue(redirect_url.endswith(str(avatar_url(cordelia)) + '&foo=bar'))
# Test cross_realm_bot avatar access using email.
response = self.api_get(hamlet, "/avatar/welcome-bot@zulip.com?foo=bar")
response = self.api_get(hamlet, "/avatar/welcome-bot@zulip.com", {"foo": "bar"})
redirect_url = response['Location']
self.assertTrue(redirect_url.endswith(str(avatar_url(cross_realm_bot)) + '&foo=bar'))
# Test cross_realm_bot avatar access using id.
response = self.api_get(hamlet, f"/avatar/{cross_realm_bot.id}?foo=bar")
response = self.api_get(hamlet, f"/avatar/{cross_realm_bot.id}", {"foo": "bar"})
redirect_url = response['Location']
self.assertTrue(redirect_url.endswith(str(avatar_url(cross_realm_bot)) + '&foo=bar'))
response = self.client_get("/avatar/cordelia@zulip.com?foo=bar")
response = self.client_get("/avatar/cordelia@zulip.com", {"foo": "bar"})
self.assert_json_error(response, "Not logged in: API authentication or user session required",
status_code=401)
@ -1005,26 +1005,26 @@ class AvatarTest(UploadSerializeMixin, ZulipTestCase):
cordelia.avatar_source = UserProfile.AVATAR_FROM_USER
cordelia.save()
response = self.client_get("/avatar/cordelia@zulip.com/medium?foo=bar")
response = self.client_get("/avatar/cordelia@zulip.com/medium", {"foo": "bar"})
redirect_url = response['Location']
self.assertTrue(redirect_url.endswith(str(avatar_url(cordelia, True)) + '&foo=bar'))
response = self.client_get(f"/avatar/{cordelia.id}/medium?foo=bar")
response = self.client_get(f"/avatar/{cordelia.id}/medium", {"foo": "bar"})
redirect_url = response['Location']
self.assertTrue(redirect_url.endswith(str(avatar_url(cordelia, True)) + '&foo=bar'))
self.logout()
# Test /avatar/<email_or_id>/medium endpoint with HTTP basic auth.
response = self.api_get(hamlet, "/avatar/cordelia@zulip.com/medium?foo=bar")
response = self.api_get(hamlet, "/avatar/cordelia@zulip.com/medium", {"foo": "bar"})
redirect_url = response['Location']
self.assertTrue(redirect_url.endswith(str(avatar_url(cordelia, True)) + '&foo=bar'))
response = self.api_get(hamlet, f"/avatar/{cordelia.id}/medium?foo=bar")
response = self.api_get(hamlet, f"/avatar/{cordelia.id}/medium", {"foo": "bar"})
redirect_url = response['Location']
self.assertTrue(redirect_url.endswith(str(avatar_url(cordelia, True)) + '&foo=bar'))
response = self.client_get("/avatar/cordelia@zulip.com/medium?foo=bar")
response = self.client_get("/avatar/cordelia@zulip.com/medium", {"foo": "bar"})
self.assert_json_error(response, "Not logged in: API authentication or user session required",
status_code=401)
@ -1034,7 +1034,7 @@ class AvatarTest(UploadSerializeMixin, ZulipTestCase):
# but this test just validates the current code's behavior.
self.login('hamlet')
response = self.client_get("/avatar/nonexistent_user@zulip.com?foo=bar")
response = self.client_get("/avatar/nonexistent_user@zulip.com", {"foo": "bar"})
redirect_url = response['Location']
actual_url = 'https://secure.gravatar.com/avatar/444258b521f152129eb0c162996e572d?d=identicon&version=1&foo=bar'
self.assertEqual(redirect_url, actual_url)
@ -1269,12 +1269,12 @@ class RealmIconTest(UploadSerializeMixin, ZulipTestCase):
realm = get_realm('zulip')
do_change_icon_source(realm, Realm.ICON_FROM_GRAVATAR)
with self.settings(ENABLE_GRAVATAR=True):
response = self.client_get("/json/realm/icon?foo=bar")
response = self.client_get("/json/realm/icon", {"foo": "bar"})
redirect_url = response['Location']
self.assertEqual(redirect_url, realm_icon_url(realm) + '&foo=bar')
with self.settings(ENABLE_GRAVATAR=False):
response = self.client_get("/json/realm/icon?foo=bar")
response = self.client_get("/json/realm/icon", {"foo": "bar"})
redirect_url = response['Location']
self.assertTrue(redirect_url.endswith(realm_icon_url(realm) + '&foo=bar'))
@ -1283,7 +1283,7 @@ class RealmIconTest(UploadSerializeMixin, ZulipTestCase):
realm = get_realm('zulip')
do_change_icon_source(realm, Realm.ICON_UPLOADED)
response = self.client_get("/json/realm/icon?foo=bar")
response = self.client_get("/json/realm/icon", {"foo": "bar"})
redirect_url = response['Location']
self.assertTrue(redirect_url.endswith(realm_icon_url(realm) + '&foo=bar'))

View File

@ -249,7 +249,7 @@ class PermissionTest(ZulipTestCase):
self.login_user(user)
# First, verify client_gravatar works normally
result = self.client_get('/json/users?client_gravatar=true')
result = self.client_get("/json/users", {"client_gravatar": "true"})
self.assert_json_success(result)
members = result.json()['members']
hamlet = find_dict(members, 'user_id', user.id)
@ -272,7 +272,7 @@ class PermissionTest(ZulipTestCase):
# is automatically disabled for the user.
do_set_realm_property(user.realm, "email_address_visibility",
Realm.EMAIL_ADDRESS_VISIBILITY_ADMINS)
result = self.client_get('/json/users?client_gravatar=true')
result = self.client_get("/json/users", {"client_gravatar": "true"})
self.assert_json_success(result)
members = result.json()['members']
hamlet = find_dict(members, 'user_id', user.id)
@ -300,7 +300,7 @@ class PermissionTest(ZulipTestCase):
# delivery_email is sent for admins.
admin.refresh_from_db()
self.login_user(admin)
result = self.client_get('/json/users?client_gravatar=true')
result = self.client_get("/json/users", {"client_gravatar": "true"})
self.assert_json_success(result)
members = result.json()['members']
hamlet = find_dict(members, 'user_id', user.id)
@ -1677,10 +1677,10 @@ class GetProfileTest(ZulipTestCase):
self.assertFalse(result['user']['is_admin'])
self.assertFalse(result['user']['is_owner'])
result = orjson.loads(self.client_get(f'/json/users/{user.id}?include_custom_profile_fields=true').content)
result = orjson.loads(self.client_get(f"/json/users/{user.id}", {"include_custom_profile_fields": "true"}).content)
self.assertIn('profile_data', result['user'])
result = self.client_get(f'/json/users/{30}?')
result = self.client_get("/json/users/30")
self.assert_json_error(result, "No such user")
bot = self.example_user("default_bot")