ruff: Fix SIM105 Use `contextlib.suppress` instead of try-except-pass.

Signed-off-by: Anders Kaseorg <anders@zulip.com>
This commit is contained in:
Anders Kaseorg 2023-01-17 23:25:49 -05:00 committed by Tim Abbott
parent b0e569f07c
commit ff1971f5ad
22 changed files with 51 additions and 80 deletions

View File

@ -1401,27 +1401,23 @@ class TestLoggingCountStats(AnalyticsTestCase):
assertInviteCountEquals(4)
# Test mix of good and malformed invite emails
try:
with self.assertRaises(InvitationError):
do_invite_users(
user,
["user3@domain.tld", "malformed"],
[stream],
invite_expires_in_minutes=invite_expires_in_minutes,
)
except InvitationError:
pass
assertInviteCountEquals(4)
# Test inviting existing users
try:
with self.assertRaises(InvitationError):
do_invite_users(
user,
["first@domain.tld", "user4@domain.tld"],
[stream],
invite_expires_in_minutes=invite_expires_in_minutes,
)
except InvitationError:
pass
assertInviteCountEquals(5)
# Revoking invite should not give you credit

View File

@ -1,6 +1,7 @@
import itertools
import time
from collections import defaultdict
from contextlib import suppress
from datetime import datetime, timedelta
from typing import Callable, Dict, List, Optional, Sequence, Tuple, Union
@ -241,10 +242,8 @@ def realm_summary_table(realm_minutes: Dict[str, float]) -> str:
hours = minutes / 60.0
total_hours += hours
row["hours"] = str(int(hours))
try:
with suppress(Exception):
row["hours_per_user"] = "{:.1f}".format(hours / row["dau_count"])
except Exception:
pass
# formatting
for row in rows:

View File

@ -1,4 +1,5 @@
import urllib
from contextlib import suppress
from dataclasses import dataclass
from datetime import timedelta
from decimal import Decimal
@ -285,10 +286,8 @@ def support(
if parse_result.port:
hostname = f"{hostname}:{parse_result.port}"
subdomain = get_subdomain_from_hostname(hostname)
try:
with suppress(Realm.DoesNotExist):
realms.add(get_realm(subdomain))
except Realm.DoesNotExist:
pass
except ValidationError:
users.update(UserProfile.objects.filter(full_name__iexact=key_word))

View File

@ -1,4 +1,5 @@
import logging
from contextlib import suppress
from typing import Any, Callable, Dict, Union
import stripe
@ -86,14 +87,12 @@ def handle_checkout_session_completed_event(
session.payment_intent.status = PaymentIntent.PROCESSING
session.payment_intent.last_payment_error = ()
session.payment_intent.save(update_fields=["status", "last_payment_error"])
try:
with suppress(stripe.error.CardError):
stripe.PaymentIntent.confirm(
session.payment_intent.stripe_payment_intent_id,
payment_method=payment_method,
off_session=True,
)
except stripe.error.CardError:
pass
elif session.type in [
Session.FREE_TRIAL_UPGRADE_FROM_BILLING_PAGE,
Session.FREE_TRIAL_UPGRADE_FROM_ONBOARDING_PAGE,

View File

@ -2,6 +2,7 @@
# This tools generates /etc/zulip/zulip-secrets.conf
import os
import sys
from contextlib import suppress
from typing import Dict, List
BASE_DIR = os.path.dirname(os.path.dirname(os.path.dirname(os.path.abspath(__file__))))
@ -166,10 +167,8 @@ def generate_secrets(development: bool = False) -> None:
)
break
try:
with suppress(redis.exceptions.ConnectionError):
get_redis_client().config_set("requirepass", redis_password)
except redis.exceptions.ConnectionError:
pass
add_secret("redis_password", redis_password)

View File

@ -4,6 +4,7 @@ import html
import os
import sys
from collections import defaultdict
from contextlib import suppress
from typing import Any, Dict, List, Set
sys.path.insert(0, os.path.join(os.path.dirname(__file__), ".."))
@ -72,10 +73,8 @@ def create_user_docs() -> None:
fn = "var/url_coverage.txt" # TODO: make path more robust, maybe use json suffix
out_dir = "var/api_docs"
try:
with suppress(OSError):
os.mkdir(out_dir)
except OSError:
pass
main_page = out_dir + "/index.html"

View File

@ -303,9 +303,8 @@ def do_update_bot_config_data(bot_profile: UserProfile, config_data: Dict[str, s
def get_service_dicts_for_bot(user_profile_id: int) -> List[Dict[str, Any]]:
user_profile = get_user_profile_by_id(user_profile_id)
services = get_bot_services(user_profile_id)
service_dicts: List[Dict[str, Any]] = []
if user_profile.bot_type == UserProfile.OUTGOING_WEBHOOK_BOT:
service_dicts = [
return [
{
"base_url": service.base_url,
"interface": service.interface,
@ -315,7 +314,7 @@ def get_service_dicts_for_bot(user_profile_id: int) -> List[Dict[str, Any]]:
]
elif user_profile.bot_type == UserProfile.EMBEDDED_BOT:
try:
service_dicts = [
return [
{
"config_data": get_bot_config(user_profile),
"service_name": services[0].name,
@ -323,8 +322,9 @@ def get_service_dicts_for_bot(user_profile_id: int) -> List[Dict[str, Any]]:
]
# A ConfigError just means that there are no config entries for user_profile.
except ConfigError:
pass
return service_dicts
return []
else:
return []
def get_service_dicts_for_bots(

View File

@ -353,11 +353,10 @@ class ZulipPasswordResetForm(PasswordResetForm):
# The view will handle the RateLimit exception and render an appropriate page
raise
user: Optional[UserProfile] = None
try:
user = get_user_by_delivery_email(email, realm)
except UserProfile.DoesNotExist:
pass
user = None
context = {
"email": email,

View File

@ -9,6 +9,7 @@ the level of detail we desire or do comparison with OpenAPI types
easily with the native Python type system.
"""
from contextlib import suppress
from dataclasses import dataclass
from typing import Any, Callable, Dict, List, Optional, Sequence, Tuple
@ -206,10 +207,8 @@ class UnionType:
def check_data(self, var_name: str, val: Any) -> None:
for sub_type in self.sub_types:
try:
with suppress(AssertionError):
check_data(sub_type, var_name, val)
except AssertionError:
pass
# We matched on one of our sub_types, so return
return

View File

@ -13,6 +13,7 @@ import os
import shutil
import subprocess
import tempfile
from contextlib import suppress
from functools import lru_cache
from typing import Any, Callable, Dict, Iterable, List, Mapping, Optional, Set, Tuple, TypedDict
@ -1941,10 +1942,8 @@ def create_soft_link(source: Path, in_progress: bool = True) -> None:
if in_progress:
new_target = in_progress_link
else:
try:
with suppress(FileNotFoundError):
os.remove(in_progress_link)
except FileNotFoundError:
pass
new_target = done_link
overwrite_symlink(source, new_target)

View File

@ -3,6 +3,7 @@ import hashlib
import logging
import threading
import traceback
from contextlib import suppress
from datetime import datetime, timedelta, timezone
from logging import Logger
from typing import Optional, Tuple, Union
@ -238,10 +239,8 @@ class ZulipWebhookFormatter(ZulipFormatter):
else:
payload = request.POST["payload"]
try:
with suppress(orjson.JSONDecodeError):
payload = orjson.dumps(orjson.loads(payload), option=orjson.OPT_INDENT_2).decode()
except orjson.JSONDecodeError:
pass
header_text = "".join(
f"{header}: {value}\n"

View File

@ -1378,7 +1378,6 @@ class CompiledInlineProcessor(markdown.inlinepatterns.InlineProcessor):
class Timestamp(markdown.inlinepatterns.Pattern):
def handleMatch(self, match: Match[str]) -> Optional[Element]:
time_input_string = match.group("time")
timestamp = None
try:
timestamp = dateutil.parser.parse(time_input_string, tzinfos=common_timezones)
except ValueError:
@ -1387,7 +1386,7 @@ class Timestamp(markdown.inlinepatterns.Pattern):
float(time_input_string), tz=datetime.timezone.utc
)
except ValueError:
pass
timestamp = None
if not timestamp:
error_element = Element("span")

View File

@ -1,6 +1,7 @@
import abc
import json
import logging
from contextlib import suppress
from time import perf_counter
from typing import Any, AnyStr, Dict, Optional
@ -232,12 +233,10 @@ def fail_with_message(event: Dict[str, Any], failure_message: str) -> None:
message_info = event["message"]
content = "Failure! " + failure_message
response_data = dict(content=content)
try:
# If the stream has vanished while we were failing, there's no
# reasonable place to report the error.
with suppress(StreamDoesNotExistError):
send_response_message(bot_id=bot_id, message_info=message_info, response_data=response_data)
except StreamDoesNotExistError:
# If the stream has vanished while we were failing, there's no
# reasonable place to report the error.
pass
def get_message_url(event: Dict[str, Any]) -> str:

View File

@ -3,6 +3,7 @@ import hashlib
import logging
import os
import smtplib
from contextlib import suppress
from email.headerregistry import Address
from email.parser import Parser
from email.policy import default
@ -552,7 +553,7 @@ def send_custom_email(
"realm_name": user_profile.realm.name,
"unsubscribe_link": one_click_unsubscribe_link(user_profile, "marketing"),
}
try:
with suppress(EmailNotDeliveredError):
send_email(
email_id,
to_user_ids=[user_profile.id],
@ -564,8 +565,6 @@ def send_custom_email(
context=context,
dry_run=options["dry_run"],
)
except EmailNotDeliveredError:
pass
if options["dry_run"]:
break

View File

@ -2,6 +2,7 @@ import datetime
import re
import secrets
import time
from contextlib import suppress
from datetime import timedelta
from email.headerregistry import Address
from typing import (
@ -1364,10 +1365,8 @@ def linkifiers_for_realm_remote_cache(realm_id: int) -> List[LinkifierDict]:
def flush_linkifiers(*, instance: RealmFilter, **kwargs: object) -> None:
realm_id = instance.realm_id
cache_delete(get_linkifiers_cache_key(realm_id))
try:
with suppress(KeyError):
per_request_linkifiers_cache.pop(realm_id)
except KeyError:
pass
post_save.connect(flush_linkifiers, sender=RealmFilter)

View File

@ -4,7 +4,7 @@ import os
import signal
import time
from collections import defaultdict
from contextlib import contextmanager
from contextlib import contextmanager, suppress
from inspect import isabstract
from typing import Any, Callable, Dict, Iterator, List, Mapping, Optional
from unittest.mock import MagicMock, patch
@ -620,10 +620,8 @@ class WorkerTest(ZulipTestCase):
fake_client.enqueue("unreliable_worker", {"type": msg})
fn = os.path.join(settings.QUEUE_ERROR_DIR, "unreliable_worker.errors")
try:
with suppress(FileNotFoundError):
os.remove(fn)
except OSError: # nocoverage # error handling for the directory not existing
pass
with simulated_queue_client(fake_client):
worker = UnreliableWorker()
@ -657,10 +655,8 @@ class WorkerTest(ZulipTestCase):
fake_client.enqueue("unreliable_loopworker", {"type": msg})
fn = os.path.join(settings.QUEUE_ERROR_DIR, "unreliable_loopworker.errors")
try:
with suppress(FileNotFoundError):
os.remove(fn)
except OSError: # nocoverage # error handling for the directory not existing
pass
with simulated_queue_client(fake_client):
loopworker = UnreliableLoopWorker()
@ -700,10 +696,8 @@ class WorkerTest(ZulipTestCase):
fake_client.enqueue("timeout_worker", {"type": msg})
fn = os.path.join(settings.QUEUE_ERROR_DIR, "timeout_worker.errors")
try:
with suppress(FileNotFoundError):
os.remove(fn)
except OSError: # nocoverage # error handling for the directory not existing
pass
with simulated_queue_client(fake_client):
worker = TimeoutWorker()

View File

@ -8,6 +8,7 @@ import time
import traceback
import uuid
from collections import deque
from contextlib import suppress
from dataclasses import asdict
from functools import lru_cache
from typing import (
@ -640,10 +641,8 @@ async def setup_event_queue(server: tornado.httpserver.HTTPServer, port: int) ->
load_event_queues(port)
autoreload.add_reload_hook(lambda: dump_event_queues(port))
try:
with suppress(OSError):
os.rename(persistent_queue_filename(port), persistent_queue_filename(port, last=True))
except OSError:
pass
# Set up event queue garbage collection
pc = tornado.ioloop.PeriodicCallback(lambda: gc_event_queues(port), EVENT_QUEUE_GC_FREQ_MSECS)

View File

@ -1,5 +1,6 @@
import logging
import urllib
from contextlib import suppress
from typing import Any, Dict, List, Optional
import tornado.web
@ -147,12 +148,10 @@ class AsyncDjangoHandler(tornado.web.RequestHandler):
self.write(response.content)
# Close the connection.
try:
# While writing the response, we might realize that the
# user already closed the connection; that is fine.
with suppress(StreamClosedError):
await self.finish()
except StreamClosedError:
# While writing the response, we might realize that the
# user already closed the connection; that is fine.
pass
async def get(self, *args: Any, **kwargs: Any) -> None:
request = await self.convert_tornado_request_to_django_request()

View File

@ -708,10 +708,8 @@ def redirect_to_deactivation_notice() -> HttpResponse:
def update_login_page_context(request: HttpRequest, context: Dict[str, Any]) -> None:
for key in ("email", "already_registered"):
try:
if key in request.GET:
context[key] = request.GET[key]
except KeyError:
pass
deactivated_email = request.GET.get("is_deactivated")
if deactivated_email is None:

View File

@ -1,6 +1,7 @@
import os
import subprocess
import urllib
from contextlib import suppress
from typing import Optional
import orjson
@ -44,10 +45,8 @@ def email_page(
def clear_emails(request: HttpRequest) -> HttpResponse:
try:
with suppress(FileNotFoundError):
os.remove(settings.EMAIL_CONTENT_LOG_PATH)
except FileNotFoundError: # nocoverage
pass
return redirect(email_page)

View File

@ -1,4 +1,5 @@
import os
from contextlib import suppress
from typing import TYPE_CHECKING, Any, Dict, List, Optional
import orjson
@ -84,10 +85,9 @@ def get_fixtures(request: HttpRequest, integration_name: str = REQ()) -> HttpRes
fixture_path = os.path.join(fixtures_dir, fixture)
with open(fixture_path) as f:
body = f.read()
try:
# The file extension will be used to determine the type.
with suppress(orjson.JSONDecodeError):
body = orjson.loads(body)
except orjson.JSONDecodeError:
pass # The file extension will be used to determine the type.
headers_raw = get_fixture_http_headers(
valid_integration_name, "".join(fixture.split(".")[:-1])

View File

@ -1,5 +1,6 @@
import logging
import urllib
from contextlib import suppress
from typing import Any, Dict, Iterable, List, Optional
from urllib.parse import urlencode
@ -227,10 +228,8 @@ def accounts_register(
require_ldap_password = False
if from_confirmation:
try:
with suppress(KeyError):
del request.session["authenticated_full_name"]
except KeyError:
pass
ldap_full_name = None
if settings.POPULATE_PROFILE_VIA_LDAP: