mirror of https://github.com/zulip/zulip.git
test: Replace occurences of `uri` with `url`.
In all the tests files, replaced all occurences of `uri` with `url` appeared in comments, local variablles, function names and their callers.
This commit is contained in:
parent
946b4e73ca
commit
b0ef8f0822
|
@ -1039,7 +1039,7 @@ class SocialAuthBase(DesktopFlowTestingLib, ZulipTestCase, ABC):
|
||||||
self.client.cookies = result.cookies
|
self.client.cookies = result.cookies
|
||||||
|
|
||||||
# Next, the browser requests result["Location"], and gets
|
# Next, the browser requests result["Location"], and gets
|
||||||
# redirected back to the registered redirect uri.
|
# redirected back to the registered redirect url.
|
||||||
|
|
||||||
# We register callbacks for the key URLs on Identity Provider that
|
# We register callbacks for the key URLs on Identity Provider that
|
||||||
# auth completion URL will call
|
# auth completion URL will call
|
||||||
|
@ -1114,8 +1114,8 @@ class SocialAuthBase(DesktopFlowTestingLib, ZulipTestCase, ABC):
|
||||||
self.assertEqual(data["redirect_to"], "/user_uploads/image")
|
self.assertEqual(data["redirect_to"], "/user_uploads/image")
|
||||||
self.assertEqual(result.status_code, 302)
|
self.assertEqual(result.status_code, 302)
|
||||||
parsed_url = urllib.parse.urlparse(result["Location"])
|
parsed_url = urllib.parse.urlparse(result["Location"])
|
||||||
uri = f"{parsed_url.scheme}://{parsed_url.netloc}{parsed_url.path}"
|
url = f"{parsed_url.scheme}://{parsed_url.netloc}{parsed_url.path}"
|
||||||
self.assertTrue(uri.startswith("http://zulip.testserver/accounts/login/subdomain/"))
|
self.assertTrue(url.startswith("http://zulip.testserver/accounts/login/subdomain/"))
|
||||||
|
|
||||||
self.assertIn(
|
self.assertIn(
|
||||||
f"INFO:{self.logger_string}:Authentication attempt from 127.0.0.1: subdomain=zulip;username=hamlet@zulip.com;outcome=success",
|
f"INFO:{self.logger_string}:Authentication attempt from 127.0.0.1: subdomain=zulip;username=hamlet@zulip.com;outcome=success",
|
||||||
|
@ -1138,8 +1138,8 @@ class SocialAuthBase(DesktopFlowTestingLib, ZulipTestCase, ABC):
|
||||||
self.assertEqual(data["redirect_to"], "/user_uploads/image")
|
self.assertEqual(data["redirect_to"], "/user_uploads/image")
|
||||||
self.assertEqual(result.status_code, 302)
|
self.assertEqual(result.status_code, 302)
|
||||||
parsed_url = urllib.parse.urlparse(result["Location"])
|
parsed_url = urllib.parse.urlparse(result["Location"])
|
||||||
uri = f"{parsed_url.scheme}://{parsed_url.netloc}{parsed_url.path}"
|
url = f"{parsed_url.scheme}://{parsed_url.netloc}{parsed_url.path}"
|
||||||
self.assertTrue(uri.startswith("http://zulip.testserver/accounts/login/subdomain/"))
|
self.assertTrue(url.startswith("http://zulip.testserver/accounts/login/subdomain/"))
|
||||||
|
|
||||||
def test_social_auth_deactivated_user(self) -> None:
|
def test_social_auth_deactivated_user(self) -> None:
|
||||||
user_profile = self.example_user("hamlet")
|
user_profile = self.example_user("hamlet")
|
||||||
|
@ -1360,8 +1360,8 @@ class SocialAuthBase(DesktopFlowTestingLib, ZulipTestCase, ABC):
|
||||||
self.assertEqual(data["subdomain"], "zulip")
|
self.assertEqual(data["subdomain"], "zulip")
|
||||||
self.assertEqual(result.status_code, 302)
|
self.assertEqual(result.status_code, 302)
|
||||||
parsed_url = urllib.parse.urlparse(result["Location"])
|
parsed_url = urllib.parse.urlparse(result["Location"])
|
||||||
uri = f"{parsed_url.scheme}://{parsed_url.netloc}{parsed_url.path}"
|
url = f"{parsed_url.scheme}://{parsed_url.netloc}{parsed_url.path}"
|
||||||
self.assertTrue(uri.startswith("http://zulip.testserver/accounts/login/subdomain/"))
|
self.assertTrue(url.startswith("http://zulip.testserver/accounts/login/subdomain/"))
|
||||||
hamlet = self.example_user("hamlet")
|
hamlet = self.example_user("hamlet")
|
||||||
# Name wasn't changed at all
|
# Name wasn't changed at all
|
||||||
self.assertEqual(hamlet.full_name, "King Hamlet")
|
self.assertEqual(hamlet.full_name, "King Hamlet")
|
||||||
|
@ -1386,8 +1386,8 @@ class SocialAuthBase(DesktopFlowTestingLib, ZulipTestCase, ABC):
|
||||||
self.assertEqual(data["subdomain"], "zulip")
|
self.assertEqual(data["subdomain"], "zulip")
|
||||||
self.assertEqual(result.status_code, 302)
|
self.assertEqual(result.status_code, 302)
|
||||||
parsed_url = urllib.parse.urlparse(result["Location"])
|
parsed_url = urllib.parse.urlparse(result["Location"])
|
||||||
uri = f"{parsed_url.scheme}://{parsed_url.netloc}{parsed_url.path}"
|
url = f"{parsed_url.scheme}://{parsed_url.netloc}{parsed_url.path}"
|
||||||
self.assertTrue(uri.startswith("http://zulip.testserver/accounts/login/subdomain/"))
|
self.assertTrue(url.startswith("http://zulip.testserver/accounts/login/subdomain/"))
|
||||||
|
|
||||||
result = self.client_get(result["Location"])
|
result = self.client_get(result["Location"])
|
||||||
|
|
||||||
|
@ -1673,8 +1673,8 @@ class SocialAuthBase(DesktopFlowTestingLib, ZulipTestCase, ABC):
|
||||||
self.assertEqual(data["subdomain"], "zulip")
|
self.assertEqual(data["subdomain"], "zulip")
|
||||||
self.assertEqual(result.status_code, 302)
|
self.assertEqual(result.status_code, 302)
|
||||||
parsed_url = urllib.parse.urlparse(result["Location"])
|
parsed_url = urllib.parse.urlparse(result["Location"])
|
||||||
uri = f"{parsed_url.scheme}://{parsed_url.netloc}{parsed_url.path}"
|
url = f"{parsed_url.scheme}://{parsed_url.netloc}{parsed_url.path}"
|
||||||
self.assertTrue(uri.startswith("http://zulip.testserver/accounts/login/subdomain/"))
|
self.assertTrue(url.startswith("http://zulip.testserver/accounts/login/subdomain/"))
|
||||||
|
|
||||||
result = self.client_get(result["Location"])
|
result = self.client_get(result["Location"])
|
||||||
self.assertEqual(result.status_code, 200)
|
self.assertEqual(result.status_code, 200)
|
||||||
|
@ -1697,8 +1697,8 @@ class SocialAuthBase(DesktopFlowTestingLib, ZulipTestCase, ABC):
|
||||||
self.assertEqual(data["subdomain"], "zulip")
|
self.assertEqual(data["subdomain"], "zulip")
|
||||||
self.assertEqual(result.status_code, 302)
|
self.assertEqual(result.status_code, 302)
|
||||||
parsed_url = urllib.parse.urlparse(result["Location"])
|
parsed_url = urllib.parse.urlparse(result["Location"])
|
||||||
uri = f"{parsed_url.scheme}://{parsed_url.netloc}{parsed_url.path}"
|
url = f"{parsed_url.scheme}://{parsed_url.netloc}{parsed_url.path}"
|
||||||
self.assertTrue(uri.startswith("http://zulip.testserver/accounts/login/subdomain/"))
|
self.assertTrue(url.startswith("http://zulip.testserver/accounts/login/subdomain/"))
|
||||||
|
|
||||||
result = self.client_get(result["Location"])
|
result = self.client_get(result["Location"])
|
||||||
self.assertEqual(result.status_code, 200)
|
self.assertEqual(result.status_code, 200)
|
||||||
|
@ -2674,10 +2674,10 @@ class SAMLAuthBackendTest(SocialAuthBase):
|
||||||
self.assertEqual(data["subdomain"], "zulip")
|
self.assertEqual(data["subdomain"], "zulip")
|
||||||
self.assertEqual(result.status_code, 302)
|
self.assertEqual(result.status_code, 302)
|
||||||
parsed_url = urllib.parse.urlparse(result["Location"])
|
parsed_url = urllib.parse.urlparse(result["Location"])
|
||||||
uri = f"{parsed_url.scheme}://{parsed_url.netloc}{parsed_url.path}"
|
url = f"{parsed_url.scheme}://{parsed_url.netloc}{parsed_url.path}"
|
||||||
self.assertTrue(uri.startswith("http://zulip.testserver/accounts/login/subdomain/"))
|
self.assertTrue(url.startswith("http://zulip.testserver/accounts/login/subdomain/"))
|
||||||
|
|
||||||
self.client_get(uri)
|
self.client_get(url)
|
||||||
self.assert_logged_in_user_id(self.example_user("hamlet").id)
|
self.assert_logged_in_user_id(self.example_user("hamlet").id)
|
||||||
|
|
||||||
def test_choose_subdomain_invalid_subdomain_specified(self) -> None:
|
def test_choose_subdomain_invalid_subdomain_specified(self) -> None:
|
||||||
|
@ -2709,10 +2709,10 @@ class SAMLAuthBackendTest(SocialAuthBase):
|
||||||
self.assertEqual(data["subdomain"], "zulip")
|
self.assertEqual(data["subdomain"], "zulip")
|
||||||
self.assertEqual(result.status_code, 302)
|
self.assertEqual(result.status_code, 302)
|
||||||
parsed_url = urllib.parse.urlparse(result["Location"])
|
parsed_url = urllib.parse.urlparse(result["Location"])
|
||||||
uri = f"{parsed_url.scheme}://{parsed_url.netloc}{parsed_url.path}"
|
url = f"{parsed_url.scheme}://{parsed_url.netloc}{parsed_url.path}"
|
||||||
self.assertTrue(uri.startswith("http://zulip.testserver/accounts/login/subdomain/"))
|
self.assertTrue(url.startswith("http://zulip.testserver/accounts/login/subdomain/"))
|
||||||
|
|
||||||
self.client_get(uri)
|
self.client_get(url)
|
||||||
self.assert_logged_in_user_id(self.example_user("hamlet").id)
|
self.assert_logged_in_user_id(self.example_user("hamlet").id)
|
||||||
|
|
||||||
def test_idp_initiated_signin_subdomain_implicit_no_relaystate_param(self) -> None:
|
def test_idp_initiated_signin_subdomain_implicit_no_relaystate_param(self) -> None:
|
||||||
|
@ -2730,10 +2730,10 @@ class SAMLAuthBackendTest(SocialAuthBase):
|
||||||
self.assertEqual(data["subdomain"], "zulip")
|
self.assertEqual(data["subdomain"], "zulip")
|
||||||
self.assertEqual(result.status_code, 302)
|
self.assertEqual(result.status_code, 302)
|
||||||
parsed_url = urllib.parse.urlparse(result["Location"])
|
parsed_url = urllib.parse.urlparse(result["Location"])
|
||||||
uri = f"{parsed_url.scheme}://{parsed_url.netloc}{parsed_url.path}"
|
url = f"{parsed_url.scheme}://{parsed_url.netloc}{parsed_url.path}"
|
||||||
self.assertTrue(uri.startswith("http://zulip.testserver/accounts/login/subdomain/"))
|
self.assertTrue(url.startswith("http://zulip.testserver/accounts/login/subdomain/"))
|
||||||
|
|
||||||
self.client_get(uri)
|
self.client_get(url)
|
||||||
self.assert_logged_in_user_id(self.example_user("hamlet").id)
|
self.assert_logged_in_user_id(self.example_user("hamlet").id)
|
||||||
|
|
||||||
def test_idp_initiated_signin_subdomain_implicit_invalid(self) -> None:
|
def test_idp_initiated_signin_subdomain_implicit_invalid(self) -> None:
|
||||||
|
@ -3745,8 +3745,8 @@ class GitHubAuthBackendTest(SocialAuthBase):
|
||||||
self.assertEqual(data["redirect_to"], "/user_uploads/image")
|
self.assertEqual(data["redirect_to"], "/user_uploads/image")
|
||||||
self.assertEqual(result.status_code, 302)
|
self.assertEqual(result.status_code, 302)
|
||||||
parsed_url = urllib.parse.urlparse(result["Location"])
|
parsed_url = urllib.parse.urlparse(result["Location"])
|
||||||
uri = f"{parsed_url.scheme}://{parsed_url.netloc}{parsed_url.path}"
|
url = f"{parsed_url.scheme}://{parsed_url.netloc}{parsed_url.path}"
|
||||||
self.assertTrue(uri.startswith("http://zulip.testserver/accounts/login/subdomain/"))
|
self.assertTrue(url.startswith("http://zulip.testserver/accounts/login/subdomain/"))
|
||||||
|
|
||||||
def test_github_oauth2_success_single_email(self) -> None:
|
def test_github_oauth2_success_single_email(self) -> None:
|
||||||
# If the user has a single email associated with its GitHub account,
|
# If the user has a single email associated with its GitHub account,
|
||||||
|
@ -3770,8 +3770,8 @@ class GitHubAuthBackendTest(SocialAuthBase):
|
||||||
self.assertEqual(data["redirect_to"], "/user_uploads/image")
|
self.assertEqual(data["redirect_to"], "/user_uploads/image")
|
||||||
self.assertEqual(result.status_code, 302)
|
self.assertEqual(result.status_code, 302)
|
||||||
parsed_url = urllib.parse.urlparse(result["Location"])
|
parsed_url = urllib.parse.urlparse(result["Location"])
|
||||||
uri = f"{parsed_url.scheme}://{parsed_url.netloc}{parsed_url.path}"
|
url = f"{parsed_url.scheme}://{parsed_url.netloc}{parsed_url.path}"
|
||||||
self.assertTrue(uri.startswith("http://zulip.testserver/accounts/login/subdomain/"))
|
self.assertTrue(url.startswith("http://zulip.testserver/accounts/login/subdomain/"))
|
||||||
|
|
||||||
def test_github_oauth2_login_only_one_account_exists(self) -> None:
|
def test_github_oauth2_login_only_one_account_exists(self) -> None:
|
||||||
# In a login flow, if only one of the user's verified emails
|
# In a login flow, if only one of the user's verified emails
|
||||||
|
@ -3799,8 +3799,8 @@ class GitHubAuthBackendTest(SocialAuthBase):
|
||||||
self.assertEqual(data["redirect_to"], "/user_uploads/image")
|
self.assertEqual(data["redirect_to"], "/user_uploads/image")
|
||||||
self.assertEqual(result.status_code, 302)
|
self.assertEqual(result.status_code, 302)
|
||||||
parsed_url = urllib.parse.urlparse(result["Location"])
|
parsed_url = urllib.parse.urlparse(result["Location"])
|
||||||
uri = f"{parsed_url.scheme}://{parsed_url.netloc}{parsed_url.path}"
|
url = f"{parsed_url.scheme}://{parsed_url.netloc}{parsed_url.path}"
|
||||||
self.assertTrue(uri.startswith("http://zulip.testserver/accounts/login/subdomain/"))
|
self.assertTrue(url.startswith("http://zulip.testserver/accounts/login/subdomain/"))
|
||||||
|
|
||||||
def test_github_oauth2_login_multiple_accounts_exist(self) -> None:
|
def test_github_oauth2_login_multiple_accounts_exist(self) -> None:
|
||||||
# In the login flow, if multiple of the user's verified emails
|
# In the login flow, if multiple of the user's verified emails
|
||||||
|
@ -3828,8 +3828,8 @@ class GitHubAuthBackendTest(SocialAuthBase):
|
||||||
self.assertEqual(data["redirect_to"], "/user_uploads/image")
|
self.assertEqual(data["redirect_to"], "/user_uploads/image")
|
||||||
self.assertEqual(result.status_code, 302)
|
self.assertEqual(result.status_code, 302)
|
||||||
parsed_url = urllib.parse.urlparse(result["Location"])
|
parsed_url = urllib.parse.urlparse(result["Location"])
|
||||||
uri = f"{parsed_url.scheme}://{parsed_url.netloc}{parsed_url.path}"
|
url = f"{parsed_url.scheme}://{parsed_url.netloc}{parsed_url.path}"
|
||||||
self.assertTrue(uri.startswith("http://zulip.testserver/accounts/login/subdomain/"))
|
self.assertTrue(url.startswith("http://zulip.testserver/accounts/login/subdomain/"))
|
||||||
|
|
||||||
def test_github_oauth2_login_no_account_exists(self) -> None:
|
def test_github_oauth2_login_no_account_exists(self) -> None:
|
||||||
# In the login flow, if the user has multiple verified emails,
|
# In the login flow, if the user has multiple verified emails,
|
||||||
|
@ -3889,8 +3889,8 @@ class GitHubAuthBackendTest(SocialAuthBase):
|
||||||
self.assertEqual(data["redirect_to"], "/user_uploads/image")
|
self.assertEqual(data["redirect_to"], "/user_uploads/image")
|
||||||
self.assertEqual(result.status_code, 302)
|
self.assertEqual(result.status_code, 302)
|
||||||
parsed_url = urllib.parse.urlparse(result["Location"])
|
parsed_url = urllib.parse.urlparse(result["Location"])
|
||||||
uri = f"{parsed_url.scheme}://{parsed_url.netloc}{parsed_url.path}"
|
url = f"{parsed_url.scheme}://{parsed_url.netloc}{parsed_url.path}"
|
||||||
self.assertTrue(uri.startswith("http://zulip.testserver/accounts/login/subdomain/"))
|
self.assertTrue(url.startswith("http://zulip.testserver/accounts/login/subdomain/"))
|
||||||
|
|
||||||
def test_github_oauth2_signup_choose_new_email_to_register(self) -> None:
|
def test_github_oauth2_signup_choose_new_email_to_register(self) -> None:
|
||||||
# In the sign up flow, if the user has multiple verified
|
# In the sign up flow, if the user has multiple verified
|
||||||
|
@ -3972,8 +3972,8 @@ class GitHubAuthBackendTest(SocialAuthBase):
|
||||||
self.assertEqual(data["subdomain"], "zulip")
|
self.assertEqual(data["subdomain"], "zulip")
|
||||||
self.assertEqual(result.status_code, 302)
|
self.assertEqual(result.status_code, 302)
|
||||||
parsed_url = urllib.parse.urlparse(result["Location"])
|
parsed_url = urllib.parse.urlparse(result["Location"])
|
||||||
uri = f"{parsed_url.scheme}://{parsed_url.netloc}{parsed_url.path}"
|
url = f"{parsed_url.scheme}://{parsed_url.netloc}{parsed_url.path}"
|
||||||
self.assertTrue(uri.startswith("http://zulip.testserver/accounts/login/subdomain/"))
|
self.assertTrue(url.startswith("http://zulip.testserver/accounts/login/subdomain/"))
|
||||||
|
|
||||||
def test_github_oauth2_email_not_associated(self) -> None:
|
def test_github_oauth2_email_not_associated(self) -> None:
|
||||||
account_data_dict = self.get_account_data_dict(
|
account_data_dict = self.get_account_data_dict(
|
||||||
|
@ -4091,7 +4091,7 @@ class GoogleAuthBackendTest(SocialAuthBase):
|
||||||
],
|
],
|
||||||
)
|
)
|
||||||
|
|
||||||
def test_social_auth_mobile_realm_uri(self) -> None:
|
def test_social_auth_mobile_realm_url(self) -> None:
|
||||||
mobile_flow_otp = "1234abcd" * 8
|
mobile_flow_otp = "1234abcd" * 8
|
||||||
account_data_dict = self.get_account_data_dict(email=self.email, name="Full Name")
|
account_data_dict = self.get_account_data_dict(email=self.email, name="Full Name")
|
||||||
|
|
||||||
|
|
|
@ -2432,17 +2432,17 @@ class NormalActionsTest(BaseAction):
|
||||||
self.login("hamlet")
|
self.login("hamlet")
|
||||||
fp = StringIO("zulip!")
|
fp = StringIO("zulip!")
|
||||||
fp.name = "zulip.txt"
|
fp.name = "zulip.txt"
|
||||||
uri = None
|
url = None
|
||||||
|
|
||||||
def do_upload() -> None:
|
def do_upload() -> None:
|
||||||
nonlocal uri
|
nonlocal url
|
||||||
result = self.client_post("/json/user_uploads", {"file": fp})
|
result = self.client_post("/json/user_uploads", {"file": fp})
|
||||||
|
|
||||||
response_dict = self.assert_json_success(result)
|
response_dict = self.assert_json_success(result)
|
||||||
self.assertIn("uri", response_dict)
|
self.assertIn("uri", response_dict)
|
||||||
uri = response_dict["uri"]
|
url = response_dict["uri"]
|
||||||
base = "/user_uploads/"
|
base = "/user_uploads/"
|
||||||
self.assertEqual(base, uri[: len(base)])
|
self.assertEqual(base, url[: len(base)])
|
||||||
|
|
||||||
events = self.verify_action(lambda: do_upload(), num_events=1, state_change_expected=False)
|
events = self.verify_action(lambda: do_upload(), num_events=1, state_change_expected=False)
|
||||||
|
|
||||||
|
@ -2455,8 +2455,8 @@ class NormalActionsTest(BaseAction):
|
||||||
|
|
||||||
hamlet = self.example_user("hamlet")
|
hamlet = self.example_user("hamlet")
|
||||||
self.subscribe(hamlet, "Denmark")
|
self.subscribe(hamlet, "Denmark")
|
||||||
assert uri is not None
|
assert url is not None
|
||||||
body = f"First message ...[zulip.txt](http://{hamlet.realm.host}" + uri + ")"
|
body = f"First message ...[zulip.txt](http://{hamlet.realm.host}" + url + ")"
|
||||||
events = self.verify_action(
|
events = self.verify_action(
|
||||||
lambda: self.send_stream_message(self.example_user("hamlet"), "Denmark", body, "test"),
|
lambda: self.send_stream_message(self.example_user("hamlet"), "Denmark", body, "test"),
|
||||||
num_events=2,
|
num_events=2,
|
||||||
|
|
|
@ -1480,7 +1480,7 @@ class MarkdownTest(ZulipTestCase):
|
||||||
],
|
],
|
||||||
)
|
)
|
||||||
|
|
||||||
# Test URI escaping
|
# Test URL escaping
|
||||||
RealmFilter(
|
RealmFilter(
|
||||||
realm=realm,
|
realm=realm,
|
||||||
pattern=r"url-(?P<id>[0-9]+)",
|
pattern=r"url-(?P<id>[0-9]+)",
|
||||||
|
|
|
@ -18,12 +18,12 @@ class RequestMockWithProxySupport(responses.RequestsMock):
|
||||||
**kwargs: Any,
|
**kwargs: Any,
|
||||||
) -> requests.Response:
|
) -> requests.Response:
|
||||||
if "proxies" in kwargs and request.url:
|
if "proxies" in kwargs and request.url:
|
||||||
proxy_uri = requests.utils.select_proxy(request.url, kwargs["proxies"])
|
proxy_url = requests.utils.select_proxy(request.url, kwargs["proxies"])
|
||||||
if proxy_uri is not None:
|
if proxy_url is not None:
|
||||||
request = requests.Request(
|
request = requests.Request(
|
||||||
method="GET",
|
method="GET",
|
||||||
url=f"{proxy_uri}/",
|
url=f"{proxy_url}/",
|
||||||
headers=adapter.proxy_headers(proxy_uri),
|
headers=adapter.proxy_headers(proxy_url),
|
||||||
).prepare()
|
).prepare()
|
||||||
return super()._on_request(adapter, request, **kwargs)
|
return super()._on_request(adapter, request, **kwargs)
|
||||||
|
|
||||||
|
|
|
@ -125,7 +125,7 @@ class RealmFilterTest(ZulipTestCase):
|
||||||
result = self.client_post("/json/realm/filters", info=data)
|
result = self.client_post("/json/realm/filters", info=data)
|
||||||
self.assert_json_success(result)
|
self.assert_json_success(result)
|
||||||
|
|
||||||
data["pattern"] = r"ZUL-URI-(?P<id>\d+)"
|
data["pattern"] = r"ZUL-URL-(?P<id>\d+)"
|
||||||
data["url_format_string"] = "https://example.com/%ba/%(id)s"
|
data["url_format_string"] = "https://example.com/%ba/%(id)s"
|
||||||
result = self.client_post("/json/realm/filters", info=data)
|
result = self.client_post("/json/realm/filters", info=data)
|
||||||
self.assert_json_success(result)
|
self.assert_json_success(result)
|
||||||
|
|
|
@ -1035,13 +1035,13 @@ class StreamAdminTest(ZulipTestCase):
|
||||||
fp.name = "zulip.txt"
|
fp.name = "zulip.txt"
|
||||||
|
|
||||||
result = self.client_post("/json/user_uploads", {"file": fp})
|
result = self.client_post("/json/user_uploads", {"file": fp})
|
||||||
uri = self.assert_json_success(result)["uri"]
|
url = self.assert_json_success(result)["uri"]
|
||||||
|
|
||||||
owner = self.example_user("desdemona")
|
owner = self.example_user("desdemona")
|
||||||
realm = owner.realm
|
realm = owner.realm
|
||||||
stream = self.make_stream("test_stream", realm=realm)
|
stream = self.make_stream("test_stream", realm=realm)
|
||||||
self.subscribe(owner, "test_stream")
|
self.subscribe(owner, "test_stream")
|
||||||
body = f"First message ...[zulip.txt](http://{realm.host}" + uri + ")"
|
body = f"First message ...[zulip.txt](http://{realm.host}" + url + ")"
|
||||||
msg_id = self.send_stream_message(owner, "test_stream", body, "test")
|
msg_id = self.send_stream_message(owner, "test_stream", body, "test")
|
||||||
attachment = Attachment.objects.get(messages__id=msg_id)
|
attachment = Attachment.objects.get(messages__id=msg_id)
|
||||||
|
|
||||||
|
|
|
@ -17,33 +17,33 @@ class ThumbnailTest(ZulipTestCase):
|
||||||
self.assert_json_success(result)
|
self.assert_json_success(result)
|
||||||
json = orjson.loads(result.content)
|
json = orjson.loads(result.content)
|
||||||
self.assertIn("uri", json)
|
self.assertIn("uri", json)
|
||||||
uri = json["uri"]
|
url = json["uri"]
|
||||||
base = "/user_uploads/"
|
base = "/user_uploads/"
|
||||||
self.assertEqual(base, uri[: len(base)])
|
self.assertEqual(base, url[: len(base)])
|
||||||
|
|
||||||
result = self.client_get("/thumbnail", {"url": uri[1:], "size": "full"})
|
result = self.client_get("/thumbnail", {"url": url[1:], "size": "full"})
|
||||||
self.assertEqual(result.status_code, 302, result)
|
self.assertEqual(result.status_code, 302, result)
|
||||||
self.assertEqual(uri, result["Location"])
|
self.assertEqual(url, result["Location"])
|
||||||
|
|
||||||
self.login("iago")
|
self.login("iago")
|
||||||
result = self.client_get("/thumbnail", {"url": uri[1:], "size": "full"})
|
result = self.client_get("/thumbnail", {"url": url[1:], "size": "full"})
|
||||||
self.assertEqual(result.status_code, 403, result)
|
self.assertEqual(result.status_code, 403, result)
|
||||||
self.assert_in_response("You are not authorized to view this file.", result)
|
self.assert_in_response("You are not authorized to view this file.", result)
|
||||||
|
|
||||||
uri = "https://www.google.com/images/srpr/logo4w.png"
|
url = "https://www.google.com/images/srpr/logo4w.png"
|
||||||
result = self.client_get("/thumbnail", {"url": uri, "size": "full"})
|
result = self.client_get("/thumbnail", {"url": url, "size": "full"})
|
||||||
self.assertEqual(result.status_code, 302, result)
|
self.assertEqual(result.status_code, 302, result)
|
||||||
base = "https://external-content.zulipcdn.net/external_content/56c362a24201593891955ff526b3b412c0f9fcd2/68747470733a2f2f7777772e676f6f676c652e636f6d2f696d616765732f737270722f6c6f676f34772e706e67"
|
base = "https://external-content.zulipcdn.net/external_content/56c362a24201593891955ff526b3b412c0f9fcd2/68747470733a2f2f7777772e676f6f676c652e636f6d2f696d616765732f737270722f6c6f676f34772e706e67"
|
||||||
self.assertEqual(base, result["Location"])
|
self.assertEqual(base, result["Location"])
|
||||||
|
|
||||||
uri = "http://www.google.com/images/srpr/logo4w.png"
|
url = "http://www.google.com/images/srpr/logo4w.png"
|
||||||
result = self.client_get("/thumbnail", {"url": uri, "size": "full"})
|
result = self.client_get("/thumbnail", {"url": url, "size": "full"})
|
||||||
self.assertEqual(result.status_code, 302, result)
|
self.assertEqual(result.status_code, 302, result)
|
||||||
base = "https://external-content.zulipcdn.net/external_content/7b6552b60c635e41e8f6daeb36d88afc4eabde79/687474703a2f2f7777772e676f6f676c652e636f6d2f696d616765732f737270722f6c6f676f34772e706e67"
|
base = "https://external-content.zulipcdn.net/external_content/7b6552b60c635e41e8f6daeb36d88afc4eabde79/687474703a2f2f7777772e676f6f676c652e636f6d2f696d616765732f737270722f6c6f676f34772e706e67"
|
||||||
self.assertEqual(base, result["Location"])
|
self.assertEqual(base, result["Location"])
|
||||||
|
|
||||||
uri = "//www.google.com/images/srpr/logo4w.png"
|
url = "//www.google.com/images/srpr/logo4w.png"
|
||||||
result = self.client_get("/thumbnail", {"url": uri, "size": "full"})
|
result = self.client_get("/thumbnail", {"url": url, "size": "full"})
|
||||||
self.assertEqual(result.status_code, 302, result)
|
self.assertEqual(result.status_code, 302, result)
|
||||||
base = "https://external-content.zulipcdn.net/external_content/676530cf4b101d56f56cc4a37c6ef4d4fd9b0c03/2f2f7777772e676f6f676c652e636f6d2f696d616765732f737270722f6c6f676f34772e706e67"
|
base = "https://external-content.zulipcdn.net/external_content/676530cf4b101d56f56cc4a37c6ef4d4fd9b0c03/2f2f7777772e676f6f676c652e636f6d2f696d616765732f737270722f6c6f676f34772e706e67"
|
||||||
self.assertEqual(base, result["Location"])
|
self.assertEqual(base, result["Location"])
|
||||||
|
@ -57,34 +57,34 @@ class ThumbnailTest(ZulipTestCase):
|
||||||
result = self.client_post("/json/user_uploads", {"file": fp})
|
result = self.client_post("/json/user_uploads", {"file": fp})
|
||||||
self.assert_json_success(result)
|
self.assert_json_success(result)
|
||||||
json = orjson.loads(result.content)
|
json = orjson.loads(result.content)
|
||||||
uri = json["uri"]
|
url = json["uri"]
|
||||||
|
|
||||||
add_ratelimit_rule(86400, 1000, domain="spectator_attachment_access_by_file")
|
add_ratelimit_rule(86400, 1000, domain="spectator_attachment_access_by_file")
|
||||||
# Deny file access for non-web-public stream
|
# Deny file access for non-web-public stream
|
||||||
self.subscribe(self.example_user("hamlet"), "Denmark")
|
self.subscribe(self.example_user("hamlet"), "Denmark")
|
||||||
host = self.example_user("hamlet").realm.host
|
host = self.example_user("hamlet").realm.host
|
||||||
body = f"First message ...[zulip.txt](http://{host}" + uri + ")"
|
body = f"First message ...[zulip.txt](http://{host}" + url + ")"
|
||||||
self.send_stream_message(self.example_user("hamlet"), "Denmark", body, "test")
|
self.send_stream_message(self.example_user("hamlet"), "Denmark", body, "test")
|
||||||
|
|
||||||
self.logout()
|
self.logout()
|
||||||
response = self.client_get("/thumbnail", {"url": uri[1:], "size": "full"})
|
response = self.client_get("/thumbnail", {"url": url[1:], "size": "full"})
|
||||||
self.assertEqual(response.status_code, 403)
|
self.assertEqual(response.status_code, 403)
|
||||||
|
|
||||||
# Allow file access for web-public stream
|
# Allow file access for web-public stream
|
||||||
self.login("hamlet")
|
self.login("hamlet")
|
||||||
self.make_stream("web-public-stream", is_web_public=True)
|
self.make_stream("web-public-stream", is_web_public=True)
|
||||||
self.subscribe(self.example_user("hamlet"), "web-public-stream")
|
self.subscribe(self.example_user("hamlet"), "web-public-stream")
|
||||||
body = f"First message ...[zulip.txt](http://{host}" + uri + ")"
|
body = f"First message ...[zulip.txt](http://{host}" + url + ")"
|
||||||
self.send_stream_message(self.example_user("hamlet"), "web-public-stream", body, "test")
|
self.send_stream_message(self.example_user("hamlet"), "web-public-stream", body, "test")
|
||||||
|
|
||||||
self.logout()
|
self.logout()
|
||||||
response = self.client_get("/thumbnail", {"url": uri[1:], "size": "full"})
|
response = self.client_get("/thumbnail", {"url": url[1:], "size": "full"})
|
||||||
self.assertEqual(response.status_code, 302)
|
self.assertEqual(response.status_code, 302)
|
||||||
remove_ratelimit_rule(86400, 1000, domain="spectator_attachment_access_by_file")
|
remove_ratelimit_rule(86400, 1000, domain="spectator_attachment_access_by_file")
|
||||||
|
|
||||||
# Deny file access since rate limited
|
# Deny file access since rate limited
|
||||||
add_ratelimit_rule(86400, 0, domain="spectator_attachment_access_by_file")
|
add_ratelimit_rule(86400, 0, domain="spectator_attachment_access_by_file")
|
||||||
response = self.client_get("/thumbnail", {"url": uri[1:], "size": "full"})
|
response = self.client_get("/thumbnail", {"url": url[1:], "size": "full"})
|
||||||
self.assertEqual(response.status_code, 403)
|
self.assertEqual(response.status_code, 403)
|
||||||
remove_ratelimit_rule(86400, 0, domain="spectator_attachment_access_by_file")
|
remove_ratelimit_rule(86400, 0, domain="spectator_attachment_access_by_file")
|
||||||
|
|
||||||
|
|
|
@ -68,19 +68,19 @@ class FileUploadTest(UploadSerializeMixin, ZulipTestCase):
|
||||||
result = self.api_post(self.example_user("hamlet"), "/api/v1/user_uploads", {"file": fp})
|
result = self.api_post(self.example_user("hamlet"), "/api/v1/user_uploads", {"file": fp})
|
||||||
response_dict = self.assert_json_success(result)
|
response_dict = self.assert_json_success(result)
|
||||||
self.assertIn("uri", response_dict)
|
self.assertIn("uri", response_dict)
|
||||||
uri = response_dict["uri"]
|
url = response_dict["uri"]
|
||||||
base = "/user_uploads/"
|
base = "/user_uploads/"
|
||||||
self.assertEqual(base, uri[: len(base)])
|
self.assertEqual(base, url[: len(base)])
|
||||||
|
|
||||||
# Download file via API
|
# Download file via API
|
||||||
self.logout()
|
self.logout()
|
||||||
response = self.api_get(self.example_user("hamlet"), uri)
|
response = self.api_get(self.example_user("hamlet"), url)
|
||||||
self.assertEqual(response.status_code, 200)
|
self.assertEqual(response.status_code, 200)
|
||||||
self.assert_streaming_content(response, b"zulip!")
|
self.assert_streaming_content(response, b"zulip!")
|
||||||
|
|
||||||
# Files uploaded through the API should be accessible via the web client
|
# Files uploaded through the API should be accessible via the web client
|
||||||
self.login("hamlet")
|
self.login("hamlet")
|
||||||
self.assert_streaming_content(self.client_get(uri), b"zulip!")
|
self.assert_streaming_content(self.client_get(url), b"zulip!")
|
||||||
|
|
||||||
def test_mobile_api_endpoint(self) -> None:
|
def test_mobile_api_endpoint(self) -> None:
|
||||||
"""
|
"""
|
||||||
|
@ -95,19 +95,19 @@ class FileUploadTest(UploadSerializeMixin, ZulipTestCase):
|
||||||
result = self.api_post(self.example_user("hamlet"), "/api/v1/user_uploads", {"file": fp})
|
result = self.api_post(self.example_user("hamlet"), "/api/v1/user_uploads", {"file": fp})
|
||||||
response_dict = self.assert_json_success(result)
|
response_dict = self.assert_json_success(result)
|
||||||
self.assertIn("uri", response_dict)
|
self.assertIn("uri", response_dict)
|
||||||
uri = response_dict["uri"]
|
url = response_dict["uri"]
|
||||||
base = "/user_uploads/"
|
base = "/user_uploads/"
|
||||||
self.assertEqual(base, uri[: len(base)])
|
self.assertEqual(base, url[: len(base)])
|
||||||
|
|
||||||
self.logout()
|
self.logout()
|
||||||
|
|
||||||
# Try to download file via API, passing URL and invalid API key
|
# Try to download file via API, passing URL and invalid API key
|
||||||
user_profile = self.example_user("hamlet")
|
user_profile = self.example_user("hamlet")
|
||||||
|
|
||||||
response = self.client_get(uri, {"api_key": "invalid"})
|
response = self.client_get(url, {"api_key": "invalid"})
|
||||||
self.assertEqual(response.status_code, 401)
|
self.assertEqual(response.status_code, 401)
|
||||||
|
|
||||||
response = self.client_get(uri, {"api_key": get_api_key(user_profile)})
|
response = self.client_get(url, {"api_key": get_api_key(user_profile)})
|
||||||
self.assertEqual(response.status_code, 200)
|
self.assertEqual(response.status_code, 200)
|
||||||
self.assert_streaming_content(response, b"zulip!")
|
self.assert_streaming_content(response, b"zulip!")
|
||||||
|
|
||||||
|
@ -172,7 +172,7 @@ class FileUploadTest(UploadSerializeMixin, ZulipTestCase):
|
||||||
# when Zulip is in DEVELOPMENT mode.
|
# when Zulip is in DEVELOPMENT mode.
|
||||||
def test_file_upload_authed(self) -> None:
|
def test_file_upload_authed(self) -> None:
|
||||||
"""
|
"""
|
||||||
A call to /json/user_uploads should return a uri and actually create an
|
A call to /json/user_uploads should return a url and actually create an
|
||||||
entry in the database. This entry will be marked unclaimed till a message
|
entry in the database. This entry will be marked unclaimed till a message
|
||||||
refers it.
|
refers it.
|
||||||
"""
|
"""
|
||||||
|
@ -183,17 +183,17 @@ class FileUploadTest(UploadSerializeMixin, ZulipTestCase):
|
||||||
result = self.client_post("/json/user_uploads", {"file": fp})
|
result = self.client_post("/json/user_uploads", {"file": fp})
|
||||||
response_dict = self.assert_json_success(result)
|
response_dict = self.assert_json_success(result)
|
||||||
self.assertIn("uri", response_dict)
|
self.assertIn("uri", response_dict)
|
||||||
uri = response_dict["uri"]
|
url = response_dict["uri"]
|
||||||
base = "/user_uploads/"
|
base = "/user_uploads/"
|
||||||
self.assertEqual(base, uri[: len(base)])
|
self.assertEqual(base, url[: len(base)])
|
||||||
|
|
||||||
# In the future, local file requests will follow the same style as S3
|
# In the future, local file requests will follow the same style as S3
|
||||||
# requests; they will be first authenticated and redirected
|
# requests; they will be first authenticated and redirected
|
||||||
self.assert_streaming_content(self.client_get(uri), b"zulip!")
|
self.assert_streaming_content(self.client_get(url), b"zulip!")
|
||||||
|
|
||||||
# Check the download endpoint
|
# Check the download endpoint
|
||||||
download_uri = uri.replace("/user_uploads/", "/user_uploads/download/")
|
download_url = url.replace("/user_uploads/", "/user_uploads/download/")
|
||||||
result = self.client_get(download_uri)
|
result = self.client_get(download_url)
|
||||||
self.assert_streaming_content(result, b"zulip!")
|
self.assert_streaming_content(result, b"zulip!")
|
||||||
self.assertIn("attachment;", result.headers["Content-Disposition"])
|
self.assertIn("attachment;", result.headers["Content-Disposition"])
|
||||||
|
|
||||||
|
@ -203,24 +203,24 @@ class FileUploadTest(UploadSerializeMixin, ZulipTestCase):
|
||||||
|
|
||||||
hamlet = self.example_user("hamlet")
|
hamlet = self.example_user("hamlet")
|
||||||
self.subscribe(hamlet, "Denmark")
|
self.subscribe(hamlet, "Denmark")
|
||||||
body = f"First message ...[zulip.txt]({hamlet.realm.host}{uri})"
|
body = f"First message ...[zulip.txt]({hamlet.realm.host}{url})"
|
||||||
self.send_stream_message(hamlet, "Denmark", body, "test")
|
self.send_stream_message(hamlet, "Denmark", body, "test")
|
||||||
|
|
||||||
# Now try the endpoint that's supposed to return a temporary URL for access
|
# Now try the endpoint that's supposed to return a temporary URL for access
|
||||||
# to the file.
|
# to the file.
|
||||||
result = self.client_get("/json" + uri)
|
result = self.client_get("/json" + url)
|
||||||
data = self.assert_json_success(result)
|
data = self.assert_json_success(result)
|
||||||
url_only_url = data["url"]
|
url_only_url = data["url"]
|
||||||
# Ensure this is different from the original uri:
|
# Ensure this is different from the original url:
|
||||||
self.assertNotEqual(url_only_url, uri)
|
self.assertNotEqual(url_only_url, url)
|
||||||
self.assertIn("user_uploads/temporary/", url_only_url)
|
self.assertIn("user_uploads/temporary/", url_only_url)
|
||||||
self.assertTrue(url_only_url.endswith("zulip.txt"))
|
self.assertTrue(url_only_url.endswith("zulip.txt"))
|
||||||
# The generated URL has a token authorizing the requestor to access the file
|
# The generated URL has a token authorizing the requestor to access the file
|
||||||
# without being logged in.
|
# without being logged in.
|
||||||
self.logout()
|
self.logout()
|
||||||
self.assert_streaming_content(self.client_get(url_only_url), b"zulip!")
|
self.assert_streaming_content(self.client_get(url_only_url), b"zulip!")
|
||||||
# The original uri shouldn't work when logged out:
|
# The original url shouldn't work when logged out:
|
||||||
result = self.client_get(uri)
|
result = self.client_get(url)
|
||||||
self.assertEqual(result.status_code, 403)
|
self.assertEqual(result.status_code, 403)
|
||||||
|
|
||||||
@override_settings(RATE_LIMITING=True)
|
@override_settings(RATE_LIMITING=True)
|
||||||
|
@ -230,34 +230,34 @@ class FileUploadTest(UploadSerializeMixin, ZulipTestCase):
|
||||||
fp.name = "zulip_web_public.txt"
|
fp.name = "zulip_web_public.txt"
|
||||||
|
|
||||||
result = self.client_post("/json/user_uploads", {"file": fp})
|
result = self.client_post("/json/user_uploads", {"file": fp})
|
||||||
uri = self.assert_json_success(result)["uri"]
|
url = self.assert_json_success(result)["uri"]
|
||||||
|
|
||||||
add_ratelimit_rule(86400, 1000, domain="spectator_attachment_access_by_file")
|
add_ratelimit_rule(86400, 1000, domain="spectator_attachment_access_by_file")
|
||||||
# Deny file access for non-web-public stream
|
# Deny file access for non-web-public stream
|
||||||
self.subscribe(self.example_user("hamlet"), "Denmark")
|
self.subscribe(self.example_user("hamlet"), "Denmark")
|
||||||
host = self.example_user("hamlet").realm.host
|
host = self.example_user("hamlet").realm.host
|
||||||
body = f"First message ...[zulip.txt](http://{host}" + uri + ")"
|
body = f"First message ...[zulip.txt](http://{host}" + url + ")"
|
||||||
self.send_stream_message(self.example_user("hamlet"), "Denmark", body, "test")
|
self.send_stream_message(self.example_user("hamlet"), "Denmark", body, "test")
|
||||||
|
|
||||||
self.logout()
|
self.logout()
|
||||||
response = self.client_get(uri)
|
response = self.client_get(url)
|
||||||
self.assertEqual(response.status_code, 403)
|
self.assertEqual(response.status_code, 403)
|
||||||
|
|
||||||
# Allow file access for web-public stream
|
# Allow file access for web-public stream
|
||||||
self.login("hamlet")
|
self.login("hamlet")
|
||||||
self.make_stream("web-public-stream", is_web_public=True)
|
self.make_stream("web-public-stream", is_web_public=True)
|
||||||
self.subscribe(self.example_user("hamlet"), "web-public-stream")
|
self.subscribe(self.example_user("hamlet"), "web-public-stream")
|
||||||
body = f"First message ...[zulip.txt](http://{host}" + uri + ")"
|
body = f"First message ...[zulip.txt](http://{host}" + url + ")"
|
||||||
self.send_stream_message(self.example_user("hamlet"), "web-public-stream", body, "test")
|
self.send_stream_message(self.example_user("hamlet"), "web-public-stream", body, "test")
|
||||||
|
|
||||||
self.logout()
|
self.logout()
|
||||||
response = self.client_get(uri)
|
response = self.client_get(url)
|
||||||
self.assertEqual(response.status_code, 200)
|
self.assertEqual(response.status_code, 200)
|
||||||
remove_ratelimit_rule(86400, 1000, domain="spectator_attachment_access_by_file")
|
remove_ratelimit_rule(86400, 1000, domain="spectator_attachment_access_by_file")
|
||||||
|
|
||||||
# Deny file access since rate limited
|
# Deny file access since rate limited
|
||||||
add_ratelimit_rule(86400, 0, domain="spectator_attachment_access_by_file")
|
add_ratelimit_rule(86400, 0, domain="spectator_attachment_access_by_file")
|
||||||
response = self.client_get(uri)
|
response = self.client_get(url)
|
||||||
self.assertEqual(response.status_code, 403)
|
self.assertEqual(response.status_code, 403)
|
||||||
remove_ratelimit_rule(86400, 0, domain="spectator_attachment_access_by_file")
|
remove_ratelimit_rule(86400, 0, domain="spectator_attachment_access_by_file")
|
||||||
|
|
||||||
|
@ -316,10 +316,10 @@ class FileUploadTest(UploadSerializeMixin, ZulipTestCase):
|
||||||
fp.name = "zulip.txt"
|
fp.name = "zulip.txt"
|
||||||
result = self.client_post("/json/user_uploads", {"file": fp})
|
result = self.client_post("/json/user_uploads", {"file": fp})
|
||||||
response_dict = self.assert_json_success(result)
|
response_dict = self.assert_json_success(result)
|
||||||
uri = response_dict["uri"]
|
url = response_dict["uri"]
|
||||||
|
|
||||||
self.logout()
|
self.logout()
|
||||||
response = self.client_get(uri)
|
response = self.client_get(url)
|
||||||
self.assertEqual(response.status_code, 403)
|
self.assertEqual(response.status_code, 403)
|
||||||
self.assert_in_response("<p>You are not authorized to view this file.</p>", response)
|
self.assert_in_response("<p>You are not authorized to view this file.</p>", response)
|
||||||
|
|
||||||
|
@ -677,8 +677,8 @@ class FileUploadTest(UploadSerializeMixin, ZulipTestCase):
|
||||||
fp = StringIO("zulip!")
|
fp = StringIO("zulip!")
|
||||||
fp.name = "zulip.txt"
|
fp.name = "zulip.txt"
|
||||||
result = self.client_post("/json/user_uploads", {"file": fp})
|
result = self.client_post("/json/user_uploads", {"file": fp})
|
||||||
uri = self.assert_json_success(result)["uri"]
|
url = self.assert_json_success(result)["uri"]
|
||||||
fp_path_id = re.sub("/user_uploads/", "", uri)
|
fp_path_id = re.sub("/user_uploads/", "", url)
|
||||||
body = f"First message ...[zulip.txt](http://{host}/user_uploads/" + fp_path_id + ")"
|
body = f"First message ...[zulip.txt](http://{host}/user_uploads/" + fp_path_id + ")"
|
||||||
with self.settings(CROSS_REALM_BOT_EMAILS={user_2.email, user_3.email}):
|
with self.settings(CROSS_REALM_BOT_EMAILS={user_2.email, user_3.email}):
|
||||||
internal_send_private_message(
|
internal_send_private_message(
|
||||||
|
@ -688,20 +688,20 @@ class FileUploadTest(UploadSerializeMixin, ZulipTestCase):
|
||||||
)
|
)
|
||||||
|
|
||||||
self.login_user(user_1)
|
self.login_user(user_1)
|
||||||
response = self.client_get(uri, subdomain=test_subdomain)
|
response = self.client_get(url, subdomain=test_subdomain)
|
||||||
self.assertEqual(response.status_code, 200)
|
self.assertEqual(response.status_code, 200)
|
||||||
self.assert_streaming_content(response, b"zulip!")
|
self.assert_streaming_content(response, b"zulip!")
|
||||||
self.logout()
|
self.logout()
|
||||||
|
|
||||||
# Confirm other cross-realm users can't read it.
|
# Confirm other cross-realm users can't read it.
|
||||||
self.login_user(user_3)
|
self.login_user(user_3)
|
||||||
response = self.client_get(uri, subdomain=test_subdomain)
|
response = self.client_get(url, subdomain=test_subdomain)
|
||||||
self.assertEqual(response.status_code, 403)
|
self.assertEqual(response.status_code, 403)
|
||||||
self.assert_in_response("You are not authorized to view this file.", response)
|
self.assert_in_response("You are not authorized to view this file.", response)
|
||||||
|
|
||||||
# Verify that cross-realm access to files for spectators is denied.
|
# Verify that cross-realm access to files for spectators is denied.
|
||||||
self.logout()
|
self.logout()
|
||||||
response = self.client_get(uri, subdomain=test_subdomain)
|
response = self.client_get(url, subdomain=test_subdomain)
|
||||||
self.assertEqual(response.status_code, 403)
|
self.assertEqual(response.status_code, 403)
|
||||||
|
|
||||||
def test_file_download_authorization_invite_only(self) -> None:
|
def test_file_download_authorization_invite_only(self) -> None:
|
||||||
|
@ -722,8 +722,8 @@ class FileUploadTest(UploadSerializeMixin, ZulipTestCase):
|
||||||
fp = StringIO("zulip!")
|
fp = StringIO("zulip!")
|
||||||
fp.name = "zulip.txt"
|
fp.name = "zulip.txt"
|
||||||
result = self.client_post("/json/user_uploads", {"file": fp})
|
result = self.client_post("/json/user_uploads", {"file": fp})
|
||||||
uri = self.assert_json_success(result)["uri"]
|
url = self.assert_json_success(result)["uri"]
|
||||||
fp_path_id = re.sub("/user_uploads/", "", uri)
|
fp_path_id = re.sub("/user_uploads/", "", url)
|
||||||
body = f"First message ...[zulip.txt](http://{realm.host}/user_uploads/" + fp_path_id + ")"
|
body = f"First message ...[zulip.txt](http://{realm.host}/user_uploads/" + fp_path_id + ")"
|
||||||
self.send_stream_message(hamlet, stream_name, body, "test")
|
self.send_stream_message(hamlet, stream_name, body, "test")
|
||||||
self.logout()
|
self.logout()
|
||||||
|
@ -731,7 +731,7 @@ class FileUploadTest(UploadSerializeMixin, ZulipTestCase):
|
||||||
# Owner user should be able to view file
|
# Owner user should be able to view file
|
||||||
self.login_user(hamlet)
|
self.login_user(hamlet)
|
||||||
with self.assert_database_query_count(5):
|
with self.assert_database_query_count(5):
|
||||||
response = self.client_get(uri)
|
response = self.client_get(url)
|
||||||
self.assertEqual(response.status_code, 200)
|
self.assertEqual(response.status_code, 200)
|
||||||
self.assert_streaming_content(response, b"zulip!")
|
self.assert_streaming_content(response, b"zulip!")
|
||||||
self.logout()
|
self.logout()
|
||||||
|
@ -739,13 +739,13 @@ class FileUploadTest(UploadSerializeMixin, ZulipTestCase):
|
||||||
# Subscribed user who received the message should be able to view file
|
# Subscribed user who received the message should be able to view file
|
||||||
self.login_user(cordelia)
|
self.login_user(cordelia)
|
||||||
with self.assert_database_query_count(6):
|
with self.assert_database_query_count(6):
|
||||||
response = self.client_get(uri)
|
response = self.client_get(url)
|
||||||
self.assertEqual(response.status_code, 200)
|
self.assertEqual(response.status_code, 200)
|
||||||
self.assert_streaming_content(response, b"zulip!")
|
self.assert_streaming_content(response, b"zulip!")
|
||||||
self.logout()
|
self.logout()
|
||||||
|
|
||||||
def assert_cannot_access_file(user: UserProfile) -> None:
|
def assert_cannot_access_file(user: UserProfile) -> None:
|
||||||
response = self.api_get(user, uri)
|
response = self.api_get(user, url)
|
||||||
self.assertEqual(response.status_code, 403)
|
self.assertEqual(response.status_code, 403)
|
||||||
self.assert_in_response("You are not authorized to view this file.", response)
|
self.assert_in_response("You are not authorized to view this file.", response)
|
||||||
|
|
||||||
|
@ -774,8 +774,8 @@ class FileUploadTest(UploadSerializeMixin, ZulipTestCase):
|
||||||
fp = StringIO("zulip!")
|
fp = StringIO("zulip!")
|
||||||
fp.name = "zulip.txt"
|
fp.name = "zulip.txt"
|
||||||
result = self.client_post("/json/user_uploads", {"file": fp})
|
result = self.client_post("/json/user_uploads", {"file": fp})
|
||||||
uri = self.assert_json_success(result)["uri"]
|
url = self.assert_json_success(result)["uri"]
|
||||||
fp_path_id = re.sub("/user_uploads/", "", uri)
|
fp_path_id = re.sub("/user_uploads/", "", url)
|
||||||
body = (
|
body = (
|
||||||
f"First message ...[zulip.txt](http://{user.realm.host}/user_uploads/"
|
f"First message ...[zulip.txt](http://{user.realm.host}/user_uploads/"
|
||||||
+ fp_path_id
|
+ fp_path_id
|
||||||
|
@ -792,7 +792,7 @@ class FileUploadTest(UploadSerializeMixin, ZulipTestCase):
|
||||||
# Owner user should be able to view file
|
# Owner user should be able to view file
|
||||||
self.login_user(user)
|
self.login_user(user)
|
||||||
with self.assert_database_query_count(5):
|
with self.assert_database_query_count(5):
|
||||||
response = self.client_get(uri)
|
response = self.client_get(url)
|
||||||
self.assertEqual(response.status_code, 200)
|
self.assertEqual(response.status_code, 200)
|
||||||
self.assert_streaming_content(response, b"zulip!")
|
self.assert_streaming_content(response, b"zulip!")
|
||||||
self.logout()
|
self.logout()
|
||||||
|
@ -800,7 +800,7 @@ class FileUploadTest(UploadSerializeMixin, ZulipTestCase):
|
||||||
# Originally subscribed user should be able to view file
|
# Originally subscribed user should be able to view file
|
||||||
self.login_user(polonius)
|
self.login_user(polonius)
|
||||||
with self.assert_database_query_count(6):
|
with self.assert_database_query_count(6):
|
||||||
response = self.client_get(uri)
|
response = self.client_get(url)
|
||||||
self.assertEqual(response.status_code, 200)
|
self.assertEqual(response.status_code, 200)
|
||||||
self.assert_streaming_content(response, b"zulip!")
|
self.assert_streaming_content(response, b"zulip!")
|
||||||
self.logout()
|
self.logout()
|
||||||
|
@ -808,7 +808,7 @@ class FileUploadTest(UploadSerializeMixin, ZulipTestCase):
|
||||||
# Subscribed user who did not receive the message should also be able to view file
|
# Subscribed user who did not receive the message should also be able to view file
|
||||||
self.login_user(late_subscribed_user)
|
self.login_user(late_subscribed_user)
|
||||||
with self.assert_database_query_count(9):
|
with self.assert_database_query_count(9):
|
||||||
response = self.client_get(uri)
|
response = self.client_get(url)
|
||||||
self.assertEqual(response.status_code, 200)
|
self.assertEqual(response.status_code, 200)
|
||||||
self.assert_streaming_content(response, b"zulip!")
|
self.assert_streaming_content(response, b"zulip!")
|
||||||
self.logout()
|
self.logout()
|
||||||
|
@ -818,7 +818,7 @@ class FileUploadTest(UploadSerializeMixin, ZulipTestCase):
|
||||||
self.login_user(user)
|
self.login_user(user)
|
||||||
# It takes a few extra queries to verify lack of access with shared history.
|
# It takes a few extra queries to verify lack of access with shared history.
|
||||||
with self.assert_database_query_count(8):
|
with self.assert_database_query_count(8):
|
||||||
response = self.client_get(uri)
|
response = self.client_get(url)
|
||||||
self.assertEqual(response.status_code, 403)
|
self.assertEqual(response.status_code, 403)
|
||||||
self.assert_in_response("You are not authorized to view this file.", response)
|
self.assert_in_response("You are not authorized to view this file.", response)
|
||||||
self.logout()
|
self.logout()
|
||||||
|
@ -843,8 +843,8 @@ class FileUploadTest(UploadSerializeMixin, ZulipTestCase):
|
||||||
fp = StringIO("zulip!")
|
fp = StringIO("zulip!")
|
||||||
fp.name = "zulip.txt"
|
fp.name = "zulip.txt"
|
||||||
result = self.client_post("/json/user_uploads", {"file": fp})
|
result = self.client_post("/json/user_uploads", {"file": fp})
|
||||||
uri = self.assert_json_success(result)["uri"]
|
url = self.assert_json_success(result)["uri"]
|
||||||
fp_path_id = re.sub("/user_uploads/", "", uri)
|
fp_path_id = re.sub("/user_uploads/", "", url)
|
||||||
for i in range(20):
|
for i in range(20):
|
||||||
body = (
|
body = (
|
||||||
f"First message ...[zulip.txt](http://{hamlet.realm.host}/user_uploads/"
|
f"First message ...[zulip.txt](http://{hamlet.realm.host}/user_uploads/"
|
||||||
|
@ -859,7 +859,7 @@ class FileUploadTest(UploadSerializeMixin, ZulipTestCase):
|
||||||
user = self.example_user("aaron")
|
user = self.example_user("aaron")
|
||||||
self.login_user(user)
|
self.login_user(user)
|
||||||
with self.assert_database_query_count(8):
|
with self.assert_database_query_count(8):
|
||||||
response = self.client_get(uri)
|
response = self.client_get(url)
|
||||||
self.assertEqual(response.status_code, 403)
|
self.assertEqual(response.status_code, 403)
|
||||||
self.assert_in_response("You are not authorized to view this file.", response)
|
self.assert_in_response("You are not authorized to view this file.", response)
|
||||||
|
|
||||||
|
@ -868,7 +868,7 @@ class FileUploadTest(UploadSerializeMixin, ZulipTestCase):
|
||||||
|
|
||||||
# If we were accidentally one query per message, this would be 20+
|
# If we were accidentally one query per message, this would be 20+
|
||||||
with self.assert_database_query_count(9):
|
with self.assert_database_query_count(9):
|
||||||
response = self.client_get(uri)
|
response = self.client_get(url)
|
||||||
self.assertEqual(response.status_code, 200)
|
self.assertEqual(response.status_code, 200)
|
||||||
self.assert_streaming_content(response, b"zulip!")
|
self.assert_streaming_content(response, b"zulip!")
|
||||||
|
|
||||||
|
@ -888,8 +888,8 @@ class FileUploadTest(UploadSerializeMixin, ZulipTestCase):
|
||||||
fp = StringIO("zulip!")
|
fp = StringIO("zulip!")
|
||||||
fp.name = "zulip.txt"
|
fp.name = "zulip.txt"
|
||||||
result = self.client_post("/json/user_uploads", {"file": fp})
|
result = self.client_post("/json/user_uploads", {"file": fp})
|
||||||
uri = self.assert_json_success(result)["uri"]
|
url = self.assert_json_success(result)["uri"]
|
||||||
fp_path_id = re.sub("/user_uploads/", "", uri)
|
fp_path_id = re.sub("/user_uploads/", "", url)
|
||||||
body = f"First message ...[zulip.txt](http://{realm.host}/user_uploads/" + fp_path_id + ")"
|
body = f"First message ...[zulip.txt](http://{realm.host}/user_uploads/" + fp_path_id + ")"
|
||||||
self.send_stream_message(self.example_user("hamlet"), "test-subscribe", body, "test")
|
self.send_stream_message(self.example_user("hamlet"), "test-subscribe", body, "test")
|
||||||
self.logout()
|
self.logout()
|
||||||
|
@ -897,7 +897,7 @@ class FileUploadTest(UploadSerializeMixin, ZulipTestCase):
|
||||||
# Now all users should be able to access the files
|
# Now all users should be able to access the files
|
||||||
for user in subscribed_users + unsubscribed_users:
|
for user in subscribed_users + unsubscribed_users:
|
||||||
self.login_user(user)
|
self.login_user(user)
|
||||||
response = self.client_get(uri)
|
response = self.client_get(url)
|
||||||
self.assert_streaming_content(response, b"zulip!")
|
self.assert_streaming_content(response, b"zulip!")
|
||||||
self.logout()
|
self.logout()
|
||||||
|
|
||||||
|
@ -912,13 +912,13 @@ class FileUploadTest(UploadSerializeMixin, ZulipTestCase):
|
||||||
fp = StringIO("zulip!")
|
fp = StringIO("zulip!")
|
||||||
fp.name = name
|
fp.name = name
|
||||||
result = self.client_post("/json/user_uploads", {"file": fp})
|
result = self.client_post("/json/user_uploads", {"file": fp})
|
||||||
uri = self.assert_json_success(result)["uri"]
|
url = self.assert_json_success(result)["uri"]
|
||||||
fp_path_id = re.sub("/user_uploads/", "", uri)
|
fp_path_id = re.sub("/user_uploads/", "", url)
|
||||||
fp_path = os.path.split(fp_path_id)[0]
|
fp_path = os.path.split(fp_path_id)[0]
|
||||||
if download:
|
if download:
|
||||||
uri = uri.replace("/user_uploads/", "/user_uploads/download/")
|
url = url.replace("/user_uploads/", "/user_uploads/download/")
|
||||||
with self.settings(DEVELOPMENT=False):
|
with self.settings(DEVELOPMENT=False):
|
||||||
response = self.client_get(uri)
|
response = self.client_get(url)
|
||||||
assert settings.LOCAL_UPLOADS_DIR is not None
|
assert settings.LOCAL_UPLOADS_DIR is not None
|
||||||
test_run, worker = os.path.split(os.path.dirname(settings.LOCAL_UPLOADS_DIR))
|
test_run, worker = os.path.split(os.path.dirname(settings.LOCAL_UPLOADS_DIR))
|
||||||
self.assertEqual(
|
self.assertEqual(
|
||||||
|
|
|
@ -42,13 +42,13 @@ from zerver.models import (
|
||||||
class LocalStorageTest(UploadSerializeMixin, ZulipTestCase):
|
class LocalStorageTest(UploadSerializeMixin, ZulipTestCase):
|
||||||
def test_upload_message_attachment(self) -> None:
|
def test_upload_message_attachment(self) -> None:
|
||||||
user_profile = self.example_user("hamlet")
|
user_profile = self.example_user("hamlet")
|
||||||
uri = upload_message_attachment(
|
url = upload_message_attachment(
|
||||||
"dummy.txt", len(b"zulip!"), "text/plain", b"zulip!", user_profile
|
"dummy.txt", len(b"zulip!"), "text/plain", b"zulip!", user_profile
|
||||||
)
|
)
|
||||||
|
|
||||||
base = "/user_uploads/"
|
base = "/user_uploads/"
|
||||||
self.assertEqual(base, uri[: len(base)])
|
self.assertEqual(base, url[: len(base)])
|
||||||
path_id = re.sub("/user_uploads/", "", uri)
|
path_id = re.sub("/user_uploads/", "", url)
|
||||||
assert settings.LOCAL_UPLOADS_DIR is not None
|
assert settings.LOCAL_UPLOADS_DIR is not None
|
||||||
assert settings.LOCAL_FILES_DIR is not None
|
assert settings.LOCAL_FILES_DIR is not None
|
||||||
file_path = os.path.join(settings.LOCAL_FILES_DIR, path_id)
|
file_path = os.path.join(settings.LOCAL_FILES_DIR, path_id)
|
||||||
|
@ -59,11 +59,11 @@ class LocalStorageTest(UploadSerializeMixin, ZulipTestCase):
|
||||||
|
|
||||||
def test_save_attachment_contents(self) -> None:
|
def test_save_attachment_contents(self) -> None:
|
||||||
user_profile = self.example_user("hamlet")
|
user_profile = self.example_user("hamlet")
|
||||||
uri = upload_message_attachment(
|
url = upload_message_attachment(
|
||||||
"dummy.txt", len(b"zulip!"), "text/plain", b"zulip!", user_profile
|
"dummy.txt", len(b"zulip!"), "text/plain", b"zulip!", user_profile
|
||||||
)
|
)
|
||||||
|
|
||||||
path_id = re.sub("/user_uploads/", "", uri)
|
path_id = re.sub("/user_uploads/", "", url)
|
||||||
output = BytesIO()
|
output = BytesIO()
|
||||||
save_attachment_contents(path_id, output)
|
save_attachment_contents(path_id, output)
|
||||||
self.assertEqual(output.getvalue(), b"zulip!")
|
self.assertEqual(output.getvalue(), b"zulip!")
|
||||||
|
@ -79,11 +79,11 @@ class LocalStorageTest(UploadSerializeMixin, ZulipTestCase):
|
||||||
user_profile = get_system_bot(settings.EMAIL_GATEWAY_BOT, internal_realm.id)
|
user_profile = get_system_bot(settings.EMAIL_GATEWAY_BOT, internal_realm.id)
|
||||||
self.assertEqual(user_profile.realm, internal_realm)
|
self.assertEqual(user_profile.realm, internal_realm)
|
||||||
|
|
||||||
uri = upload_message_attachment(
|
url = upload_message_attachment(
|
||||||
"dummy.txt", len(b"zulip!"), "text/plain", b"zulip!", user_profile, zulip_realm
|
"dummy.txt", len(b"zulip!"), "text/plain", b"zulip!", user_profile, zulip_realm
|
||||||
)
|
)
|
||||||
# Ensure the correct realm id of the target realm is used instead of the bot's realm.
|
# Ensure the correct realm id of the target realm is used instead of the bot's realm.
|
||||||
self.assertTrue(uri.startswith(f"/user_uploads/{zulip_realm.id}/"))
|
self.assertTrue(url.startswith(f"/user_uploads/{zulip_realm.id}/"))
|
||||||
|
|
||||||
def test_delete_message_attachment(self) -> None:
|
def test_delete_message_attachment(self) -> None:
|
||||||
self.login("hamlet")
|
self.login("hamlet")
|
||||||
|
@ -108,12 +108,12 @@ class LocalStorageTest(UploadSerializeMixin, ZulipTestCase):
|
||||||
user_profile = self.example_user("hamlet")
|
user_profile = self.example_user("hamlet")
|
||||||
path_ids = []
|
path_ids = []
|
||||||
for n in range(1, 1005):
|
for n in range(1, 1005):
|
||||||
uri = upload_message_attachment(
|
url = upload_message_attachment(
|
||||||
"dummy.txt", len(b"zulip!"), "text/plain", b"zulip!", user_profile
|
"dummy.txt", len(b"zulip!"), "text/plain", b"zulip!", user_profile
|
||||||
)
|
)
|
||||||
base = "/user_uploads/"
|
base = "/user_uploads/"
|
||||||
self.assertEqual(base, uri[: len(base)])
|
self.assertEqual(base, url[: len(base)])
|
||||||
path_id = re.sub("/user_uploads/", "", uri)
|
path_id = re.sub("/user_uploads/", "", url)
|
||||||
path_ids.append(path_id)
|
path_ids.append(path_id)
|
||||||
file_path = os.path.join(settings.LOCAL_FILES_DIR, path_id)
|
file_path = os.path.join(settings.LOCAL_FILES_DIR, path_id)
|
||||||
self.assertTrue(os.path.isfile(file_path))
|
self.assertTrue(os.path.isfile(file_path))
|
||||||
|
@ -256,14 +256,14 @@ class LocalStorageTest(UploadSerializeMixin, ZulipTestCase):
|
||||||
f.write("dummy")
|
f.write("dummy")
|
||||||
|
|
||||||
assert settings.LOCAL_AVATARS_DIR is not None
|
assert settings.LOCAL_AVATARS_DIR is not None
|
||||||
uri = upload_export_tarball(user_profile.realm, tarball_path)
|
url = upload_export_tarball(user_profile.realm, tarball_path)
|
||||||
self.assertTrue(os.path.isfile(os.path.join(settings.LOCAL_AVATARS_DIR, tarball_path)))
|
self.assertTrue(os.path.isfile(os.path.join(settings.LOCAL_AVATARS_DIR, tarball_path)))
|
||||||
|
|
||||||
result = re.search(re.compile(r"([A-Za-z0-9\-_]{24})"), uri)
|
result = re.search(re.compile(r"([A-Za-z0-9\-_]{24})"), url)
|
||||||
if result is not None:
|
if result is not None:
|
||||||
random_name = result.group(1)
|
random_name = result.group(1)
|
||||||
expected_url = f"http://zulip.testserver/user_avatars/exports/{user_profile.realm_id}/{random_name}/tarball.tar.gz"
|
expected_url = f"http://zulip.testserver/user_avatars/exports/{user_profile.realm_id}/{random_name}/tarball.tar.gz"
|
||||||
self.assertEqual(expected_url, uri)
|
self.assertEqual(expected_url, url)
|
||||||
|
|
||||||
# Delete the tarball.
|
# Delete the tarball.
|
||||||
with self.assertLogs(level="WARNING") as warn_log:
|
with self.assertLogs(level="WARNING") as warn_log:
|
||||||
|
@ -272,5 +272,5 @@ class LocalStorageTest(UploadSerializeMixin, ZulipTestCase):
|
||||||
warn_log.output,
|
warn_log.output,
|
||||||
["WARNING:root:not_a_file does not exist. Its entry in the database will be removed."],
|
["WARNING:root:not_a_file does not exist. Its entry in the database will be removed."],
|
||||||
)
|
)
|
||||||
path_id = urllib.parse.urlparse(uri).path
|
path_id = urllib.parse.urlparse(url).path
|
||||||
self.assertEqual(delete_export_tarball(path_id), path_id)
|
self.assertEqual(delete_export_tarball(path_id), path_id)
|
||||||
|
|
|
@ -51,13 +51,13 @@ class S3Test(ZulipTestCase):
|
||||||
bucket = create_s3_buckets(settings.S3_AUTH_UPLOADS_BUCKET)[0]
|
bucket = create_s3_buckets(settings.S3_AUTH_UPLOADS_BUCKET)[0]
|
||||||
|
|
||||||
user_profile = self.example_user("hamlet")
|
user_profile = self.example_user("hamlet")
|
||||||
uri = upload_message_attachment(
|
url = upload_message_attachment(
|
||||||
"dummy.txt", len(b"zulip!"), "text/plain", b"zulip!", user_profile
|
"dummy.txt", len(b"zulip!"), "text/plain", b"zulip!", user_profile
|
||||||
)
|
)
|
||||||
|
|
||||||
base = "/user_uploads/"
|
base = "/user_uploads/"
|
||||||
self.assertEqual(base, uri[: len(base)])
|
self.assertEqual(base, url[: len(base)])
|
||||||
path_id = re.sub("/user_uploads/", "", uri)
|
path_id = re.sub("/user_uploads/", "", url)
|
||||||
content = bucket.Object(path_id).get()["Body"].read()
|
content = bucket.Object(path_id).get()["Body"].read()
|
||||||
self.assertEqual(b"zulip!", content)
|
self.assertEqual(b"zulip!", content)
|
||||||
|
|
||||||
|
@ -65,18 +65,18 @@ class S3Test(ZulipTestCase):
|
||||||
self.assert_length(b"zulip!", uploaded_file.size)
|
self.assert_length(b"zulip!", uploaded_file.size)
|
||||||
|
|
||||||
self.subscribe(self.example_user("hamlet"), "Denmark")
|
self.subscribe(self.example_user("hamlet"), "Denmark")
|
||||||
body = f"First message ...[zulip.txt](http://{user_profile.realm.host}{uri})"
|
body = f"First message ...[zulip.txt](http://{user_profile.realm.host}{url})"
|
||||||
self.send_stream_message(self.example_user("hamlet"), "Denmark", body, "test")
|
self.send_stream_message(self.example_user("hamlet"), "Denmark", body, "test")
|
||||||
|
|
||||||
@use_s3_backend
|
@use_s3_backend
|
||||||
def test_save_attachment_contents(self) -> None:
|
def test_save_attachment_contents(self) -> None:
|
||||||
create_s3_buckets(settings.S3_AUTH_UPLOADS_BUCKET)
|
create_s3_buckets(settings.S3_AUTH_UPLOADS_BUCKET)
|
||||||
user_profile = self.example_user("hamlet")
|
user_profile = self.example_user("hamlet")
|
||||||
uri = upload_message_attachment(
|
url = upload_message_attachment(
|
||||||
"dummy.txt", len(b"zulip!"), "text/plain", b"zulip!", user_profile
|
"dummy.txt", len(b"zulip!"), "text/plain", b"zulip!", user_profile
|
||||||
)
|
)
|
||||||
|
|
||||||
path_id = re.sub("/user_uploads/", "", uri)
|
path_id = re.sub("/user_uploads/", "", url)
|
||||||
output = BytesIO()
|
output = BytesIO()
|
||||||
save_attachment_contents(path_id, output)
|
save_attachment_contents(path_id, output)
|
||||||
self.assertEqual(output.getvalue(), b"zulip!")
|
self.assertEqual(output.getvalue(), b"zulip!")
|
||||||
|
@ -94,20 +94,20 @@ class S3Test(ZulipTestCase):
|
||||||
user_profile = get_system_bot(settings.EMAIL_GATEWAY_BOT, internal_realm.id)
|
user_profile = get_system_bot(settings.EMAIL_GATEWAY_BOT, internal_realm.id)
|
||||||
self.assertEqual(user_profile.realm, internal_realm)
|
self.assertEqual(user_profile.realm, internal_realm)
|
||||||
|
|
||||||
uri = upload_message_attachment(
|
url = upload_message_attachment(
|
||||||
"dummy.txt", len(b"zulip!"), "text/plain", b"zulip!", user_profile, zulip_realm
|
"dummy.txt", len(b"zulip!"), "text/plain", b"zulip!", user_profile, zulip_realm
|
||||||
)
|
)
|
||||||
# Ensure the correct realm id of the target realm is used instead of the bot's realm.
|
# Ensure the correct realm id of the target realm is used instead of the bot's realm.
|
||||||
self.assertTrue(uri.startswith(f"/user_uploads/{zulip_realm.id}/"))
|
self.assertTrue(url.startswith(f"/user_uploads/{zulip_realm.id}/"))
|
||||||
|
|
||||||
@use_s3_backend
|
@use_s3_backend
|
||||||
def test_upload_message_attachment_s3_with_undefined_content_type(self) -> None:
|
def test_upload_message_attachment_s3_with_undefined_content_type(self) -> None:
|
||||||
bucket = create_s3_buckets(settings.S3_AUTH_UPLOADS_BUCKET)[0]
|
bucket = create_s3_buckets(settings.S3_AUTH_UPLOADS_BUCKET)[0]
|
||||||
|
|
||||||
user_profile = self.example_user("hamlet")
|
user_profile = self.example_user("hamlet")
|
||||||
uri = upload_message_attachment("dummy.txt", len(b"zulip!"), None, b"zulip!", user_profile)
|
url = upload_message_attachment("dummy.txt", len(b"zulip!"), None, b"zulip!", user_profile)
|
||||||
|
|
||||||
path_id = re.sub("/user_uploads/", "", uri)
|
path_id = re.sub("/user_uploads/", "", url)
|
||||||
self.assertEqual(b"zulip!", bucket.Object(path_id).get()["Body"].read())
|
self.assertEqual(b"zulip!", bucket.Object(path_id).get()["Body"].read())
|
||||||
uploaded_file = Attachment.objects.get(owner=user_profile, path_id=path_id)
|
uploaded_file = Attachment.objects.get(owner=user_profile, path_id=path_id)
|
||||||
self.assert_length(b"zulip!", uploaded_file.size)
|
self.assert_length(b"zulip!", uploaded_file.size)
|
||||||
|
@ -117,11 +117,11 @@ class S3Test(ZulipTestCase):
|
||||||
bucket = create_s3_buckets(settings.S3_AUTH_UPLOADS_BUCKET)[0]
|
bucket = create_s3_buckets(settings.S3_AUTH_UPLOADS_BUCKET)[0]
|
||||||
|
|
||||||
user_profile = self.example_user("hamlet")
|
user_profile = self.example_user("hamlet")
|
||||||
uri = upload_message_attachment(
|
url = upload_message_attachment(
|
||||||
"dummy.txt", len(b"zulip!"), "text/plain", b"zulip!", user_profile
|
"dummy.txt", len(b"zulip!"), "text/plain", b"zulip!", user_profile
|
||||||
)
|
)
|
||||||
|
|
||||||
path_id = re.sub("/user_uploads/", "", uri)
|
path_id = re.sub("/user_uploads/", "", url)
|
||||||
self.assertIsNotNone(bucket.Object(path_id).get())
|
self.assertIsNotNone(bucket.Object(path_id).get())
|
||||||
self.assertTrue(delete_message_attachment(path_id))
|
self.assertTrue(delete_message_attachment(path_id))
|
||||||
with self.assertRaises(botocore.exceptions.ClientError):
|
with self.assertRaises(botocore.exceptions.ClientError):
|
||||||
|
@ -134,10 +134,10 @@ class S3Test(ZulipTestCase):
|
||||||
user_profile = self.example_user("hamlet")
|
user_profile = self.example_user("hamlet")
|
||||||
path_ids = []
|
path_ids = []
|
||||||
for n in range(1, 5):
|
for n in range(1, 5):
|
||||||
uri = upload_message_attachment(
|
url = upload_message_attachment(
|
||||||
"dummy.txt", len(b"zulip!"), "text/plain", b"zulip!", user_profile
|
"dummy.txt", len(b"zulip!"), "text/plain", b"zulip!", user_profile
|
||||||
)
|
)
|
||||||
path_id = re.sub("/user_uploads/", "", uri)
|
path_id = re.sub("/user_uploads/", "", url)
|
||||||
self.assertIsNotNone(bucket.Object(path_id).get())
|
self.assertIsNotNone(bucket.Object(path_id).get())
|
||||||
path_ids.append(path_id)
|
path_ids.append(path_id)
|
||||||
|
|
||||||
|
@ -169,17 +169,17 @@ class S3Test(ZulipTestCase):
|
||||||
user_profile = self.example_user("hamlet")
|
user_profile = self.example_user("hamlet")
|
||||||
path_ids = []
|
path_ids = []
|
||||||
for n in range(1, 5):
|
for n in range(1, 5):
|
||||||
uri = upload_message_attachment(
|
url = upload_message_attachment(
|
||||||
"dummy.txt", len(b"zulip!"), "text/plain", b"zulip!", user_profile
|
"dummy.txt", len(b"zulip!"), "text/plain", b"zulip!", user_profile
|
||||||
)
|
)
|
||||||
path_ids.append(re.sub("/user_uploads/", "", uri))
|
path_ids.append(re.sub("/user_uploads/", "", url))
|
||||||
found_paths = [r[0] for r in all_message_attachments()]
|
found_paths = [r[0] for r in all_message_attachments()]
|
||||||
self.assertEqual(sorted(found_paths), sorted(path_ids))
|
self.assertEqual(sorted(found_paths), sorted(path_ids))
|
||||||
|
|
||||||
@use_s3_backend
|
@use_s3_backend
|
||||||
def test_user_uploads_authed(self) -> None:
|
def test_user_uploads_authed(self) -> None:
|
||||||
"""
|
"""
|
||||||
A call to /json/user_uploads should return a uri and actually create an object.
|
A call to /json/user_uploads should return a url and actually create an object.
|
||||||
"""
|
"""
|
||||||
bucket = create_s3_buckets(settings.S3_AUTH_UPLOADS_BUCKET)[0]
|
bucket = create_s3_buckets(settings.S3_AUTH_UPLOADS_BUCKET)[0]
|
||||||
|
|
||||||
|
@ -191,11 +191,11 @@ class S3Test(ZulipTestCase):
|
||||||
response_dict = self.assert_json_success(result)
|
response_dict = self.assert_json_success(result)
|
||||||
self.assertIn("uri", response_dict)
|
self.assertIn("uri", response_dict)
|
||||||
base = "/user_uploads/"
|
base = "/user_uploads/"
|
||||||
uri = response_dict["uri"]
|
url = response_dict["uri"]
|
||||||
self.assertEqual(base, uri[: len(base)])
|
self.assertEqual(base, url[: len(base)])
|
||||||
|
|
||||||
# In development, this is just a redirect
|
# In development, this is just a redirect
|
||||||
response = self.client_get(uri)
|
response = self.client_get(url)
|
||||||
redirect_url = response["Location"]
|
redirect_url = response["Location"]
|
||||||
path = urllib.parse.urlparse(redirect_url).path
|
path = urllib.parse.urlparse(redirect_url).path
|
||||||
assert path.startswith("/")
|
assert path.startswith("/")
|
||||||
|
@ -204,7 +204,7 @@ class S3Test(ZulipTestCase):
|
||||||
|
|
||||||
prefix = f"/internal/s3/{settings.S3_AUTH_UPLOADS_BUCKET}.s3.amazonaws.com/"
|
prefix = f"/internal/s3/{settings.S3_AUTH_UPLOADS_BUCKET}.s3.amazonaws.com/"
|
||||||
with self.settings(DEVELOPMENT=False):
|
with self.settings(DEVELOPMENT=False):
|
||||||
response = self.client_get(uri)
|
response = self.client_get(url)
|
||||||
redirect_url = response["X-Accel-Redirect"]
|
redirect_url = response["X-Accel-Redirect"]
|
||||||
path = urllib.parse.urlparse(redirect_url).path
|
path = urllib.parse.urlparse(redirect_url).path
|
||||||
assert path.startswith(prefix)
|
assert path.startswith(prefix)
|
||||||
|
@ -212,9 +212,9 @@ class S3Test(ZulipTestCase):
|
||||||
self.assertEqual(b"zulip!", bucket.Object(key).get()["Body"].read())
|
self.assertEqual(b"zulip!", bucket.Object(key).get()["Body"].read())
|
||||||
|
|
||||||
# Check the download endpoint
|
# Check the download endpoint
|
||||||
download_uri = uri.replace("/user_uploads/", "/user_uploads/download/")
|
download_url = url.replace("/user_uploads/", "/user_uploads/download/")
|
||||||
with self.settings(DEVELOPMENT=False):
|
with self.settings(DEVELOPMENT=False):
|
||||||
response = self.client_get(download_uri)
|
response = self.client_get(download_url)
|
||||||
redirect_url = response["X-Accel-Redirect"]
|
redirect_url = response["X-Accel-Redirect"]
|
||||||
path = urllib.parse.urlparse(redirect_url).path
|
path = urllib.parse.urlparse(redirect_url).path
|
||||||
assert path.startswith(prefix)
|
assert path.startswith(prefix)
|
||||||
|
@ -223,11 +223,11 @@ class S3Test(ZulipTestCase):
|
||||||
|
|
||||||
# Now try the endpoint that's supposed to return a temporary URL for access
|
# Now try the endpoint that's supposed to return a temporary URL for access
|
||||||
# to the file.
|
# to the file.
|
||||||
result = self.client_get("/json" + uri)
|
result = self.client_get("/json" + url)
|
||||||
data = self.assert_json_success(result)
|
data = self.assert_json_success(result)
|
||||||
url_only_url = data["url"]
|
url_only_url = data["url"]
|
||||||
|
|
||||||
self.assertNotEqual(url_only_url, uri)
|
self.assertNotEqual(url_only_url, url)
|
||||||
self.assertIn("user_uploads/temporary/", url_only_url)
|
self.assertIn("user_uploads/temporary/", url_only_url)
|
||||||
self.assertTrue(url_only_url.endswith("zulip.txt"))
|
self.assertTrue(url_only_url.endswith("zulip.txt"))
|
||||||
# The generated URL has a token authorizing the requestor to access the file
|
# The generated URL has a token authorizing the requestor to access the file
|
||||||
|
@ -241,14 +241,14 @@ class S3Test(ZulipTestCase):
|
||||||
key = path[len(prefix) :]
|
key = path[len(prefix) :]
|
||||||
self.assertEqual(b"zulip!", bucket.Object(key).get()["Body"].read())
|
self.assertEqual(b"zulip!", bucket.Object(key).get()["Body"].read())
|
||||||
|
|
||||||
# The original uri shouldn't work when logged out:
|
# The original url shouldn't work when logged out:
|
||||||
with self.settings(DEVELOPMENT=False):
|
with self.settings(DEVELOPMENT=False):
|
||||||
result = self.client_get(uri)
|
result = self.client_get(url)
|
||||||
self.assertEqual(result.status_code, 403)
|
self.assertEqual(result.status_code, 403)
|
||||||
|
|
||||||
hamlet = self.example_user("hamlet")
|
hamlet = self.example_user("hamlet")
|
||||||
self.subscribe(hamlet, "Denmark")
|
self.subscribe(hamlet, "Denmark")
|
||||||
body = f"First message ...[zulip.txt](http://{hamlet.realm.host}" + uri + ")"
|
body = f"First message ...[zulip.txt](http://{hamlet.realm.host}" + url + ")"
|
||||||
self.send_stream_message(hamlet, "Denmark", body, "test")
|
self.send_stream_message(hamlet, "Denmark", body, "test")
|
||||||
|
|
||||||
@use_s3_backend
|
@use_s3_backend
|
||||||
|
@ -518,17 +518,17 @@ class S3Test(ZulipTestCase):
|
||||||
nonlocal total_bytes_transferred
|
nonlocal total_bytes_transferred
|
||||||
total_bytes_transferred += bytes_transferred
|
total_bytes_transferred += bytes_transferred
|
||||||
|
|
||||||
uri = upload_export_tarball(
|
url = upload_export_tarball(
|
||||||
user_profile.realm, tarball_path, percent_callback=percent_callback
|
user_profile.realm, tarball_path, percent_callback=percent_callback
|
||||||
)
|
)
|
||||||
# Verify the percent_callback API works
|
# Verify the percent_callback API works
|
||||||
self.assertEqual(total_bytes_transferred, 5)
|
self.assertEqual(total_bytes_transferred, 5)
|
||||||
|
|
||||||
result = re.search(re.compile(r"([0-9a-fA-F]{32})"), uri)
|
result = re.search(re.compile(r"([0-9a-fA-F]{32})"), url)
|
||||||
if result is not None:
|
if result is not None:
|
||||||
hex_value = result.group(1)
|
hex_value = result.group(1)
|
||||||
expected_url = f"https://{bucket.name}.s3.amazonaws.com/exports/{hex_value}/{os.path.basename(tarball_path)}"
|
expected_url = f"https://{bucket.name}.s3.amazonaws.com/exports/{hex_value}/{os.path.basename(tarball_path)}"
|
||||||
self.assertEqual(uri, expected_url)
|
self.assertEqual(url, expected_url)
|
||||||
|
|
||||||
# Delete the tarball.
|
# Delete the tarball.
|
||||||
with self.assertLogs(level="WARNING") as warn_log:
|
with self.assertLogs(level="WARNING") as warn_log:
|
||||||
|
@ -537,5 +537,5 @@ class S3Test(ZulipTestCase):
|
||||||
warn_log.output,
|
warn_log.output,
|
||||||
["WARNING:root:not_a_file does not exist. Its entry in the database will be removed."],
|
["WARNING:root:not_a_file does not exist. Its entry in the database will be removed."],
|
||||||
)
|
)
|
||||||
path_id = urllib.parse.urlparse(uri).path
|
path_id = urllib.parse.urlparse(url).path
|
||||||
self.assertEqual(delete_export_tarball(path_id), path_id)
|
self.assertEqual(delete_export_tarball(path_id), path_id)
|
||||||
|
|
Loading…
Reference in New Issue