backend: Add None-checks with assertions and if-elses.

This fixes a batch of mypy errors of the following format:
'Item "None" of "Optional[Something]" has no attribute "abc"'
This commit is contained in:
PIG208 2021-07-25 00:16:48 +08:00 committed by Tim Abbott
parent fffd4ed8d5
commit 66b1a4e7ca
15 changed files with 55 additions and 24 deletions

View File

@ -60,6 +60,7 @@ class Command(BaseCommand):
return
fill_to_time = parse_datetime(options["time"])
assert fill_to_time is not None
if options["utc"]:
fill_to_time = fill_to_time.replace(tzinfo=timezone.utc)
if fill_to_time.tzinfo is None:

View File

@ -313,9 +313,13 @@ def get_chart_data(
_("No analytics data available. Please contact your server administrator.")
)
if start is None:
start = aggregate_table.objects.filter(server=server).first().end_time
first = aggregate_table.objects.filter(server=server).first()
assert first is not None
start = first.end_time
if end is None:
end = aggregate_table.objects.filter(server=server).last().end_time
last = aggregate_table.objects.filter(server=server).last()
assert last is not None
end = last.end_time
else:
# Otherwise, we can use tables on the current server to
# determine a nice range, and some additional validation.

View File

@ -74,6 +74,7 @@ def get_confirmations(
days_to_activate = _properties[type].validity_in_days
expiry_date = confirmation.date_sent + timedelta(days=days_to_activate)
assert content_object is not None
if hasattr(content_object, "status"):
if content_object.status == STATUS_ACTIVE:
link_status = "Link has been clicked"

View File

@ -66,6 +66,7 @@ def get_object_from_key(
raise ConfirmationKeyException(ConfirmationKeyException.EXPIRED)
obj = confirmation.content_object
assert obj is not None
if activate_object and hasattr(obj, "status"):
obj.status = getattr(settings, "STATUS_ACTIVE", 1)
obj.save(update_fields=["status"])

View File

@ -367,18 +367,22 @@ def make_end_of_cycle_updates_if_needed(
plan: CustomerPlan, event_time: datetime
) -> Tuple[Optional[CustomerPlan], Optional[LicenseLedger]]:
last_ledger_entry = LicenseLedger.objects.filter(plan=plan).order_by("-id").first()
last_renewal = (
LicenseLedger.objects.filter(plan=plan, is_renewal=True).order_by("-id").first().event_time
last_ledger_renewal = (
LicenseLedger.objects.filter(plan=plan, is_renewal=True).order_by("-id").first()
)
assert last_ledger_renewal is not None
last_renewal = last_ledger_renewal.event_time
next_billing_cycle = start_of_next_billing_cycle(plan, last_renewal)
if next_billing_cycle <= event_time:
if next_billing_cycle <= event_time and last_ledger_entry is not None:
licenses_at_next_renewal = last_ledger_entry.licenses_at_next_renewal
assert licenses_at_next_renewal is not None
if plan.status == CustomerPlan.ACTIVE:
return None, LicenseLedger.objects.create(
plan=plan,
is_renewal=True,
event_time=next_billing_cycle,
licenses=last_ledger_entry.licenses_at_next_renewal,
licenses_at_next_renewal=last_ledger_entry.licenses_at_next_renewal,
licenses=licenses_at_next_renewal,
licenses_at_next_renewal=licenses_at_next_renewal,
)
if plan.is_free_trial():
plan.invoiced_through = last_ledger_entry
@ -390,8 +394,8 @@ def make_end_of_cycle_updates_if_needed(
plan=plan,
is_renewal=True,
event_time=next_billing_cycle,
licenses=last_ledger_entry.licenses_at_next_renewal,
licenses_at_next_renewal=last_ledger_entry.licenses_at_next_renewal,
licenses=licenses_at_next_renewal,
licenses_at_next_renewal=licenses_at_next_renewal,
)
if plan.status == CustomerPlan.SWITCH_TO_ANNUAL_AT_END_OF_CYCLE:
@ -427,8 +431,8 @@ def make_end_of_cycle_updates_if_needed(
plan=new_plan,
is_renewal=True,
event_time=next_billing_cycle,
licenses=last_ledger_entry.licenses_at_next_renewal,
licenses_at_next_renewal=last_ledger_entry.licenses_at_next_renewal,
licenses=licenses_at_next_renewal,
licenses_at_next_renewal=licenses_at_next_renewal,
)
RealmAuditLog.objects.create(
@ -748,15 +752,16 @@ def invoice_plan(plan: CustomerPlan, event_time: datetime) -> None:
}
description = "Zulip Standard - renewal"
elif licenses_base is not None and ledger_entry.licenses != licenses_base:
assert plan.price_per_license
last_renewal = (
assert plan.price_per_license and ledger_entry is not None
last_ledger_entry_renewal = (
LicenseLedger.objects.filter(
plan=plan, is_renewal=True, event_time__lte=ledger_entry.event_time
)
.order_by("-id")
.first()
.event_time
)
assert last_ledger_entry_renewal is not None
last_renewal = last_ledger_entry_renewal.event_time
period_end = start_of_next_billing_cycle(plan, ledger_entry.event_time)
proration_fraction = (period_end - ledger_entry.event_time) / (
period_end - last_renewal

View File

@ -131,14 +131,16 @@ class CustomerPlan(models.Model):
}[self.status]
def licenses(self) -> int:
return LicenseLedger.objects.filter(plan=self).order_by("id").last().licenses
ledger_entry = LicenseLedger.objects.filter(plan=self).order_by("id").last()
assert ledger_entry is not None
return ledger_entry.licenses
def licenses_at_next_renewal(self) -> Optional[int]:
if self.status == CustomerPlan.DOWNGRADE_AT_END_OF_CYCLE:
return None
return (
LicenseLedger.objects.filter(plan=self).order_by("id").last().licenses_at_next_renewal
)
ledger_entry = LicenseLedger.objects.filter(plan=self).order_by("id").last()
assert ledger_entry is not None
return ledger_entry.licenses_at_next_renewal
def is_free_trial(self) -> bool:
return self.status == CustomerPlan.FREE_TRIAL

View File

@ -3557,7 +3557,7 @@ class InvoiceTest(StripeTestCase):
stripe_customer_id = plan.customer.stripe_customer_id
assert stripe_customer_id is not None
[invoice0, invoice1] = stripe.Invoice.list(customer=stripe_customer_id)
self.assertEqual(invoice0.billing, "send_invoice")
self.assertEqual(invoice0.collection_method, "send_invoice")
[item] = invoice0.lines
line_item_params = {
"amount": 100,

View File

@ -687,6 +687,7 @@ def do_create_user(
.order_by("id")
.last()
)
assert realm_creation_audit_log is not None
realm_creation_audit_log.acting_user = user_profile
realm_creation_audit_log.save(update_fields=["acting_user"])
@ -1143,6 +1144,7 @@ def do_delete_user(user_profile: UserProfile) -> None:
# Recipient objects don't get deleted through CASCADE, so we need to handle
# the user's personal recipient manually. This will also delete all Messages pointing
# to this recipient (all private messages sent to the user).
assert personal_recipient is not None
personal_recipient.delete()
replacement_user = create_user(
force_id=user_id,
@ -2378,7 +2380,9 @@ def check_add_reaction(
# vote any old reaction they see in the UI even if that is a
# deactivated custom emoji, so we just use the emoji name from
# the existing reaction with no further validation.
emoji_name = query.first().emoji_name
reaction = query.first()
assert reaction is not None
emoji_name = reaction.emoji_name
else:
# Otherwise, use the name provided in this request, but verify
# it is valid in the user's realm (e.g. not a deactivated
@ -6949,6 +6953,7 @@ def do_get_user_invites(user_profile: UserProfile) -> List[Dict[str, Any]]:
)
for confirmation_obj in multiuse_confirmation_objs:
invite = confirmation_obj.content_object
assert invite is not None
invites.append(
dict(
invited_by_user_id=invite.referred_by.id,

View File

@ -449,6 +449,7 @@ def process_missed_message(to: str, message: EmailMessage) -> None:
body = construct_zulip_body(message, user_profile.realm)
assert recipient is not None
if recipient.type == Recipient.STREAM:
stream = get_stream_by_id_in_realm(recipient.type_id, user_profile.realm)
send_mm_reply_to_stream(user_profile, stream, topic, body)

View File

@ -1546,7 +1546,7 @@ def export_emoji_from_local(realm: Realm, local_dir: Path, output_dir: Path) ->
author = realm_emoji.author
author_id = None
if author:
author_id = realm_emoji.author.id
author_id = author.id
record = dict(
realm_id=realm.id,
author=author_id,
@ -1959,6 +1959,7 @@ def get_realm_exports_serialized(user: UserProfile) -> List[Dict[str, Any]]:
export_url = None
deleted_timestamp = None
failed_timestamp = None
acting_user = export.acting_user
if export.extra_data is not None:
pending = False
@ -1973,10 +1974,11 @@ def get_realm_exports_serialized(user: UserProfile) -> List[Dict[str, Any]]:
user.realm, export_path
)
assert acting_user is not None
exports_dict[export.id] = dict(
id=export.id,
export_time=export.event_time.timestamp(),
acting_user_id=export.acting_user.id,
acting_user_id=acting_user.id,
export_url=export_url,
deleted_timestamp=deleted_timestamp,
failed_timestamp=failed_timestamp,

View File

@ -275,6 +275,7 @@ def send_email(
def initialize_connection(connection: Optional[BaseEmailBackend] = None) -> BaseEmailBackend:
if not connection:
connection = get_connection()
assert connection is not None
if connection.open():
# If it's a new connection, no need to no-op to check connectivity
@ -293,6 +294,7 @@ def initialize_connection(connection: Optional[BaseEmailBackend] = None) -> Base
# closed by the mail server.
if isinstance(connection, EmailBackend):
try:
assert connection.connection is not None
status = connection.connection.noop()[0]
except Exception:
status = -1

View File

@ -239,7 +239,9 @@ def do_soft_deactivate_user(user_profile: UserProfile) -> None:
except IndexError: # nocoverage
# In the unlikely event that a user somehow has never received
# a message, we just use the overall max message ID.
user_profile.last_active_message_id = Message.objects.last().id
last_message = Message.objects.last()
assert last_message is not None
user_profile.last_active_message_id = last_message.id
user_profile.long_term_idle = True
user_profile.save(update_fields=["long_term_idle", "last_active_message_id"])
logger.info("Soft deactivated user %s", user_profile.id)

View File

@ -175,6 +175,7 @@ def maybe_send_to_registration(
)
multiuse_obj: Optional[MultiuseInvite] = None
realm: Optional[Realm] = None
from_multiuse_invite = False
if multiuse_object_key:
from_multiuse_invite = True
@ -183,13 +184,14 @@ def maybe_send_to_registration(
except ConfirmationKeyException:
return render(request, "zerver/confirmation_link_expired_error.html", status=404)
assert multiuse_obj is not None
realm = multiuse_obj.realm
invited_as = multiuse_obj.invited_as
else:
try:
realm = get_realm(get_subdomain(request))
except Realm.DoesNotExist:
realm = None
pass
invited_as = PreregistrationUser.INVITE_AS["MEMBER"]
form = HomepageForm({"email": email}, realm=realm, from_multiuse_invite=from_multiuse_invite)

View File

@ -82,6 +82,7 @@ def generate_all_emails(request: HttpRequest) -> HttpResponse:
assert result.status_code == 302
change_user_is_active(user, True)
# account on different realm
assert other_realm is not None
result = client.post(
"/accounts/password/reset/", {"email": registered_email}, HTTP_HOST=other_realm.host
)

View File

@ -103,6 +103,7 @@ def check_prereg_key_and_redirect(request: HttpRequest, confirmation_key: str) -
)
prereg_user = confirmation.content_object
assert prereg_user is not None
if prereg_user.status == confirmation_settings.STATUS_REVOKED:
return render(request, "zerver/confirmation_link_expired_error.html")
@ -130,6 +131,7 @@ def accounts_register(request: HttpRequest) -> HttpResponse:
return render(request, "zerver/confirmation_link_expired_error.html", status=404)
prereg_user = confirmation.content_object
assert prereg_user is not None
if prereg_user.status == confirmation_settings.STATUS_REVOKED:
return render(request, "zerver/confirmation_link_expired_error.html", status=404)
email = prereg_user.email