python: Use urlsplit instead of urlparse.

Signed-off-by: Anders Kaseorg <anders@zulip.com>
This commit is contained in:
Anders Kaseorg 2023-12-05 12:25:00 -08:00 committed by Tim Abbott
parent 3853fa875a
commit 223b626256
21 changed files with 101 additions and 92 deletions

View File

@ -2,7 +2,7 @@ from contextlib import suppress
from datetime import timedelta
from decimal import Decimal
from typing import Any, Dict, Iterable, List, Optional, Union
from urllib.parse import urlencode, urlparse
from urllib.parse import urlencode, urlsplit
from django.conf import settings
from django.core.exceptions import ValidationError
@ -272,7 +272,7 @@ def support(
for key_word in key_words:
try:
URLValidator()(key_word)
parse_result = urlparse(key_word)
parse_result = urlsplit(key_word)
hostname = parse_result.hostname
assert hostname is not None
if parse_result.port:

View File

@ -50,7 +50,7 @@ import sys
from configparser import RawConfigParser
from typing import NoReturn
from urllib.error import HTTPError
from urllib.parse import urlencode, urljoin, urlparse
from urllib.parse import urlencode, urljoin, urlsplit
from urllib.request import Request, urlopen
sys.path.append(os.path.join(os.path.dirname(__file__), "..", ".."))
@ -131,7 +131,7 @@ def send_email_mirror(
if test:
return
if not urlparse(host).scheme:
if not urlsplit(host).scheme:
config_file = get_config_file()
http_only_config = get_config(config_file, "application_server", "http_only", "")
http_only = http_only_config == "true"

View File

@ -1,7 +1,7 @@
import os
from posixpath import basename
from typing import Any, List, Set
from urllib.parse import urlparse
from urllib.parse import urlsplit
from typing_extensions import override
@ -27,7 +27,7 @@ class UnusedImagesLinterSpider(BaseDocumentationSpider):
def _is_external_url(self, url: str) -> bool:
is_external = url.startswith("http") and self.start_urls[0] not in url
if self._has_extension(url) and f"localhost:9981/{self.images_path}" in url:
self.static_images.add(basename(urlparse(url).path))
self.static_images.add(basename(urlsplit(url).path))
return is_external or self._has_extension(url)
def closed(self, *args: Any, **kwargs: Any) -> None:

View File

@ -2,7 +2,7 @@ import json
import os
import re
from typing import Callable, Iterator, List, Optional, Union
from urllib.parse import urlparse, urlsplit
from urllib.parse import urlsplit
import scrapy
from scrapy.http import Request, Response
@ -233,7 +233,7 @@ class BaseDocumentationSpider(scrapy.Spider):
response = failure.value.response
# Hack: The filtering above does not catch this URL,
# likely due to a redirect.
if urlparse(response.url).netloc == "idmsa.apple.com":
if urlsplit(response.url).netloc == "idmsa.apple.com":
return None
if response.status == 405 and response.request.method == "HEAD":
# Method 'HEAD' not allowed, repeat request with 'GET'

View File

@ -8,7 +8,7 @@ import signal
import subprocess
import sys
from typing import List, Sequence
from urllib.parse import urlunparse
from urllib.parse import urlunsplit
TOOLS_DIR = os.path.dirname(os.path.abspath(__file__))
sys.path.insert(0, os.path.dirname(TOOLS_DIR))
@ -204,7 +204,7 @@ def transform_url(protocol: str, path: str, query: str, target_port: int, target
host = ":".join((target_host, str(target_port)))
# Here we are going to rewrite the path a bit so that it is in parity with
# what we will have for production
newpath = urlunparse((protocol, host, path, "", query, ""))
newpath = urlunsplit((protocol, host, path, query, ""))
return newpath

View File

@ -295,3 +295,12 @@ rules:
fix: time_machine.travel($TIME, tick=False)
severity: ERROR
message: "Use the time_machine package, rather than mocking timezone_now"
- id: urlparse
languages: [python]
pattern-either:
- pattern: urllib.parse.urlparse
- pattern: urllib.parse.urlunparse
- pattern: urllib.parse.ParseResult
severity: ERROR
message: "Use urlsplit rather than urlparse"

View File

@ -15,7 +15,7 @@ from typing import (
cast,
overload,
)
from urllib.parse import urlparse
from urllib.parse import urlsplit
import django_otp
from django.conf import settings
@ -393,8 +393,8 @@ def zulip_redirect_to_login(
resolved_login_url = resolve_url(login_url or settings.LOGIN_URL)
# If the login URL is the same scheme and net location then just
# use the path as the "next" url.
login_scheme, login_netloc = urlparse(resolved_login_url)[:2]
current_scheme, current_netloc = urlparse(path)[:2]
login_scheme, login_netloc = urlsplit(resolved_login_url)[:2]
current_scheme, current_netloc = urlsplit(path)[:2]
if (not login_scheme or login_scheme == current_scheme) and (
not login_netloc or login_netloc == current_netloc
):

View File

@ -26,7 +26,7 @@ from typing import (
Union,
cast,
)
from urllib.parse import parse_qs, quote, urlencode, urljoin, urlparse, urlsplit, urlunparse
from urllib.parse import parse_qs, quote, urlencode, urljoin, urlsplit, urlunsplit
from xml.etree.ElementTree import Element, SubElement
import ahocorasick
@ -486,7 +486,7 @@ def fetch_open_graph_image(url: str) -> Optional[Dict[str, Any]]:
def get_tweet_id(url: str) -> Optional[str]:
parsed_url = urlparse(url)
parsed_url = urlsplit(url)
if not (parsed_url.netloc == "twitter.com" or parsed_url.netloc.endswith(".twitter.com")):
return None
to_match = parsed_url.path
@ -715,7 +715,7 @@ class InlineInterestingLinkProcessor(markdown.treeprocessors.Treeprocessor):
def get_actual_image_url(self, url: str) -> str:
# Add specific per-site cases to convert image-preview URLs to image URLs.
# See https://github.com/zulip/zulip/issues/4658 for more information
parsed_url = urlparse(url)
parsed_url = urlsplit(url)
if parsed_url.netloc == "github.com" or parsed_url.netloc.endswith(".github.com"):
# https://github.com/zulip/zulip/blob/main/static/images/logo/zulip-icon-128x128.png ->
# https://raw.githubusercontent.com/zulip/zulip/main/static/images/logo/zulip-icon-128x128.png
@ -730,7 +730,7 @@ class InlineInterestingLinkProcessor(markdown.treeprocessors.Treeprocessor):
def is_image(self, url: str) -> bool:
if not self.zmd.image_preview_enabled:
return False
parsed_url = urlparse(url)
parsed_url = urlsplit(url)
# remove HTML URLs which end with image extensions that cannot be shorted
if parsed_url.netloc == "pasteboard.co":
return False
@ -742,7 +742,7 @@ class InlineInterestingLinkProcessor(markdown.treeprocessors.Treeprocessor):
# wikipedia.org to point to the actual image URL. It's
# structurally very similar to dropbox_image, and possibly
# should be rewritten to use open graph, but has some value.
parsed_url = urlparse(url)
parsed_url = urlsplit(url)
if parsed_url.netloc.lower().endswith(".wikipedia.org") and parsed_url.path.startswith(
"/wiki/File:"
):
@ -757,7 +757,7 @@ class InlineInterestingLinkProcessor(markdown.treeprocessors.Treeprocessor):
def dropbox_image(self, url: str) -> Optional[Dict[str, Any]]:
# TODO: The returned Dict could possibly be a TypedDict in future.
parsed_url = urlparse(url)
parsed_url = urlsplit(url)
if parsed_url.netloc == "dropbox.com" or parsed_url.netloc.endswith(".dropbox.com"):
is_album = parsed_url.path.startswith("/sc/") or parsed_url.path.startswith("/photos/")
# Only allow preview Dropbox shared links
@ -1566,8 +1566,8 @@ def sanitize_url(url: str) -> Optional[str]:
See the docstring on markdown.inlinepatterns.LinkPattern.sanitize_url.
"""
try:
parts = urlparse(url.replace(" ", "%20"))
scheme, netloc, path, params, query, fragment = parts
parts = urlsplit(url.replace(" ", "%20"))
scheme, netloc, path, query, fragment = parts
except ValueError:
# Bad URL - so bad it couldn't be parsed.
return ""
@ -1578,10 +1578,10 @@ def sanitize_url(url: str) -> Optional[str]:
scheme = "mailto"
elif scheme == "" and netloc == "" and len(path) > 0 and path[0] == "/":
# Allow domain-relative links
return urlunparse(("", "", path, params, query, fragment))
elif (scheme, netloc, path, params, query) == ("", "", "", "", "") and len(fragment) > 0:
return urlunsplit(("", "", path, query, fragment))
elif (scheme, netloc, path, query) == ("", "", "", "") and len(fragment) > 0:
# Allow fragment links
return urlunparse(("", "", "", "", "", fragment))
return urlunsplit(("", "", "", "", fragment))
# Zulip modification: If scheme is not specified, assume http://
# We re-enter sanitize_url because netloc etc. need to be re-parsed.
@ -1606,7 +1606,7 @@ def sanitize_url(url: str) -> Optional[str]:
# the colon check, which would also forbid a lot of legitimate URLs.
# URL passes all tests. Return URL as-is.
return urlunparse((scheme, netloc, path, params, query, fragment))
return urlunsplit((scheme, netloc, path, query, fragment))
def url_to_a(

View File

@ -1,5 +1,5 @@
from typing import Optional
from urllib.parse import urlparse
from urllib.parse import urlsplit
from bs4.element import Tag
from typing_extensions import override
@ -53,9 +53,9 @@ class GenericParser(BaseParser):
if isinstance(first_image, Tag) and first_image["src"] != "":
assert isinstance(first_image["src"], str)
try:
# We use urlparse and not URLValidator because we
# We use urlsplit and not URLValidator because we
# need to support relative URLs.
urlparse(first_image["src"])
urlsplit(first_image["src"])
except ValueError:
return None
return first_image["src"]

View File

@ -1,4 +1,4 @@
from urllib.parse import urlparse
from urllib.parse import urlsplit
from typing_extensions import override
@ -26,9 +26,9 @@ class OpenGraphParser(BaseParser):
data.description = tag["content"]
elif tag["property"] == "og:image":
try:
# We use urlparse and not URLValidator because we
# We use urlsplit and not URLValidator because we
# need to support relative URLs.
urlparse(tag["content"])
urlsplit(tag["content"])
except ValueError:
continue
data.image = tag["content"]

View File

@ -24,7 +24,7 @@ from typing import (
Type,
)
from unittest import mock
from urllib.parse import parse_qs, urlencode, urlparse
from urllib.parse import parse_qs, urlencode, urlsplit
import jwt
import ldap
@ -977,7 +977,7 @@ class SocialAuthBase(DesktopFlowTestingLib, ZulipTestCase, ABC):
headers: Any,
**extra_data: Any,
) -> "TestHttpResponse":
parsed_url = urlparse(result["Location"])
parsed_url = urlsplit(result["Location"])
csrf_state = parse_qs(parsed_url.query)["state"]
result = self.client_get(self.AUTH_FINISH_URL, dict(state=csrf_state), **headers)
return result
@ -1157,7 +1157,7 @@ class SocialAuthBase(DesktopFlowTestingLib, ZulipTestCase, ABC):
self.assertEqual(data["subdomain"], "zulip")
self.assertEqual(data["redirect_to"], "/user_uploads/image")
self.assertEqual(result.status_code, 302)
parsed_url = urlparse(result["Location"])
parsed_url = urlsplit(result["Location"])
url = f"{parsed_url.scheme}://{parsed_url.netloc}{parsed_url.path}"
self.assertTrue(url.startswith("http://zulip.testserver/accounts/login/subdomain/"))
@ -1181,7 +1181,7 @@ class SocialAuthBase(DesktopFlowTestingLib, ZulipTestCase, ABC):
self.assertEqual(data["subdomain"], "zulip")
self.assertEqual(data["redirect_to"], "/user_uploads/image")
self.assertEqual(result.status_code, 302)
parsed_url = urlparse(result["Location"])
parsed_url = urlsplit(result["Location"])
url = f"{parsed_url.scheme}://{parsed_url.netloc}{parsed_url.path}"
self.assertTrue(url.startswith("http://zulip.testserver/accounts/login/subdomain/"))
@ -1307,7 +1307,7 @@ class SocialAuthBase(DesktopFlowTestingLib, ZulipTestCase, ABC):
)
self.assertEqual(result.status_code, 302)
redirect_url = result["Location"]
parsed_url = urlparse(redirect_url)
parsed_url = urlsplit(redirect_url)
query_params = parse_qs(parsed_url.query)
self.assertEqual(parsed_url.scheme, "zulip")
self.assertEqual(query_params["realm"], ["http://zulip.testserver"])
@ -1403,7 +1403,7 @@ class SocialAuthBase(DesktopFlowTestingLib, ZulipTestCase, ABC):
self.assertEqual(data["full_name"], self.example_user("hamlet").full_name)
self.assertEqual(data["subdomain"], "zulip")
self.assertEqual(result.status_code, 302)
parsed_url = urlparse(result["Location"])
parsed_url = urlsplit(result["Location"])
url = f"{parsed_url.scheme}://{parsed_url.netloc}{parsed_url.path}"
self.assertTrue(url.startswith("http://zulip.testserver/accounts/login/subdomain/"))
hamlet = self.example_user("hamlet")
@ -1429,7 +1429,7 @@ class SocialAuthBase(DesktopFlowTestingLib, ZulipTestCase, ABC):
self.assertEqual(data["full_name"], name)
self.assertEqual(data["subdomain"], "zulip")
self.assertEqual(result.status_code, 302)
parsed_url = urlparse(result["Location"])
parsed_url = urlsplit(result["Location"])
url = f"{parsed_url.scheme}://{parsed_url.netloc}{parsed_url.path}"
self.assertTrue(url.startswith("http://zulip.testserver/accounts/login/subdomain/"))
@ -1472,7 +1472,7 @@ class SocialAuthBase(DesktopFlowTestingLib, ZulipTestCase, ABC):
if mobile_flow_otp:
self.assertEqual(result.status_code, 302)
redirect_url = result["Location"]
parsed_url = urlparse(redirect_url)
parsed_url = urlsplit(redirect_url)
query_params = parse_qs(parsed_url.query)
self.assertEqual(parsed_url.scheme, "zulip")
self.assertEqual(query_params["realm"], ["http://zulip.testserver"])
@ -1715,7 +1715,7 @@ class SocialAuthBase(DesktopFlowTestingLib, ZulipTestCase, ABC):
self.assertEqual(data["full_name"], name)
self.assertEqual(data["subdomain"], "zulip")
self.assertEqual(result.status_code, 302)
parsed_url = urlparse(result["Location"])
parsed_url = urlsplit(result["Location"])
url = f"{parsed_url.scheme}://{parsed_url.netloc}{parsed_url.path}"
self.assertTrue(url.startswith("http://zulip.testserver/accounts/login/subdomain/"))
@ -1739,7 +1739,7 @@ class SocialAuthBase(DesktopFlowTestingLib, ZulipTestCase, ABC):
self.assertEqual(data["full_name"], name)
self.assertEqual(data["subdomain"], "zulip")
self.assertEqual(result.status_code, 302)
parsed_url = urlparse(result["Location"])
parsed_url = urlsplit(result["Location"])
url = f"{parsed_url.scheme}://{parsed_url.netloc}{parsed_url.path}"
self.assertTrue(url.startswith("http://zulip.testserver/accounts/login/subdomain/"))
@ -1995,7 +1995,7 @@ class SAMLAuthBackendTest(SocialAuthBase):
assert "samlrequest" in result["Location"].lower()
self.client.cookies = result.cookies
parsed_url = urlparse(result["Location"])
parsed_url = urlsplit(result["Location"])
relay_state = parse_qs(parsed_url.query)["RelayState"][0]
# Make sure params are getting encoded into RelayState:
data = SAMLAuthBackend.get_data_from_redis(orjson.loads(relay_state)["state_token"])
@ -2148,7 +2148,7 @@ class SAMLAuthBackendTest(SocialAuthBase):
# Verify the redirect has the correct form - a LogoutRequest for hamlet
# is delivered to the IdP in the SAMLRequest param.
query_dict = parse_qs(urlparse(result["Location"]).query)
query_dict = parse_qs(urlsplit(result["Location"]).query)
saml_request_encoded = query_dict["SAMLRequest"][0]
saml_request = OneLogin_Saml2_Utils.decode_base64_and_inflate(saml_request_encoded).decode()
self.assertIn("<samlp:LogoutRequest", saml_request)
@ -2330,7 +2330,7 @@ class SAMLAuthBackendTest(SocialAuthBase):
redirect_to = result["Location"]
self.assertIn(settings.SOCIAL_AUTH_SAML_ENABLED_IDPS["test_idp"]["slo_url"], redirect_to)
parsed = urlparse(redirect_to)
parsed = urlsplit(redirect_to)
query_dict = parse_qs(parsed.query)
self.assertIn("SAMLResponse", query_dict)
@ -3014,7 +3014,7 @@ class SAMLAuthBackendTest(SocialAuthBase):
self.assertEqual(data["full_name"], self.name)
self.assertEqual(data["subdomain"], "zulip")
self.assertEqual(result.status_code, 302)
parsed_url = urlparse(result["Location"])
parsed_url = urlsplit(result["Location"])
url = f"{parsed_url.scheme}://{parsed_url.netloc}{parsed_url.path}"
self.assertTrue(url.startswith("http://zulip.testserver/accounts/login/subdomain/"))
@ -3049,7 +3049,7 @@ class SAMLAuthBackendTest(SocialAuthBase):
self.assertEqual(data["full_name"], self.name)
self.assertEqual(data["subdomain"], "zulip")
self.assertEqual(result.status_code, 302)
parsed_url = urlparse(result["Location"])
parsed_url = urlsplit(result["Location"])
url = f"{parsed_url.scheme}://{parsed_url.netloc}{parsed_url.path}"
self.assertTrue(url.startswith("http://zulip.testserver/accounts/login/subdomain/"))
@ -3070,7 +3070,7 @@ class SAMLAuthBackendTest(SocialAuthBase):
self.assertEqual(data["full_name"], self.name)
self.assertEqual(data["subdomain"], "zulip")
self.assertEqual(result.status_code, 302)
parsed_url = urlparse(result["Location"])
parsed_url = urlsplit(result["Location"])
url = f"{parsed_url.scheme}://{parsed_url.netloc}{parsed_url.path}"
self.assertTrue(url.startswith("http://zulip.testserver/accounts/login/subdomain/"))
@ -3344,7 +3344,7 @@ class AppleIdAuthBackendTest(AppleAuthMixin, SocialAuthBase):
headers: Any,
**extra_data: Any,
) -> "TestHttpResponse":
parsed_url = urlparse(result["Location"])
parsed_url = urlsplit(result["Location"])
state = parse_qs(parsed_url.query)["state"]
user_param = json.dumps(account_data_dict)
self.client.session.flush()
@ -3923,7 +3923,7 @@ class GitHubAuthBackendTest(SocialAuthBase):
expect_noreply_email_allowed: bool = False,
**extra_data: Any,
) -> "TestHttpResponse":
parsed_url = urlparse(result["Location"])
parsed_url = urlsplit(result["Location"])
csrf_state = parse_qs(parsed_url.query)["state"]
result = self.client_get(self.AUTH_FINISH_URL, dict(state=csrf_state), **headers)
@ -4120,7 +4120,7 @@ class GitHubAuthBackendTest(SocialAuthBase):
self.assertEqual(data["subdomain"], "zulip")
self.assertEqual(data["redirect_to"], "/user_uploads/image")
self.assertEqual(result.status_code, 302)
parsed_url = urlparse(result["Location"])
parsed_url = urlsplit(result["Location"])
url = f"{parsed_url.scheme}://{parsed_url.netloc}{parsed_url.path}"
self.assertTrue(url.startswith("http://zulip.testserver/accounts/login/subdomain/"))
@ -4145,7 +4145,7 @@ class GitHubAuthBackendTest(SocialAuthBase):
self.assertEqual(data["subdomain"], "zulip")
self.assertEqual(data["redirect_to"], "/user_uploads/image")
self.assertEqual(result.status_code, 302)
parsed_url = urlparse(result["Location"])
parsed_url = urlsplit(result["Location"])
url = f"{parsed_url.scheme}://{parsed_url.netloc}{parsed_url.path}"
self.assertTrue(url.startswith("http://zulip.testserver/accounts/login/subdomain/"))
@ -4174,7 +4174,7 @@ class GitHubAuthBackendTest(SocialAuthBase):
self.assertEqual(data["subdomain"], "zulip")
self.assertEqual(data["redirect_to"], "/user_uploads/image")
self.assertEqual(result.status_code, 302)
parsed_url = urlparse(result["Location"])
parsed_url = urlsplit(result["Location"])
url = f"{parsed_url.scheme}://{parsed_url.netloc}{parsed_url.path}"
self.assertTrue(url.startswith("http://zulip.testserver/accounts/login/subdomain/"))
@ -4203,7 +4203,7 @@ class GitHubAuthBackendTest(SocialAuthBase):
self.assertEqual(data["subdomain"], "zulip")
self.assertEqual(data["redirect_to"], "/user_uploads/image")
self.assertEqual(result.status_code, 302)
parsed_url = urlparse(result["Location"])
parsed_url = urlsplit(result["Location"])
url = f"{parsed_url.scheme}://{parsed_url.netloc}{parsed_url.path}"
self.assertTrue(url.startswith("http://zulip.testserver/accounts/login/subdomain/"))
@ -4264,7 +4264,7 @@ class GitHubAuthBackendTest(SocialAuthBase):
self.assertEqual(data["subdomain"], "zulip")
self.assertEqual(data["redirect_to"], "/user_uploads/image")
self.assertEqual(result.status_code, 302)
parsed_url = urlparse(result["Location"])
parsed_url = urlsplit(result["Location"])
url = f"{parsed_url.scheme}://{parsed_url.netloc}{parsed_url.path}"
self.assertTrue(url.startswith("http://zulip.testserver/accounts/login/subdomain/"))
@ -4347,7 +4347,7 @@ class GitHubAuthBackendTest(SocialAuthBase):
self.assertEqual(data["full_name"], account_data_dict["name"])
self.assertEqual(data["subdomain"], "zulip")
self.assertEqual(result.status_code, 302)
parsed_url = urlparse(result["Location"])
parsed_url = urlsplit(result["Location"])
url = f"{parsed_url.scheme}://{parsed_url.netloc}{parsed_url.path}"
self.assertTrue(url.startswith("http://zulip.testserver/accounts/login/subdomain/"))
@ -4486,7 +4486,7 @@ class GoogleAuthBackendTest(SocialAuthBase):
self.assertEqual(result.status_code, 302)
redirect_url = result["Location"]
parsed_url = urlparse(redirect_url)
parsed_url = urlsplit(redirect_url)
query_params = parse_qs(parsed_url.query)
self.assertEqual(parsed_url.scheme, "zulip")
self.assertEqual(query_params["realm"], ["http://zulip-mobile.testserver"])
@ -4531,7 +4531,7 @@ class GoogleAuthBackendTest(SocialAuthBase):
)
self.assertEqual(result.status_code, 302)
redirect_url = result["Location"]
parsed_url = urlparse(redirect_url)
parsed_url = urlsplit(redirect_url)
query_params = parse_qs(parsed_url.query)
self.assertEqual(parsed_url.scheme, "zulip")
self.assertEqual(query_params["realm"], ["http://zulip.testserver"])
@ -5529,7 +5529,7 @@ class TestZulipRemoteUserBackend(DesktopFlowTestingLib, ZulipTestCase):
self.assertEqual(result.status_code, 302)
url = result["Location"]
parsed_url = urlparse(url)
parsed_url = urlsplit(url)
self.assertEqual(parsed_url.path, "/accounts/login/sso/")
self.assertEqual(parsed_url.query, "param1=value1&params=value2")
@ -5673,7 +5673,7 @@ class TestZulipRemoteUserBackend(DesktopFlowTestingLib, ZulipTestCase):
)
self.assertEqual(result.status_code, 302)
redirect_url = result["Location"]
parsed_url = urlparse(redirect_url)
parsed_url = urlsplit(redirect_url)
query_params = parse_qs(parsed_url.query)
self.assertEqual(parsed_url.scheme, "zulip")
self.assertEqual(query_params["realm"], ["http://zulip.testserver"])
@ -5722,7 +5722,7 @@ class TestZulipRemoteUserBackend(DesktopFlowTestingLib, ZulipTestCase):
)
self.assertEqual(result.status_code, 302)
redirect_url = result["Location"]
parsed_url = urlparse(redirect_url)
parsed_url = urlsplit(redirect_url)
query_params = parse_qs(parsed_url.query)
self.assertEqual(parsed_url.scheme, "zulip")
self.assertEqual(query_params["realm"], ["http://zulip.testserver"])

View File

@ -2,7 +2,7 @@ import calendar
from datetime import timedelta, timezone
from typing import TYPE_CHECKING, Any, Dict
from unittest.mock import patch
from urllib.parse import urlparse
from urllib.parse import urlsplit
import orjson
import time_machine
@ -1011,7 +1011,7 @@ class HomeTest(ZulipTestCase):
self.assertTrue(result["Location"].endswith("/desktop_home/"))
result = self.client_get("/desktop_home/")
self.assertEqual(result.status_code, 302)
path = urlparse(result["Location"]).path
path = urlsplit(result["Location"]).path
self.assertEqual(path, "/")
@override_settings(SERVER_UPGRADE_NAG_DEADLINE_DAYS=365)

View File

@ -3,7 +3,7 @@ import time
from datetime import timedelta
from typing import TYPE_CHECKING, Any, Dict, Optional, Sequence, Union
from unittest.mock import MagicMock, patch
from urllib.parse import quote, quote_plus, urlencode, urlparse
from urllib.parse import quote, quote_plus, urlencode, urlsplit
import orjson
from django.conf import settings
@ -1095,7 +1095,7 @@ class EmailUnsubscribeTests(ZulipTestCase):
# An unknown message type "fake" produces an error.
user_profile = self.example_user("hamlet")
unsubscribe_link = one_click_unsubscribe_link(user_profile, "fake")
result = self.client_get(urlparse(unsubscribe_link).path)
result = self.client_get(urlsplit(unsubscribe_link).path)
self.assert_in_response("Unknown email unsubscribe request", result)
def test_message_notification_emails_unsubscribe(self) -> None:
@ -1109,7 +1109,7 @@ class EmailUnsubscribeTests(ZulipTestCase):
user_profile.save()
unsubscribe_link = one_click_unsubscribe_link(user_profile, "missed_messages")
result = self.client_get(urlparse(unsubscribe_link).path)
result = self.client_get(urlsplit(unsubscribe_link).path)
self.assertEqual(result.status_code, 200)
@ -1128,7 +1128,7 @@ class EmailUnsubscribeTests(ZulipTestCase):
# Simulate unsubscribing from the welcome e-mails.
unsubscribe_link = one_click_unsubscribe_link(user_profile, "welcome")
result = self.client_get(urlparse(unsubscribe_link).path)
result = self.client_get(urlsplit(unsubscribe_link).path)
# The welcome email jobs are no longer scheduled.
self.assertEqual(result.status_code, 200)
@ -1166,7 +1166,7 @@ class EmailUnsubscribeTests(ZulipTestCase):
# Simulate unsubscribing from digest e-mails.
unsubscribe_link = one_click_unsubscribe_link(user_profile, "digest")
result = self.client_get(urlparse(unsubscribe_link).path)
result = self.client_get(urlsplit(unsubscribe_link).path)
# The setting is toggled off, and scheduled jobs have been removed.
self.assertEqual(result.status_code, 200)
@ -1187,7 +1187,7 @@ class EmailUnsubscribeTests(ZulipTestCase):
user_profile.save()
unsubscribe_link = one_click_unsubscribe_link(user_profile, "login")
result = self.client_get(urlparse(unsubscribe_link).path)
result = self.client_get(urlsplit(unsubscribe_link).path)
self.assertEqual(result.status_code, 200)
@ -1204,7 +1204,7 @@ class EmailUnsubscribeTests(ZulipTestCase):
# Simulate unsubscribing from marketing e-mails.
unsubscribe_link = one_click_unsubscribe_link(user_profile, "marketing")
result = self.client_get(urlparse(unsubscribe_link).path)
result = self.client_get(urlsplit(unsubscribe_link).path)
self.assertEqual(result.status_code, 200)
# Circumvent user_profile caching.
@ -1223,7 +1223,7 @@ class EmailUnsubscribeTests(ZulipTestCase):
# Simulate unsubscribing from marketing e-mails.
unsubscribe_link = one_click_unsubscribe_link(user_profile, "marketing")
client = Client(enforce_csrf_checks=True)
result = client.post(urlparse(unsubscribe_link).path, {"List-Unsubscribe": "One-Click"})
result = client.post(urlsplit(unsubscribe_link).path, {"List-Unsubscribe": "One-Click"})
self.assertEqual(result.status_code, 200)
# Circumvent user_profile caching.

View File

@ -4,7 +4,7 @@ from io import BytesIO
from typing import Any, Dict, Iterator, List, Set, Tuple
from unittest import mock
from unittest.mock import ANY
from urllib.parse import parse_qs, urlparse
from urllib.parse import parse_qs, urlsplit
import orjson
import responses
@ -76,7 +76,7 @@ def request_callback(request: PreparedRequest) -> Tuple[int, Dict[str, str], byt
if request.url == "https://slack.com/api/users.list":
return (200, {}, orjson.dumps({"ok": True, "members": "user_data"}))
query_from_url = str(urlparse(request.url).query)
query_from_url = str(urlsplit(request.url).query)
qs = parse_qs(query_from_url)
if request.url and "https://slack.com/api/users.info" in request.url:
user2team_dict = {

View File

@ -1,7 +1,7 @@
import os
import re
from io import BytesIO, StringIO
from urllib.parse import urlparse
from urllib.parse import urlsplit
from django.conf import settings
from PIL import Image
@ -143,7 +143,7 @@ class LocalStorageTest(UploadSerializeMixin, ZulipTestCase):
# on-disk content, which nginx serves
result = self.client_get(url)
self.assertEqual(result.status_code, 200)
internal_redirect_path = urlparse(url).path.replace(
internal_redirect_path = urlsplit(url).path.replace(
"/user_avatars/", "/internal/local/user_avatars/"
)
self.assertEqual(result["X-Accel-Redirect"], internal_redirect_path)
@ -257,5 +257,5 @@ class LocalStorageTest(UploadSerializeMixin, ZulipTestCase):
warn_log.output,
["WARNING:root:not_a_file does not exist. Its entry in the database will be removed."],
)
path_id = urlparse(url).path
path_id = urlsplit(url).path
self.assertEqual(delete_export_tarball(path_id), path_id)

View File

@ -3,7 +3,7 @@ import os
import re
from io import BytesIO, StringIO
from unittest.mock import patch
from urllib.parse import urlparse
from urllib.parse import urlsplit
import botocore.exceptions
from django.conf import settings
@ -191,7 +191,7 @@ class S3Test(ZulipTestCase):
# In development, this is just a redirect
response = self.client_get(url)
redirect_url = response["Location"]
path = urlparse(redirect_url).path
path = urlsplit(redirect_url).path
assert path.startswith("/")
key = path[len("/") :]
self.assertEqual(b"zulip!", bucket.Object(key).get()["Body"].read())
@ -200,7 +200,7 @@ class S3Test(ZulipTestCase):
with self.settings(DEVELOPMENT=False):
response = self.client_get(url)
redirect_url = response["X-Accel-Redirect"]
path = urlparse(redirect_url).path
path = urlsplit(redirect_url).path
assert path.startswith(prefix)
key = path[len(prefix) :]
self.assertEqual(b"zulip!", bucket.Object(key).get()["Body"].read())
@ -210,7 +210,7 @@ class S3Test(ZulipTestCase):
with self.settings(DEVELOPMENT=False):
response = self.client_get(download_url)
redirect_url = response["X-Accel-Redirect"]
path = urlparse(redirect_url).path
path = urlsplit(redirect_url).path
assert path.startswith(prefix)
key = path[len(prefix) :]
self.assertEqual(b"zulip!", bucket.Object(key).get()["Body"].read())
@ -230,7 +230,7 @@ class S3Test(ZulipTestCase):
with self.settings(DEVELOPMENT=False):
self.client_get(url_only_url)
redirect_url = response["X-Accel-Redirect"]
path = urlparse(redirect_url).path
path = urlsplit(redirect_url).path
assert path.startswith(prefix)
key = path[len(prefix) :]
self.assertEqual(b"zulip!", bucket.Object(key).get()["Body"].read())
@ -532,5 +532,5 @@ class S3Test(ZulipTestCase):
warn_log.output,
["WARNING:root:not_a_file does not exist. Its entry in the database will be removed."],
)
path_id = urlparse(url).path
path_id = urlsplit(url).path
self.assertEqual(delete_export_tarball(path_id), path_id)

View File

@ -1,7 +1,7 @@
from collections import defaultdict
from functools import lru_cache
from typing import Any, Dict, Iterable, List, Mapping, Optional, Sequence, Tuple, Union
from urllib.parse import urlparse
from urllib.parse import urlsplit
import orjson
import requests
@ -50,7 +50,7 @@ class TornadoAdapter(HTTPAdapter):
request, stream=stream, timeout=timeout, verify=verify, cert=cert, proxies=proxies
)
except ConnectionError:
parsed_url = urlparse(request.url)
parsed_url = urlsplit(request.url)
logfile = (
f"tornado-{parsed_url.port}.log"
if settings.TORNADO_PROCESSES > 1

View File

@ -1,7 +1,7 @@
import logging
from contextlib import suppress
from typing import Type
from urllib.parse import urlparse
from urllib.parse import urlsplit
import orjson
from circuitbreaker import CircuitBreakerError, circuit
@ -34,7 +34,7 @@ def sentry_tunnel(
try:
envelope_header_line, envelope_items = request.body.split(b"\n", 1)
envelope_header = to_wild_value("envelope_header", envelope_header_line.decode("utf-8"))
dsn = urlparse(envelope_header["dsn"].tame(check_url))
dsn = urlsplit(envelope_header["dsn"].tame(check_url))
except Exception:
raise JsonableError(_("Invalid request format"))

View File

@ -4,7 +4,7 @@ import os
from datetime import timedelta
from mimetypes import guess_type
from typing import List, Optional, Union
from urllib.parse import quote, urlparse
from urllib.parse import quote, urlsplit
from django.conf import settings
from django.contrib.auth.models import AnonymousUser
@ -41,7 +41,7 @@ from zerver.models import UserProfile, validate_attachment_request
def patch_disposition_header(response: HttpResponse, url: str, is_attachment: bool) -> None:
filename = os.path.basename(urlparse(url).path)
filename = os.path.basename(urlsplit(url).path)
content_disposition = content_disposition_header(is_attachment, filename)
if content_disposition is not None:
@ -84,7 +84,7 @@ def serve_s3(request: HttpRequest, path_id: str, force_download: bool = False) -
# We over-escape the path, to work around it being impossible to
# get the _unescaped_ new internal request URI in nginx.
parsed_url = urlparse(url)
parsed_url = urlsplit(url)
assert parsed_url.hostname is not None
assert parsed_url.path is not None
assert parsed_url.query is not None

View File

@ -1,6 +1,6 @@
# Webhooks for external integrations.
from typing import Optional, Tuple
from urllib.parse import urlparse
from urllib.parse import urlsplit
from django.http import HttpRequest, HttpResponse
@ -224,7 +224,7 @@ def semaphore_2(payload: WildValue) -> Tuple[str, str, Optional[str], str]:
def is_github_repo(repo_url: str) -> bool:
return urlparse(repo_url).hostname == "github.com"
return urlsplit(repo_url).hostname == "github.com"
def summary_line(message: str) -> str:

View File

@ -30,7 +30,7 @@ from typing import (
Type,
TypeVar,
)
from urllib.parse import urlparse
from urllib.parse import urlsplit
import orjson
import sentry_sdk
@ -1132,7 +1132,7 @@ class DeferredWorker(QueueProcessingWorker):
assert public_url is not None
# Update the extra_data field now that the export is complete.
extra_data["export_path"] = urlparse(public_url).path
extra_data["export_path"] = urlsplit(public_url).path
export_event.extra_data = extra_data
export_event.save(update_fields=["extra_data"])