ruff: Fix SIM117 Use a single `with` statement with multiple contexts.

Signed-off-by: Anders Kaseorg <anders@zulip.com>
This commit is contained in:
Anders Kaseorg 2024-07-14 11:30:42 -07:00 committed by Tim Abbott
parent b0f144327d
commit b96feb34f6
47 changed files with 1380 additions and 1141 deletions

View File

@ -560,11 +560,14 @@ class RemoteBillingAuthenticationTest(RemoteRealmBillingTestCase):
)
# Try the case where the identity dict is simultaneously expired.
with time_machine.travel(
with (
time_machine.travel(
now + timedelta(seconds=REMOTE_BILLING_SESSION_VALIDITY_SECONDS + 30),
tick=False,
),
self.assertLogs("django.request", "ERROR") as m,
self.assertRaises(AssertionError),
):
with self.assertLogs("django.request", "ERROR") as m, self.assertRaises(AssertionError):
self.client_get(final_url, subdomain="selfhosting")
# The django.request log should be a traceback, mentioning the relevant
# exceptions that occurred.

View File

@ -1415,8 +1415,10 @@ class StripeTest(StripeTestCase):
self.assertFalse(Customer.objects.filter(realm=user.realm).exists())
# Require free trial users to add a credit card.
with time_machine.travel(self.now, tick=False):
with self.assertLogs("corporate.stripe", "WARNING"):
with (
time_machine.travel(self.now, tick=False),
self.assertLogs("corporate.stripe", "WARNING"),
):
response = self.upgrade()
self.assert_json_error(
response, "Please add a credit card before starting your free trial."
@ -1953,10 +1955,12 @@ class StripeTest(StripeTestCase):
initial_upgrade_request
)
# Change the seat count while the user is going through the upgrade flow
with patch("corporate.lib.stripe.get_latest_seat_count", return_value=new_seat_count):
with patch(
with (
patch("corporate.lib.stripe.get_latest_seat_count", return_value=new_seat_count),
patch(
"corporate.lib.stripe.RealmBillingSession.get_initial_upgrade_context",
return_value=(_, context_when_upgrade_page_is_rendered),
),
):
self.add_card_and_upgrade(hamlet)
@ -2072,8 +2076,10 @@ class StripeTest(StripeTestCase):
hamlet = self.example_user("hamlet")
self.login_user(hamlet)
self.local_upgrade(self.seat_count, True, CustomerPlan.BILLING_SCHEDULE_ANNUAL, True, False)
with self.assertLogs("corporate.stripe", "WARNING") as m:
with self.assertRaises(BillingError) as context:
with (
self.assertLogs("corporate.stripe", "WARNING") as m,
self.assertRaises(BillingError) as context,
):
self.local_upgrade(
self.seat_count, True, CustomerPlan.BILLING_SCHEDULE_ANNUAL, True, False
)
@ -2197,10 +2203,12 @@ class StripeTest(StripeTestCase):
else:
del_args = []
upgrade_params["licenses"] = licenses
with patch("corporate.lib.stripe.BillingSession.process_initial_upgrade"):
with patch(
with (
patch("corporate.lib.stripe.BillingSession.process_initial_upgrade"),
patch(
"corporate.lib.stripe.BillingSession.create_stripe_invoice_and_charge",
return_value="fake_stripe_invoice_id",
),
):
response = self.upgrade(
invoice=invoice, talk_to_stripe=False, del_args=del_args, **upgrade_params
@ -2911,8 +2919,10 @@ class StripeTest(StripeTestCase):
assert plan is not None
self.assertEqual(plan.licenses(), self.seat_count)
self.assertEqual(plan.licenses_at_next_renewal(), self.seat_count)
with self.assertLogs("corporate.stripe", "INFO") as m:
with time_machine.travel(self.now, tick=False):
with (
self.assertLogs("corporate.stripe", "INFO") as m,
time_machine.travel(self.now, tick=False),
):
response = self.client_billing_patch(
"/billing/plan",
{"status": CustomerPlan.DOWNGRADE_AT_END_OF_CYCLE},
@ -3034,8 +3044,10 @@ class StripeTest(StripeTestCase):
new_plan = get_current_plan_by_realm(user.realm)
assert new_plan is not None
with self.assertLogs("corporate.stripe", "INFO") as m:
with time_machine.travel(self.now, tick=False):
with (
self.assertLogs("corporate.stripe", "INFO") as m,
time_machine.travel(self.now, tick=False),
):
response = self.client_billing_patch(
"/billing/plan",
{"status": CustomerPlan.SWITCH_TO_ANNUAL_AT_END_OF_CYCLE},
@ -3062,8 +3074,10 @@ class StripeTest(StripeTestCase):
(20, 20),
)
with time_machine.travel(self.next_month, tick=False):
with patch("corporate.lib.stripe.get_latest_seat_count", return_value=25):
with (
time_machine.travel(self.next_month, tick=False),
patch("corporate.lib.stripe.get_latest_seat_count", return_value=25),
):
billing_session.update_license_ledger_if_needed(self.next_month)
self.assertEqual(LicenseLedger.objects.filter(plan=monthly_plan).count(), 2)
customer = get_customer_by_realm(user.realm)
@ -3230,8 +3244,10 @@ class StripeTest(StripeTestCase):
stripe_customer_id = Customer.objects.get(realm=user.realm).id
new_plan = get_current_plan_by_realm(user.realm)
assert new_plan is not None
with self.assertLogs("corporate.stripe", "INFO") as m:
with time_machine.travel(self.now, tick=False):
with (
self.assertLogs("corporate.stripe", "INFO") as m,
time_machine.travel(self.now, tick=False),
):
response = self.client_billing_patch(
"/billing/plan",
{"status": CustomerPlan.SWITCH_TO_ANNUAL_AT_END_OF_CYCLE},
@ -3343,8 +3359,10 @@ class StripeTest(StripeTestCase):
assert new_plan is not None
assert self.now is not None
with self.assertLogs("corporate.stripe", "INFO") as m:
with time_machine.travel(self.now, tick=False):
with (
self.assertLogs("corporate.stripe", "INFO") as m,
time_machine.travel(self.now, tick=False),
):
response = self.client_billing_patch(
"/billing/plan",
{"status": CustomerPlan.SWITCH_TO_MONTHLY_AT_END_OF_CYCLE},
@ -3375,8 +3393,10 @@ class StripeTest(StripeTestCase):
# additional licenses) but at the end of current billing cycle.
self.assertEqual(annual_plan.next_invoice_date, self.next_month)
assert annual_plan.next_invoice_date is not None
with time_machine.travel(annual_plan.next_invoice_date, tick=False):
with patch("corporate.lib.stripe.get_latest_seat_count", return_value=25):
with (
time_machine.travel(annual_plan.next_invoice_date, tick=False),
patch("corporate.lib.stripe.get_latest_seat_count", return_value=25),
):
billing_session.update_license_ledger_if_needed(annual_plan.next_invoice_date)
annual_plan.refresh_from_db()
@ -3430,8 +3450,10 @@ class StripeTest(StripeTestCase):
self.assertEqual(invoice_item2[key], value)
# Check that we switch to monthly plan at the end of current billing cycle.
with time_machine.travel(self.next_year, tick=False):
with patch("corporate.lib.stripe.get_latest_seat_count", return_value=25):
with (
time_machine.travel(self.next_year, tick=False),
patch("corporate.lib.stripe.get_latest_seat_count", return_value=25),
):
billing_session.update_license_ledger_if_needed(self.next_year)
self.assertEqual(LicenseLedger.objects.filter(plan=annual_plan).count(), 3)
customer = get_customer_by_realm(user.realm)
@ -3513,8 +3535,10 @@ class StripeTest(StripeTestCase):
self.local_upgrade(
self.seat_count, True, CustomerPlan.BILLING_SCHEDULE_ANNUAL, True, False
)
with self.assertLogs("corporate.stripe", "INFO") as m:
with time_machine.travel(self.now, tick=False):
with (
self.assertLogs("corporate.stripe", "INFO") as m,
time_machine.travel(self.now, tick=False),
):
response = self.client_billing_patch(
"/billing/plan",
{"status": CustomerPlan.DOWNGRADE_AT_END_OF_CYCLE},
@ -3528,8 +3552,10 @@ class StripeTest(StripeTestCase):
plan = CustomerPlan.objects.first()
assert plan is not None
self.assertEqual(plan.status, CustomerPlan.DOWNGRADE_AT_END_OF_CYCLE)
with self.assertLogs("corporate.stripe", "INFO") as m:
with time_machine.travel(self.now, tick=False):
with (
self.assertLogs("corporate.stripe", "INFO") as m,
time_machine.travel(self.now, tick=False),
):
response = self.client_billing_patch(
"/billing/plan",
{"status": CustomerPlan.ACTIVE},
@ -3587,8 +3613,7 @@ class StripeTest(StripeTestCase):
self.login_user(user)
free_trial_end_date = self.now + timedelta(days=60)
with self.settings(CLOUD_FREE_TRIAL_DAYS=60):
with time_machine.travel(self.now, tick=False):
with self.settings(CLOUD_FREE_TRIAL_DAYS=60), time_machine.travel(self.now, tick=False):
self.add_card_and_upgrade(user, schedule="monthly")
plan = CustomerPlan.objects.get()
self.assertEqual(plan.next_invoice_date, free_trial_end_date)
@ -3643,8 +3668,7 @@ class StripeTest(StripeTestCase):
self.login_user(user)
free_trial_end_date = self.now + timedelta(days=60)
with self.settings(CLOUD_FREE_TRIAL_DAYS=60):
with time_machine.travel(self.now, tick=False):
with self.settings(CLOUD_FREE_TRIAL_DAYS=60), time_machine.travel(self.now, tick=False):
self.add_card_and_upgrade(user, schedule="annual")
plan = CustomerPlan.objects.get()
self.assertEqual(plan.next_invoice_date, free_trial_end_date)
@ -3764,8 +3788,10 @@ class StripeTest(StripeTestCase):
self.assertEqual(plan.licenses_at_next_renewal(), self.seat_count)
# Schedule downgrade
with self.assertLogs("corporate.stripe", "INFO") as m:
with time_machine.travel(self.now, tick=False):
with (
self.assertLogs("corporate.stripe", "INFO") as m,
time_machine.travel(self.now, tick=False),
):
response = self.client_billing_patch(
"/billing/plan",
{"status": CustomerPlan.DOWNGRADE_AT_END_OF_FREE_TRIAL},
@ -3874,8 +3900,10 @@ class StripeTest(StripeTestCase):
self.assertEqual(plan.licenses_at_next_renewal(), self.seat_count)
# Schedule downgrade
with self.assertLogs("corporate.stripe", "INFO") as m:
with time_machine.travel(self.now, tick=False):
with (
self.assertLogs("corporate.stripe", "INFO") as m,
time_machine.travel(self.now, tick=False),
):
response = self.client_billing_patch(
"/billing/plan",
{"status": CustomerPlan.DOWNGRADE_AT_END_OF_FREE_TRIAL},
@ -3894,8 +3922,10 @@ class StripeTest(StripeTestCase):
self.assertEqual(plan.licenses_at_next_renewal(), None)
# Cancel downgrade
with self.assertLogs("corporate.stripe", "INFO") as m:
with time_machine.travel(self.now, tick=False):
with (
self.assertLogs("corporate.stripe", "INFO") as m,
time_machine.travel(self.now, tick=False),
):
response = self.client_billing_patch(
"/billing/plan",
{"status": CustomerPlan.FREE_TRIAL},
@ -3937,8 +3967,8 @@ class StripeTest(StripeTestCase):
with (
self.assertRaises(BillingError) as context,
self.assertLogs("corporate.stripe", "WARNING") as m,
time_machine.travel(self.now, tick=False),
):
with time_machine.travel(self.now, tick=False):
self.local_upgrade(
self.seat_count, True, CustomerPlan.BILLING_SCHEDULE_ANNUAL, True, False
)
@ -4242,8 +4272,10 @@ class StripeTest(StripeTestCase):
)
self.login_user(self.example_user("hamlet"))
with self.assertLogs("corporate.stripe", "INFO") as m:
with time_machine.travel(self.now, tick=False):
with (
self.assertLogs("corporate.stripe", "INFO") as m,
time_machine.travel(self.now, tick=False),
):
result = self.client_billing_patch(
"/billing/plan",
{"status": CustomerPlan.DOWNGRADE_AT_END_OF_CYCLE},
@ -4270,8 +4302,10 @@ class StripeTest(StripeTestCase):
)
self.login_user(self.example_user("hamlet"))
with self.assertLogs("corporate.stripe", "INFO") as m:
with time_machine.travel(self.now, tick=False):
with (
self.assertLogs("corporate.stripe", "INFO") as m,
time_machine.travel(self.now, tick=False),
):
result = self.client_billing_patch(
"/billing/plan",
{"status": CustomerPlan.SWITCH_TO_ANNUAL_AT_END_OF_CYCLE},
@ -5602,8 +5636,10 @@ class LicenseLedgerTest(StripeTestCase):
self.assertEqual(plan.licenses(), self.seat_count + 3)
self.assertEqual(plan.licenses_at_next_renewal(), self.seat_count + 3)
with patch("corporate.lib.stripe.get_latest_seat_count", return_value=self.seat_count):
with self.assertRaises(AssertionError):
with (
patch("corporate.lib.stripe.get_latest_seat_count", return_value=self.seat_count),
self.assertRaises(AssertionError),
):
billing_session.update_license_ledger_for_manual_plan(
plan, self.now, licenses=self.seat_count
)
@ -5615,8 +5651,10 @@ class LicenseLedgerTest(StripeTestCase):
self.assertEqual(plan.licenses(), self.seat_count + 3)
self.assertEqual(plan.licenses_at_next_renewal(), self.seat_count)
with patch("corporate.lib.stripe.get_latest_seat_count", return_value=self.seat_count):
with self.assertRaises(AssertionError):
with (
patch("corporate.lib.stripe.get_latest_seat_count", return_value=self.seat_count),
self.assertRaises(AssertionError),
):
billing_session.update_license_ledger_for_manual_plan(
plan, self.now, licenses_at_next_renewal=self.seat_count - 1
)
@ -6614,8 +6652,10 @@ class TestRemoteRealmBillingFlow(StripeTestCase, RemoteRealmBillingTestCase):
# Same result even with free trial enabled for self hosted customers since we don't
# offer free trial for business plan.
with self.settings(SELF_HOSTING_FREE_TRIAL_DAYS=30):
with time_machine.travel(self.now, tick=False):
with (
self.settings(SELF_HOSTING_FREE_TRIAL_DAYS=30),
time_machine.travel(self.now, tick=False),
):
result = self.client_get(
f"{self.billing_session.billing_base_url}/upgrade/", subdomain="selfhosting"
)
@ -6631,8 +6671,7 @@ class TestRemoteRealmBillingFlow(StripeTestCase, RemoteRealmBillingTestCase):
)
# Check that cloud free trials don't affect self hosted customers.
with self.settings(CLOUD_FREE_TRIAL_DAYS=30):
with time_machine.travel(self.now, tick=False):
with self.settings(CLOUD_FREE_TRIAL_DAYS=30), time_machine.travel(self.now, tick=False):
result = self.client_get(
f"{self.billing_session.billing_base_url}/upgrade/", subdomain="selfhosting"
)
@ -8018,8 +8057,10 @@ class TestRemoteRealmBillingFlow(StripeTestCase, RemoteRealmBillingTestCase):
self.assertEqual(result["Location"], f"{billing_base_url}/billing/")
# Downgrade
with self.assertLogs("corporate.stripe", "INFO") as m:
with time_machine.travel(self.now + timedelta(days=7), tick=False):
with (
self.assertLogs("corporate.stripe", "INFO") as m,
time_machine.travel(self.now + timedelta(days=7), tick=False),
):
response = self.client_billing_patch(
"/billing/plan",
{"status": CustomerPlan.DOWNGRADE_AT_END_OF_CYCLE},
@ -8323,8 +8364,10 @@ class TestRemoteServerBillingFlow(StripeTestCase, RemoteServerTestCase):
# Same result even with free trial enabled for self hosted customers since we don't
# offer free trial for business plan.
with self.settings(SELF_HOSTING_FREE_TRIAL_DAYS=30):
with time_machine.travel(self.now, tick=False):
with (
self.settings(SELF_HOSTING_FREE_TRIAL_DAYS=30),
time_machine.travel(self.now, tick=False),
):
result = self.client_get(f"{billing_base_url}/upgrade/", subdomain="selfhosting")
self.assert_in_success_response(["Add card", "Purchase Zulip Business"], result)
@ -8390,8 +8433,10 @@ class TestRemoteServerBillingFlow(StripeTestCase, RemoteServerTestCase):
self.assertEqual(result["Location"], f"{billing_base_url}/billing/")
# Downgrade
with self.assertLogs("corporate.stripe", "INFO") as m:
with time_machine.travel(self.now + timedelta(days=7), tick=False):
with (
self.assertLogs("corporate.stripe", "INFO") as m,
time_machine.travel(self.now + timedelta(days=7), tick=False),
):
response = self.client_billing_patch(
"/billing/plan",
{"status": CustomerPlan.DOWNGRADE_AT_END_OF_CYCLE},
@ -8599,8 +8644,10 @@ class TestRemoteServerBillingFlow(StripeTestCase, RemoteServerTestCase):
self.assertEqual(result["Location"], f"{billing_base_url}/billing/")
# Downgrade
with self.assertLogs("corporate.stripe", "INFO") as m:
with time_machine.travel(self.now + timedelta(days=7), tick=False):
with (
self.assertLogs("corporate.stripe", "INFO") as m,
time_machine.travel(self.now + timedelta(days=7), tick=False),
):
response = self.client_billing_patch(
"/billing/plan",
{"status": CustomerPlan.ACTIVE},

View File

@ -186,7 +186,6 @@ ignore = [
"SIM103", # Return the condition directly
"SIM108", # Use ternary operator `action = "[commented]" if action == "created" else f"{action} a [comment]"` instead of if-else-block
"SIM114", # Combine `if` branches using logical `or` operator
"SIM117", # Use a single `with` statement with multiple contexts instead of nested `with` statements
"SIM401", # Use `d.get(key, default)` instead of an `if` block
"TCH001", # Move application import into a type-checking block
"TCH002", # Move third-party import into a type-checking block

View File

@ -66,8 +66,10 @@ def main() -> None:
args = parser.parse_args()
sns_topic_arn = get_ses_arn(session, args)
with our_sqs_queue(session, sns_topic_arn) as (queue_arn, queue_url):
with our_sns_subscription(session, sns_topic_arn, queue_arn):
with (
our_sqs_queue(session, sns_topic_arn) as (queue_arn, queue_url),
our_sns_subscription(session, sns_topic_arn, queue_arn),
):
print_messages(session, queue_url)

View File

@ -168,10 +168,12 @@ def get_failed_tests() -> list[str]:
def block_internet() -> Iterator[None]:
# Monkey-patching - responses library raises requests.ConnectionError when access to an unregistered URL
# is attempted. We want to replace that with our own exception, so that it propagates all the way:
with mock.patch.object(responses, "ConnectionError", new=ZulipInternetBlockedError):
with (
mock.patch.object(responses, "ConnectionError", new=ZulipInternetBlockedError),
# We'll run all tests in this context manager. It'll cause an error to be raised (see above comment),
# if any code attempts to access the internet.
with responses.RequestsMock():
responses.RequestsMock(),
):
yield

View File

@ -19,8 +19,7 @@ CACHE_FILE = os.path.join(CACHE_DIR, "requirements_hashes")
def print_diff(path_file1: str, path_file2: str) -> None:
with open(path_file1) as file1:
with open(path_file2) as file2:
with open(path_file1) as file1, open(path_file2) as file2:
diff = difflib.unified_diff(
file1.readlines(),
file2.readlines(),

View File

@ -1347,8 +1347,10 @@ def fetch_team_icons(
)
resized_icon_output_path = os.path.join(output_dir, str(realm_id), "icon.png")
with open(resized_icon_output_path, "wb") as output_file:
with open(original_icon_output_path, "rb") as original_file:
with (
open(resized_icon_output_path, "wb") as output_file,
open(original_icon_output_path, "rb") as original_file,
):
resized_data = resize_logo(original_file.read())
output_file.write(resized_data)
records.append(

View File

@ -28,8 +28,7 @@ def lockfile(filename: str, shared: bool = False) -> Iterator[None]:
If shared is True, use a LOCK_SH lock, otherwise LOCK_EX.
The file is given by name and will be created if it does not exist."""
with open(filename, "w") as lock:
with flock(lock, shared=shared):
with open(filename, "w") as lock, flock(lock, shared=shared):
yield

View File

@ -548,8 +548,10 @@ def custom_email_sender(
rendered_input = render_markdown_path(plain_text_template_path.replace("templates/", ""))
# And then extend it with our standard email headers.
with open(html_template_path, "w") as f:
with open(markdown_email_base_template_path) as base_template:
with (
open(html_template_path, "w") as f,
open(markdown_email_base_template_path) as base_template,
):
# We use an ugly string substitution here, because we want to:
# 1. Only run Jinja once on the supplied content
# 2. Allow the supplied content to have jinja interpolation in it

View File

@ -2018,13 +2018,15 @@ class ZulipTestCase(ZulipTestCaseMixin, TestCase):
# Some code might call process_notification using keyword arguments,
# so mypy doesn't allow assigning lst.append to process_notification
# So explicitly change parameter name to 'notice' to work around this problem
with mock.patch("zerver.tornado.event_queue.process_notification", lst.append):
with (
mock.patch("zerver.tornado.event_queue.process_notification", lst.append),
# Some `send_event` calls need to be executed only after the current transaction
# commits (using `on_commit` hooks). Because the transaction in Django tests never
# commits (rather, gets rolled back after the test completes), such events would
# never be sent in tests, and we would be unable to verify them. Hence, we use
# this helper to make sure the `send_event` calls actually run.
with self.captureOnCommitCallbacks(execute=True):
self.captureOnCommitCallbacks(execute=True),
):
yield lst
self.assert_length(lst, expected_num_events)

View File

@ -71,8 +71,10 @@ class MockLDAP(fakeldap.MockLDAP):
def stub_event_queue_user_events(
event_queue_return: Any, user_events_return: Any
) -> Iterator[None]:
with mock.patch("zerver.lib.events.request_event_queue", return_value=event_queue_return):
with mock.patch("zerver.lib.events.get_user_events", return_value=user_events_return):
with (
mock.patch("zerver.lib.events.request_event_queue", return_value=event_queue_return),
mock.patch("zerver.lib.events.get_user_events", return_value=user_events_return),
):
yield

View File

@ -186,8 +186,10 @@ def thumbnail_local_emoji(apps: StateApps) -> None:
)
new_file_name = get_emoji_file_name("image/png", emoji.id)
try:
with open(f"{settings.DEPLOY_ROOT}/static/images/bad-emoji.png", "rb") as f:
with open(f"{base_path}/{new_file_name}", "wb") as new_f:
with (
open(f"{settings.DEPLOY_ROOT}/static/images/bad-emoji.png", "rb") as f,
open(f"{base_path}/{new_file_name}", "wb") as new_f,
):
new_f.write(f.read())
emoji.deactivated = True
emoji.is_animated = False

View File

@ -3415,8 +3415,10 @@ class AppleIdAuthBackendTest(AppleAuthMixin, SocialAuthBase):
def test_id_token_verification_failure(self) -> None:
account_data_dict = self.get_account_data_dict(email=self.email, name=self.name)
with self.assertLogs(self.logger_string, level="INFO") as m:
with mock.patch("jwt.decode", side_effect=PyJWTError):
with (
self.assertLogs(self.logger_string, level="INFO") as m,
mock.patch("jwt.decode", side_effect=PyJWTError),
):
result = self.social_auth_test(
account_data_dict,
expect_choose_email_screen=True,
@ -4583,8 +4585,10 @@ class GoogleAuthBackendTest(SocialAuthBase):
"redirect_to": next,
}
user_profile = self.example_user("hamlet")
with mock.patch("zerver.views.auth.authenticate", return_value=user_profile):
with mock.patch("zerver.views.auth.do_login"):
with (
mock.patch("zerver.views.auth.authenticate", return_value=user_profile),
mock.patch("zerver.views.auth.do_login"),
):
result = self.get_log_into_subdomain(data)
return result
@ -5666,13 +5670,15 @@ class TestZulipRemoteUserBackend(DesktopFlowTestingLib, ZulipTestCase):
def test_login_failure_due_to_wrong_subdomain(self) -> None:
email = self.example_email("hamlet")
with self.settings(
with (
self.settings(
AUTHENTICATION_BACKENDS=(
"zproject.backends.ZulipRemoteUserBackend",
"zproject.backends.ZulipDummyBackend",
)
),
mock.patch("zerver.views.auth.get_subdomain", return_value="acme"),
):
with mock.patch("zerver.views.auth.get_subdomain", return_value="acme"):
result = self.client_get(
"http://testserver:9080/accounts/login/sso/", REMOTE_USER=email
)
@ -5682,13 +5688,15 @@ class TestZulipRemoteUserBackend(DesktopFlowTestingLib, ZulipTestCase):
def test_login_failure_due_to_empty_subdomain(self) -> None:
email = self.example_email("hamlet")
with self.settings(
with (
self.settings(
AUTHENTICATION_BACKENDS=(
"zproject.backends.ZulipRemoteUserBackend",
"zproject.backends.ZulipDummyBackend",
)
),
mock.patch("zerver.views.auth.get_subdomain", return_value=""),
):
with mock.patch("zerver.views.auth.get_subdomain", return_value=""):
result = self.client_get(
"http://testserver:9080/accounts/login/sso/", REMOTE_USER=email
)
@ -5699,12 +5707,14 @@ class TestZulipRemoteUserBackend(DesktopFlowTestingLib, ZulipTestCase):
def test_login_success_under_subdomains(self) -> None:
user_profile = self.example_user("hamlet")
email = user_profile.delivery_email
with mock.patch("zerver.views.auth.get_subdomain", return_value="zulip"):
with self.settings(
with (
mock.patch("zerver.views.auth.get_subdomain", return_value="zulip"),
self.settings(
AUTHENTICATION_BACKENDS=(
"zproject.backends.ZulipRemoteUserBackend",
"zproject.backends.ZulipDummyBackend",
)
),
):
result = self.client_get("/accounts/login/sso/", REMOTE_USER=email)
self.assertEqual(result.status_code, 302)
@ -5974,8 +5984,10 @@ class TestJWTLogin(ZulipTestCase):
def test_login_failure_due_to_wrong_subdomain(self) -> None:
payload = {"email": "hamlet@zulip.com"}
with self.settings(JWT_AUTH_KEYS={"acme": {"key": "key", "algorithms": ["HS256"]}}):
with mock.patch("zerver.views.auth.get_realm_from_request", return_value=None):
with (
self.settings(JWT_AUTH_KEYS={"acme": {"key": "key", "algorithms": ["HS256"]}}),
mock.patch("zerver.views.auth.get_realm_from_request", return_value=None),
):
key = settings.JWT_AUTH_KEYS["acme"]["key"]
[algorithm] = settings.JWT_AUTH_KEYS["acme"]["algorithms"]
web_token = jwt.encode(payload, key, algorithm)
@ -5987,8 +5999,10 @@ class TestJWTLogin(ZulipTestCase):
def test_login_success_under_subdomains(self) -> None:
payload = {"email": "hamlet@zulip.com"}
with self.settings(JWT_AUTH_KEYS={"zulip": {"key": "key", "algorithms": ["HS256"]}}):
with mock.patch("zerver.views.auth.get_subdomain", return_value="zulip"):
with (
self.settings(JWT_AUTH_KEYS={"zulip": {"key": "key", "algorithms": ["HS256"]}}),
mock.patch("zerver.views.auth.get_subdomain", return_value="zulip"),
):
key = settings.JWT_AUTH_KEYS["zulip"]["key"]
[algorithm] = settings.JWT_AUTH_KEYS["zulip"]["algorithms"]
web_token = jwt.encode(payload, key, algorithm)
@ -6046,8 +6060,7 @@ class DjangoToLDAPUsernameTests(ZulipTestCase):
self.backend.django_to_ldap_username("aaron@zulip.com"), self.ldap_username("aaron")
)
with self.assertLogs(level="WARNING") as m:
with self.assertRaises(NoMatchingLDAPUserError):
with self.assertLogs(level="WARNING") as m, self.assertRaises(NoMatchingLDAPUserError):
self.backend.django_to_ldap_username("shared_email@zulip.com")
self.assertEqual(
m.output,
@ -6641,8 +6654,10 @@ class TestZulipLDAPUserPopulator(ZulipLDAPTestCase):
@override_settings(LDAP_EMAIL_ATTR="mail")
def test_populate_user_returns_none(self) -> None:
with mock.patch.object(ZulipLDAPUser, "populate_user", return_value=None):
with self.assertRaises(PopulateUserLDAPError):
with (
mock.patch.object(ZulipLDAPUser, "populate_user", return_value=None),
self.assertRaises(PopulateUserLDAPError),
):
sync_user_from_ldap(self.example_user("hamlet"), mock.Mock())
def test_update_full_name(self) -> None:
@ -6823,8 +6838,10 @@ class TestZulipLDAPUserPopulator(ZulipLDAPTestCase):
self.change_ldap_user_attr("hamlet", "cn", "Second Hamlet")
expected_call_args = [hamlet2, "Second Hamlet", None]
with self.settings(AUTH_LDAP_USER_ATTR_MAP={"full_name": "cn"}):
with mock.patch("zerver.actions.user_settings.do_change_full_name") as f:
with (
self.settings(AUTH_LDAP_USER_ATTR_MAP={"full_name": "cn"}),
mock.patch("zerver.actions.user_settings.do_change_full_name") as f,
):
self.perform_ldap_sync(hamlet2)
f.assert_called_once_with(*expected_call_args)
@ -7038,14 +7055,16 @@ class TestZulipLDAPUserPopulator(ZulipLDAPTestCase):
},
],
]
with self.settings(
with (
self.settings(
AUTH_LDAP_USER_ATTR_MAP={
"full_name": "cn",
"custom_profile_field__birthday": "birthDate",
"custom_profile_field__phone_number": "homePhone",
}
),
mock.patch("zproject.backends.do_update_user_custom_profile_data_if_changed") as f,
):
with mock.patch("zproject.backends.do_update_user_custom_profile_data_if_changed") as f:
self.perform_ldap_sync(self.example_user("hamlet"))
f.assert_called_once_with(*expected_call_args)
@ -7489,9 +7508,11 @@ class JWTFetchAPIKeyTest(ZulipTestCase):
self.assert_json_error_contains(result, "Invalid subdomain", 404)
def test_jwt_key_not_found_failure(self) -> None:
with self.settings(JWT_AUTH_KEYS={"zulip": {"key": "key1", "algorithms": ["HS256"]}}):
with mock.patch(
with (
self.settings(JWT_AUTH_KEYS={"zulip": {"key": "key1", "algorithms": ["HS256"]}}),
mock.patch(
"zerver.views.auth.get_realm_from_request", return_value=get_realm("zephyr")
),
):
result = self.client_post("/api/v1/jwt/fetch_api_key")
self.assert_json_error_contains(
@ -7709,10 +7730,10 @@ class LDAPGroupSyncTest(ZulipTestCase):
),
self.assertLogs("django_auth_ldap", "WARN") as django_ldap_log,
self.assertLogs("zulip.ldap", "DEBUG") as zulip_ldap_log,
):
with self.assertRaisesRegex(
self.assertRaisesRegex(
ZulipLDAPError,
"search_s.*",
),
):
sync_user_from_ldap(cordelia, mock.Mock())

View File

@ -165,9 +165,9 @@ class DecoratorTestCase(ZulipTestCase):
# Start a valid request here
request = HostRequestMock()
request.POST["api_key"] = webhook_bot_api_key
with self.assertLogs(level="WARNING") as m:
with self.assertRaisesRegex(
JsonableError, "Account is not associated with this subdomain"
with (
self.assertLogs(level="WARNING") as m,
self.assertRaisesRegex(JsonableError, "Account is not associated with this subdomain"),
):
api_result = my_webhook(request)
self.assertEqual(
@ -181,9 +181,9 @@ class DecoratorTestCase(ZulipTestCase):
request = HostRequestMock()
request.POST["api_key"] = webhook_bot_api_key
with self.assertLogs(level="WARNING") as m:
with self.assertRaisesRegex(
JsonableError, "Account is not associated with this subdomain"
with (
self.assertLogs(level="WARNING") as m,
self.assertRaisesRegex(JsonableError, "Account is not associated with this subdomain"),
):
request.host = "acme." + settings.EXTERNAL_HOST
api_result = my_webhook(request)
@ -203,8 +203,10 @@ class DecoratorTestCase(ZulipTestCase):
request = HostRequestMock()
request.host = "zulip.testserver"
request.POST["api_key"] = webhook_bot_api_key
with self.assertLogs("zulip.zerver.webhooks", level="INFO") as log:
with self.assertRaisesRegex(Exception, "raised by webhook function"):
with (
self.assertLogs("zulip.zerver.webhooks", level="INFO") as log,
self.assertRaisesRegex(Exception, "raised by webhook function"),
):
request._body = b"{}"
request.content_type = "application/json"
my_webhook_raises_exception(request)
@ -215,8 +217,10 @@ class DecoratorTestCase(ZulipTestCase):
request = HostRequestMock()
request.host = "zulip.testserver"
request.POST["api_key"] = webhook_bot_api_key
with self.assertLogs("zulip.zerver.webhooks", level="INFO") as log:
with self.assertRaisesRegex(Exception, "raised by webhook function"):
with (
self.assertLogs("zulip.zerver.webhooks", level="INFO") as log,
self.assertRaisesRegex(Exception, "raised by webhook function"),
):
request._body = b"notjson"
request.content_type = "text/plain"
my_webhook_raises_exception(request)
@ -227,8 +231,10 @@ class DecoratorTestCase(ZulipTestCase):
request = HostRequestMock()
request.host = "zulip.testserver"
request.POST["api_key"] = webhook_bot_api_key
with self.assertLogs("zulip.zerver.webhooks", level="ERROR") as log:
with self.assertRaisesRegex(Exception, "raised by webhook function"):
with (
self.assertLogs("zulip.zerver.webhooks", level="ERROR") as log,
self.assertRaisesRegex(Exception, "raised by webhook function"),
):
request._body = b"invalidjson"
request.content_type = "application/json"
request.META["HTTP_X_CUSTOM_HEADER"] = "custom_value"
@ -245,8 +251,10 @@ class DecoratorTestCase(ZulipTestCase):
exception_msg = (
"The 'test_event' event isn't currently supported by the ClientName webhook; ignoring"
)
with self.assertLogs("zulip.zerver.webhooks.unsupported", level="ERROR") as log:
with self.assertRaisesRegex(UnsupportedWebhookEventTypeError, exception_msg):
with (
self.assertLogs("zulip.zerver.webhooks.unsupported", level="ERROR") as log,
self.assertRaisesRegex(UnsupportedWebhookEventTypeError, exception_msg),
):
request._body = b"invalidjson"
request.content_type = "application/json"
request.META["HTTP_X_CUSTOM_HEADER"] = "custom_value"
@ -259,8 +267,10 @@ class DecoratorTestCase(ZulipTestCase):
request = HostRequestMock()
request.host = "zulip.testserver"
request.POST["api_key"] = webhook_bot_api_key
with self.settings(RATE_LIMITING=True):
with mock.patch("zerver.decorator.rate_limit_user") as rate_limit_mock:
with (
self.settings(RATE_LIMITING=True),
mock.patch("zerver.decorator.rate_limit_user") as rate_limit_mock,
):
api_result = orjson.loads(my_webhook(request).content).get("msg")
# Verify rate limiting was attempted.
@ -389,8 +399,10 @@ class DecoratorLoggingTestCase(ZulipTestCase):
request._body = b"{}"
request.content_type = "text/plain"
with self.assertLogs("zulip.zerver.webhooks") as logger:
with self.assertRaisesRegex(Exception, "raised by webhook function"):
with (
self.assertLogs("zulip.zerver.webhooks") as logger,
self.assertRaisesRegex(Exception, "raised by webhook function"),
):
my_webhook_raises_exception(request)
self.assertIn("raised by webhook function", logger.output[0])
@ -440,8 +452,10 @@ class DecoratorLoggingTestCase(ZulipTestCase):
request._body = b"{}"
request.content_type = "application/json"
with mock.patch("zerver.decorator.webhook_logger.exception") as mock_exception:
with self.assertRaisesRegex(Exception, "raised by a non-webhook view"):
with (
mock.patch("zerver.decorator.webhook_logger.exception") as mock_exception,
self.assertRaisesRegex(Exception, "raised by a non-webhook view"),
):
non_webhook_view_raises_exception(request)
self.assertFalse(mock_exception.called)
@ -964,9 +978,11 @@ class TestValidateApiKey(ZulipTestCase):
def test_valid_api_key_if_user_is_on_wrong_subdomain(self) -> None:
with self.settings(RUNNING_INSIDE_TORNADO=False):
api_key = get_api_key(self.default_bot)
with self.assertLogs(level="WARNING") as m:
with self.assertRaisesRegex(
with (
self.assertLogs(level="WARNING") as m,
self.assertRaisesRegex(
JsonableError, "Account is not associated with this subdomain"
),
):
validate_api_key(
HostRequestMock(host=settings.EXTERNAL_HOST),
@ -982,9 +998,11 @@ class TestValidateApiKey(ZulipTestCase):
],
)
with self.assertLogs(level="WARNING") as m:
with self.assertRaisesRegex(
with (
self.assertLogs(level="WARNING") as m,
self.assertRaisesRegex(
JsonableError, "Account is not associated with this subdomain"
),
):
validate_api_key(
HostRequestMock(host="acme." + settings.EXTERNAL_HOST),

View File

@ -241,8 +241,7 @@ class TestDigestEmailMessages(ZulipTestCase):
digest_user_ids = [user.id for user in digest_users]
get_recent_topics.cache_clear()
with self.assert_database_query_count(16):
with self.assert_memcached_count(0):
with self.assert_database_query_count(16), self.assert_memcached_count(0):
bulk_handle_digest_email(digest_user_ids, cutoff)
self.assert_length(digest_users, mock_send_future_email.call_count)
@ -441,8 +440,10 @@ class TestDigestEmailMessages(ZulipTestCase):
tuesday = self.tuesday()
cutoff = tuesday - timedelta(days=5)
with time_machine.travel(tuesday, tick=False):
with mock.patch("zerver.lib.digest.queue_digest_user_ids") as queue_mock:
with (
time_machine.travel(tuesday, tick=False),
mock.patch("zerver.lib.digest.queue_digest_user_ids") as queue_mock,
):
enqueue_emails(cutoff)
queue_mock.assert_not_called()
@ -453,8 +454,10 @@ class TestDigestEmailMessages(ZulipTestCase):
not_tuesday = datetime(year=2016, month=1, day=6, tzinfo=timezone.utc)
cutoff = not_tuesday - timedelta(days=5)
with time_machine.travel(not_tuesday, tick=False):
with mock.patch("zerver.lib.digest.queue_digest_user_ids") as queue_mock:
with (
time_machine.travel(not_tuesday, tick=False),
mock.patch("zerver.lib.digest.queue_digest_user_ids") as queue_mock,
):
enqueue_emails(cutoff)
queue_mock.assert_not_called()

View File

@ -72,11 +72,13 @@ class TestEmbeddedBotMessaging(ZulipTestCase):
def test_embedded_bot_quit_exception(self) -> None:
assert self.bot_profile is not None
with patch(
with (
patch(
"zulip_bots.bots.helloworld.helloworld.HelloWorldHandler.handle_message",
side_effect=EmbeddedBotQuitError("I'm quitting!"),
),
self.assertLogs(level="WARNING") as m,
):
with self.assertLogs(level="WARNING") as m:
self.send_stream_message(
self.user_profile,
"Denmark",

View File

@ -86,8 +86,10 @@ class EventsEndpointTest(ZulipTestCase):
test_event = dict(id=6, type=event_type, realm_emoji=empty_realm_emoji_dict)
# Test that call is made to deal with a returning soft deactivated user.
with mock.patch("zerver.lib.events.reactivate_user_if_soft_deactivated") as fa:
with stub_event_queue_user_events(return_event_queue, return_user_events):
with (
mock.patch("zerver.lib.events.reactivate_user_if_soft_deactivated") as fa,
stub_event_queue_user_events(return_event_queue, return_user_events),
):
result = self.api_post(
user, "/api/v1/register", dict(event_types=orjson.dumps([event_type]).decode())
)
@ -1171,8 +1173,10 @@ class FetchQueriesTest(ZulipTestCase):
# count in production.
realm = get_realm_with_settings(realm_id=user.realm_id)
with self.assert_database_query_count(43):
with mock.patch("zerver.lib.events.always_want") as want_mock:
with (
self.assert_database_query_count(43),
mock.patch("zerver.lib.events.always_want") as want_mock,
):
fetch_initial_state_data(user, realm=realm)
expected_counts = dict(

View File

@ -1742,8 +1742,10 @@ class NormalActionsTest(BaseAction):
cordelia.save()
away_val = False
with self.settings(CAN_ACCESS_ALL_USERS_GROUP_LIMITS_PRESENCE=True):
with self.verify_action(num_events=0, state_change_expected=False) as events:
with (
self.settings(CAN_ACCESS_ALL_USERS_GROUP_LIMITS_PRESENCE=True),
self.verify_action(num_events=0, state_change_expected=False) as events,
):
do_update_user_status(
user_profile=cordelia,
away=away_val,
@ -2128,8 +2130,7 @@ class NormalActionsTest(BaseAction):
{"Google": False, "Email": False, "GitHub": True, "LDAP": False, "Dev": True},
{"Google": False, "Email": True, "GitHub": True, "LDAP": True, "Dev": False},
):
with fake_backends():
with self.verify_action() as events:
with fake_backends(), self.verify_action() as events:
do_set_realm_authentication_methods(
self.user_profile.realm,
auth_method_dict,
@ -2664,8 +2665,7 @@ class NormalActionsTest(BaseAction):
def test_realm_emoji_events(self) -> None:
author = self.example_user("iago")
with get_test_image_file("img.png") as img_file:
with self.verify_action() as events:
with get_test_image_file("img.png") as img_file, self.verify_action() as events:
check_add_realm_emoji(
self.user_profile.realm, "my_emoji", author, img_file, "image/png"
)
@ -3278,8 +3278,11 @@ class NormalActionsTest(BaseAction):
"zerver.lib.export.do_export_realm",
return_value=create_dummy_file("test-export.tar.gz"),
):
with stdout_suppressed(), self.assertLogs(level="INFO") as info_logs:
with self.verify_action(state_change_expected=True, num_events=3) as events:
with (
stdout_suppressed(),
self.assertLogs(level="INFO") as info_logs,
self.verify_action(state_change_expected=True, num_events=3) as events,
):
self.client_post("/json/export/realm")
self.assertTrue("INFO:root:Completed data export for zulip in" in info_logs.output[0])
@ -3328,8 +3331,10 @@ class NormalActionsTest(BaseAction):
mock.patch("zerver.lib.export.do_export_realm", side_effect=Exception("Some failure")),
self.assertLogs(level="ERROR") as error_log,
):
with stdout_suppressed():
with self.verify_action(state_change_expected=False, num_events=2) as events:
with (
stdout_suppressed(),
self.verify_action(state_change_expected=False, num_events=2) as events,
):
self.client_post("/json/export/realm")
# Log is of following format: "ERROR:root:Data export for zulip failed after 0.004499673843383789"

View File

@ -298,10 +298,12 @@ class RateLimitTests(ZulipTestCase):
# We need to reset the circuitbreaker before starting. We
# patch the .opened property to be false, then call the
# function, so it resets to closed.
with mock.patch("builtins.open", mock.mock_open(read_data=orjson.dumps(["1.2.3.4"]))):
with mock.patch(
with (
mock.patch("builtins.open", mock.mock_open(read_data=orjson.dumps(["1.2.3.4"]))),
mock.patch(
"circuitbreaker.CircuitBreaker.opened", new_callable=mock.PropertyMock
) as mock_opened:
) as mock_opened,
):
mock_opened.return_value = False
get_tor_ips()
@ -354,8 +356,10 @@ class RateLimitTests(ZulipTestCase):
# An empty list of IPs is treated as some error in parsing the
# input, and as such should not be cached; rate-limiting
# should work as normal, per-IP
with self.tor_mock(read_data=[]) as tor_open:
with self.assertLogs("zerver.lib.rate_limiter", level="WARNING"):
with (
self.tor_mock(read_data=[]) as tor_open,
self.assertLogs("zerver.lib.rate_limiter", level="WARNING"),
):
self.do_test_hit_ratelimits(
lambda: self.send_unauthed_api_request(REMOTE_ADDR="1.2.3.4")
)
@ -372,10 +376,12 @@ class RateLimitTests(ZulipTestCase):
for ip in ["1.2.3.4", "5.6.7.8", "tor-exit-node"]:
RateLimitedIPAddr(ip, domain="api_by_ip").clear_history()
with self.tor_mock(side_effect=FileNotFoundError("File not found")) as tor_open:
with (
self.tor_mock(side_effect=FileNotFoundError("File not found")) as tor_open,
# If we cannot get a list of TOR exit nodes, then
# rate-limiting works as normal, per-IP
with self.assertLogs("zerver.lib.rate_limiter", level="WARNING") as log_mock:
self.assertLogs("zerver.lib.rate_limiter", level="WARNING") as log_mock,
):
self.do_test_hit_ratelimits(
lambda: self.send_unauthed_api_request(REMOTE_ADDR="1.2.3.4")
)

View File

@ -261,8 +261,10 @@ class HomeTest(ZulipTestCase):
self.client_post("/json/bots", bot_info)
# Verify succeeds once logged-in
with self.assert_database_query_count(54):
with patch("zerver.lib.cache.cache_set") as cache_mock:
with (
self.assert_database_query_count(54),
patch("zerver.lib.cache.cache_set") as cache_mock,
):
result = self._get_home_page(stream="Denmark")
self.check_rendered_logged_in_app(result)
self.assertEqual(
@ -312,8 +314,7 @@ class HomeTest(ZulipTestCase):
self.login("hamlet")
# Verify succeeds once logged-in
with queries_captured():
with patch("zerver.lib.cache.cache_set"):
with queries_captured(), patch("zerver.lib.cache.cache_set"):
result = self._get_home_page(stream="Denmark")
self.check_rendered_logged_in_app(result)
@ -565,8 +566,10 @@ class HomeTest(ZulipTestCase):
def test_num_queries_for_realm_admin(self) -> None:
# Verify number of queries for Realm admin isn't much higher than for normal users.
self.login("iago")
with self.assert_database_query_count(54):
with patch("zerver.lib.cache.cache_set") as cache_mock:
with (
self.assert_database_query_count(54),
patch("zerver.lib.cache.cache_set") as cache_mock,
):
result = self._get_home_page()
self.check_rendered_logged_in_app(result)
self.assert_length(cache_mock.call_args_list, 7)

View File

@ -2547,8 +2547,10 @@ class MultiuseInviteTest(ZulipTestCase):
email = self.nonreg_email("newuser")
invite_link = "/join/invalid_key/"
with patch("zerver.views.registration.get_realm_from_request", return_value=self.realm):
with patch("zerver.views.registration.get_realm", return_value=self.realm):
with (
patch("zerver.views.registration.get_realm_from_request", return_value=self.realm),
patch("zerver.views.registration.get_realm", return_value=self.realm),
):
self.check_user_able_to_register(email, invite_link)
def test_multiuse_link_with_specified_streams(self) -> None:

View File

@ -438,8 +438,7 @@ class PreviewTestCase(ZulipTestCase):
self.create_mock_response(original_url)
self.create_mock_response(edited_url)
with self.settings(TEST_SUITE=False):
with self.assertLogs(level="INFO") as info_logs:
with self.settings(TEST_SUITE=False), self.assertLogs(level="INFO") as info_logs:
# Run the queue processor. This will simulate the event for original_url being
# processed after the message has been edited.
FetchLinksEmbedData().consume(event)
@ -457,8 +456,7 @@ class PreviewTestCase(ZulipTestCase):
self.assertTrue(responses.assert_call_count(edited_url, 0))
with self.settings(TEST_SUITE=False):
with self.assertLogs(level="INFO") as info_logs:
with self.settings(TEST_SUITE=False), self.assertLogs(level="INFO") as info_logs:
# Now proceed with the original queue_json_publish and call the
# up-to-date event for edited_url.
queue_json_publish(*args, **kwargs)
@ -503,8 +501,7 @@ class PreviewTestCase(ZulipTestCase):
# We do still fetch the URL, as we don't want to incur the
# cost of locking the row while we do the HTTP fetches.
self.create_mock_response(url)
with self.settings(TEST_SUITE=False):
with self.assertLogs(level="INFO") as info_logs:
with self.settings(TEST_SUITE=False), self.assertLogs(level="INFO") as info_logs:
# Run the queue processor. This will simulate the event for original_url being
# processed after the message has been deleted.
FetchLinksEmbedData().consume(event)
@ -852,14 +849,16 @@ class PreviewTestCase(ZulipTestCase):
self.create_mock_response(url, body=ConnectionError())
with mock.patch(
with (
mock.patch(
"zerver.lib.url_preview.preview.get_oembed_data",
side_effect=lambda *args, **kwargs: None,
):
with mock.patch(
),
mock.patch(
"zerver.lib.url_preview.preview.valid_content_type", side_effect=lambda k: True
),
self.settings(TEST_SUITE=False),
):
with self.settings(TEST_SUITE=False):
with self.assertLogs(level="INFO") as info_logs:
FetchLinksEmbedData().consume(event)
self.assertTrue(
@ -939,10 +938,12 @@ class PreviewTestCase(ZulipTestCase):
)
self.create_mock_response(url)
with self.settings(TEST_SUITE=False):
with self.assertLogs(level="INFO") as info_logs:
with mock.patch(
with (
self.assertLogs(level="INFO") as info_logs,
mock.patch(
"zerver.lib.url_preview.preview.get_oembed_data",
lambda *args, **kwargs: mocked_data,
),
):
FetchLinksEmbedData().consume(event)
cached_data = cache_get(preview_url_cache_key(url))[0]
@ -979,10 +980,12 @@ class PreviewTestCase(ZulipTestCase):
)
self.create_mock_response(url)
with self.settings(TEST_SUITE=False):
with self.assertLogs(level="INFO") as info_logs:
with mock.patch(
with (
self.assertLogs(level="INFO") as info_logs,
mock.patch(
"zerver.worker.embed_links.url_preview.get_link_embed_data",
lambda *args, **kwargs: mocked_data,
),
):
FetchLinksEmbedData().consume(event)
self.assertTrue(
@ -1017,10 +1020,12 @@ class PreviewTestCase(ZulipTestCase):
)
self.create_mock_response(url)
with self.settings(TEST_SUITE=False):
with self.assertLogs(level="INFO") as info_logs:
with mock.patch(
with (
self.assertLogs(level="INFO") as info_logs,
mock.patch(
"zerver.worker.embed_links.url_preview.get_link_embed_data",
lambda *args, **kwargs: mocked_data,
),
):
FetchLinksEmbedData().consume(event)
self.assertTrue(

View File

@ -29,9 +29,11 @@ from zerver.models.users import get_user_profile_by_email
class TestCheckConfig(ZulipTestCase):
def test_check_config(self) -> None:
check_config()
with self.settings(REQUIRED_SETTINGS=[("asdf", "not asdf")]):
with self.assertRaisesRegex(
with (
self.settings(REQUIRED_SETTINGS=[("asdf", "not asdf")]),
self.assertRaisesRegex(
CommandError, "Error: You must set asdf in /etc/zulip/settings.py."
),
):
check_config()
@ -210,8 +212,7 @@ class TestCommandsCanStart(ZulipTestCase):
def test_management_commands_show_help(self) -> None:
with stdout_suppressed():
for command in self.commands:
with self.subTest(management_command=command):
with self.assertRaises(SystemExit):
with self.subTest(management_command=command), self.assertRaises(SystemExit):
call_command(command, "--help")
# zerver/management/commands/runtornado.py sets this to True;
# we need to reset it here. See #3685 for details.

View File

@ -1104,8 +1104,10 @@ class MarkdownTest(ZulipTestCase):
)
def test_fetch_tweet_data_settings_validation(self) -> None:
with self.settings(TEST_SUITE=False, TWITTER_CONSUMER_KEY=None):
with self.assertRaises(NotImplementedError):
with (
self.settings(TEST_SUITE=False, TWITTER_CONSUMER_KEY=None),
self.assertRaises(NotImplementedError),
):
fetch_tweet_data("287977969287315459")
def test_content_has_emoji(self) -> None:
@ -1710,8 +1712,10 @@ class MarkdownTest(ZulipTestCase):
self.assertEqual(linkifiers_for_realm(realm.id), [])
# Verify that our in-memory cache avoids round trips.
with self.assert_database_query_count(0, keep_cache_warm=True):
with self.assert_memcached_count(0):
with (
self.assert_database_query_count(0, keep_cache_warm=True),
self.assert_memcached_count(0),
):
self.assertEqual(linkifiers_for_realm(realm.id), [])
linkifier = RealmFilter(realm=realm, pattern=r"whatever", url_template="whatever")
@ -1724,8 +1728,10 @@ class MarkdownTest(ZulipTestCase):
)
# And the in-process cache works again.
with self.assert_database_query_count(0, keep_cache_warm=True):
with self.assert_memcached_count(0):
with (
self.assert_database_query_count(0, keep_cache_warm=True),
self.assert_memcached_count(0),
):
self.assertEqual(
linkifiers_for_realm(realm.id),
[{"id": linkifier.id, "pattern": "whatever", "url_template": "whatever"}],
@ -3289,16 +3295,17 @@ class MarkdownApiTests(ZulipTestCase):
class MarkdownErrorTests(ZulipTestCase):
def test_markdown_error_handling(self) -> None:
with self.simulated_markdown_failure():
with self.assertRaises(MarkdownRenderingError):
with self.simulated_markdown_failure(), self.assertRaises(MarkdownRenderingError):
markdown_convert_wrapper("")
def test_send_message_errors(self) -> None:
message = "whatever"
with self.simulated_markdown_failure():
with (
self.simulated_markdown_failure(),
# We don't use assertRaisesRegex because it seems to not
# handle i18n properly here on some systems.
with self.assertRaises(JsonableError):
self.assertRaises(JsonableError),
):
self.send_stream_message(self.example_user("othello"), "Denmark", message)
@override_settings(MAX_MESSAGE_LENGTH=10)
@ -3310,8 +3317,8 @@ class MarkdownErrorTests(ZulipTestCase):
with (
mock.patch("zerver.lib.markdown.unsafe_timeout", return_value=msg),
mock.patch("zerver.lib.markdown.markdown_logger"),
self.assertRaises(MarkdownRenderingError),
):
with self.assertRaises(MarkdownRenderingError):
markdown_convert_wrapper(msg)
def test_curl_code_block_validation(self) -> None:

View File

@ -301,8 +301,10 @@ class DeleteMessageTest(ZulipTestCase):
self.send_stream_message(hamlet, "Denmark")
message = self.get_last_message()
with self.capture_send_event_calls(expected_num_events=1):
with mock.patch("zerver.tornado.django_api.queue_json_publish") as m:
with (
self.capture_send_event_calls(expected_num_events=1),
mock.patch("zerver.tornado.django_api.queue_json_publish") as m,
):
m.side_effect = AssertionError(
"Events should be sent only after the transaction commits."
)

View File

@ -100,8 +100,10 @@ class EditMessageSideEffectsTest(ZulipTestCase):
content=content,
)
with mock.patch("zerver.tornado.event_queue.maybe_enqueue_notifications") as m:
with self.captureOnCommitCallbacks(execute=True):
with (
mock.patch("zerver.tornado.event_queue.maybe_enqueue_notifications") as m,
self.captureOnCommitCallbacks(execute=True),
):
result = self.client_patch(url, request)
cordelia = self.example_user("cordelia")

View File

@ -4203,8 +4203,7 @@ class GetOldMessagesTest(ZulipTestCase):
request = HostRequestMock(query_params, user_profile)
first_visible_message_id = first_unread_message_id + 2
with first_visible_id_as(first_visible_message_id):
with queries_captured() as all_queries:
with first_visible_id_as(first_visible_message_id), queries_captured() as all_queries:
get_messages_backend(
request,
user_profile,

View File

@ -2118,8 +2118,10 @@ class StreamMessagesTest(ZulipTestCase):
self.subscribe(cordelia, "test_stream")
do_set_realm_property(cordelia.realm, "wildcard_mention_policy", 10, acting_user=None)
content = "@**all** test wildcard mention"
with mock.patch("zerver.lib.message.num_subscribers_for_stream_id", return_value=16):
with self.assertRaisesRegex(AssertionError, "Invalid wildcard mention policy"):
with (
mock.patch("zerver.lib.message.num_subscribers_for_stream_id", return_value=16),
self.assertRaisesRegex(AssertionError, "Invalid wildcard mention policy"),
):
self.send_stream_message(cordelia, "test_stream", content)
def test_user_group_mention_restrictions(self) -> None:

View File

@ -630,10 +630,12 @@ class TestOutgoingWebhookMessaging(ZulipTestCase):
"https://bot.example.com/",
body=requests.exceptions.Timeout("Time is up!"),
)
with mock.patch(
with (
mock.patch(
"zerver.lib.outgoing_webhook.fail_with_message", side_effect=wrapped
) as fail:
with self.assertLogs(level="INFO") as logs:
) as fail,
self.assertLogs(level="INFO") as logs,
):
self.send_stream_message(
bot_owner, "Denmark", content=f"@**{bot.full_name}** foo", topic_name="bar"
)

View File

@ -1103,10 +1103,12 @@ class PushBouncerNotificationTest(BouncerTestCase):
not_configured_warn_log,
)
with mock.patch(
with (
mock.patch(
"zerver.lib.push_notifications.uses_notification_bouncer", return_value=True
),
mock.patch("zerver.lib.remote_server.send_to_push_bouncer") as m,
):
with mock.patch("zerver.lib.remote_server.send_to_push_bouncer") as m:
post_response = {
"realms": {realm.uuid: {"can_push": True, "expected_end_timestamp": None}}
}
@ -2340,12 +2342,14 @@ class AnalyticsBouncerTest(BouncerTestCase):
def test_realm_properties_after_send_analytics(self) -> None:
self.add_mock_response()
with mock.patch(
with (
mock.patch(
"corporate.lib.stripe.RemoteRealmBillingSession.get_customer", return_value=None
) as m:
with mock.patch(
) as m,
mock.patch(
"corporate.lib.stripe.RemoteServerBillingSession.current_count_for_billed_licenses",
return_value=10,
),
):
send_server_data_to_push_bouncer(consider_usage_statistics=False)
m.assert_called()
@ -2354,12 +2358,14 @@ class AnalyticsBouncerTest(BouncerTestCase):
self.assertEqual(realm.push_notifications_enabled, True)
self.assertEqual(realm.push_notifications_enabled_end_timestamp, None)
with mock.patch(
with (
mock.patch(
"zilencer.views.RemoteRealmBillingSession.get_customer", return_value=None
) as m:
with mock.patch(
) as m,
mock.patch(
"corporate.lib.stripe.RemoteRealmBillingSession.current_count_for_billed_licenses",
return_value=11,
),
):
send_server_data_to_push_bouncer(consider_usage_statistics=False)
m.assert_called()
@ -2369,13 +2375,13 @@ class AnalyticsBouncerTest(BouncerTestCase):
self.assertEqual(realm.push_notifications_enabled_end_timestamp, None)
dummy_customer = mock.MagicMock()
with mock.patch(
with (
mock.patch(
"corporate.lib.stripe.RemoteRealmBillingSession.get_customer",
return_value=dummy_customer,
),
mock.patch("corporate.lib.stripe.get_current_plan_by_customer", return_value=None) as m,
):
with mock.patch(
"corporate.lib.stripe.get_current_plan_by_customer", return_value=None
) as m:
send_server_data_to_push_bouncer(consider_usage_statistics=False)
m.assert_called()
realms = Realm.objects.all()
@ -2384,15 +2390,15 @@ class AnalyticsBouncerTest(BouncerTestCase):
self.assertEqual(realm.push_notifications_enabled_end_timestamp, None)
dummy_customer = mock.MagicMock()
with mock.patch(
with (
mock.patch(
"zilencer.views.RemoteRealmBillingSession.get_customer", return_value=dummy_customer
):
with mock.patch(
"corporate.lib.stripe.get_current_plan_by_customer", return_value=None
) as m:
with mock.patch(
),
mock.patch("corporate.lib.stripe.get_current_plan_by_customer", return_value=None) as m,
mock.patch(
"corporate.lib.stripe.RemoteRealmBillingSession.current_count_for_billed_licenses",
return_value=11,
),
):
send_server_data_to_push_bouncer(consider_usage_statistics=False)
m.assert_called()
@ -2405,13 +2411,15 @@ class AnalyticsBouncerTest(BouncerTestCase):
plan_type=RemoteRealm.PLAN_TYPE_COMMUNITY
)
with mock.patch(
with (
mock.patch(
"zilencer.views.RemoteRealmBillingSession.get_customer", return_value=dummy_customer
):
with mock.patch("corporate.lib.stripe.get_current_plan_by_customer", return_value=None):
with mock.patch(
),
mock.patch("corporate.lib.stripe.get_current_plan_by_customer", return_value=None),
mock.patch(
"corporate.lib.stripe.RemoteRealmBillingSession.current_count_for_billed_licenses"
) as m:
) as m,
):
send_server_data_to_push_bouncer(consider_usage_statistics=False)
m.assert_not_called()
realms = Realm.objects.all()
@ -2427,19 +2435,19 @@ class AnalyticsBouncerTest(BouncerTestCase):
dummy_customer_plan = mock.MagicMock()
dummy_customer_plan.status = CustomerPlan.DOWNGRADE_AT_END_OF_CYCLE
dummy_date = datetime(year=2023, month=12, day=3, tzinfo=timezone.utc)
with mock.patch(
with (
mock.patch(
"corporate.lib.stripe.RemoteRealmBillingSession.get_customer",
return_value=dummy_customer,
):
with mock.patch(
),
mock.patch(
"corporate.lib.stripe.get_current_plan_by_customer",
return_value=dummy_customer_plan,
):
with mock.patch(
),
mock.patch(
"corporate.lib.stripe.RemoteRealmBillingSession.current_count_for_billed_licenses",
return_value=11,
):
with (
),
mock.patch(
"corporate.lib.stripe.RemoteRealmBillingSession.get_next_billing_cycle",
return_value=dummy_date,
@ -2460,19 +2468,19 @@ class AnalyticsBouncerTest(BouncerTestCase):
info_log.output[0],
)
with mock.patch(
with (
mock.patch(
"corporate.lib.stripe.RemoteRealmBillingSession.get_customer",
return_value=dummy_customer,
):
with mock.patch(
),
mock.patch(
"corporate.lib.stripe.get_current_plan_by_customer",
return_value=dummy_customer_plan,
):
with mock.patch(
),
mock.patch(
"corporate.lib.stripe.RemoteRealmBillingSession.current_count_for_billed_licenses",
side_effect=MissingDataError,
):
with (
),
mock.patch(
"corporate.lib.stripe.RemoteRealmBillingSession.get_next_billing_cycle",
return_value=dummy_date,
@ -2493,17 +2501,19 @@ class AnalyticsBouncerTest(BouncerTestCase):
info_log.output[0],
)
with mock.patch(
with (
mock.patch(
"corporate.lib.stripe.RemoteRealmBillingSession.get_customer",
return_value=dummy_customer,
):
with mock.patch(
),
mock.patch(
"corporate.lib.stripe.get_current_plan_by_customer",
return_value=dummy_customer_plan,
):
with mock.patch(
),
mock.patch(
"corporate.lib.stripe.RemoteRealmBillingSession.current_count_for_billed_licenses",
return_value=10,
),
):
send_server_data_to_push_bouncer(consider_usage_statistics=False)
m.assert_called()
@ -2517,15 +2527,17 @@ class AnalyticsBouncerTest(BouncerTestCase):
dummy_customer_plan = mock.MagicMock()
dummy_customer_plan.status = CustomerPlan.ACTIVE
with mock.patch(
with (
mock.patch(
"corporate.lib.stripe.RemoteRealmBillingSession.get_customer",
return_value=dummy_customer,
):
with mock.patch(
),
mock.patch(
"corporate.lib.stripe.get_current_plan_by_customer",
return_value=dummy_customer_plan,
),
self.assertLogs("zulip.analytics", level="INFO") as info_log,
):
with self.assertLogs("zulip.analytics", level="INFO") as info_log:
send_server_data_to_push_bouncer(consider_usage_statistics=False)
m.assert_called()
realms = Realm.objects.all()

View File

@ -377,8 +377,11 @@ class WorkerTest(ZulipTestCase):
# If called after `expected_scheduled_timestamp`, it should process all emails.
one_minute_overdue = expected_scheduled_timestamp + timedelta(seconds=60)
with time_machine.travel(one_minute_overdue, tick=True):
with send_mock as sm, self.assertLogs(level="INFO") as info_logs:
with (
time_machine.travel(one_minute_overdue, tick=True),
send_mock as sm,
self.assertLogs(level="INFO") as info_logs,
):
has_timeout = advance()
self.assertTrue(has_timeout)
self.assertEqual(ScheduledMessageNotificationEmail.objects.count(), 0)
@ -643,11 +646,13 @@ class WorkerTest(ZulipTestCase):
self.assertEqual(mock_mirror_email.call_count, 4)
# If RateLimiterLockingError is thrown, we rate-limit the new message:
with patch(
with (
patch(
"zerver.lib.rate_limiter.RedisRateLimiterBackend.incr_ratelimit",
side_effect=RateLimiterLockingError,
),
self.assertLogs("zerver.lib.rate_limiter", "WARNING") as mock_warn,
):
with self.assertLogs("zerver.lib.rate_limiter", "WARNING") as mock_warn:
fake_client.enqueue("email_mirror", data[0])
worker.start()
self.assertEqual(mock_mirror_email.call_count, 4)

View File

@ -1054,8 +1054,10 @@ class ReactionAPIEventTest(EmojiReactionBase):
"emoji_code": "1f354",
"reaction_type": "unicode_emoji",
}
with self.capture_send_event_calls(expected_num_events=1) as events:
with mock.patch("zerver.tornado.django_api.queue_json_publish") as m:
with (
self.capture_send_event_calls(expected_num_events=1) as events,
mock.patch("zerver.tornado.django_api.queue_json_publish") as m,
):
m.side_effect = AssertionError(
"Events should be sent only after the transaction commits!"
)
@ -1137,8 +1139,10 @@ class ReactionAPIEventTest(EmojiReactionBase):
reaction_type="whatever",
)
with self.capture_send_event_calls(expected_num_events=1):
with mock.patch("zerver.tornado.django_api.queue_json_publish") as m:
with (
self.capture_send_event_calls(expected_num_events=1),
mock.patch("zerver.tornado.django_api.queue_json_publish") as m,
):
m.side_effect = AssertionError(
"Events should be sent only after the transaction commits."
)

View File

@ -95,12 +95,13 @@ class RealmTest(ZulipTestCase):
)
def test_realm_creation_on_special_subdomains_disallowed(self) -> None:
with self.settings(SOCIAL_AUTH_SUBDOMAIN="zulipauth"):
with self.assertRaises(AssertionError):
with self.settings(SOCIAL_AUTH_SUBDOMAIN="zulipauth"), self.assertRaises(AssertionError):
do_create_realm("zulipauth", "Test Realm")
with self.settings(SELF_HOSTING_MANAGEMENT_SUBDOMAIN="zulipselfhosting"):
with self.assertRaises(AssertionError):
with (
self.settings(SELF_HOSTING_MANAGEMENT_SUBDOMAIN="zulipselfhosting"),
self.assertRaises(AssertionError),
):
do_create_realm("zulipselfhosting", "Test Realm")
def test_permission_for_education_non_profit_organization(self) -> None:

View File

@ -315,8 +315,7 @@ class RealmEmojiTest(ZulipTestCase):
def test_emoji_upload_file_size_error(self) -> None:
self.login("iago")
with get_test_image_file("img.png") as fp:
with self.settings(MAX_EMOJI_FILE_SIZE_MIB=0):
with get_test_image_file("img.png") as fp, self.settings(MAX_EMOJI_FILE_SIZE_MIB=0):
result = self.client_post("/json/realm/emoji/my_emoji", {"file": fp})
self.assert_json_error(result, "Uploaded file is larger than the allowed limit of 0 MiB")
@ -355,10 +354,12 @@ class RealmEmojiTest(ZulipTestCase):
def test_failed_file_upload(self) -> None:
self.login("iago")
with mock.patch(
with (
mock.patch(
"zerver.lib.upload.local.write_local_file", side_effect=BadImageError(msg="Broken")
),
get_test_image_file("img.png") as fp1,
):
with get_test_image_file("img.png") as fp1:
emoji_data = {"f1": fp1}
result = self.client_post("/json/realm/emoji/my_emoji", info=emoji_data)
self.assert_json_error(result, "Broken")

View File

@ -49,8 +49,8 @@ class RealmExportTest(ZulipTestCase):
self.settings(LOCAL_UPLOADS_DIR=None),
stdout_suppressed(),
self.assertLogs(level="INFO") as info_logs,
self.captureOnCommitCallbacks(execute=True),
):
with self.captureOnCommitCallbacks(execute=True):
result = self.client_post("/json/export/realm")
self.assertTrue("INFO:root:Completed data export for zulip in " in info_logs.output[0])
self.assert_json_success(result)
@ -150,8 +150,11 @@ class RealmExportTest(ZulipTestCase):
with patch(
"zerver.lib.export.do_export_realm", side_effect=fake_export_realm
) as mock_export:
with stdout_suppressed(), self.assertLogs(level="INFO") as info_logs:
with self.captureOnCommitCallbacks(execute=True):
with (
stdout_suppressed(),
self.assertLogs(level="INFO") as info_logs,
self.captureOnCommitCallbacks(execute=True),
):
result = self.client_post("/json/export/realm")
self.assertTrue("INFO:root:Completed data export for zulip in " in info_logs.output[0])
mock_export.assert_called_once()
@ -208,11 +211,14 @@ class RealmExportTest(ZulipTestCase):
admin = self.example_user("iago")
self.login_user(admin)
with patch(
with (
patch(
"zerver.lib.export.do_export_realm", side_effect=Exception("failure")
) as mock_export:
with stdout_suppressed(), self.assertLogs(level="INFO") as info_logs:
with self.captureOnCommitCallbacks(execute=True):
) as mock_export,
stdout_suppressed(),
self.assertLogs(level="INFO") as info_logs,
self.captureOnCommitCallbacks(execute=True),
):
result = self.client_post("/json/export/realm")
self.assertTrue(
info_logs.output[0].startswith("ERROR:root:Data export for zulip failed after ")
@ -240,8 +246,10 @@ class RealmExportTest(ZulipTestCase):
# If the queue worker sees the same export-id again, it aborts
# instead of retrying
with patch("zerver.lib.export.do_export_realm") as mock_export:
with self.assertLogs(level="INFO") as info_logs:
with (
patch("zerver.lib.export.do_export_realm") as mock_export,
self.assertLogs(level="INFO") as info_logs,
):
queue_json_publish(
"deferred_work",
{

View File

@ -132,8 +132,10 @@ class TestSendEmail(ZulipTestCase):
for message, side_effect in errors.items():
with mock.patch.object(EmailBackend, "send_messages", side_effect=side_effect):
with self.assertLogs(logger=logger) as info_log:
with self.assertRaises(EmailNotDeliveredError):
with (
self.assertLogs(logger=logger) as info_log,
self.assertRaises(EmailNotDeliveredError),
):
send_email(
"zerver/emails/password_reset",
to_emails=[hamlet.email],
@ -151,8 +153,10 @@ class TestSendEmail(ZulipTestCase):
def test_send_email_config_error_logging(self) -> None:
hamlet = self.example_user("hamlet")
with self.settings(EMAIL_HOST_USER="test", EMAIL_HOST_PASSWORD=None):
with self.assertLogs(logger=logger, level="ERROR") as error_log:
with (
self.settings(EMAIL_HOST_USER="test", EMAIL_HOST_PASSWORD=None),
self.assertLogs(logger=logger, level="ERROR") as error_log,
):
send_email(
"zerver/emails/password_reset",
to_emails=[hamlet.email],

View File

@ -1050,8 +1050,11 @@ class LoginTest(ZulipTestCase):
# seem to be any O(N) behavior. Some of the cache hits are related
# to sending messages, such as getting the welcome bot, looking up
# the alert words for a realm, etc.
with self.assert_database_query_count(94), self.assert_memcached_count(14):
with self.captureOnCommitCallbacks(execute=True):
with (
self.assert_database_query_count(94),
self.assert_memcached_count(14),
self.captureOnCommitCallbacks(execute=True),
):
self.register(self.nonreg_email("test"), "test")
user_profile = self.nonreg_user("test")
@ -2946,8 +2949,10 @@ class UserSignUpTest(ZulipTestCase):
return_data = kwargs.get("return_data", {})
return_data["invalid_subdomain"] = True
with patch("zerver.views.registration.authenticate", side_effect=invalid_subdomain):
with self.assertLogs(level="ERROR") as m:
with (
patch("zerver.views.registration.authenticate", side_effect=invalid_subdomain),
self.assertLogs(level="ERROR") as m,
):
result = self.client_post(
"/accounts/register/",
{

View File

@ -273,8 +273,10 @@ class UserSoftDeactivationTests(ZulipTestCase):
).count()
self.assertEqual(0, received_count)
with self.settings(AUTO_CATCH_UP_SOFT_DEACTIVATED_USERS=False):
with self.assertLogs(logger_string, level="INFO") as m:
with (
self.settings(AUTO_CATCH_UP_SOFT_DEACTIVATED_USERS=False),
self.assertLogs(logger_string, level="INFO") as m,
):
users_deactivated = do_auto_soft_deactivate_users(-1, realm)
self.assertEqual(
m.output,

View File

@ -194,8 +194,10 @@ class TestBasics(ZulipTestCase):
hamlet = self.example_user("hamlet")
message_id = self.send_stream_message(hamlet, "Denmark")
with self.capture_send_event_calls(expected_num_events=1):
with mock.patch("zerver.tornado.django_api.queue_json_publish") as m:
with (
self.capture_send_event_calls(expected_num_events=1),
mock.patch("zerver.tornado.django_api.queue_json_publish") as m,
):
m.side_effect = AssertionError(
"Events should be sent only after the transaction commits."
)

View File

@ -2607,9 +2607,11 @@ class StreamAdminTest(ZulipTestCase):
for user in other_sub_users:
self.subscribe(user, stream_name)
with self.assert_database_query_count(query_count):
with cache_tries_captured() as cache_tries:
with self.captureOnCommitCallbacks(execute=True):
with (
self.assert_database_query_count(query_count),
cache_tries_captured() as cache_tries,
self.captureOnCommitCallbacks(execute=True),
):
result = self.client_delete(
"/json/users/me/subscriptions",
{
@ -4744,8 +4746,10 @@ class SubscriptionAPITest(ZulipTestCase):
user2 = self.example_user("iago")
realm = get_realm("zulip")
streams_to_sub = ["multi_user_stream"]
with self.capture_send_event_calls(expected_num_events=5) as events:
with self.assert_database_query_count(38):
with (
self.capture_send_event_calls(expected_num_events=5) as events,
self.assert_database_query_count(38),
):
self.common_subscribe_to_streams(
self.test_user,
streams_to_sub,
@ -4768,8 +4772,10 @@ class SubscriptionAPITest(ZulipTestCase):
self.assertEqual(num_subscribers_for_stream_id(stream.id), 2)
# Now add ourselves
with self.capture_send_event_calls(expected_num_events=2) as events:
with self.assert_database_query_count(14):
with (
self.capture_send_event_calls(expected_num_events=2) as events,
self.assert_database_query_count(14),
):
self.common_subscribe_to_streams(
self.test_user,
streams_to_sub,
@ -5061,9 +5067,11 @@ class SubscriptionAPITest(ZulipTestCase):
# Sends 3 peer-remove events, 2 unsubscribe events
# and 2 stream delete events for private streams.
with self.assert_database_query_count(16):
with self.assert_memcached_count(3):
with self.capture_send_event_calls(expected_num_events=7) as events:
with (
self.assert_database_query_count(16),
self.assert_memcached_count(3),
self.capture_send_event_calls(expected_num_events=7) as events,
):
bulk_remove_subscriptions(
realm,
[user1, user2],
@ -5214,9 +5222,11 @@ class SubscriptionAPITest(ZulipTestCase):
# The only known O(N) behavior here is that we call
# principal_to_user_profile for each of our users, but it
# should be cached.
with self.assert_database_query_count(21):
with self.assert_memcached_count(3):
with mock.patch("zerver.views.streams.send_messages_for_new_subscribers"):
with (
self.assert_database_query_count(21),
self.assert_memcached_count(3),
mock.patch("zerver.views.streams.send_messages_for_new_subscribers"),
):
self.common_subscribe_to_streams(
desdemona,
streams,

View File

@ -176,8 +176,10 @@ class TypingHappyPathTestDirectMessages(ZulipTestCase):
op="start",
)
with self.assert_database_query_count(4):
with self.capture_send_event_calls(expected_num_events=1) as events:
with (
self.assert_database_query_count(4),
self.capture_send_event_calls(expected_num_events=1) as events,
):
result = self.api_post(sender, "/api/v1/typing", params)
self.assert_json_success(result)
@ -212,8 +214,10 @@ class TypingHappyPathTestDirectMessages(ZulipTestCase):
op="start",
)
with self.assert_database_query_count(5):
with self.capture_send_event_calls(expected_num_events=1) as events:
with (
self.assert_database_query_count(5),
self.capture_send_event_calls(expected_num_events=1) as events,
):
result = self.api_post(sender, "/api/v1/typing", params)
self.assert_json_success(result)
self.assert_length(events, 1)
@ -406,8 +410,10 @@ class TypingHappyPathTestStreams(ZulipTestCase):
topic=topic_name,
)
with self.assert_database_query_count(6):
with self.capture_send_event_calls(expected_num_events=1) as events:
with (
self.assert_database_query_count(6),
self.capture_send_event_calls(expected_num_events=1) as events,
):
result = self.api_post(sender, "/api/v1/typing", params)
self.assert_json_success(result)
self.assert_length(events, 1)
@ -437,8 +443,10 @@ class TypingHappyPathTestStreams(ZulipTestCase):
topic=topic_name,
)
with self.assert_database_query_count(6):
with self.capture_send_event_calls(expected_num_events=1) as events:
with (
self.assert_database_query_count(6),
self.capture_send_event_calls(expected_num_events=1) as events,
):
result = self.api_post(sender, "/api/v1/typing", params)
self.assert_json_success(result)
self.assert_length(events, 1)
@ -470,8 +478,10 @@ class TypingHappyPathTestStreams(ZulipTestCase):
topic=topic_name,
)
with self.settings(MAX_STREAM_SIZE_FOR_TYPING_NOTIFICATIONS=5):
with self.assert_database_query_count(5):
with self.capture_send_event_calls(expected_num_events=0) as events:
with (
self.assert_database_query_count(5),
self.capture_send_event_calls(expected_num_events=0) as events,
):
result = self.api_post(sender, "/api/v1/typing", params)
self.assert_json_success(result)
self.assert_length(events, 0)
@ -501,8 +511,10 @@ class TypingHappyPathTestStreams(ZulipTestCase):
topic=topic_name,
)
with self.assert_database_query_count(6):
with self.capture_send_event_calls(expected_num_events=1) as events:
with (
self.assert_database_query_count(6),
self.capture_send_event_calls(expected_num_events=1) as events,
):
result = self.api_post(sender, "/api/v1/typing", params)
self.assert_json_success(result)
self.assert_length(events, 1)

View File

@ -1390,8 +1390,10 @@ class AvatarTest(UploadSerializeMixin, ZulipTestCase):
def test_avatar_upload_file_size_error(self) -> None:
self.login("hamlet")
with get_test_image_file(self.correct_files[0][0]) as fp:
with self.settings(MAX_AVATAR_FILE_SIZE_MIB=0):
with (
get_test_image_file(self.correct_files[0][0]) as fp,
self.settings(MAX_AVATAR_FILE_SIZE_MIB=0),
):
result = self.client_post("/json/users/me/avatar", {"file": fp})
self.assert_json_error(result, "Uploaded file is larger than the allowed limit of 0 MiB")
@ -1537,8 +1539,10 @@ class RealmIconTest(UploadSerializeMixin, ZulipTestCase):
def test_realm_icon_upload_file_size_error(self) -> None:
self.login("iago")
with get_test_image_file(self.correct_files[0][0]) as fp:
with self.settings(MAX_ICON_FILE_SIZE_MIB=0):
with (
get_test_image_file(self.correct_files[0][0]) as fp,
self.settings(MAX_ICON_FILE_SIZE_MIB=0),
):
result = self.client_post("/json/realm/icon", {"file": fp})
self.assert_json_error(result, "Uploaded file is larger than the allowed limit of 0 MiB")
@ -1743,8 +1747,10 @@ class RealmLogoTest(UploadSerializeMixin, ZulipTestCase):
def test_logo_upload_file_size_error(self) -> None:
self.login("iago")
with get_test_image_file(self.correct_files[0][0]) as fp:
with self.settings(MAX_LOGO_FILE_SIZE_MIB=0):
with (
get_test_image_file(self.correct_files[0][0]) as fp,
self.settings(MAX_LOGO_FILE_SIZE_MIB=0),
):
result = self.client_post(
"/json/realm/logo", {"file": fp, "night": orjson.dumps(self.night).decode()}
)
@ -1766,8 +1772,10 @@ class EmojiTest(UploadSerializeMixin, ZulipTestCase):
def test_non_image(self) -> None:
"""Non-image is not resized"""
self.login("iago")
with get_test_image_file("text.txt") as f:
with patch("zerver.lib.upload.resize_emoji", return_value=(b"a", None)) as resize_mock:
with (
get_test_image_file("text.txt") as f,
patch("zerver.lib.upload.resize_emoji", return_value=(b"a", None)) as resize_mock,
):
result = self.client_post("/json/realm/emoji/new", {"f1": f})
self.assert_json_error(result, "Invalid image format")
resize_mock.assert_not_called()
@ -1775,8 +1783,10 @@ class EmojiTest(UploadSerializeMixin, ZulipTestCase):
def test_upsupported_format(self) -> None:
"""Invalid format is not resized"""
self.login("iago")
with get_test_image_file("img.bmp") as f:
with patch("zerver.lib.upload.resize_emoji", return_value=(b"a", None)) as resize_mock:
with (
get_test_image_file("img.bmp") as f,
patch("zerver.lib.upload.resize_emoji", return_value=(b"a", None)) as resize_mock,
):
result = self.client_post("/json/realm/emoji/new", {"f1": f})
self.assert_json_error(result, "Invalid image format")
resize_mock.assert_not_called()
@ -1784,10 +1794,12 @@ class EmojiTest(UploadSerializeMixin, ZulipTestCase):
def test_upload_too_big_after_resize(self) -> None:
"""Non-animated image is too big after resizing"""
self.login("iago")
with get_test_image_file("img.png") as f:
with patch(
with (
get_test_image_file("img.png") as f,
patch(
"zerver.lib.upload.resize_emoji", return_value=(b"a" * (200 * 1024), None)
) as resize_mock:
) as resize_mock,
):
result = self.client_post("/json/realm/emoji/new", {"f1": f})
self.assert_json_error(result, "Image size exceeds limit")
resize_mock.assert_called_once()
@ -1795,10 +1807,12 @@ class EmojiTest(UploadSerializeMixin, ZulipTestCase):
def test_upload_big_after_animated_resize(self) -> None:
"""A big animated image is fine as long as the still is small"""
self.login("iago")
with get_test_image_file("animated_img.gif") as f:
with patch(
with (
get_test_image_file("animated_img.gif") as f,
patch(
"zerver.lib.upload.resize_emoji", return_value=(b"a" * (200 * 1024), b"aaa")
) as resize_mock:
) as resize_mock,
):
result = self.client_post("/json/realm/emoji/new", {"f1": f})
self.assert_json_success(result)
resize_mock.assert_called_once()
@ -1806,10 +1820,12 @@ class EmojiTest(UploadSerializeMixin, ZulipTestCase):
def test_upload_too_big_after_animated_resize_still(self) -> None:
"""Still of animated image is too big after resizing"""
self.login("iago")
with get_test_image_file("animated_img.gif") as f:
with patch(
with (
get_test_image_file("animated_img.gif") as f,
patch(
"zerver.lib.upload.resize_emoji", return_value=(b"aaa", b"a" * (200 * 1024))
) as resize_mock:
) as resize_mock,
):
result = self.client_post("/json/realm/emoji/new", {"f1": f})
self.assert_json_error(result, "Image size exceeds limit")
resize_mock.assert_called_once()

View File

@ -1156,8 +1156,10 @@ class UserGroupAPITestCase(UserGroupTestCase):
munge = lambda obj: orjson.dumps(obj).decode()
params = dict(add=munge(new_user_ids))
with mock.patch("zerver.views.user_groups.notify_for_user_group_subscription_changes"):
with self.assert_database_query_count(11):
with (
mock.patch("zerver.views.user_groups.notify_for_user_group_subscription_changes"),
self.assert_database_query_count(11),
):
result = self.client_post(f"/json/user_groups/{user_group.id}/members", info=params)
self.assert_json_success(result)

View File

@ -338,8 +338,10 @@ class MutedTopicsTests(ZulipTestCase):
mock_date_muted = datetime(2020, 1, 1, tzinfo=timezone.utc).timestamp()
with self.capture_send_event_calls(expected_num_events=2) as events:
with time_machine.travel(datetime(2020, 1, 1, tzinfo=timezone.utc), tick=False):
with (
self.capture_send_event_calls(expected_num_events=2) as events,
time_machine.travel(datetime(2020, 1, 1, tzinfo=timezone.utc), tick=False),
):
result = self.api_post(user, url, data)
self.assert_json_success(result)
@ -404,8 +406,10 @@ class MutedTopicsTests(ZulipTestCase):
mock_date_mute_removed = datetime(2020, 1, 1, tzinfo=timezone.utc).timestamp()
with self.capture_send_event_calls(expected_num_events=2) as events:
with time_machine.travel(datetime(2020, 1, 1, tzinfo=timezone.utc), tick=False):
with (
self.capture_send_event_calls(expected_num_events=2) as events,
time_machine.travel(datetime(2020, 1, 1, tzinfo=timezone.utc), tick=False),
):
result = self.api_post(user, url, data)
self.assert_json_success(result)
@ -553,8 +557,10 @@ class UnmutedTopicsTests(ZulipTestCase):
mock_date_unmuted = datetime(2020, 1, 1, tzinfo=timezone.utc).timestamp()
with self.capture_send_event_calls(expected_num_events=2) as events:
with time_machine.travel(datetime(2020, 1, 1, tzinfo=timezone.utc), tick=False):
with (
self.capture_send_event_calls(expected_num_events=2) as events,
time_machine.travel(datetime(2020, 1, 1, tzinfo=timezone.utc), tick=False),
):
result = self.api_post(user, url, data)
self.assert_json_success(result)
@ -619,8 +625,10 @@ class UnmutedTopicsTests(ZulipTestCase):
mock_date_unmute_removed = datetime(2020, 1, 1, tzinfo=timezone.utc).timestamp()
with self.capture_send_event_calls(expected_num_events=2) as events:
with time_machine.travel(datetime(2020, 1, 1, tzinfo=timezone.utc), tick=False):
with (
self.capture_send_event_calls(expected_num_events=2) as events,
time_machine.travel(datetime(2020, 1, 1, tzinfo=timezone.utc), tick=False),
):
result = self.api_post(user, url, data)
self.assert_json_success(result)

View File

@ -909,9 +909,11 @@ class QueryCountTest(ZulipTestCase):
prereg_user = PreregistrationUser.objects.get(email="fred@zulip.com")
with self.assert_database_query_count(84):
with self.assert_memcached_count(19):
with self.capture_send_event_calls(expected_num_events=10) as events:
with (
self.assert_database_query_count(84),
self.assert_memcached_count(19),
self.capture_send_event_calls(expected_num_events=10) as events,
):
fred = do_create_user(
email="fred@zulip.com",
password="password",
@ -2404,8 +2406,7 @@ class GetProfileTest(ZulipTestCase):
"""
realm = get_realm("zulip")
email = self.example_user("hamlet").email
with self.assert_database_query_count(1):
with simulated_empty_cache() as cache_queries:
with self.assert_database_query_count(1), simulated_empty_cache() as cache_queries:
user_profile = get_user(email, realm)
self.assert_length(cache_queries, 1)

View File

@ -210,9 +210,9 @@ Try again next time
def test_bad_payload(self) -> None:
bad = ("foo", None, "bar")
with self.assertRaisesRegex(AssertionError, "Unable to handle Pivotal payload"):
with mock.patch(
"zerver.webhooks.pivotal.view.api_pivotal_webhook_v3", return_value=bad
with (
self.assertRaisesRegex(AssertionError, "Unable to handle Pivotal payload"),
mock.patch("zerver.webhooks.pivotal.view.api_pivotal_webhook_v3", return_value=bad),
):
self.check_webhook("accepted", expect_topic="foo")
@ -226,8 +226,10 @@ Try again next time
self.assertEqual(result[0], "#0: ")
bad = orjson.loads(self.get_body("bad_kind"))
with self.assertRaisesRegex(UnsupportedWebhookEventTypeError, "'unknown_kind'.* supported"):
with mock.patch("zerver.webhooks.pivotal.view.orjson.loads", return_value=bad):
with (
self.assertRaisesRegex(UnsupportedWebhookEventTypeError, "'unknown_kind'.* supported"),
mock.patch("zerver.webhooks.pivotal.view.orjson.loads", return_value=bad),
):
api_pivotal_webhook_v5(request, hamlet)
@override

View File

@ -276,8 +276,7 @@ class QueueProcessingWorker(ABC):
fn = os.path.join(settings.QUEUE_ERROR_DIR, fname)
line = f"{time.asctime()}\t{orjson.dumps(events).decode()}\n"
lock_fn = fn + ".lock"
with lockfile(lock_fn):
with open(fn, "a") as f:
with lockfile(lock_fn), open(fn, "a") as f:
f.write(line)
check_and_send_restart_signal()