ruff: Bump target-version from py38 to py310.

Signed-off-by: Anders Kaseorg <anders@zulip.com>
This commit is contained in:
Anders Kaseorg 2024-07-11 17:30:32 -07:00 committed by Tim Abbott
parent 3f29bc42b1
commit 48202389b8
49 changed files with 1056 additions and 712 deletions

View File

@ -1432,12 +1432,16 @@ class TestLoggingCountStats(AnalyticsTestCase):
"gcm_options": gcm_options,
}
now = timezone_now()
with time_machine.travel(now, tick=False), mock.patch(
"zilencer.views.send_android_push_notification", return_value=1
), mock.patch("zilencer.views.send_apple_push_notification", return_value=1), mock.patch(
with (
time_machine.travel(now, tick=False),
mock.patch("zilencer.views.send_android_push_notification", return_value=1),
mock.patch("zilencer.views.send_apple_push_notification", return_value=1),
mock.patch(
"corporate.lib.stripe.RemoteServerBillingSession.current_count_for_billed_licenses",
return_value=10,
), self.assertLogs("zilencer.views", level="INFO"):
),
self.assertLogs("zilencer.views", level="INFO"),
):
result = self.uuid_post(
self.server_uuid,
"/api/v1/remotes/push/notify",
@ -1491,12 +1495,16 @@ class TestLoggingCountStats(AnalyticsTestCase):
"apns_payload": apns_payload,
"gcm_options": gcm_options,
}
with time_machine.travel(now, tick=False), mock.patch(
"zilencer.views.send_android_push_notification", return_value=1
), mock.patch("zilencer.views.send_apple_push_notification", return_value=1), mock.patch(
with (
time_machine.travel(now, tick=False),
mock.patch("zilencer.views.send_android_push_notification", return_value=1),
mock.patch("zilencer.views.send_apple_push_notification", return_value=1),
mock.patch(
"corporate.lib.stripe.RemoteServerBillingSession.current_count_for_billed_licenses",
return_value=10,
), self.assertLogs("zilencer.views", level="INFO"):
),
self.assertLogs("zilencer.views", level="INFO"),
):
result = self.uuid_post(
self.server_uuid,
"/api/v1/remotes/push/notify",
@ -1549,12 +1557,16 @@ class TestLoggingCountStats(AnalyticsTestCase):
realm_date_created=realm.date_created,
)
with time_machine.travel(now, tick=False), mock.patch(
"zilencer.views.send_android_push_notification", return_value=1
), mock.patch("zilencer.views.send_apple_push_notification", return_value=1), mock.patch(
with (
time_machine.travel(now, tick=False),
mock.patch("zilencer.views.send_android_push_notification", return_value=1),
mock.patch("zilencer.views.send_apple_push_notification", return_value=1),
mock.patch(
"corporate.lib.stripe.RemoteRealmBillingSession.current_count_for_billed_licenses",
return_value=10,
), self.assertLogs("zilencer.views", level="INFO"):
),
self.assertLogs("zilencer.views", level="INFO"),
):
result = self.uuid_post(
self.server_uuid,
"/api/v1/remotes/push/notify",

View File

@ -112,7 +112,7 @@ def handle_checkout_session_completed_event(
session.customer, stripe_session.metadata.get("user_id")
)
payment_method = stripe_setup_intent.payment_method
assert isinstance(payment_method, (str, type(None)))
assert isinstance(payment_method, (str, type(None))) # noqa: UP038 # https://github.com/python/mypy/issues/17413
if session.type in [
Session.CARD_UPDATE_FROM_BILLING_PAGE,

View File

@ -283,9 +283,10 @@ class RemoteBillingAuthenticationTest(RemoteRealmBillingTestCase):
def test_self_hosted_config_error_page(self) -> None:
self.login("desdemona")
with self.settings(
CORPORATE_ENABLED=False, PUSH_NOTIFICATION_BOUNCER_URL=None
), self.assertLogs("django.request"):
with (
self.settings(CORPORATE_ENABLED=False, PUSH_NOTIFICATION_BOUNCER_URL=None),
self.assertLogs("django.request"),
):
result = self.client_get("/self-hosted-billing/not-configured/")
self.assertEqual(result.status_code, 500)
self.assert_in_response(
@ -703,9 +704,10 @@ class RemoteBillingAuthenticationTest(RemoteRealmBillingTestCase):
# Now click the second confirmation link. The RemoteRealmBillingUser entry
# stays the same, since it's already been created, and the user is redirected
# normally further through the flow, while we log this event.
with time_machine.travel(now + timedelta(seconds=1), tick=False), self.assertLogs(
"corporate.stripe", "INFO"
) as mock_logger:
with (
time_machine.travel(now + timedelta(seconds=1), tick=False),
self.assertLogs("corporate.stripe", "INFO") as mock_logger,
):
result = self.client_get(second_confirmation_url, subdomain="selfhosting")
self.assertEqual(result.status_code, 302)
self.assertTrue(result["Location"].startswith("/remote-billing-login/"))

View File

@ -2253,10 +2253,13 @@ class StripeTest(StripeTestCase):
hamlet = self.example_user("hamlet")
self.login_user(hamlet)
self.add_card_to_customer_for_upgrade()
with patch(
with (
patch(
"corporate.lib.stripe.BillingSession.create_stripe_invoice_and_charge",
side_effect=Exception,
), self.assertLogs("corporate.stripe", "WARNING") as m:
),
self.assertLogs("corporate.stripe", "WARNING") as m,
):
response = self.upgrade(talk_to_stripe=False)
self.assertIn("ERROR:corporate.stripe:Uncaught exception in billing", m.output[0])
self.assertIn(m.records[0].stack_info, m.output[0])
@ -2273,9 +2276,12 @@ class StripeTest(StripeTestCase):
self.login_user(hamlet)
self.add_card_to_customer_for_upgrade()
with patch(
with (
patch(
"corporate.lib.stripe.BillingSession.process_initial_upgrade", side_effect=Exception
), self.assertLogs("corporate.stripe", "WARNING"):
),
self.assertLogs("corporate.stripe", "WARNING"),
):
response = self.upgrade()
response_dict = self.assert_json_success(response)
@ -2609,9 +2615,10 @@ class StripeTest(StripeTestCase):
self.assert_in_response(substring, response)
# schedule downgrade
with time_machine.travel(self.now + timedelta(days=3), tick=False), self.assertLogs(
"corporate.stripe", "INFO"
) as m:
with (
time_machine.travel(self.now + timedelta(days=3), tick=False),
self.assertLogs("corporate.stripe", "INFO") as m,
):
response = self.client_billing_patch(
"/billing/plan",
{"status": CustomerPlan.DOWNGRADE_AT_END_OF_FREE_TRIAL},
@ -3927,9 +3934,10 @@ class StripeTest(StripeTestCase):
expected_log = f"INFO:corporate.stripe:Change plan status: Customer.id: {stripe_customer_id}, CustomerPlan.id: {new_plan.id}, status: {CustomerPlan.DOWNGRADE_AT_END_OF_CYCLE}"
self.assertEqual(m.output[0], expected_log)
with self.assertRaises(BillingError) as context, self.assertLogs(
"corporate.stripe", "WARNING"
) as m:
with (
self.assertRaises(BillingError) as context,
self.assertLogs("corporate.stripe", "WARNING") as m,
):
with time_machine.travel(self.now, tick=False):
self.local_upgrade(
self.seat_count, True, CustomerPlan.BILLING_SCHEDULE_ANNUAL, True, False
@ -7010,9 +7018,10 @@ class TestRemoteRealmBillingFlow(StripeTestCase, RemoteRealmBillingTestCase):
self.assert_in_response(substring, response)
# schedule downgrade
with time_machine.travel(self.now + timedelta(days=3), tick=False), self.assertLogs(
"corporate.stripe", "INFO"
) as m:
with (
time_machine.travel(self.now + timedelta(days=3), tick=False),
self.assertLogs("corporate.stripe", "INFO") as m,
):
response = self.client_billing_patch(
"/billing/plan",
{"status": CustomerPlan.DOWNGRADE_AT_END_OF_FREE_TRIAL},
@ -7400,8 +7409,9 @@ class TestRemoteRealmBillingFlow(StripeTestCase, RemoteRealmBillingTestCase):
self.execute_remote_billing_authentication_flow(hamlet)
mock_invoice = MagicMock()
mock_invoice.hosted_invoice_url = "payments_page_url"
with time_machine.travel(self.now, tick=False), mock.patch(
"stripe.Invoice.retrieve", return_value=mock_invoice
with (
time_machine.travel(self.now, tick=False),
mock.patch("stripe.Invoice.retrieve", return_value=mock_invoice),
):
result = self.client_get(
f"{self.billing_session.billing_base_url}/upgrade/?tier={CustomerPlan.TIER_SELF_HOSTED_BASIC}",
@ -8894,9 +8904,10 @@ class TestRemoteServerBillingFlow(StripeTestCase, RemoteServerTestCase):
self.assert_in_response(substring, response)
# schedule downgrade
with time_machine.travel(self.now + timedelta(days=3), tick=False), self.assertLogs(
"corporate.stripe", "INFO"
) as m:
with (
time_machine.travel(self.now + timedelta(days=3), tick=False),
self.assertLogs("corporate.stripe", "INFO") as m,
):
response = self.client_billing_patch(
"/billing/plan",
{"status": CustomerPlan.DOWNGRADE_AT_END_OF_FREE_TRIAL},
@ -9210,8 +9221,9 @@ class TestRemoteServerBillingFlow(StripeTestCase, RemoteServerTestCase):
self.execute_remote_billing_authentication_flow(hamlet.delivery_email, hamlet.full_name)
mock_invoice = MagicMock()
mock_invoice.hosted_invoice_url = "payments_page_url"
with time_machine.travel(self.now, tick=False), mock.patch(
"stripe.Invoice.retrieve", return_value=mock_invoice
with (
time_machine.travel(self.now, tick=False),
mock.patch("stripe.Invoice.retrieve", return_value=mock_invoice),
):
result = self.client_get(
f"{self.billing_session.billing_base_url}/upgrade/?tier={CustomerPlan.TIER_SELF_HOSTED_BASIC}",
@ -9663,9 +9675,11 @@ class TestRemoteServerBillingFlow(StripeTestCase, RemoteServerTestCase):
self.remote_server.plan_type, RemoteZulipServer.PLAN_TYPE_SELF_MANAGED_LEGACY
)
with mock.patch("stripe.Invoice.create") as invoice_create, mock.patch(
"corporate.lib.stripe.send_email"
) as send_email, time_machine.travel(plan_end_date, tick=False):
with (
mock.patch("stripe.Invoice.create") as invoice_create,
mock.patch("corporate.lib.stripe.send_email") as send_email,
time_machine.travel(plan_end_date, tick=False),
):
invoice_plans_as_needed()
# Verify that for legacy plan with no next plan scheduled,
# invoice overdue email is not sent even if the last audit log
@ -9730,9 +9744,11 @@ class TestRemoteServerBillingFlow(StripeTestCase, RemoteServerTestCase):
)
licenses = max(min_licenses, server_user_count)
with mock.patch("stripe.Invoice.finalize_invoice") as invoice_create, mock.patch(
"corporate.lib.stripe.send_email"
) as send_email, time_machine.travel(end_date, tick=False):
with (
mock.patch("stripe.Invoice.finalize_invoice") as invoice_create,
mock.patch("corporate.lib.stripe.send_email") as send_email,
time_machine.travel(end_date, tick=False),
):
invoice_plans_as_needed()
# Verify that for legacy plan with next plan scheduled, invoice
# overdue email is sent if the last audit log is stale.

View File

@ -1,6 +1,6 @@
[tool.black]
line-length = 100
target-version = ["py38"]
target-version = ["py310"]
[tool.isort]
src_paths = [".", "tools"]
@ -99,7 +99,7 @@ warn_required_dynamic_aliases = true
[tool.ruff]
line-length = 100
src = [".", "tools"]
target-version = "py38"
target-version = "py310"
[tool.ruff.lint]
# See https://github.com/astral-sh/ruff#rules for error code definitions.

View File

@ -33,9 +33,10 @@ def write_updated_configs() -> None:
expected_ports = list(range(9800, ports[-1] + 1))
assert ports == expected_ports, f"ports ({ports}) must be contiguous, starting with 9800"
with open("/etc/zulip/nginx_sharding_map.conf.tmp", "w") as nginx_sharding_conf_f, open(
"/etc/zulip/sharding.json.tmp", "w"
) as sharding_json_f:
with (
open("/etc/zulip/nginx_sharding_map.conf.tmp", "w") as nginx_sharding_conf_f,
open("/etc/zulip/sharding.json.tmp", "w") as sharding_json_f,
):
if len(ports) == 1:
nginx_sharding_conf_f.write('map "" $tornado_server {\n')
nginx_sharding_conf_f.write(" default http://tornado;\n")

View File

@ -15,13 +15,12 @@ import subprocess
import sys
import time
import uuid
import zoneinfo
from collections.abc import Sequence
from datetime import datetime, timedelta
from typing import IO, Any, Literal, overload
from urllib.parse import SplitResult
import zoneinfo
DEPLOYMENTS_DIR = "/home/zulip/deployments"
LOCK_DIR = os.path.join(DEPLOYMENTS_DIR, "lock")
TIMESTAMP_FORMAT = "%Y-%m-%d-%H-%M-%S"

View File

@ -2,7 +2,6 @@
import json
import os
import sys
import zoneinfo
ZULIP_PATH = os.path.join(os.path.dirname(os.path.abspath(__file__)), "../../")

View File

@ -58,8 +58,9 @@ def vnu_servlet() -> Iterator[None]:
proc.terminate()
with vnu_servlet(), test_server_running(
options.skip_provision_check, external_host, log_file=LOG_FILE, dots=True
with (
vnu_servlet(),
test_server_running(options.skip_provision_check, external_host, log_file=LOG_FILE, dots=True),
):
ret_help_doc = subprocess.call(
["scrapy", "crawl_with_status", *extra_args, "help_documentation_crawler"],

View File

@ -58,9 +58,10 @@ class TestRuleList(TestCase):
for line in rule.get("bad_lines", []):
for filename in rule.get("include_only", {"foo.bar"}):
with patch(
"builtins.open", return_value=StringIO(line + "\n\n"), autospec=True
), patch("builtins.print"):
with (
patch("builtins.open", return_value=StringIO(line + "\n\n"), autospec=True),
patch("builtins.print"),
):
self.assertTrue(
RuleList([], [rule]).custom_check_file(filename, "baz", ""),
f"The pattern '{pattern}' didn't match the line '{line}' while it should.",

View File

@ -1,8 +1,8 @@
import logging
import zoneinfo
from email.headerregistry import Address
from typing import Any, Literal
import zoneinfo
from django.conf import settings
from django.db import transaction
from django.utils.timezone import get_current_timezone_name as timezone_get_current_timezone_name

View File

@ -5,6 +5,7 @@ import os
import re
import subprocess
import sys
import zoneinfo
from collections import defaultdict
from dataclasses import dataclass
from datetime import timedelta
@ -12,7 +13,6 @@ from email.headerregistry import Address
from typing import Any
import lxml.html
import zoneinfo
from bs4 import BeautifulSoup
from django.conf import settings
from django.contrib.auth import get_backends

View File

@ -226,9 +226,10 @@ class ZulipTestCaseMixin(SimpleTestCase):
if not settings.BAN_CONSOLE_OUTPUT and self.expected_console_output is None:
return super().run(result)
extra_output_finder = ExtraConsoleOutputFinder()
with tee_stderr_and_find_extra_console_output(
extra_output_finder
), tee_stdout_and_find_extra_console_output(extra_output_finder):
with (
tee_stderr_and_find_extra_console_output(extra_output_finder),
tee_stdout_and_find_extra_console_output(extra_output_finder),
):
test_result = super().run(result)
if extra_output_finder.full_extra_output and (
test_result is None or test_result.wasSuccessful()
@ -1567,9 +1568,13 @@ Output:
This raises a failure inside of the try/except block of
markdown.__init__.do_convert.
"""
with mock.patch(
"zerver.lib.markdown.unsafe_timeout", side_effect=subprocess.CalledProcessError(1, [])
), self.assertLogs(level="ERROR"): # For markdown_logger.exception
with (
mock.patch(
"zerver.lib.markdown.unsafe_timeout",
side_effect=subprocess.CalledProcessError(1, []),
),
self.assertLogs(level="ERROR"),
): # For markdown_logger.exception
yield
def create_default_device(

View File

@ -29,13 +29,13 @@ for any particular type of object.
"""
import re
import zoneinfo
from collections.abc import Callable, Collection, Container, Iterator
from dataclasses import dataclass
from datetime import datetime, timezone
from typing import Any, NoReturn, TypeVar, cast, overload
import orjson
import zoneinfo
from django.core.exceptions import ValidationError
from django.core.validators import URLValidator, validate_email
from django.utils.translation import gettext as _

View File

@ -123,8 +123,9 @@ class ThreadedWorker(threading.Thread):
@override
def run(self) -> None:
with configure_scope() as scope, log_and_exit_if_exception(
self.logger, self.queue_name, threaded=True
with (
configure_scope() as scope,
log_and_exit_if_exception(self.logger, self.queue_name, threaded=True),
):
scope.set_tag("queue_worker", self.queue_name)
worker = get_worker(self.queue_name, threaded=True)

View File

@ -1,6 +1,6 @@
import zoneinfo
from typing import Any
import zoneinfo
from django.conf import settings
from django.contrib.auth.signals import user_logged_in, user_logged_out
from django.dispatch import receiver

View File

@ -279,8 +279,9 @@ class AuthBackendTest(ZulipTestCase):
user_profile.set_password(password)
user_profile.save()
with mock.patch("zproject.backends.email_auth_enabled", return_value=False), mock.patch(
"zproject.backends.password_auth_enabled", return_value=True
with (
mock.patch("zproject.backends.email_auth_enabled", return_value=False),
mock.patch("zproject.backends.password_auth_enabled", return_value=True),
):
return_data: dict[str, bool] = {}
user = EmailAuthBackend().authenticate(
@ -370,13 +371,17 @@ class AuthBackendTest(ZulipTestCase):
user_profile.set_password(password)
user_profile.save()
with self.settings(
with (
self.settings(
PASSWORD_HASHERS=(
"django.contrib.auth.hashers.MD5PasswordHasher",
"django.contrib.auth.hashers.SHA1PasswordHasher",
),
PASSWORD_MIN_LENGTH=30,
), self.assertLogs("zulip.auth.email", level="INFO"), self.assertRaises(JsonableError) as m:
),
self.assertLogs("zulip.auth.email", level="INFO"),
self.assertRaises(JsonableError) as m,
):
EmailAuthBackend().authenticate(
request=mock.MagicMock(),
username=self.example_email("hamlet"),
@ -528,9 +533,10 @@ class RateLimitAuthenticationTests(ZulipTestCase):
request.session = mock.MagicMock()
return attempt_authentication_func(request, username, password)
with mock.patch.object(
RateLimitedAuthenticationByUsername, "key", new=_mock_key
), ratelimit_rule(10, 2, domain="authenticate_by_username"):
with (
mock.patch.object(RateLimitedAuthenticationByUsername, "key", new=_mock_key),
ratelimit_rule(10, 2, domain="authenticate_by_username"),
):
try:
start_time = time.time()
with mock.patch("time.time", return_value=start_time):
@ -944,9 +950,10 @@ class SocialAuthBase(DesktopFlowTestingLib, ZulipTestCase, ABC):
def test_social_auth_no_key(self) -> None:
account_data_dict = self.get_account_data_dict(email=self.email, name=self.name)
with self.settings(**{self.CLIENT_KEY_SETTING: None}), self.assertLogs(
"django.request", level="ERROR"
) as m:
with (
self.settings(**{self.CLIENT_KEY_SETTING: None}),
self.assertLogs("django.request", level="ERROR") as m,
):
result = self.social_auth_test(
account_data_dict, subdomain="zulip", next="/user_uploads/image"
)
@ -959,9 +966,10 @@ class SocialAuthBase(DesktopFlowTestingLib, ZulipTestCase, ABC):
def test_config_error_development(self) -> None:
if hasattr(self, "CLIENT_KEY_SETTING") and hasattr(self, "CLIENT_SECRET_SETTING"):
with self.settings(**{self.CLIENT_KEY_SETTING: None}), self.assertLogs(
"django.request", level="ERROR"
) as m:
with (
self.settings(**{self.CLIENT_KEY_SETTING: None}),
self.assertLogs("django.request", level="ERROR") as m,
):
result = self.client_get(self.LOGIN_URL)
self.assertEqual(result.status_code, 500)
self.assert_in_response("Configuration error", result)
@ -980,9 +988,10 @@ class SocialAuthBase(DesktopFlowTestingLib, ZulipTestCase, ABC):
@override_settings(DEVELOPMENT=False)
def test_config_error_production(self) -> None:
if hasattr(self, "CLIENT_KEY_SETTING") and hasattr(self, "CLIENT_SECRET_SETTING"):
with self.settings(**{self.CLIENT_KEY_SETTING: None}), self.assertLogs(
"django.request", level="ERROR"
) as m:
with (
self.settings(**{self.CLIENT_KEY_SETTING: None}),
self.assertLogs("django.request", level="ERROR") as m,
):
result = self.client_get(self.LOGIN_URL)
self.assertEqual(result.status_code, 500)
self.assert_in_response("Configuration error", result)
@ -1731,7 +1740,8 @@ class SocialAuthBase(DesktopFlowTestingLib, ZulipTestCase, ABC):
account_data_dict = self.get_account_data_dict(email=email, name=name)
backend_path = f"zproject.backends.{self.BACKEND_CLASS.__name__}"
with self.settings(
with (
self.settings(
POPULATE_PROFILE_VIA_LDAP=True,
LDAP_APPEND_DOMAIN="zulip.com",
AUTH_LDAP_USER_ATTR_MAP=ldap_user_attr_map,
@ -1740,7 +1750,9 @@ class SocialAuthBase(DesktopFlowTestingLib, ZulipTestCase, ABC):
"zproject.backends.ZulipLDAPUserPopulator",
"zproject.backends.ZulipDummyBackend",
),
), self.assertLogs(level="WARNING") as log_warn:
),
self.assertLogs(level="WARNING") as log_warn,
):
result = self.social_auth_test(
account_data_dict,
expect_choose_email_screen=True,
@ -1798,7 +1810,8 @@ class SocialAuthBase(DesktopFlowTestingLib, ZulipTestCase, ABC):
account_data_dict = self.get_account_data_dict(email=email, name=name)
backend_path = f"zproject.backends.{self.BACKEND_CLASS.__name__}"
with self.settings(
with (
self.settings(
POPULATE_PROFILE_VIA_LDAP=True,
LDAP_EMAIL_ATTR="mail",
AUTH_LDAP_USER_ATTR_MAP=ldap_user_attr_map,
@ -1807,9 +1820,10 @@ class SocialAuthBase(DesktopFlowTestingLib, ZulipTestCase, ABC):
"zproject.backends.ZulipLDAPAuthBackend",
"zproject.backends.ZulipDummyBackend",
),
), self.assertLogs("zulip.ldap", level="DEBUG") as log_debug, self.assertLogs(
level="WARNING"
) as log_warn:
),
self.assertLogs("zulip.ldap", level="DEBUG") as log_debug,
self.assertLogs(level="WARNING") as log_warn,
):
account_data_dict = self.get_account_data_dict(email=email, name=name)
result = self.social_auth_test(
account_data_dict,
@ -1839,10 +1853,13 @@ class SocialAuthBase(DesktopFlowTestingLib, ZulipTestCase, ABC):
)
def test_social_auth_complete(self) -> None:
with mock.patch(
with (
mock.patch(
"social_core.backends.oauth.BaseOAuth2.process_error",
side_effect=AuthFailed("Not found"),
), self.assertLogs(self.logger_string, level="INFO") as m:
),
self.assertLogs(self.logger_string, level="INFO") as m,
):
result = self.client_get(reverse("social:complete", args=[self.backend.name]))
self.assertEqual(result.status_code, 302)
self.assertIn("login", result["Location"])
@ -1853,10 +1870,13 @@ class SocialAuthBase(DesktopFlowTestingLib, ZulipTestCase, ABC):
],
)
with mock.patch(
with (
mock.patch(
"social_core.backends.oauth.BaseOAuth2.auth_complete",
side_effect=requests.exceptions.HTTPError,
), self.assertLogs(self.logger_string, level="INFO") as m:
),
self.assertLogs(self.logger_string, level="INFO") as m,
):
result = self.client_get(reverse("social:complete", args=[self.backend.name]))
self.assertEqual(result.status_code, 302)
self.assertIn("login", result["Location"])
@ -1868,10 +1888,13 @@ class SocialAuthBase(DesktopFlowTestingLib, ZulipTestCase, ABC):
)
def test_social_auth_complete_when_base_exc_is_raised(self) -> None:
with mock.patch(
with (
mock.patch(
"social_core.backends.oauth.BaseOAuth2.auth_complete",
side_effect=AuthStateForbidden("State forbidden"),
), self.assertLogs(self.logger_string, level="WARNING"):
),
self.assertLogs(self.logger_string, level="WARNING"),
):
result = self.client_get(reverse("social:complete", args=[self.backend.name]))
self.assertEqual(result.status_code, 302)
self.assertIn("login", result["Location"])
@ -2070,12 +2093,13 @@ class SAMLAuthBackendTest(SocialAuthBase):
if make_validity_checks_pass:
# It's hard to create fully-correct LogoutRequests with signatures in tests,
# so we rely on mocking the validating functions instead.
with mock.patch.object(
OneLogin_Saml2_Logout_Request, "is_valid", return_value=True
), mock.patch.object(
with (
mock.patch.object(OneLogin_Saml2_Logout_Request, "is_valid", return_value=True),
mock.patch.object(
OneLogin_Saml2_Auth,
"validate_request_signature",
return_value=True,
),
):
result = self.client_get("http://zulip.testserver/complete/saml/", parameters)
else:
@ -2130,12 +2154,13 @@ class SAMLAuthBackendTest(SocialAuthBase):
logout_response: str = base64.b64encode(unencoded_logout_response.encode()).decode()
# It's hard to create fully-correct LogoutResponse with signatures in tests,
# so we rely on mocking the validating functions instead.
with mock.patch.object(
OneLogin_Saml2_Logout_Response, "is_valid", return_value=True
), mock.patch.object(
with (
mock.patch.object(OneLogin_Saml2_Logout_Response, "is_valid", return_value=True),
mock.patch.object(
OneLogin_Saml2_Auth,
"validate_response_signature",
return_value=True,
),
):
result = self.client_get(
"/complete/saml/",
@ -2528,9 +2553,10 @@ class SAMLAuthBackendTest(SocialAuthBase):
@override
def test_social_auth_complete(self) -> None:
with mock.patch.object(OneLogin_Saml2_Response, "is_valid", return_value=True):
with mock.patch.object(
OneLogin_Saml2_Auth, "is_authenticated", return_value=False
), self.assertLogs(self.logger_string, level="INFO") as m:
with (
mock.patch.object(OneLogin_Saml2_Auth, "is_authenticated", return_value=False),
self.assertLogs(self.logger_string, level="INFO") as m,
):
# This mock causes AuthFailed to be raised.
saml_response = self.generate_saml_response(self.email, self.name)
relay_state = orjson.dumps(
@ -2554,10 +2580,13 @@ class SAMLAuthBackendTest(SocialAuthBase):
@override
def test_social_auth_complete_when_base_exc_is_raised(self) -> None:
with mock.patch.object(OneLogin_Saml2_Response, "is_valid", return_value=True):
with mock.patch(
with (
mock.patch(
"social_core.backends.saml.SAMLAuth.auth_complete",
side_effect=AuthStateForbidden("State forbidden"),
), self.assertLogs(self.logger_string, level="WARNING") as m:
),
self.assertLogs(self.logger_string, level="WARNING") as m,
):
saml_response = self.generate_saml_response(self.email, self.name)
relay_state = orjson.dumps(
dict(
@ -2776,8 +2805,9 @@ class SAMLAuthBackendTest(SocialAuthBase):
validation in the underlying libraries.
"""
with self.assertLogs(self.logger_string, level="INFO") as m, mock.patch.object(
SAMLDocument, "get_issuing_idp", return_value="test_idp"
with (
self.assertLogs(self.logger_string, level="INFO") as m,
mock.patch.object(SAMLDocument, "get_issuing_idp", return_value="test_idp"),
):
relay_state = orjson.dumps(
dict(
@ -3820,9 +3850,10 @@ class GenericOpenIdConnectTest(SocialAuthBase):
mock_oidc_setting_dict = copy.deepcopy(settings.SOCIAL_AUTH_OIDC_ENABLED_IDPS)
[idp_config_dict] = mock_oidc_setting_dict.values()
del idp_config_dict["client_id"]
with self.settings(SOCIAL_AUTH_OIDC_ENABLED_IDPS=mock_oidc_setting_dict), self.assertLogs(
"django.request", level="ERROR"
) as m:
with (
self.settings(SOCIAL_AUTH_OIDC_ENABLED_IDPS=mock_oidc_setting_dict),
self.assertLogs("django.request", level="ERROR") as m,
):
result = self.social_auth_test(
account_data_dict, subdomain="zulip", next="/user_uploads/image"
)
@ -3843,9 +3874,10 @@ class GenericOpenIdConnectTest(SocialAuthBase):
mock_oidc_setting_dict = copy.deepcopy(settings.SOCIAL_AUTH_OIDC_ENABLED_IDPS)
[idp_config_dict] = mock_oidc_setting_dict.values()
mock_oidc_setting_dict["secondprovider"] = idp_config_dict
with self.settings(SOCIAL_AUTH_OIDC_ENABLED_IDPS=mock_oidc_setting_dict), self.assertLogs(
"django.request", level="ERROR"
) as m:
with (
self.settings(SOCIAL_AUTH_OIDC_ENABLED_IDPS=mock_oidc_setting_dict),
self.assertLogs("django.request", level="ERROR") as m,
):
result = self.social_auth_test(
account_data_dict, subdomain="zulip", next="/user_uploads/image"
)
@ -4000,10 +4032,13 @@ class GitHubAuthBackendTest(SocialAuthBase):
account_data_dict = self.get_account_data_dict(email=self.email, name=self.name)
subdomain = "zulip"
realm = get_realm(subdomain)
with mock.patch(
with (
mock.patch(
"social_core.backends.github.GithubTeamOAuth2.user_data",
side_effect=AuthFailed("Not found"),
), self.assertLogs(self.logger_string, level="INFO") as mock_info:
),
self.assertLogs(self.logger_string, level="INFO") as mock_info,
):
result = self.social_auth_test(account_data_dict, subdomain=subdomain)
self.assertEqual(result.status_code, 302)
self.assertEqual(result["Location"], realm.url + "/login/")
@ -4036,10 +4071,13 @@ class GitHubAuthBackendTest(SocialAuthBase):
account_data_dict = self.get_account_data_dict(email=self.email, name=self.name)
subdomain = "zulip"
realm = get_realm(subdomain)
with mock.patch(
with (
mock.patch(
"social_core.backends.github.GithubOrganizationOAuth2.user_data",
side_effect=AuthFailed("Not found"),
), self.assertLogs(self.logger_string, level="INFO") as mock_info:
),
self.assertLogs(self.logger_string, level="INFO") as mock_info,
):
result = self.social_auth_test(account_data_dict, subdomain=subdomain)
self.assertEqual(result.status_code, 302)
self.assertEqual(result["Location"], realm.url + "/login/")
@ -5002,13 +5040,16 @@ class FetchAPIKeyTest(ZulipTestCase):
user_profile.set_password(password)
user_profile.save()
with self.settings(
with (
self.settings(
PASSWORD_HASHERS=(
"django.contrib.auth.hashers.MD5PasswordHasher",
"django.contrib.auth.hashers.SHA1PasswordHasher",
),
PASSWORD_MIN_LENGTH=30,
), self.assertLogs("zulip.auth.email", level="INFO"):
),
self.assertLogs("zulip.auth.email", level="INFO"),
):
result = self.client_post(
"/api/v1/fetch_api_key",
dict(username=self.email, password=password),
@ -5475,9 +5516,10 @@ class TestDevAuthBackend(ZulipTestCase):
def test_login_failure(self) -> None:
email = self.example_email("hamlet")
data = {"direct_email": email}
with self.settings(
AUTHENTICATION_BACKENDS=("zproject.backends.EmailAuthBackend",)
), self.assertLogs("django.request", level="ERROR") as m:
with (
self.settings(AUTHENTICATION_BACKENDS=("zproject.backends.EmailAuthBackend",)),
self.assertLogs("django.request", level="ERROR") as m,
):
response = self.client_post("/accounts/login/local/", data)
self.assertEqual(response.status_code, 500)
self.assert_in_response("Configuration error", response)
@ -5605,12 +5647,15 @@ class TestZulipRemoteUserBackend(DesktopFlowTestingLib, ZulipTestCase):
self.assert_json_error_contains(result, "Enter a valid email address.", 400)
def test_login_failure_due_to_missing_field(self) -> None:
with self.settings(
with (
self.settings(
AUTHENTICATION_BACKENDS=(
"zproject.backends.ZulipRemoteUserBackend",
"zproject.backends.ZulipDummyBackend",
)
), self.assertLogs("django.request", level="ERROR") as m:
),
self.assertLogs("django.request", level="ERROR") as m,
):
result = self.client_get("/accounts/login/sso/")
self.assertEqual(result.status_code, 500)
self.assert_in_response("Configuration error", result)
@ -6263,9 +6308,10 @@ class TestLDAP(ZulipLDAPTestCase):
@override_settings(AUTHENTICATION_BACKENDS=("zproject.backends.ZulipLDAPAuthBackend",))
def test_login_failure_due_to_nonexistent_user(self) -> None:
with self.settings(LDAP_APPEND_DOMAIN="zulip.com"), self.assertLogs(
"zulip.ldap", level="DEBUG"
) as log_debug:
with (
self.settings(LDAP_APPEND_DOMAIN="zulip.com"),
self.assertLogs("zulip.ldap", level="DEBUG") as log_debug,
):
user = self.backend.authenticate(
request=mock.MagicMock(),
username="nonexistent@zulip.com",
@ -6412,9 +6458,10 @@ class TestLDAP(ZulipLDAPTestCase):
@override_settings(AUTHENTICATION_BACKENDS=("zproject.backends.ZulipLDAPAuthBackend",))
def test_login_failure_when_domain_does_not_match(self) -> None:
with self.settings(LDAP_APPEND_DOMAIN="acme.com"), self.assertLogs(
"zulip.ldap", "DEBUG"
) as debug_log:
with (
self.settings(LDAP_APPEND_DOMAIN="acme.com"),
self.assertLogs("zulip.ldap", "DEBUG") as debug_log,
):
user_profile = self.backend.authenticate(
request=mock.MagicMock(),
username=self.example_email("hamlet"),
@ -6474,10 +6521,13 @@ class TestLDAP(ZulipLDAPTestCase):
def test_login_failure_user_account_control(self) -> None:
self.change_ldap_user_attr("hamlet", "userAccountControl", "2")
with self.settings(
with (
self.settings(
LDAP_APPEND_DOMAIN="zulip.com",
AUTH_LDAP_USER_ATTR_MAP={"userAccountControl": "userAccountControl"},
), self.assertLogs("django_auth_ldap", "DEBUG") as debug_log:
),
self.assertLogs("django_auth_ldap", "DEBUG") as debug_log,
):
user_profile = self.backend.authenticate(
request=mock.MagicMock(),
username=self.example_email("hamlet"),
@ -6635,9 +6685,10 @@ class TestZulipLDAPUserPopulator(ZulipLDAPTestCase):
def test_too_short_name(self) -> None:
self.change_ldap_user_attr("hamlet", "cn", "a")
with self.assertRaises(ZulipLDAPError), self.assertLogs(
"django_auth_ldap", "WARNING"
) as warn_log:
with (
self.assertRaises(ZulipLDAPError),
self.assertLogs("django_auth_ldap", "WARNING") as warn_log,
):
self.perform_ldap_sync(self.example_user("hamlet"))
self.assertEqual(
warn_log.output,
@ -6647,9 +6698,15 @@ class TestZulipLDAPUserPopulator(ZulipLDAPTestCase):
def test_deactivate_user_with_useraccountcontrol_attr(self) -> None:
self.change_ldap_user_attr("hamlet", "userAccountControl", "2")
with self.settings(
AUTH_LDAP_USER_ATTR_MAP={"full_name": "cn", "userAccountControl": "userAccountControl"}
), self.assertLogs("zulip.ldap") as info_logs:
with (
self.settings(
AUTH_LDAP_USER_ATTR_MAP={
"full_name": "cn",
"userAccountControl": "userAccountControl",
}
),
self.assertLogs("zulip.ldap") as info_logs,
):
self.perform_ldap_sync(self.example_user("hamlet"))
hamlet = self.example_user("hamlet")
self.assertFalse(hamlet.is_active)
@ -6663,9 +6720,12 @@ class TestZulipLDAPUserPopulator(ZulipLDAPTestCase):
def test_deactivate_reactivate_user_with_deactivated_attr(self) -> None:
self.change_ldap_user_attr("hamlet", "someCustomAttr", "TRUE")
with self.settings(
with (
self.settings(
AUTH_LDAP_USER_ATTR_MAP={"full_name": "cn", "deactivated": "someCustomAttr"}
), self.assertLogs("zulip.ldap") as info_logs:
),
self.assertLogs("zulip.ldap") as info_logs,
):
self.perform_ldap_sync(self.example_user("hamlet"))
hamlet = self.example_user("hamlet")
self.assertFalse(hamlet.is_active)
@ -6677,9 +6737,12 @@ class TestZulipLDAPUserPopulator(ZulipLDAPTestCase):
)
self.change_ldap_user_attr("hamlet", "someCustomAttr", "FALSE")
with self.settings(
with (
self.settings(
AUTH_LDAP_USER_ATTR_MAP={"full_name": "cn", "deactivated": "someCustomAttr"}
), self.assertLogs("zulip.ldap") as info_logs:
),
self.assertLogs("zulip.ldap") as info_logs,
):
self.perform_ldap_sync(self.example_user("hamlet"))
hamlet.refresh_from_db()
self.assertTrue(hamlet.is_active)
@ -6691,9 +6754,13 @@ class TestZulipLDAPUserPopulator(ZulipLDAPTestCase):
)
self.change_ldap_user_attr("hamlet", "someCustomAttr", "YESSS")
with self.settings(
with (
self.settings(
AUTH_LDAP_USER_ATTR_MAP={"full_name": "cn", "deactivated": "someCustomAttr"}
), self.assertLogs("django_auth_ldap") as ldap_logs, self.assertRaises(AssertionError):
),
self.assertLogs("django_auth_ldap") as ldap_logs,
self.assertRaises(AssertionError),
):
self.perform_ldap_sync(self.example_user("hamlet"))
hamlet.refresh_from_db()
self.assertTrue(hamlet.is_active)
@ -6708,9 +6775,15 @@ class TestZulipLDAPUserPopulator(ZulipLDAPTestCase):
def test_dont_sync_disabled_ldap_user(self, fake_sync: mock.MagicMock) -> None:
self.change_ldap_user_attr("hamlet", "userAccountControl", "2")
with self.settings(
AUTH_LDAP_USER_ATTR_MAP={"full_name": "cn", "userAccountControl": "userAccountControl"}
), self.assertLogs("zulip.ldap") as info_logs:
with (
self.settings(
AUTH_LDAP_USER_ATTR_MAP={
"full_name": "cn",
"userAccountControl": "userAccountControl",
}
),
self.assertLogs("zulip.ldap") as info_logs,
):
self.perform_ldap_sync(self.example_user("hamlet"))
fake_sync.assert_not_called()
self.assertEqual(
@ -6723,9 +6796,15 @@ class TestZulipLDAPUserPopulator(ZulipLDAPTestCase):
def test_reactivate_user(self) -> None:
do_deactivate_user(self.example_user("hamlet"), acting_user=None)
with self.settings(
AUTH_LDAP_USER_ATTR_MAP={"full_name": "cn", "userAccountControl": "userAccountControl"}
), self.assertLogs("zulip.ldap") as info_logs:
with (
self.settings(
AUTH_LDAP_USER_ATTR_MAP={
"full_name": "cn",
"userAccountControl": "userAccountControl",
}
),
self.assertLogs("zulip.ldap") as info_logs,
):
self.perform_ldap_sync(self.example_user("hamlet"))
hamlet = self.example_user("hamlet")
self.assertTrue(hamlet.is_active)
@ -6777,8 +6856,9 @@ class TestZulipLDAPUserPopulator(ZulipLDAPTestCase):
def test_update_user_avatar(self) -> None:
# Hamlet has jpegPhoto set in our test directory by default.
with mock.patch("zerver.lib.upload.upload_avatar_image") as fn, self.settings(
AUTH_LDAP_USER_ATTR_MAP={"full_name": "cn", "avatar": "jpegPhoto"}
with (
mock.patch("zerver.lib.upload.upload_avatar_image") as fn,
self.settings(AUTH_LDAP_USER_ATTR_MAP={"full_name": "cn", "avatar": "jpegPhoto"}),
):
self.perform_ldap_sync(self.example_user("hamlet"))
fn.assert_called_once()
@ -6796,8 +6876,9 @@ class TestZulipLDAPUserPopulator(ZulipLDAPTestCase):
self.change_ldap_user_attr(
"hamlet", "jpegPhoto", static_path("images/logo/zulip-icon-512x512.png"), binary=True
)
with mock.patch("zerver.lib.upload.upload_avatar_image") as fn, self.settings(
AUTH_LDAP_USER_ATTR_MAP={"full_name": "cn", "avatar": "jpegPhoto"}
with (
mock.patch("zerver.lib.upload.upload_avatar_image") as fn,
self.settings(AUTH_LDAP_USER_ATTR_MAP={"full_name": "cn", "avatar": "jpegPhoto"}),
):
self.perform_ldap_sync(self.example_user("hamlet"))
fn.assert_called_once()
@ -6884,9 +6965,12 @@ class TestZulipLDAPUserPopulator(ZulipLDAPTestCase):
"custom_profile_field__non_existent": "homePhone",
}
):
with self.assertRaisesRegex(
with (
self.assertRaisesRegex(
ZulipLDAPError, "Custom profile field with name non_existent not found"
), self.assertLogs("django_auth_ldap", "WARNING") as warn_log:
),
self.assertLogs("django_auth_ldap", "WARNING") as warn_log,
):
self.perform_ldap_sync(self.example_user("hamlet"))
self.assertEqual(
warn_log.output,
@ -6904,9 +6988,10 @@ class TestZulipLDAPUserPopulator(ZulipLDAPTestCase):
"custom_profile_field__birthday": "birthDate",
}
):
with self.assertRaisesRegex(
ZulipLDAPError, "Invalid data for birthday field"
), self.assertLogs("django_auth_ldap", "WARNING") as warn_log:
with (
self.assertRaisesRegex(ZulipLDAPError, "Invalid data for birthday field"),
self.assertLogs("django_auth_ldap", "WARNING") as warn_log,
):
self.perform_ldap_sync(self.example_user("hamlet"))
self.assertEqual(
warn_log.output,
@ -6971,12 +7056,15 @@ class TestZulipLDAPUserPopulator(ZulipLDAPTestCase):
user_profile=hamlet, field=no_op_field
).value
with self.settings(
with (
self.settings(
AUTH_LDAP_USER_ATTR_MAP={
"full_name": "cn",
"custom_profile_field__birthday": "nonExistentAttr",
}
), self.assertLogs("django_auth_ldap", "WARNING") as warn_log:
),
self.assertLogs("django_auth_ldap", "WARNING") as warn_log,
):
self.perform_ldap_sync(self.example_user("hamlet"))
actual_value = CustomProfileFieldValue.objects.get(
@ -7315,11 +7403,14 @@ class LDAPBackendTest(ZulipTestCase):
)
error_type = ZulipLDAPAuthBackend.REALM_IS_NONE_ERROR
error = ZulipLDAPConfigurationError("Realm is None", error_type)
with mock.patch(
with (
mock.patch(
"zproject.backends.ZulipLDAPAuthBackend.get_or_build_user", side_effect=error
), mock.patch("django_auth_ldap.backend._LDAPUser._authenticate_user_dn"), self.assertLogs(
"django_auth_ldap", "WARNING"
) as warn_log, self.assertLogs("django.request", level="ERROR"):
),
mock.patch("django_auth_ldap.backend._LDAPUser._authenticate_user_dn"),
self.assertLogs("django_auth_ldap", "WARNING") as warn_log,
self.assertLogs("django.request", level="ERROR"),
):
response = self.client_post("/login/", data)
self.assertEqual(response.status_code, 500)
self.assert_in_response("Configuration error", response)
@ -7510,7 +7601,8 @@ class LDAPGroupSyncTest(ZulipTestCase):
realm = get_realm("zulip")
with self.settings(
with (
self.settings(
AUTH_LDAP_GROUP_SEARCH=LDAPSearch(
"ou=groups,dc=zulip,dc=com",
ldap.SCOPE_ONELEVEL,
@ -7522,7 +7614,9 @@ class LDAPGroupSyncTest(ZulipTestCase):
]
},
LDAP_APPEND_DOMAIN="zulip.com",
), self.assertLogs("zulip.ldap", "DEBUG") as zulip_ldap_log:
),
self.assertLogs("zulip.ldap", "DEBUG") as zulip_ldap_log,
):
self.assertFalse(
NamedUserGroup.objects.filter(realm=realm, name="cool_test_group").exists()
)
@ -7599,7 +7693,8 @@ class LDAPGroupSyncTest(ZulipTestCase):
)
# Test an exception using a malformed ldap group search setting.
with self.settings(
with (
self.settings(
AUTH_LDAP_GROUP_SEARCH=LDAPSearch(
"ou=groups,dc=zulip,dc=com",
ldap.SCOPE_ONELEVEL,
@ -7611,9 +7706,10 @@ class LDAPGroupSyncTest(ZulipTestCase):
]
},
LDAP_APPEND_DOMAIN="zulip.com",
), self.assertLogs("django_auth_ldap", "WARN") as django_ldap_log, self.assertLogs(
"zulip.ldap", "DEBUG"
) as zulip_ldap_log:
),
self.assertLogs("django_auth_ldap", "WARN") as django_ldap_log,
self.assertLogs("zulip.ldap", "DEBUG") as zulip_ldap_log,
):
with self.assertRaisesRegex(
ZulipLDAPError,
"search_s.*",

View File

@ -230,8 +230,9 @@ class TestVideoCall(ZulipTestCase):
self.assert_json_success(response)
def test_create_bigbluebutton_link(self) -> None:
with mock.patch("zerver.views.video_calls.random.randint", return_value="1"), mock.patch(
"secrets.token_bytes", return_value=b"\x00" * 20
with (
mock.patch("zerver.views.video_calls.random.randint", return_value="1"),
mock.patch("secrets.token_bytes", return_value=b"\x00" * 20),
):
response = self.client_get(
"/json/calls/bigbluebutton/create?meeting_name=general > meeting"

View File

@ -122,9 +122,10 @@ class DecoratorTestCase(ZulipTestCase):
request = HttpRequest()
request.POST["param"] = "test"
request.META["HTTP_USER_AGENT"] = "mocked should fail"
with mock.patch(
"zerver.middleware.parse_client", side_effect=JsonableError("message")
) as m, self.assertLogs(level="ERROR"):
with (
mock.patch("zerver.middleware.parse_client", side_effect=JsonableError("message")) as m,
self.assertLogs(level="ERROR"),
):
LogRequests(lambda request: HttpResponse()).process_request(request)
request_notes = RequestNotes.get_notes(request)
self.assertEqual(request_notes.client_name, "Unparsable")
@ -490,11 +491,10 @@ class RateLimitTestCase(ZulipTestCase):
request = HostRequestMock(host="zulip.testserver", client_name=client_name, meta_data=META)
view_func = self.ratelimited_web_view if check_web_view else self.ratelimited_json_view
with mock.patch(
"zerver.lib.rate_limiter.RateLimitedUser"
) as rate_limit_user_mock, mock.patch(
"zerver.lib.rate_limiter.RateLimitedIPAddr"
) as rate_limit_ip_mock:
with (
mock.patch("zerver.lib.rate_limiter.RateLimitedUser") as rate_limit_user_mock,
mock.patch("zerver.lib.rate_limiter.RateLimitedIPAddr") as rate_limit_ip_mock,
):
self.assert_in_success_response(["some value"], view_func(request))
self.assertEqual(rate_limit_ip_mock.called, expect_rate_limit)
self.assertFalse(rate_limit_user_mock.called)
@ -506,11 +506,10 @@ class RateLimitTestCase(ZulipTestCase):
request = HostRequestMock(
user_profile=user, host="zulip.testserver", client_name=client_name, meta_data=META
)
with mock.patch(
"zerver.lib.rate_limiter.RateLimitedUser"
) as rate_limit_user_mock, mock.patch(
"zerver.lib.rate_limiter.RateLimitedIPAddr"
) as rate_limit_ip_mock:
with (
mock.patch("zerver.lib.rate_limiter.RateLimitedUser") as rate_limit_user_mock,
mock.patch("zerver.lib.rate_limiter.RateLimitedIPAddr") as rate_limit_ip_mock,
):
self.assert_in_success_response(["some value"], view_func(request))
self.assertEqual(rate_limit_user_mock.called, expect_rate_limit)
self.assertFalse(rate_limit_ip_mock.called)
@ -562,9 +561,10 @@ class RateLimitTestCase(ZulipTestCase):
)
server.save()
with self.settings(RATE_LIMITING=True), mock.patch(
"zilencer.auth.rate_limit_remote_server"
) as rate_limit_mock:
with (
self.settings(RATE_LIMITING=True),
mock.patch("zilencer.auth.rate_limit_remote_server") as rate_limit_mock,
):
result = self.uuid_post(
server_uuid,
"/api/v1/remotes/push/unregister/all",
@ -1165,8 +1165,9 @@ class TestAuthenticatedJsonViewDecorator(ZulipTestCase):
email = user.delivery_email
self.login_user(user)
with self.assertLogs(level="WARNING") as m, mock.patch(
"zerver.decorator.get_subdomain", return_value=""
with (
self.assertLogs(level="WARNING") as m,
mock.patch("zerver.decorator.get_subdomain", return_value=""),
):
self.assert_json_error_contains(
self._do_test(email), "Account is not associated with this subdomain"
@ -1180,8 +1181,9 @@ class TestAuthenticatedJsonViewDecorator(ZulipTestCase):
],
)
with self.assertLogs(level="WARNING") as m, mock.patch(
"zerver.decorator.get_subdomain", return_value="acme"
with (
self.assertLogs(level="WARNING") as m,
mock.patch("zerver.decorator.get_subdomain", return_value="acme"),
):
self.assert_json_error_contains(
self._do_test(email), "Account is not associated with this subdomain"

View File

@ -114,9 +114,10 @@ class TestDigestEmailMessages(ZulipTestCase):
do_deactivate_user(hamlet, acting_user=None)
with mock.patch("zerver.lib.digest.enough_traffic", return_value=True), mock.patch(
"zerver.lib.digest.send_future_email"
) as mock_send_email:
with (
mock.patch("zerver.lib.digest.enough_traffic", return_value=True),
mock.patch("zerver.lib.digest.send_future_email") as mock_send_email,
):
bulk_handle_digest_email(user_ids, 1)
emailed_user_ids = [

View File

@ -9,9 +9,14 @@ from zproject.email_backends import get_forward_address
class EmailLogTest(ZulipTestCase):
def test_generate_and_clear_email_log(self) -> None:
with self.settings(EMAIL_BACKEND="zproject.email_backends.EmailLogBackEnd"), mock.patch(
with (
self.settings(EMAIL_BACKEND="zproject.email_backends.EmailLogBackEnd"),
mock.patch(
"zproject.email_backends.EmailLogBackEnd._do_send_messages", lambda *args: 1
), self.assertLogs(level="INFO") as m, self.settings(DEVELOPMENT_LOG_EMAILS=True):
),
self.assertLogs(level="INFO") as m,
self.settings(DEVELOPMENT_LOG_EMAILS=True),
):
with self.captureOnCommitCallbacks(execute=True):
result = self.client_get("/emails/generate/")
self.assertEqual(result.status_code, 302)
@ -37,8 +42,11 @@ class EmailLogTest(ZulipTestCase):
self.assertEqual(get_forward_address(), forward_address)
with self.settings(EMAIL_BACKEND="zproject.email_backends.EmailLogBackEnd"), mock.patch(
with (
self.settings(EMAIL_BACKEND="zproject.email_backends.EmailLogBackEnd"),
mock.patch(
"zproject.email_backends.EmailLogBackEnd._do_send_messages", lambda *args: 1
),
):
result = self.client_get("/emails/generate/")
self.assertEqual(result.status_code, 302)

View File

@ -3324,9 +3324,10 @@ class NormalActionsTest(BaseAction):
)
self.login_user(self.user_profile)
with mock.patch(
"zerver.lib.export.do_export_realm", side_effect=Exception("Some failure")
), self.assertLogs(level="ERROR") as error_log:
with (
mock.patch("zerver.lib.export.do_export_realm", side_effect=Exception("Some failure")),
self.assertLogs(level="ERROR") as error_log,
):
with stdout_suppressed():
with self.verify_action(state_change_expected=False, num_events=2) as events:
self.client_post("/json/export/realm")

View File

@ -390,10 +390,13 @@ class TestDevelopmentEmailsLog(ZulipTestCase):
# and verify the log messages. That can be achieved with assertLogs()
# as you'll see below. Read more about assertLogs() at:
# https://docs.python.org/3/library/unittest.html#unittest.TestCase.assertLogs
with self.settings(EMAIL_BACKEND="zproject.email_backends.EmailLogBackEnd"), self.settings(
DEVELOPMENT_LOG_EMAILS=True
), self.assertLogs(level="INFO") as logger, mock.patch(
with (
self.settings(EMAIL_BACKEND="zproject.email_backends.EmailLogBackEnd"),
self.settings(DEVELOPMENT_LOG_EMAILS=True),
self.assertLogs(level="INFO") as logger,
mock.patch(
"zproject.email_backends.EmailLogBackEnd._do_send_messages", lambda *args: 1
),
):
# Parts of this endpoint use transactions, and use
# transaction.on_commit to run code when the transaction

View File

@ -13,11 +13,13 @@ class HealthTest(ZulipTestCase):
self.assert_json_success(result)
def test_database_failure(self) -> None:
with mock.patch(
with (
mock.patch(
"zerver.views.health.check_database",
side_effect=ServerNotReadyError("Cannot query postgresql"),
), self.assertLogs(level="ERROR") as logs, self.assertRaisesRegex(
ServerNotReadyError, r"^Cannot query postgresql$"
),
self.assertLogs(level="ERROR") as logs,
self.assertRaisesRegex(ServerNotReadyError, r"^Cannot query postgresql$"),
):
self.client_get("/health")
self.assertIn(

View File

@ -604,8 +604,9 @@ class HomeTest(ZulipTestCase):
self.assertIn("test_stream_7", html)
def _get_home_page(self, **kwargs: Any) -> "TestHttpResponse":
with patch("zerver.lib.events.request_event_queue", return_value=42), patch(
"zerver.lib.events.get_user_events", return_value=[]
with (
patch("zerver.lib.events.request_event_queue", return_value=42),
patch("zerver.lib.events.get_user_events", return_value=[]),
):
result = self.client_get("/", dict(**kwargs))
return result
@ -663,9 +664,10 @@ class HomeTest(ZulipTestCase):
user.tos_version = UserProfile.TOS_VERSION_BEFORE_FIRST_LOGIN
user.save()
with self.settings(
FIRST_TIME_TERMS_OF_SERVICE_TEMPLATE="corporate/hello.html"
), self.settings(TERMS_OF_SERVICE_VERSION="99.99"):
with (
self.settings(FIRST_TIME_TERMS_OF_SERVICE_TEMPLATE="corporate/hello.html"),
self.settings(TERMS_OF_SERVICE_VERSION="99.99"),
):
result = self.client_post("/accounts/accept_terms/")
self.assertEqual(result.status_code, 200)
self.assert_in_response("I agree to the", result)
@ -1378,8 +1380,9 @@ class HomeTest(ZulipTestCase):
self.login_user(user)
result = self._get_home_page()
self.check_rendered_logged_in_app(result)
with patch("zerver.lib.events.request_event_queue", return_value=42), patch(
"zerver.lib.events.get_user_events", return_value=[]
with (
patch("zerver.lib.events.request_event_queue", return_value=42),
patch("zerver.lib.events.get_user_events", return_value=[]),
):
result = self.client_get("/de/")
page_params = self._get_page_params(result)

View File

@ -1550,9 +1550,11 @@ class RealmImportExportTest(ExportFile):
self.export_realm_and_create_auditlog(original_realm)
with self.settings(BILLING_ENABLED=False), self.assertLogs(level="INFO"), patch(
"zerver.lib.remote_server.send_to_push_bouncer"
) as m:
with (
self.settings(BILLING_ENABLED=False),
self.assertLogs(level="INFO"),
patch("zerver.lib.remote_server.send_to_push_bouncer") as m,
):
get_response = {
"last_realm_count_id": 0,
"last_installation_count_id": 0,

View File

@ -24,9 +24,11 @@ class TestIntegrationsDevPanel(ZulipTestCase):
"custom_headers": "{}",
"is_json": "true",
}
with self.assertLogs(level="ERROR") as logs, self.settings(
TEST_SUITE=False
), self.assertRaises(ValidationError):
with (
self.assertLogs(level="ERROR") as logs,
self.settings(TEST_SUITE=False),
self.assertRaises(ValidationError),
):
self.client_post(target_url, data)
# Intention of this test looks like to trigger ValidationError

View File

@ -1931,9 +1931,10 @@ class InvitationsTestCase(InviteUserBase):
invite_expires_in_minutes=invite_expires_in_minutes,
)
with time_machine.travel(
(timezone_now() - timedelta(days=3)), tick=False
), self.captureOnCommitCallbacks(execute=True):
with (
time_machine.travel((timezone_now() - timedelta(days=3)), tick=False),
self.captureOnCommitCallbacks(execute=True),
):
do_invite_users(
user_profile,
["TestTwo@zulip.com"],
@ -1983,9 +1984,10 @@ class InvitationsTestCase(InviteUserBase):
get_stream(stream_name, user_profile.realm) for stream_name in ["Denmark", "Scotland"]
]
with time_machine.travel(
(timezone_now() - timedelta(days=1000)), tick=False
), self.captureOnCommitCallbacks(execute=True):
with (
time_machine.travel((timezone_now() - timedelta(days=1000)), tick=False),
self.captureOnCommitCallbacks(execute=True),
):
# Testing the invitation with expiry date set to "None" exists
# after a large amount of days.
do_invite_users(
@ -2624,9 +2626,10 @@ class MultiuseInviteTest(ZulipTestCase):
request = HttpRequest()
confirmation = Confirmation.objects.get(confirmation_key=key)
multiuse_object = confirmation.content_object
with patch(
"zerver.views.registration.get_subdomain", return_value="zulip"
), self.assertRaises(AssertionError):
with (
patch("zerver.views.registration.get_subdomain", return_value="zulip"),
self.assertRaises(AssertionError),
):
accounts_home(request, multiuse_object=multiuse_object)
def test_create_multiuse_link_api_call(self) -> None:

View File

@ -473,9 +473,10 @@ class TestConvertMattermostData(ZulipTestCase):
COMMAND_NAME = "convert_mattermost_data"
def test_if_command_calls_do_convert_data(self) -> None:
with patch(
"zerver.management.commands.convert_mattermost_data.do_convert_data"
) as m, patch("builtins.print") as mock_print:
with (
patch("zerver.management.commands.convert_mattermost_data.do_convert_data") as m,
patch("builtins.print") as mock_print,
):
mm_fixtures = self.fixture_file_name("", "mattermost_fixtures")
output_dir = self.make_import_output_dir("mattermost")
call_command(self.COMMAND_NAME, mm_fixtures, f"--output={output_dir}")
@ -532,9 +533,11 @@ class TestExport(ZulipTestCase):
self.example_user("hamlet"), message, "outbox", "1f4e4", Reaction.UNICODE_EMOJI
)
with patch("zerver.management.commands.export.export_realm_wrapper") as m, patch(
"builtins.print"
) as mock_print, patch("builtins.input", return_value="y") as mock_input:
with (
patch("zerver.management.commands.export.export_realm_wrapper") as m,
patch("builtins.print") as mock_print,
patch("builtins.input", return_value="y") as mock_input,
):
call_command(self.COMMAND_NAME, "-r=zulip", f"--consent-message-id={message.id}")
m.assert_called_once_with(
realm=realm,
@ -559,9 +562,10 @@ class TestExport(ZulipTestCase):
],
)
with self.assertRaisesRegex(CommandError, "Message with given ID does not"), patch(
"builtins.print"
) as mock_print:
with (
self.assertRaisesRegex(CommandError, "Message with given ID does not"),
patch("builtins.print") as mock_print,
):
call_command(self.COMMAND_NAME, "-r=zulip", "--consent-message-id=123456")
self.assertEqual(
mock_print.mock_calls,
@ -572,9 +576,10 @@ class TestExport(ZulipTestCase):
message.last_edit_time = timezone_now()
message.save()
with self.assertRaisesRegex(CommandError, "Message was edited. Aborting..."), patch(
"builtins.print"
) as mock_print:
with (
self.assertRaisesRegex(CommandError, "Message was edited. Aborting..."),
patch("builtins.print") as mock_print,
):
call_command(self.COMMAND_NAME, "-r=zulip", f"--consent-message-id={message.id}")
self.assertEqual(
mock_print.mock_calls,
@ -588,9 +593,12 @@ class TestExport(ZulipTestCase):
do_add_reaction(
self.mit_user("sipbtest"), message, "outbox", "1f4e4", Reaction.UNICODE_EMOJI
)
with self.assertRaisesRegex(
with (
self.assertRaisesRegex(
CommandError, "Users from a different realm reacted to message. Aborting..."
), patch("builtins.print") as mock_print:
),
patch("builtins.print") as mock_print,
):
call_command(self.COMMAND_NAME, "-r=zulip", f"--consent-message-id={message.id}")
self.assertEqual(

View File

@ -3307,8 +3307,9 @@ class MarkdownErrorTests(ZulipTestCase):
throws an exception"""
msg = "mock rendered message\n" * 10 * settings.MAX_MESSAGE_LENGTH
with mock.patch("zerver.lib.markdown.unsafe_timeout", return_value=msg), mock.patch(
"zerver.lib.markdown.markdown_logger"
with (
mock.patch("zerver.lib.markdown.unsafe_timeout", return_value=msg),
mock.patch("zerver.lib.markdown.markdown_logger"),
):
with self.assertRaises(MarkdownRenderingError):
markdown_convert_wrapper(msg)

View File

@ -120,9 +120,11 @@ class DeleteMessageTest(ZulipTestCase):
# Test handling of 500 error caused by multiple delete requests due to latency.
# see issue #11219.
with mock.patch("zerver.views.message_edit.do_delete_messages") as m, mock.patch(
"zerver.views.message_edit.validate_can_delete_message", return_value=None
), mock.patch("zerver.views.message_edit.access_message", return_value=(None, None)):
with (
mock.patch("zerver.views.message_edit.do_delete_messages") as m,
mock.patch("zerver.views.message_edit.validate_can_delete_message", return_value=None),
mock.patch("zerver.views.message_edit.access_message", return_value=(None, None)),
):
m.side_effect = IntegrityError()
result = test_delete_message_by_owner(msg_id=msg_id)
self.assert_json_error(result, "Message already deleted")

View File

@ -732,9 +732,10 @@ class FixUnreadTests(ZulipTestCase):
assert_unread(um_unsubscribed_id)
# fix unsubscribed
with connection.cursor() as cursor, self.assertLogs(
"zulip.fix_unreads", "INFO"
) as info_logs:
with (
connection.cursor() as cursor,
self.assertLogs("zulip.fix_unreads", "INFO") as info_logs,
):
fix_unsubscribed(cursor, user)
self.assertEqual(info_logs.output[0], "INFO:zulip.fix_unreads:get recipients")

View File

@ -37,11 +37,10 @@ class SlowQueryTest(ZulipTestCase):
def test_slow_query_log(self) -> None:
self.log_data["time_started"] = time.time() - self.SLOW_QUERY_TIME
with self.assertLogs(
"zulip.slow_queries", level="INFO"
) as slow_query_logger, self.assertLogs(
"zulip.requests", level="INFO"
) as middleware_normal_logger:
with (
self.assertLogs("zulip.slow_queries", level="INFO") as slow_query_logger,
self.assertLogs("zulip.requests", level="INFO") as middleware_normal_logger,
):
write_log_line(
self.log_data,
path="/some/endpoint/",

View File

@ -1,8 +1,8 @@
import zoneinfo
from collections.abc import Sequence
from datetime import datetime, timedelta, timezone
import time_machine
import zoneinfo
from django.conf import settings
from django.core import mail
from django.test import override_settings

View File

@ -83,9 +83,10 @@ class DoRestCallTests(ZulipTestCase):
def _helper(content: str | None) -> None:
expect_send_response = mock.patch("zerver.lib.outgoing_webhook.send_response_message")
with mock.patch.object(
service_handler, "session"
) as session, expect_send_response as mock_send:
with (
mock.patch.object(service_handler, "session") as session,
expect_send_response as mock_send,
):
session.post.return_value = ResponseMock(200, orjson.dumps(dict(content=content)))
with self.assertLogs(level="INFO") as logs:
do_rest_call("", mock_event, service_handler)
@ -120,9 +121,10 @@ class DoRestCallTests(ZulipTestCase):
mock_event = self.mock_event(bot_user)
service_handler = GenericOutgoingWebhookService("token", bot_user, "service")
with mock.patch.object(service_handler, "session") as session, self.assertLogs(
level="WARNING"
) as m:
with (
mock.patch.object(service_handler, "session") as session,
self.assertLogs(level="WARNING") as m,
):
session.post.return_value = ResponseMock(500)
final_response = do_rest_call("", mock_event, service_handler)
assert final_response is not None
@ -149,9 +151,11 @@ The webhook got a response with status code *500*.""",
service_handler = GenericOutgoingWebhookService("token", bot_user, "service")
mock_event["message"]["type"] = "unknown"
with mock.patch.object(service_handler, "session") as session, self.assertRaises(
JsonableError
), self.assertLogs(level="INFO"):
with (
mock.patch.object(service_handler, "session") as session,
self.assertRaises(JsonableError),
self.assertLogs(level="INFO"),
):
session.post.return_value = ResponseMock(200)
url = "http://somewhere.com/api/call"
with mock.patch("zerver.lib.outgoing_webhook.get_message_url", return_value=url):
@ -162,10 +166,13 @@ The webhook got a response with status code *500*.""",
mock_event = self.mock_event(bot_user)
service_handler = GenericOutgoingWebhookService("token", bot_user, "service")
with mock.patch(
with (
mock.patch(
"zerver.lib.outgoing_webhook.GenericOutgoingWebhookService.make_request",
return_value=None,
), self.assertLogs(level="INFO") as logs:
),
self.assertLogs(level="INFO") as logs,
):
resp = do_rest_call("", mock_event, service_handler)
self.assertEqual(resp, None)
self.assert_length(logs.output, 1)
@ -177,9 +184,11 @@ The webhook got a response with status code *500*.""",
expect_fail = mock.patch("zerver.lib.outgoing_webhook.fail_with_message")
with mock.patch.object(
service_handler, "session"
) as session, expect_fail as mock_fail, self.assertLogs(level="WARNING") as m:
with (
mock.patch.object(service_handler, "session") as session,
expect_fail as mock_fail,
self.assertLogs(level="WARNING") as m,
):
session.post.return_value = ResponseMock(400)
final_response = do_rest_call("", mock_event, service_handler)
assert final_response is not None
@ -269,9 +278,11 @@ The webhook got a response with status code *400*.""",
# Don't think that we should catch and assert whole log output(which is actually a very big error traceback).
# We are already asserting bot_owner_notification.content which verifies exception did occur.
with mock.patch.object(
service_handler, "session"
) as session, expect_logging_exception, expect_fail as mock_fail:
with (
mock.patch.object(service_handler, "session") as session,
expect_logging_exception,
expect_fail as mock_fail,
):
session.post.side_effect = request_exception_error
do_rest_call("", mock_event, service_handler)

View File

@ -115,9 +115,10 @@ class UserPresenceModelTests(ZulipTestCase):
cursor = connection.cursor()
return cursor
with mock.patch("zerver.actions.presence.connection") as mock_connection, self.assertLogs(
"zerver.actions.presence", level="INFO"
) as mock_logs:
with (
mock.patch("zerver.actions.presence.connection") as mock_connection,
self.assertLogs("zerver.actions.presence", level="INFO") as mock_logs,
):
# This is a tricky mock. We need to set things up so that connection.cursor()
# in do_update_user_presence runs our custom code when the caller tries to
# enter the context manager.

View File

@ -204,9 +204,12 @@ class SendTestPushNotificationEndpointTest(BouncerTestCase):
# 3. Then test without submitting a specific token,
# meaning both devices should get notified.
with mock.patch(
with (
mock.patch(
"zerver.lib.push_notifications.send_android_push_notification"
) as mock_send_android_push_notification, time_machine.travel(time_now, tick=False):
) as mock_send_android_push_notification,
time_machine.travel(time_now, tick=False),
):
result = self.api_post(user, endpoint, {"token": android_token}, subdomain="zulip")
expected_android_payload = {
@ -229,9 +232,12 @@ class SendTestPushNotificationEndpointTest(BouncerTestCase):
)
self.assert_json_success(result)
with mock.patch(
with (
mock.patch(
"zerver.lib.push_notifications.send_apple_push_notification"
) as mock_send_apple_push_notification, time_machine.travel(time_now, tick=False):
) as mock_send_apple_push_notification,
time_machine.travel(time_now, tick=False),
):
result = self.api_post(user, endpoint, {"token": apple_token}, subdomain="zulip")
expected_apple_payload = {
@ -261,11 +267,15 @@ class SendTestPushNotificationEndpointTest(BouncerTestCase):
self.assert_json_success(result)
# Test without submitting a token value. Both devices should get notified.
with mock.patch(
with (
mock.patch(
"zerver.lib.push_notifications.send_apple_push_notification"
) as mock_send_apple_push_notification, mock.patch(
) as mock_send_apple_push_notification,
mock.patch(
"zerver.lib.push_notifications.send_android_push_notification"
) as mock_send_android_push_notification, time_machine.travel(time_now, tick=False):
) as mock_send_android_push_notification,
time_machine.travel(time_now, tick=False),
):
result = self.api_post(user, endpoint, subdomain="zulip")
mock_send_android_push_notification.assert_called_once_with(
@ -313,9 +323,12 @@ class SendTestPushNotificationEndpointTest(BouncerTestCase):
endpoint = "/api/v1/mobile_push/test_notification"
time_now = now()
with mock.patch(
with (
mock.patch(
"zerver.lib.push_notifications.send_android_push_notification"
) as mock_send_android_push_notification, time_machine.travel(time_now, tick=False):
) as mock_send_android_push_notification,
time_machine.travel(time_now, tick=False),
):
result = self.api_post(user, endpoint, {"token": token}, subdomain="zulip")
expected_payload = {
"server": "testserver",
@ -603,12 +616,15 @@ class PushBouncerNotificationTest(BouncerTestCase):
"apns_payload": {},
"gcm_options": {},
}
with mock.patch(
"zilencer.views.send_android_push_notification", return_value=1
), mock.patch("zilencer.views.send_apple_push_notification", return_value=1), mock.patch(
with (
mock.patch("zilencer.views.send_android_push_notification", return_value=1),
mock.patch("zilencer.views.send_apple_push_notification", return_value=1),
mock.patch(
"corporate.lib.stripe.RemoteRealmBillingSession.current_count_for_billed_licenses",
return_value=10,
), self.assertLogs("zilencer.views", level="INFO"):
),
self.assertLogs("zilencer.views", level="INFO"),
):
result = self.uuid_post(
self.server_uuid,
"/api/v1/remotes/push/notify",
@ -673,16 +689,18 @@ class PushBouncerNotificationTest(BouncerTestCase):
}
time_sent = now()
with mock.patch(
with (
mock.patch(
"zilencer.views.send_android_push_notification", return_value=2
) as android_push, mock.patch(
"zilencer.views.send_apple_push_notification", return_value=1
) as apple_push, mock.patch(
) as android_push,
mock.patch("zilencer.views.send_apple_push_notification", return_value=1) as apple_push,
mock.patch(
"corporate.lib.stripe.RemoteRealmBillingSession.current_count_for_billed_licenses",
return_value=10,
), time_machine.travel(time_sent, tick=False), self.assertLogs(
"zilencer.views", level="INFO"
) as logger:
),
time_machine.travel(time_sent, tick=False),
self.assertLogs("zilencer.views", level="INFO") as logger,
):
result = self.uuid_post(
self.server_uuid,
"/api/v1/remotes/push/notify",
@ -995,12 +1013,16 @@ class PushBouncerNotificationTest(BouncerTestCase):
"gcm_options": gcm_options,
}
time_received = time_sent + timedelta(seconds=1, milliseconds=234)
with time_machine.travel(time_received, tick=False), mock.patch(
"zilencer.views.send_android_push_notification", return_value=1
), mock.patch("zilencer.views.send_apple_push_notification", return_value=1), mock.patch(
with (
time_machine.travel(time_received, tick=False),
mock.patch("zilencer.views.send_android_push_notification", return_value=1),
mock.patch("zilencer.views.send_apple_push_notification", return_value=1),
mock.patch(
"corporate.lib.stripe.RemoteServerBillingSession.current_count_for_billed_licenses",
return_value=10,
), self.assertLogs("zilencer.views", level="INFO") as logger:
),
self.assertLogs("zilencer.views", level="INFO") as logger,
):
result = self.uuid_post(
self.server_uuid,
"/api/v1/remotes/push/notify",
@ -1061,9 +1083,12 @@ class PushBouncerNotificationTest(BouncerTestCase):
realm = get_realm("zulip")
self.assertTrue(realm.push_notifications_enabled)
with mock.patch(
with (
mock.patch(
"zerver.lib.push_notifications.push_notifications_configured", return_value=False
), self.assertLogs("zerver.lib.push_notifications", level="WARNING") as warn_log:
),
self.assertLogs("zerver.lib.push_notifications", level="WARNING") as warn_log,
):
initialize_push_notifications()
not_configured_warn_log = (
@ -1313,10 +1338,13 @@ class PushBouncerNotificationTest(BouncerTestCase):
# Now we want to remove them using the bouncer after an API key change.
# First we test error handling in case of issues with the bouncer:
with mock.patch(
with (
mock.patch(
"zerver.worker.deferred_work.clear_push_device_tokens",
side_effect=PushNotificationBouncerRetryLaterError("test"),
), mock.patch("zerver.worker.deferred_work.retry_event") as mock_retry:
),
mock.patch("zerver.worker.deferred_work.retry_event") as mock_retry,
):
do_regenerate_api_key(user, user)
mock_retry.assert_called()
@ -1359,9 +1387,10 @@ class AnalyticsBouncerTest(BouncerTestCase):
ANALYTICS_URL = settings.PUSH_NOTIFICATION_BOUNCER_URL + "/api/v1/remotes/server/analytics"
ANALYTICS_STATUS_URL = ANALYTICS_URL + "/status"
with responses.RequestsMock() as resp, self.assertLogs(
"zulip.analytics", level="WARNING"
) as mock_warning:
with (
responses.RequestsMock() as resp,
self.assertLogs("zulip.analytics", level="WARNING") as mock_warning,
):
resp.add(responses.GET, ANALYTICS_STATUS_URL, body=ConnectionError())
Realm.objects.all().update(push_notifications_enabled=True)
send_server_data_to_push_bouncer()
@ -1374,9 +1403,10 @@ class AnalyticsBouncerTest(BouncerTestCase):
# Simulate ConnectionError again, but this time with a redis record indicating
# that push notifications have recently worked fine.
with responses.RequestsMock() as resp, self.assertLogs(
"zulip.analytics", level="WARNING"
) as mock_warning:
with (
responses.RequestsMock() as resp,
self.assertLogs("zulip.analytics", level="WARNING") as mock_warning,
):
resp.add(responses.GET, ANALYTICS_STATUS_URL, body=ConnectionError())
Realm.objects.all().update(push_notifications_enabled=True)
record_push_notifications_recently_working()
@ -1406,9 +1436,10 @@ class AnalyticsBouncerTest(BouncerTestCase):
redis_utils.REDIS_KEY_PREFIX + PUSH_NOTIFICATIONS_RECENTLY_WORKING_REDIS_KEY
)
with responses.RequestsMock() as resp, self.assertLogs(
"zulip.analytics", level="WARNING"
) as mock_warning:
with (
responses.RequestsMock() as resp,
self.assertLogs("zulip.analytics", level="WARNING") as mock_warning,
):
resp.add(responses.GET, ANALYTICS_STATUS_URL, body="This is not JSON")
Realm.objects.all().update(push_notifications_enabled=True)
send_server_data_to_push_bouncer()
@ -1431,9 +1462,10 @@ class AnalyticsBouncerTest(BouncerTestCase):
self.assertTrue(resp.assert_call_count(ANALYTICS_STATUS_URL, 1))
self.assertPushNotificationsAre(True)
with responses.RequestsMock() as resp, self.assertLogs(
"zulip.analytics", level="WARNING"
) as mock_warning:
with (
responses.RequestsMock() as resp,
self.assertLogs("zulip.analytics", level="WARNING") as mock_warning,
):
Realm.objects.all().update(push_notifications_enabled=True)
resp.add(
responses.GET,
@ -1449,9 +1481,10 @@ class AnalyticsBouncerTest(BouncerTestCase):
self.assertTrue(resp.assert_call_count(ANALYTICS_STATUS_URL, 1))
self.assertPushNotificationsAre(False)
with responses.RequestsMock() as resp, self.assertLogs(
"zulip.analytics", level="WARNING"
) as mock_warning:
with (
responses.RequestsMock() as resp,
self.assertLogs("zulip.analytics", level="WARNING") as mock_warning,
):
Realm.objects.all().update(push_notifications_enabled=True)
resp.add(
responses.GET,
@ -2406,10 +2439,13 @@ class AnalyticsBouncerTest(BouncerTestCase):
"corporate.lib.stripe.RemoteRealmBillingSession.current_count_for_billed_licenses",
return_value=11,
):
with mock.patch(
with (
mock.patch(
"corporate.lib.stripe.RemoteRealmBillingSession.get_next_billing_cycle",
return_value=dummy_date,
) as m, self.assertLogs("zulip.analytics", level="INFO") as info_log:
) as m,
self.assertLogs("zulip.analytics", level="INFO") as info_log,
):
send_server_data_to_push_bouncer(consider_usage_statistics=False)
m.assert_called()
realms = Realm.objects.all()
@ -2436,10 +2472,13 @@ class AnalyticsBouncerTest(BouncerTestCase):
"corporate.lib.stripe.RemoteRealmBillingSession.current_count_for_billed_licenses",
side_effect=MissingDataError,
):
with mock.patch(
with (
mock.patch(
"corporate.lib.stripe.RemoteRealmBillingSession.get_next_billing_cycle",
return_value=dummy_date,
) as m, self.assertLogs("zulip.analytics", level="INFO") as info_log:
) as m,
self.assertLogs("zulip.analytics", level="INFO") as info_log,
):
send_server_data_to_push_bouncer(consider_usage_statistics=False)
m.assert_called()
realms = Realm.objects.all()
@ -2514,18 +2553,21 @@ class AnalyticsBouncerTest(BouncerTestCase):
return dummy_remote_server_customer_plan
return None
with mock.patch(
with (
mock.patch(
"corporate.lib.stripe.RemoteRealmBillingSession.get_customer",
return_value=dummy_remote_realm_customer,
), mock.patch(
),
mock.patch(
"corporate.lib.stripe.RemoteServerBillingSession.get_customer",
return_value=dummy_remote_server_customer,
), mock.patch(
"zilencer.views.RemoteServerBillingSession.sync_license_ledger_if_needed"
), mock.patch(
),
mock.patch("zilencer.views.RemoteServerBillingSession.sync_license_ledger_if_needed"),
mock.patch(
"corporate.lib.stripe.get_current_plan_by_customer",
side_effect=get_current_plan_by_customer,
) as m:
) as m,
):
send_server_data_to_push_bouncer(consider_usage_statistics=False)
m.assert_called()
realms = Realm.objects.all()
@ -2536,9 +2578,10 @@ class AnalyticsBouncerTest(BouncerTestCase):
None,
)
with mock.patch("zerver.lib.remote_server.send_to_push_bouncer") as m, self.assertLogs(
"zulip.analytics", level="WARNING"
) as exception_log:
with (
mock.patch("zerver.lib.remote_server.send_to_push_bouncer") as m,
self.assertLogs("zulip.analytics", level="WARNING") as exception_log,
):
get_response = {
"last_realm_count_id": 0,
"last_installation_count_id": 0,
@ -2610,9 +2653,12 @@ class AnalyticsBouncerTest(BouncerTestCase):
# causing the bouncer to include its corresponding info in the response. Through
# that, we're testing our graceful handling of seeing a non-existent realm uuid
# in that response.
with mock.patch(
with (
mock.patch(
"zerver.lib.remote_server.get_realms_info_for_push_bouncer", return_value=realm_info
) as m, self.assertLogs(logger, level="WARNING") as analytics_logger:
) as m,
self.assertLogs(logger, level="WARNING") as analytics_logger,
):
send_server_data_to_push_bouncer(consider_usage_statistics=False)
m.assert_called()
realms = Realm.objects.all()
@ -2630,8 +2676,9 @@ class AnalyticsBouncerTest(BouncerTestCase):
# Now we want to test the other side of this - bouncer's handling
# of a deleted realm.
with self.assertLogs(logger, level="WARNING") as analytics_logger, mock.patch(
"zilencer.views.RemoteRealmBillingSession.on_paid_plan", return_value=True
with (
self.assertLogs(logger, level="WARNING") as analytics_logger,
mock.patch("zilencer.views.RemoteRealmBillingSession.on_paid_plan", return_value=True),
):
# This time the logger shouldn't get triggered - because the bouncer doesn't
# include .realm_locally_deleted realms in its response.
@ -2772,9 +2819,10 @@ class PushNotificationTest(BouncerTestCase):
@contextmanager
def mock_fcm(self) -> Iterator[tuple[mock.MagicMock, mock.MagicMock]]:
with mock.patch("zerver.lib.push_notifications.fcm_app") as mock_fcm_app, mock.patch(
"zerver.lib.push_notifications.firebase_messaging"
) as mock_fcm_messaging:
with (
mock.patch("zerver.lib.push_notifications.fcm_app") as mock_fcm_app,
mock.patch("zerver.lib.push_notifications.firebase_messaging") as mock_fcm_messaging,
):
yield mock_fcm_app, mock_fcm_messaging
def setup_fcm_tokens(self) -> None:
@ -2858,15 +2906,20 @@ class HandlePushNotificationTest(PushNotificationTest):
"message_id": message.id,
"trigger": NotificationTriggers.DIRECT_MESSAGE,
}
with time_machine.travel(time_received, tick=False), self.mock_fcm() as (
with (
time_machine.travel(time_received, tick=False),
self.mock_fcm() as (
mock_fcm_app,
mock_fcm_messaging,
), self.mock_apns() as (apns_context, send_notification), mock.patch(
),
self.mock_apns() as (apns_context, send_notification),
mock.patch(
"corporate.lib.stripe.RemoteRealmBillingSession.current_count_for_billed_licenses",
return_value=10,
), self.assertLogs(
"zerver.lib.push_notifications", level="INFO"
) as pn_logger, self.assertLogs("zilencer.views", level="INFO") as views_logger:
),
self.assertLogs("zerver.lib.push_notifications", level="INFO") as pn_logger,
self.assertLogs("zilencer.views", level="INFO") as views_logger,
):
apns_devices = [
(b64_to_hex(device.token), device.ios_app_id, device.token)
for device in RemotePushDeviceToken.objects.filter(kind=PushDeviceToken.APNS)
@ -2952,12 +3005,14 @@ class HandlePushNotificationTest(PushNotificationTest):
"message_id": message.id,
"trigger": NotificationTriggers.DIRECT_MESSAGE,
}
with mock.patch(
with (
mock.patch(
"corporate.lib.stripe.RemoteRealmBillingSession.current_count_for_billed_licenses",
return_value=100,
) as mock_current_count, self.assertLogs(
"zerver.lib.push_notifications", level="INFO"
) as pn_logger, self.assertLogs("zilencer.views", level="INFO"):
) as mock_current_count,
self.assertLogs("zerver.lib.push_notifications", level="INFO") as pn_logger,
self.assertLogs("zilencer.views", level="INFO"),
):
handle_push_notification(self.user_profile.id, missed_message)
self.assertEqual(
@ -3019,15 +3074,20 @@ class HandlePushNotificationTest(PushNotificationTest):
"message_id": message.id,
"trigger": NotificationTriggers.DIRECT_MESSAGE,
}
with time_machine.travel(time_received, tick=False), self.mock_fcm() as (
with (
time_machine.travel(time_received, tick=False),
self.mock_fcm() as (
mock_fcm_app,
mock_fcm_messaging,
), self.mock_apns() as (apns_context, send_notification), mock.patch(
),
self.mock_apns() as (apns_context, send_notification),
mock.patch(
"corporate.lib.stripe.RemoteRealmBillingSession.current_count_for_billed_licenses",
return_value=10,
), self.assertLogs(
"zerver.lib.push_notifications", level="INFO"
) as pn_logger, self.assertLogs("zilencer.views", level="INFO") as views_logger:
),
self.assertLogs("zerver.lib.push_notifications", level="INFO") as pn_logger,
self.assertLogs("zilencer.views", level="INFO") as views_logger,
):
apns_devices = [
(b64_to_hex(device.token), device.ios_app_id, device.token)
for device in RemotePushDeviceToken.objects.filter(kind=PushDeviceToken.APNS)
@ -3146,11 +3206,14 @@ class HandlePushNotificationTest(PushNotificationTest):
}
# If the message is unread, we should send push notifications.
with mock.patch(
with (
mock.patch(
"zerver.lib.push_notifications.send_apple_push_notification", return_value=1
) as mock_send_apple, mock.patch(
) as mock_send_apple,
mock.patch(
"zerver.lib.push_notifications.send_android_push_notification", return_value=1
) as mock_send_android:
) as mock_send_android,
):
handle_push_notification(user_profile.id, missed_message)
mock_send_apple.assert_called_once()
mock_send_android.assert_called_once()
@ -3158,11 +3221,14 @@ class HandlePushNotificationTest(PushNotificationTest):
# If the message has been read, don't send push notifications.
usermessage.flags.read = True
usermessage.save()
with mock.patch(
with (
mock.patch(
"zerver.lib.push_notifications.send_apple_push_notification", return_value=1
) as mock_send_apple, mock.patch(
) as mock_send_apple,
mock.patch(
"zerver.lib.push_notifications.send_android_push_notification", return_value=1
) as mock_send_android:
) as mock_send_android,
):
handle_push_notification(user_profile.id, missed_message)
mock_send_apple.assert_not_called()
mock_send_android.assert_not_called()
@ -3189,11 +3255,13 @@ class HandlePushNotificationTest(PushNotificationTest):
# This mock.patch() should be assertNoLogs once that feature
# is added to Python.
with mock.patch(
"zerver.lib.push_notifications.uses_notification_bouncer"
) as mock_check, mock.patch("logging.error") as mock_logging_error, mock.patch(
with (
mock.patch("zerver.lib.push_notifications.uses_notification_bouncer") as mock_check,
mock.patch("logging.error") as mock_logging_error,
mock.patch(
"zerver.lib.push_notifications.push_notifications_configured", return_value=True
) as mock_push_notifications:
) as mock_push_notifications,
):
handle_push_notification(user_profile.id, missed_message)
mock_push_notifications.assert_called_once()
# Check we didn't proceed through and didn't log anything.
@ -3221,11 +3289,13 @@ class HandlePushNotificationTest(PushNotificationTest):
message.delete()
# This should log an error
with mock.patch(
"zerver.lib.push_notifications.uses_notification_bouncer"
) as mock_check, self.assertLogs(level="INFO") as mock_logging_info, mock.patch(
with (
mock.patch("zerver.lib.push_notifications.uses_notification_bouncer") as mock_check,
self.assertLogs(level="INFO") as mock_logging_info,
mock.patch(
"zerver.lib.push_notifications.push_notifications_configured", return_value=True
) as mock_push_notifications:
) as mock_push_notifications,
):
handle_push_notification(user_profile.id, missed_message)
mock_push_notifications.assert_called_once()
# Check we didn't proceed through.
@ -3256,12 +3326,17 @@ class HandlePushNotificationTest(PushNotificationTest):
"message_id": message.id,
"trigger": NotificationTriggers.DIRECT_MESSAGE,
}
with self.settings(PUSH_NOTIFICATION_BOUNCER_URL=True), mock.patch(
"zerver.lib.push_notifications.get_message_payload_apns", return_value={"apns": True}
), mock.patch(
with (
self.settings(PUSH_NOTIFICATION_BOUNCER_URL=True),
mock.patch(
"zerver.lib.push_notifications.get_message_payload_apns",
return_value={"apns": True},
),
mock.patch(
"zerver.lib.push_notifications.get_message_payload_gcm",
return_value=({"gcm": True}, {}),
), mock.patch(
),
mock.patch(
"zerver.lib.push_notifications.send_json_to_push_bouncer",
return_value=dict(
total_android_devices=3,
@ -3269,9 +3344,9 @@ class HandlePushNotificationTest(PushNotificationTest):
deleted_devices=DevicesToCleanUpDict(android_devices=[], apple_devices=[]),
realm=None,
),
) as mock_send, self.assertLogs(
"zerver.lib.push_notifications", level="INFO"
) as mock_logging_info:
) as mock_send,
self.assertLogs("zerver.lib.push_notifications", level="INFO") as mock_logging_info,
):
handle_push_notification(user_profile.id, missed_message)
mock_send.assert_called_with(
"POST",
@ -3329,22 +3404,29 @@ class HandlePushNotificationTest(PushNotificationTest):
"message_id": message.id,
"trigger": NotificationTriggers.DIRECT_MESSAGE,
}
with mock.patch(
"zerver.lib.push_notifications.get_message_payload_apns", return_value={"apns": True}
), mock.patch(
with (
mock.patch(
"zerver.lib.push_notifications.get_message_payload_apns",
return_value={"apns": True},
),
mock.patch(
"zerver.lib.push_notifications.get_message_payload_gcm",
return_value=({"gcm": True}, {}),
), mock.patch(
),
mock.patch(
# Simulate the send...push_notification functions returning a number of successes
# lesser than the number of devices, so that we can verify correct CountStat counting.
"zerver.lib.push_notifications.send_apple_push_notification",
return_value=len(apple_devices) - 1,
) as mock_send_apple, mock.patch(
) as mock_send_apple,
mock.patch(
"zerver.lib.push_notifications.send_android_push_notification",
return_value=len(android_devices) - 1,
) as mock_send_android, mock.patch(
) as mock_send_android,
mock.patch(
"zerver.lib.push_notifications.push_notifications_configured", return_value=True
) as mock_push_notifications:
) as mock_push_notifications,
):
handle_push_notification(self.user_profile.id, missed_message)
user_identity = UserPushIdentityCompat(user_id=self.user_profile.id)
mock_send_apple.assert_called_with(user_identity, apple_devices, {"apns": True})
@ -3377,9 +3459,10 @@ class HandlePushNotificationTest(PushNotificationTest):
flags=UserMessage.flags.active_mobile_push_notification,
)
with self.settings(PUSH_NOTIFICATION_BOUNCER_URL=True), mock.patch(
"zerver.lib.push_notifications.send_notifications_to_bouncer"
) as mock_send:
with (
self.settings(PUSH_NOTIFICATION_BOUNCER_URL=True),
mock.patch("zerver.lib.push_notifications.send_notifications_to_bouncer") as mock_send,
):
handle_remove_push_notification(user_profile.id, [message.id])
mock_send.assert_called_with(
user_profile,
@ -3446,17 +3529,21 @@ class HandlePushNotificationTest(PushNotificationTest):
PushDeviceToken.objects.filter(user=self.user_profile, kind=PushDeviceToken.APNS)
)
with mock.patch(
with (
mock.patch(
"zerver.lib.push_notifications.push_notifications_configured", return_value=True
) as mock_push_notifications, mock.patch(
) as mock_push_notifications,
mock.patch(
# Simulate the send...push_notification functions returning a number of successes
# lesser than the number of devices, so that we can verify correct CountStat counting.
"zerver.lib.push_notifications.send_android_push_notification",
return_value=len(apple_devices) - 1,
) as mock_send_android, mock.patch(
) as mock_send_android,
mock.patch(
"zerver.lib.push_notifications.send_apple_push_notification",
return_value=len(apple_devices) - 1,
) as mock_send_apple:
) as mock_send_apple,
):
handle_remove_push_notification(self.user_profile.id, [message.id])
mock_push_notifications.assert_called_once()
user_identity = UserPushIdentityCompat(user_id=self.user_profile.id)
@ -3517,9 +3604,12 @@ class HandlePushNotificationTest(PushNotificationTest):
self.subscribe(sender, "public_stream")
message_id = self.send_stream_message(sender, "public_stream", "test")
missed_message = {"message_id": message_id}
with self.assertLogs("zerver.lib.push_notifications", level="ERROR") as logger, mock.patch(
with (
self.assertLogs("zerver.lib.push_notifications", level="ERROR") as logger,
mock.patch(
"zerver.lib.push_notifications.push_notifications_configured", return_value=True
) as mock_push_notifications:
) as mock_push_notifications,
):
handle_push_notification(self.user_profile.id, missed_message)
self.assertEqual(
"ERROR:zerver.lib.push_notifications:"
@ -3539,13 +3629,17 @@ class HandlePushNotificationTest(PushNotificationTest):
sender = self.example_user("iago")
self.subscribe(sender, "public_stream")
message_id = self.send_stream_message(sender, "public_stream", "test")
with mock.patch(
with (
mock.patch(
"zerver.lib.push_notifications.push_notifications_configured", return_value=True
) as mock_push_notifications, mock.patch(
) as mock_push_notifications,
mock.patch(
"zerver.lib.push_notifications.send_android_push_notification", return_value=1
) as mock_send_android, mock.patch(
) as mock_send_android,
mock.patch(
"zerver.lib.push_notifications.send_apple_push_notification", return_value=1
) as mock_send_apple:
) as mock_send_apple,
):
handle_remove_push_notification(self.user_profile.id, [message_id])
mock_push_notifications.assert_called_once()
mock_send_android.assert_called_once()
@ -3586,20 +3680,26 @@ class HandlePushNotificationTest(PushNotificationTest):
PushDeviceToken.objects.filter(user=self.user_profile, kind=PushDeviceToken.APNS)
)
with mock.patch(
"zerver.lib.push_notifications.get_message_payload_apns", return_value={"apns": True}
), mock.patch(
with (
mock.patch(
"zerver.lib.push_notifications.get_message_payload_apns",
return_value={"apns": True},
),
mock.patch(
"zerver.lib.push_notifications.get_message_payload_gcm",
return_value=({"gcm": True}, {}),
), mock.patch(
),
mock.patch(
"zerver.lib.push_notifications.send_apple_push_notification", return_value=1
) as mock_send_apple, mock.patch(
) as mock_send_apple,
mock.patch(
"zerver.lib.push_notifications.send_android_push_notification", return_value=1
) as mock_send_android, mock.patch(
"zerver.lib.push_notifications.logger.error"
) as mock_logger, mock.patch(
) as mock_send_android,
mock.patch("zerver.lib.push_notifications.logger.error") as mock_logger,
mock.patch(
"zerver.lib.push_notifications.push_notifications_configured", return_value=True
) as mock_push_notifications:
) as mock_push_notifications,
):
handle_push_notification(self.user_profile.id, missed_message)
mock_logger.assert_not_called()
user_identity = UserPushIdentityCompat(user_id=self.user_profile.id)
@ -3812,9 +3912,10 @@ class TestAPNs(PushNotificationTest):
zerver.lib.push_notifications.get_apns_context.cache_clear()
try:
with self.settings(APNS_CERT_FILE="/foo.pem"), mock.patch(
"ssl.SSLContext.load_cert_chain"
) as mock_load_cert_chain:
with (
self.settings(APNS_CERT_FILE="/foo.pem"),
mock.patch("ssl.SSLContext.load_cert_chain") as mock_load_cert_chain,
):
apns_context = get_apns_context()
assert apns_context is not None
try:
@ -3829,9 +3930,10 @@ class TestAPNs(PushNotificationTest):
def test_not_configured(self) -> None:
self.setup_apns_tokens()
with mock.patch(
"zerver.lib.push_notifications.get_apns_context"
) as mock_get, self.assertLogs("zerver.lib.push_notifications", level="DEBUG") as logger:
with (
mock.patch("zerver.lib.push_notifications.get_apns_context") as mock_get,
self.assertLogs("zerver.lib.push_notifications", level="DEBUG") as logger,
):
mock_get.return_value = None
self.send()
notification_drop_log = (
@ -3855,9 +3957,10 @@ class TestAPNs(PushNotificationTest):
def test_success(self) -> None:
self.setup_apns_tokens()
with self.mock_apns() as (apns_context, send_notification), self.assertLogs(
"zerver.lib.push_notifications", level="INFO"
) as logger:
with (
self.mock_apns() as (apns_context, send_notification),
self.assertLogs("zerver.lib.push_notifications", level="INFO") as logger,
):
send_notification.return_value.is_successful = True
self.send()
for device in self.devices():
@ -3868,9 +3971,10 @@ class TestAPNs(PushNotificationTest):
def test_http_retry_eventually_fails(self) -> None:
self.setup_apns_tokens()
with self.mock_apns() as (apns_context, send_notification), self.assertLogs(
"zerver.lib.push_notifications", level="INFO"
) as logger:
with (
self.mock_apns() as (apns_context, send_notification),
self.assertLogs("zerver.lib.push_notifications", level="INFO") as logger,
):
send_notification.side_effect = aioapns.exceptions.ConnectionError()
self.send(devices=self.devices()[0:1])
self.assertIn(
@ -3880,9 +3984,10 @@ class TestAPNs(PushNotificationTest):
def test_other_exception(self) -> None:
self.setup_apns_tokens()
with self.mock_apns() as (apns_context, send_notification), self.assertLogs(
"zerver.lib.push_notifications", level="INFO"
) as logger:
with (
self.mock_apns() as (apns_context, send_notification),
self.assertLogs("zerver.lib.push_notifications", level="INFO") as logger,
):
send_notification.side_effect = IOError
self.send(devices=self.devices()[0:1])
self.assertIn(
@ -3892,9 +3997,10 @@ class TestAPNs(PushNotificationTest):
def test_internal_server_error(self) -> None:
self.setup_apns_tokens()
with self.mock_apns() as (apns_context, send_notification), self.assertLogs(
"zerver.lib.push_notifications", level="INFO"
) as logger:
with (
self.mock_apns() as (apns_context, send_notification),
self.assertLogs("zerver.lib.push_notifications", level="INFO") as logger,
):
send_notification.return_value.is_successful = False
send_notification.return_value.description = "InternalServerError"
self.send(devices=self.devices()[0:1])
@ -3911,9 +4017,10 @@ class TestAPNs(PushNotificationTest):
user_id=self.user_profile.id,
server=self.server,
)
with self.mock_apns() as (apns_context, send_notification), self.assertLogs(
"zerver.lib.push_notifications", level="INFO"
) as logger:
with (
self.mock_apns() as (apns_context, send_notification),
self.assertLogs("zerver.lib.push_notifications", level="INFO") as logger,
):
send_notification.return_value.is_successful = True
self.send(devices=[device])
self.assertIn(
@ -4517,11 +4624,10 @@ class TestSendNotificationsToBouncer(PushNotificationTest):
def test_send_notifications_to_bouncer_when_no_devices(self) -> None:
user = self.example_user("hamlet")
with mock.patch(
"zerver.lib.remote_server.send_to_push_bouncer"
) as mock_send, self.assertLogs(
"zerver.lib.push_notifications", level="INFO"
) as mock_logging_info:
with (
mock.patch("zerver.lib.remote_server.send_to_push_bouncer") as mock_send,
self.assertLogs("zerver.lib.push_notifications", level="INFO") as mock_logging_info,
):
send_notifications_to_bouncer(
user, {"apns": True}, {"gcm": True}, {}, android_devices=[], apple_devices=[]
)
@ -4710,9 +4816,10 @@ class TestPushApi(BouncerTestCase):
self.assert_json_error(result, "Token does not exist")
# Use push notification bouncer and try to remove non-existing tokens.
with self.settings(
PUSH_NOTIFICATION_BOUNCER_URL="https://push.zulip.org.example.com"
), responses.RequestsMock() as resp:
with (
self.settings(PUSH_NOTIFICATION_BOUNCER_URL="https://push.zulip.org.example.com"),
responses.RequestsMock() as resp,
):
assert settings.PUSH_NOTIFICATION_BOUNCER_URL is not None
URL = settings.PUSH_NOTIFICATION_BOUNCER_URL + "/api/v1/remotes/push/unregister"
resp.add_callback(responses.POST, URL, callback=self.request_callback)

View File

@ -93,9 +93,10 @@ class TestQueueImplementation(ZulipTestCase):
raise AMQPConnectionError("test")
actual_publish(*args, **kwargs)
with mock.patch(
"zerver.lib.queue.SimpleQueueClient.publish", throw_connection_error_once
), self.assertLogs("zulip.queue", level="WARN") as warn_logs:
with (
mock.patch("zerver.lib.queue.SimpleQueueClient.publish", throw_connection_error_once),
self.assertLogs("zulip.queue", level="WARN") as warn_logs,
):
queue_json_publish("test_suite", {"event": "my_event"})
self.assertEqual(
warn_logs.output,

View File

@ -312,9 +312,10 @@ class WorkerTest(ZulipTestCase):
# Enqueues the events to the internal queue, as if from RabbitMQ
time_zero = datetime(2021, 1, 1, tzinfo=timezone.utc)
with time_machine.travel(time_zero, tick=False), patch.object(
mmw.cv, "notify"
) as notify_mock:
with (
time_machine.travel(time_zero, tick=False),
patch.object(mmw.cv, "notify") as notify_mock,
):
for event in events:
mmw.consume_single_event(event)
# All of these notify, because has_timeout is still false in
@ -351,9 +352,10 @@ class WorkerTest(ZulipTestCase):
# `expected_scheduled_timestamp` as the earlier events.
few_moments_later = time_zero + timedelta(seconds=3)
with time_machine.travel(few_moments_later, tick=False), patch.object(
mmw.cv, "notify"
) as notify_mock:
with (
time_machine.travel(few_moments_later, tick=False),
patch.object(mmw.cv, "notify") as notify_mock,
):
mmw.consume_single_event(bonus_event_hamlet)
self.assertEqual(notify_mock.call_count, 0)
@ -424,12 +426,14 @@ class WorkerTest(ZulipTestCase):
# details, but the summary is that IntegrityErrors due to database constraints are raised at
# the end of the test, not inside the `try` block. So, we have the code inside the `try` block
# raise `IntegrityError` by mocking.
with patch(
with (
patch(
"zerver.models.ScheduledMessageNotificationEmail.objects.create",
side_effect=IntegrityError,
), self.assertLogs(level="DEBUG") as debug_logs, patch.object(
mmw.cv, "notify"
) as notify_mock:
),
self.assertLogs(level="DEBUG") as debug_logs,
patch.object(mmw.cv, "notify") as notify_mock,
):
mmw.consume_single_event(hamlet_event1)
self.assertEqual(notify_mock.call_count, 0)
self.assertIn(
@ -441,9 +445,10 @@ class WorkerTest(ZulipTestCase):
# throws an exception. First, enqueue the messages, and get
# them to create database rows:
time_zero = datetime(2021, 1, 1, tzinfo=timezone.utc)
with time_machine.travel(time_zero, tick=False), patch.object(
mmw.cv, "notify"
) as notify_mock:
with (
time_machine.travel(time_zero, tick=False),
patch.object(mmw.cv, "notify") as notify_mock,
):
mmw.consume_single_event(hamlet_event1)
mmw.consume_single_event(hamlet_event2)
mmw.consume_single_event(othello_event)
@ -458,9 +463,11 @@ class WorkerTest(ZulipTestCase):
raise RuntimeError
one_minute_overdue = expected_scheduled_timestamp + timedelta(seconds=60)
with time_machine.travel(one_minute_overdue, tick=False), self.assertLogs(
level="ERROR"
) as error_logs, send_mock as sm:
with (
time_machine.travel(one_minute_overdue, tick=False),
self.assertLogs(level="ERROR") as error_logs,
send_mock as sm,
):
sm.side_effect = fail_some
has_timeout = advance()
self.assertTrue(has_timeout)
@ -500,12 +507,16 @@ class WorkerTest(ZulipTestCase):
with simulated_queue_client(fake_client):
worker = PushNotificationsWorker()
worker.setup()
with patch(
with (
patch(
"zerver.worker.missedmessage_mobile_notifications.handle_push_notification"
) as mock_handle_new, patch(
) as mock_handle_new,
patch(
"zerver.worker.missedmessage_mobile_notifications.handle_remove_push_notification"
) as mock_handle_remove, patch(
) as mock_handle_remove,
patch(
"zerver.worker.missedmessage_mobile_notifications.initialize_push_notifications"
),
):
event_new = generate_new_message_notification()
event_remove = generate_remove_notification()
@ -518,25 +529,32 @@ class WorkerTest(ZulipTestCase):
event_remove["user_profile_id"], event_remove["message_ids"]
)
with patch(
with (
patch(
"zerver.worker.missedmessage_mobile_notifications.handle_push_notification",
side_effect=PushNotificationBouncerRetryLaterError("test"),
) as mock_handle_new, patch(
) as mock_handle_new,
patch(
"zerver.worker.missedmessage_mobile_notifications.handle_remove_push_notification",
side_effect=PushNotificationBouncerRetryLaterError("test"),
) as mock_handle_remove, patch(
) as mock_handle_remove,
patch(
"zerver.worker.missedmessage_mobile_notifications.initialize_push_notifications"
),
):
event_new = generate_new_message_notification()
event_remove = generate_remove_notification()
fake_client.enqueue("missedmessage_mobile_notifications", event_new)
fake_client.enqueue("missedmessage_mobile_notifications", event_remove)
with mock_queue_publish(
with (
mock_queue_publish(
"zerver.lib.queue.queue_json_publish", side_effect=fake_publish
), self.assertLogs(
),
self.assertLogs(
"zerver.worker.missedmessage_mobile_notifications", "WARNING"
) as warn_logs:
) as warn_logs,
):
worker.start()
self.assertEqual(mock_handle_new.call_count, 1 + MAX_REQUEST_RETRIES)
self.assertEqual(mock_handle_remove.call_count, 1 + MAX_REQUEST_RETRIES)
@ -588,9 +606,10 @@ class WorkerTest(ZulipTestCase):
for element in data:
fake_client.enqueue("email_mirror", element)
with simulated_queue_client(fake_client), self.assertLogs(
"zerver.worker.email_mirror", level="WARNING"
) as warn_logs:
with (
simulated_queue_client(fake_client),
self.assertLogs("zerver.worker.email_mirror", level="WARNING") as warn_logs,
):
start_time = time.time()
with patch("time.time", return_value=start_time):
worker = MirrorWorker()
@ -668,11 +687,11 @@ class WorkerTest(ZulipTestCase):
with simulated_queue_client(fake_client):
worker = EmailSendingWorker()
worker.setup()
with patch(
"zerver.lib.send_email.build_email", side_effect=EmailNotDeliveredError
), mock_queue_publish(
"zerver.lib.queue.queue_json_publish", side_effect=fake_publish
), self.assertLogs(level="ERROR") as m:
with (
patch("zerver.lib.send_email.build_email", side_effect=EmailNotDeliveredError),
mock_queue_publish("zerver.lib.queue.queue_json_publish", side_effect=fake_publish),
self.assertLogs(level="ERROR") as m,
):
worker.start()
self.assertIn("failed due to exception EmailNotDeliveredError", m.output[0])

View File

@ -81,9 +81,12 @@ class RealmTest(ZulipTestCase):
def test_realm_creation_ensures_internal_realms(self) -> None:
with mock.patch("zerver.actions.create_realm.server_initialized", return_value=False):
with mock.patch(
with (
mock.patch(
"zerver.actions.create_realm.create_internal_realm"
) as mock_create_internal, self.assertLogs(level="INFO") as info_logs:
) as mock_create_internal,
self.assertLogs(level="INFO") as info_logs,
):
do_create_realm("testrealm", "Test Realm")
mock_create_internal.assert_called_once()
self.assertEqual(

View File

@ -45,9 +45,11 @@ class RealmExportTest(ZulipTestCase):
# Test the export logic.
with patch("zerver.lib.export.do_export_realm", return_value=tarball_path) as mock_export:
with self.settings(LOCAL_UPLOADS_DIR=None), stdout_suppressed(), self.assertLogs(
level="INFO"
) as info_logs:
with (
self.settings(LOCAL_UPLOADS_DIR=None),
stdout_suppressed(),
self.assertLogs(level="INFO") as info_logs,
):
with self.captureOnCommitCallbacks(execute=True):
result = self.client_post("/json/export/realm")
self.assertTrue("INFO:root:Completed data export for zulip in " in info_logs.output[0])

View File

@ -877,8 +877,9 @@ class RocketChatImporter(ZulipTestCase):
rocketchat_data_dir = self.fixture_file_name("", "rocketchat_fixtures")
output_dir = self.make_import_output_dir("rocketchat")
with self.assertLogs(level="INFO") as info_log, self.settings(
EXTERNAL_HOST="zulip.example.com"
with (
self.assertLogs(level="INFO") as info_log,
self.settings(EXTERNAL_HOST="zulip.example.com"),
):
# We need to mock EXTERNAL_HOST to be a valid domain because rocketchat's importer
# uses it to generate email addresses for users without an email specified.

View File

@ -100,11 +100,13 @@ class TestExceptionDetailsNotRevealedToClient(SCIMTestCase):
Verify that, unlike in default django-scim2 behavior, details of an exception
are not revealed in the HttpResponse.
"""
with mock.patch(
with (
mock.patch(
"zerver.lib.scim.ZulipSCIMUser.to_dict", side_effect=Exception("test exception")
), self.assertLogs("django_scim.views", "ERROR") as mock_scim_logger, self.assertLogs(
"django.request", "ERROR"
) as mock_request_logger:
),
self.assertLogs("django_scim.views", "ERROR") as mock_scim_logger,
self.assertLogs("django.request", "ERROR") as mock_request_logger,
):
result = self.client_get("/scim/v2/Users", {}, **self.scim_headers())
# Only a generic error message is returned:
self.assertEqual(

View File

@ -314,9 +314,12 @@ class ChangeSettingsTest(ZulipTestCase):
)
self.assert_json_error(result, "Your Zulip password is managed in LDAP")
with self.settings(
with (
self.settings(
LDAP_APPEND_DOMAIN="example.com", AUTH_LDAP_USER_ATTR_MAP=ldap_user_attr_map
), self.assertLogs("zulip.ldap", "DEBUG") as debug_log:
),
self.assertLogs("zulip.ldap", "DEBUG") as debug_log,
):
result = self.client_patch(
"/json/settings",
dict(

View File

@ -971,13 +971,16 @@ class LoginTest(ZulipTestCase):
user_profile.set_password(password)
user_profile.save()
with self.settings(
with (
self.settings(
PASSWORD_HASHERS=(
"django.contrib.auth.hashers.MD5PasswordHasher",
"django.contrib.auth.hashers.SHA1PasswordHasher",
),
PASSWORD_MIN_LENGTH=30,
), self.assertLogs("zulip.auth.email", level="INFO"):
),
self.assertLogs("zulip.auth.email", level="INFO"),
):
result = self.login_with_return(self.example_email("hamlet"), password)
self.assertEqual(result.status_code, 200)
self.assert_in_response(
@ -3305,13 +3308,15 @@ class UserSignUpTest(ZulipTestCase):
result = self.client_get(result["Location"])
self.assert_in_response("check your email", result)
with self.settings(
with (
self.settings(
POPULATE_PROFILE_VIA_LDAP=True,
LDAP_APPEND_DOMAIN="zulip.com",
AUTH_LDAP_USER_ATTR_MAP=ldap_user_attr_map,
), self.assertLogs("zulip.ldap", level="DEBUG") as ldap_logs, self.assertLogs(
level="WARNING"
) as root_logs:
),
self.assertLogs("zulip.ldap", level="DEBUG") as ldap_logs,
self.assertLogs(level="WARNING") as root_logs,
):
# Click confirmation link
result = self.submit_reg_form_for_user(
email,
@ -3537,9 +3542,12 @@ class UserSignUpTest(ZulipTestCase):
self.change_ldap_user_attr("newuser_with_email", "mail", "thisisnotavalidemail")
with self.settings(
with (
self.settings(
LDAP_EMAIL_ATTR="mail",
), self.assertLogs("zulip.auth.ldap", "WARNING") as mock_log:
),
self.assertLogs("zulip.auth.ldap", "WARNING") as mock_log,
):
original_user_count = UserProfile.objects.count()
self.login_with_return(username, password, HTTP_HOST=subdomain + ".testserver")
# Verify that the process failed as intended - no UserProfile is created.
@ -3688,11 +3696,14 @@ class UserSignUpTest(ZulipTestCase):
# If the user's email is not in the LDAP directory, but fits LDAP_APPEND_DOMAIN,
# we refuse to create the account.
with self.settings(
with (
self.settings(
POPULATE_PROFILE_VIA_LDAP=True,
LDAP_APPEND_DOMAIN="zulip.com",
AUTH_LDAP_USER_ATTR_MAP=ldap_user_attr_map,
), self.assertLogs("zulip.ldap", "DEBUG") as debug_log:
),
self.assertLogs("zulip.ldap", "DEBUG") as debug_log,
):
result = self.submit_reg_form_for_user(
email,
password,
@ -4099,9 +4110,10 @@ class UserSignUpTest(ZulipTestCase):
# (this is an invalid state, so it's a bug we got here):
change_user_is_active(user_profile, True)
with self.assertRaisesRegex(
AssertionError, "Mirror dummy user is already active!"
), self.assertLogs("django.request", "ERROR") as error_log:
with (
self.assertRaisesRegex(AssertionError, "Mirror dummy user is already active!"),
self.assertLogs("django.request", "ERROR") as error_log,
):
result = self.submit_reg_form_for_user(
email,
password,
@ -4153,9 +4165,10 @@ class UserSignUpTest(ZulipTestCase):
user_profile.save()
change_user_is_active(user_profile, True)
with self.assertRaisesRegex(
AssertionError, "Mirror dummy user is already active!"
), self.assertLogs("django.request", "ERROR") as error_log:
with (
self.assertRaisesRegex(AssertionError, "Mirror dummy user is already active!"),
self.assertLogs("django.request", "ERROR") as error_log,
):
self.client_post("/register/", {"email": email}, subdomain="zephyr")
self.assertTrue(
"ERROR:django.request:Internal Server Error: /register/" in error_log.output[0]

View File

@ -1,6 +1,6 @@
import zoneinfo
from datetime import datetime, timezone
import zoneinfo
from django.utils.timezone import now as timezone_now
from zerver.lib.test_classes import ZulipTestCase

View File

@ -1297,16 +1297,18 @@ class AvatarTest(UploadSerializeMixin, ZulipTestCase):
source_original_path_id = avatar_disk_path(source_user_profile, original=True)
target_original_path_id = avatar_disk_path(target_user_profile, original=True)
with open(source_original_path_id, "rb") as source, open(
target_original_path_id, "rb"
) as target:
with (
open(source_original_path_id, "rb") as source,
open(target_original_path_id, "rb") as target,
):
self.assertEqual(source.read(), target.read())
source_medium_path_id = avatar_disk_path(source_user_profile, medium=True)
target_medium_path_id = avatar_disk_path(target_user_profile, medium=True)
with open(source_medium_path_id, "rb") as source, open(
target_medium_path_id, "rb"
) as target:
with (
open(source_medium_path_id, "rb") as source,
open(target_medium_path_id, "rb") as target,
):
self.assertEqual(source.read(), target.read())
def test_delete_avatar_image(self) -> None:

View File

@ -51,9 +51,12 @@ class ZephyrTest(ZulipTestCase):
result = post("zephyr", cred=cred)
self.assert_json_error(result, "Invalid Kerberos cache")
with ccache_mock(return_value=b"1234"), ssh_mock(
side_effect=subprocess.CalledProcessError(1, [])
), mirror_mock(), self.assertLogs(level="ERROR") as log:
with (
ccache_mock(return_value=b"1234"),
ssh_mock(side_effect=subprocess.CalledProcessError(1, [])),
mirror_mock(),
self.assertLogs(level="ERROR") as log,
):
result = post("zephyr", cred=cred)
self.assert_json_error(result, "We were unable to set up mirroring for you")
@ -87,9 +90,12 @@ class ZephyrTest(ZulipTestCase):
)
cred = dict(cname=dict(nameString=["kerberos_alter_ego"]))
with ccache_mock(
return_value=b"1234"
), mirror_mock(), ssh_mock() as ssh, kerberos_alter_egos_mock():
with (
ccache_mock(return_value=b"1234"),
mirror_mock(),
ssh_mock() as ssh,
kerberos_alter_egos_mock(),
):
result = post("zephyr", cred=cred)
self.assert_json_success(result)

View File

@ -27,9 +27,10 @@ def dev_update_subgroups(
# The test is expected to set up the barrier before accessing this endpoint.
assert BARRIER is not None
try:
with transaction.atomic(), mock.patch(
"zerver.lib.user_groups.access_user_group_by_id"
) as m:
with (
transaction.atomic(),
mock.patch("zerver.lib.user_groups.access_user_group_by_id") as m,
):
def wait_after_recursive_query(*args: Any, **kwargs: Any) -> UserGroup:
# When updating the subgroups, we access the supergroup group