invites: Remove invites worker, make confirmation object in-process.

The "invites" worker exists to do two things -- make a Confirmation
object, and send the outgoing email.  Making the Confirmation object
in a background process from where the PreregistrationUser is created
temporarily leaves the PreregistrationUser in invalid state, and
results in 500's, and the user not immediately seeing the sent
invitation.  That the "invites" worker also wants to create the
Confirmation object means that "resending" an invite invalidates the
URL in the previous email, which can be confusing to the user.

Moving the Confirmation creation to the same transaction solves both
of these issues, and leaves the "invites" worker with nothing to do
but send the email; as such, we remove it entirely, and use the
existing "email_senders" worker to send the invites.  The volume of
invites is small enough that this will not affect other uses of that
worker.

Fixes: #21306
Fixes: #24275
This commit is contained in:
Alex Vandiver 2024-04-30 20:12:34 +00:00 committed by Tim Abbott
parent 512d53d01a
commit 9dfaa83aa8
11 changed files with 116 additions and 235 deletions

View File

@ -42,11 +42,7 @@ from zerver.actions.create_user import (
do_create_user,
do_reactivate_user,
)
from zerver.actions.invites import (
do_invite_users,
do_resend_user_invite_email,
do_revoke_user_invite,
)
from zerver.actions.invites import do_invite_users, do_revoke_user_invite, do_send_user_invite_email
from zerver.actions.message_flags import (
do_mark_all_as_read,
do_mark_stream_messages_as_read,
@ -1719,7 +1715,7 @@ class TestLoggingCountStats(AnalyticsTestCase):
# Resending invite should cost you
with invite_context():
do_resend_user_invite_email(assert_is_not_none(PreregistrationUser.objects.first()))
do_send_user_invite_email(assert_is_not_none(PreregistrationUser.objects.first()))
assertInviteCountEquals(6)
def test_messages_read_hour(self) -> None:

View File

@ -124,14 +124,13 @@ def get_object_from_key(
return obj
def create_confirmation_link(
def create_confirmation_object(
obj: ConfirmationObjT,
confirmation_type: int,
*,
validity_in_minutes: Union[Optional[int], UnspecifiedValue] = UnspecifiedValue(),
url_args: Mapping[str, str] = {},
no_associated_realm_object: bool = False,
) -> str:
) -> "Confirmation":
# validity_in_minutes is an override for the default values which are
# determined by the confirmation_type - its main purpose is for use
# in tests which may want to have control over the exact expiration time.
@ -158,7 +157,7 @@ def create_confirmation_link(
else:
expiry_date = current_time + timedelta(days=_properties[confirmation_type].validity_in_days)
Confirmation.objects.create(
return Confirmation.objects.create(
content_object=obj,
date_sent=current_time,
confirmation_key=key,
@ -166,7 +165,31 @@ def create_confirmation_link(
expiry_date=expiry_date,
type=confirmation_type,
)
return confirmation_url(key, realm, confirmation_type, url_args)
def create_confirmation_link(
obj: ConfirmationObjT,
confirmation_type: int,
*,
validity_in_minutes: Union[Optional[int], UnspecifiedValue] = UnspecifiedValue(),
url_args: Mapping[str, str] = {},
no_associated_realm_object: bool = False,
) -> str:
return confirmation_url_for(
create_confirmation_object(
obj,
confirmation_type,
validity_in_minutes=validity_in_minutes,
no_associated_realm_object=no_associated_realm_object,
),
url_args=url_args,
)
def confirmation_url_for(confirmation_obj: "Confirmation", url_args: Mapping[str, str] = {}) -> str:
return confirmation_url(
confirmation_obj.confirmation_key, confirmation_obj.realm, confirmation_obj.type, url_args
)
def confirmation_url(

View File

@ -365,12 +365,6 @@ define service {
check_command check_rabbitmq_consumers!embedded_bots
}
define service {
use rabbitmq-consumer-service
service_description Check RabbitMQ invites consumers
check_command check_rabbitmq_consumers!invites
}
define service {
use rabbitmq-consumer-service
service_description Check RabbitMQ missedmessage_emails consumers

View File

@ -137,7 +137,6 @@ class zulip::app_frontend_base {
'email_mirror',
'embed_links',
'embedded_bots',
'invites',
'email_senders',
'missedmessage_emails',
'missedmessage_mobile_notifications',

View File

@ -15,7 +15,6 @@ normal_queues = [
"email_senders",
"embed_links",
"embedded_bots",
"invites",
"missedmessage_emails",
"missedmessage_mobile_notifications",
"outgoing_webhooks",

View File

@ -1,6 +1,6 @@
import logging
from datetime import timedelta
from typing import Any, Collection, Dict, List, Optional, Sequence, Set, Tuple, Union
from datetime import datetime, timedelta
from typing import Any, Collection, Dict, List, Optional, Sequence, Set, Tuple
from django.conf import settings
from django.contrib.contenttypes.models import ContentType
@ -13,7 +13,13 @@ from zxcvbn import zxcvbn
from analytics.lib.counts import COUNT_STATS, do_increment_logging_stat
from analytics.models import RealmCount
from confirmation import settings as confirmation_settings
from confirmation.models import Confirmation, confirmation_url, create_confirmation_link
from confirmation.models import (
Confirmation,
confirmation_url_for,
create_confirmation_link,
create_confirmation_object,
)
from zerver.context_processors import common_context
from zerver.lib.email_validation import (
get_existing_user_errors,
get_realm_email_validator,
@ -22,44 +28,13 @@ from zerver.lib.email_validation import (
from zerver.lib.exceptions import InvitationError
from zerver.lib.invites import notify_invites_changed
from zerver.lib.queue import queue_event_on_commit
from zerver.lib.send_email import FromAddress, clear_scheduled_invitation_emails, send_email
from zerver.lib.send_email import FromAddress, clear_scheduled_invitation_emails, send_future_email
from zerver.lib.timestamp import datetime_to_timestamp
from zerver.lib.types import UnspecifiedValue
from zerver.lib.utils import assert_is_not_none
from zerver.models import Message, MultiuseInvite, PreregistrationUser, Realm, Stream, UserProfile
from zerver.models.prereg_users import filter_to_valid_prereg_users
def do_send_confirmation_email(
invitee: PreregistrationUser,
referrer: UserProfile,
email_language: str,
invite_expires_in_minutes: Union[Optional[int], UnspecifiedValue] = UnspecifiedValue(),
) -> str:
"""
Send the confirmation/welcome e-mail to an invited user.
"""
activation_url = create_confirmation_link(
invitee, Confirmation.INVITATION, validity_in_minutes=invite_expires_in_minutes
)
context = {
"referrer_full_name": referrer.full_name,
"referrer_email": referrer.delivery_email,
"activate_url": activation_url,
"referrer_realm_name": referrer.realm.name,
"corporate_enabled": settings.CORPORATE_ENABLED,
}
send_email(
"zerver/emails/invitation",
to_emails=[invitee.email],
from_address=FromAddress.tokenized_no_reply_address(),
language=email_language,
context=context,
realm=referrer.realm,
)
return activation_url
def estimate_recent_invites(realms: Collection[Realm] | QuerySet[Realm], *, days: int) -> int:
"""An upper bound on the number of invites sent in the last `days` days"""
recent_invites = RealmCount.objects.filter(
@ -278,17 +253,6 @@ def do_invite_users(
_("We weren't able to invite anyone."), skipped, sent_invitations=False
)
# We do this here rather than in the invite queue processor since this
# is used for rate limiting invitations, rather than keeping track of
# when exactly invitations were sent
do_increment_logging_stat(
realm,
COUNT_STATS["invites_sent::day"],
None,
timezone_now(),
increment=len(validated_emails),
)
# Now that we are past all the possible errors, we actually create
# the PreregistrationUser objects and trigger the email invitations.
for email in validated_emails:
@ -300,13 +264,14 @@ def do_invite_users(
stream_ids = [stream.id for stream in streams]
prereg_user.streams.set(stream_ids)
event = {
"prereg_id": prereg_user.id,
"referrer_id": user_profile.id,
"email_language": realm.default_language,
"invite_expires_in_minutes": invite_expires_in_minutes,
}
queue_event_on_commit("invites", event)
confirmation = create_confirmation_object(
prereg_user, Confirmation.INVITATION, validity_in_minutes=invite_expires_in_minutes
)
do_send_user_invite_email(
prereg_user,
confirmation=confirmation,
invite_expires_in_minutes=invite_expires_in_minutes,
)
notify_invites_changed(realm, changed_invite_referrer=user_profile)
@ -377,11 +342,7 @@ def do_get_invites_controlled_by_user(user_profile: UserProfile) -> List[Dict[st
invited=datetime_to_timestamp(confirmation_obj.date_sent),
expiry_date=get_invitation_expiry_date(confirmation_obj),
id=invite.id,
link_url=confirmation_url(
confirmation_obj.confirmation_key,
user_profile.realm,
Confirmation.MULTIUSE_INVITE,
),
link_url=confirmation_url_for(confirmation_obj),
invited_as=invite.invited_as,
is_multiuse=True,
)
@ -438,31 +399,67 @@ def do_revoke_multi_use_invite(multiuse_invite: MultiuseInvite) -> None:
@transaction.atomic
def do_resend_user_invite_email(prereg_user: PreregistrationUser) -> None:
def do_send_user_invite_email(
prereg_user: PreregistrationUser,
*,
confirmation: Optional[Confirmation] = None,
event_time: Optional[datetime] = None,
invite_expires_in_minutes: Optional[int] = None,
) -> None:
# Take a lock on the realm, so we can check for invitation limits without races
realm_id = assert_is_not_none(prereg_user.realm_id)
realm = Realm.objects.select_for_update().get(id=realm_id)
check_invite_limit(realm, 1)
referrer = assert_is_not_none(prereg_user.referred_by)
assert prereg_user.referred_by is not None
if event_time is None:
event_time = prereg_user.invited_at
do_increment_logging_stat(realm, COUNT_STATS["invites_sent::day"], None, event_time)
expiry_date = prereg_user.confirmation.get().expiry_date
if expiry_date is None:
invite_expires_in_minutes = None
else:
# The resent invitation is reset to expire as long after the
# reminder is sent as it lasted originally.
invite_expires_in_minutes = (expiry_date - timezone_now()).total_seconds() / 60
prereg_user.confirmation.clear()
if confirmation is None:
confirmation = prereg_user.confirmation.get()
do_increment_logging_stat(realm, COUNT_STATS["invites_sent::day"], None, prereg_user.invited_at)
event = {
"template_prefix": "zerver/emails/invitation",
"to_emails": [prereg_user.email],
"from_address": FromAddress.tokenized_no_reply_address(),
"language": realm.default_language,
"context": {
"referrer_full_name": referrer.full_name,
"referrer_email": referrer.delivery_email,
"activate_url": confirmation_url_for(confirmation),
"referrer_realm_name": realm.name,
"corporate_enabled": settings.CORPORATE_ENABLED,
},
"realm_id": realm.id,
}
queue_event_on_commit("email_senders", event)
clear_scheduled_invitation_emails(prereg_user.email)
# We don't store the custom email body, so just set it to None
event = {
"prereg_id": prereg_user.id,
"referrer_id": prereg_user.referred_by.id,
"email_language": realm.default_language,
"invite_expires_in_minutes": invite_expires_in_minutes,
}
queue_event_on_commit("invites", event)
if invite_expires_in_minutes is None and confirmation.expiry_date is not None:
# Pull the remaining time from the confirmation object
invite_expires_in_minutes = (confirmation.expiry_date - event_time).total_seconds() / 60
if invite_expires_in_minutes is None or invite_expires_in_minutes < 4 * 24 * 60:
# We do not queue reminder email for never expiring
# invitations. This is probably a low importance bug; it
# would likely be more natural to send a reminder after 7
# days.
return
context = common_context(referrer)
context.update(
activate_url=confirmation_url_for(confirmation),
referrer_name=referrer.full_name,
referrer_email=referrer.delivery_email,
referrer_realm_name=realm.name,
)
send_future_email(
"zerver/emails/invitation_reminder",
realm,
to_emails=[prereg_user.email],
from_address=FromAddress.tokenized_no_reply_placeholder,
language=realm.default_language,
context=context,
delay=timedelta(minutes=invite_expires_in_minutes - (2 * 24 * 60)),
)

View File

@ -1,6 +1,3 @@
from datetime import timedelta
from typing import Optional, Union
from django.contrib.contenttypes.fields import GenericRelation
from django.db import models
from django.db.models import CASCADE, Q, QuerySet
@ -8,7 +5,6 @@ from django.db.models.functions import Upper
from django.utils.timezone import now as timezone_now
from confirmation import settings as confirmation_settings
from zerver.lib.types import UnspecifiedValue
from zerver.models.constants import MAX_LANGUAGE_ID_LENGTH
from zerver.models.realms import Realm
from zerver.models.users import UserProfile
@ -115,7 +111,6 @@ class PreregistrationUser(models.Model):
def filter_to_valid_prereg_users(
query: QuerySet[PreregistrationUser],
invite_expires_in_minutes: Union[Optional[int], UnspecifiedValue] = UnspecifiedValue(),
) -> QuerySet[PreregistrationUser]:
"""
If invite_expires_in_days is specified, we return only those PreregistrationUser
@ -125,17 +120,6 @@ def filter_to_valid_prereg_users(
revoked_value = confirmation_settings.STATUS_REVOKED
query = query.exclude(status__in=[used_value, revoked_value])
if invite_expires_in_minutes is None:
# Since invite_expires_in_minutes is None, we're invitation will never
# expire, we do not need to check anything else and can simply return
# after excluding objects with active and revoked status.
return query
assert invite_expires_in_minutes is not None
if not isinstance(invite_expires_in_minutes, UnspecifiedValue):
lowest_datetime = timezone_now() - timedelta(minutes=invite_expires_in_minutes)
return query.filter(invited_at__gte=lowest_datetime)
else:
return query.filter(
Q(confirmation__expiry_date=None) | Q(confirmation__expiry_date__gte=timezone_now())
)

View File

@ -23,12 +23,7 @@ from zerver.lib.remote_server import PushNotificationBouncerRetryLaterError
from zerver.lib.send_email import EmailNotDeliveredError, FromAddress
from zerver.lib.test_classes import ZulipTestCase
from zerver.lib.test_helpers import mock_queue_publish
from zerver.models import (
PreregistrationUser,
ScheduledMessageNotificationEmail,
UserActivity,
UserProfile,
)
from zerver.models import ScheduledMessageNotificationEmail, UserActivity, UserProfile
from zerver.models.clients import get_client
from zerver.models.realms import get_realm
from zerver.models.scheduled_jobs import NotificationTriggers
@ -38,7 +33,6 @@ from zerver.worker import base as base_worker
from zerver.worker.email_mirror import MirrorWorker
from zerver.worker.email_senders import EmailSendingWorker
from zerver.worker.embed_links import FetchLinksEmbedData
from zerver.worker.invites import ConfirmationEmailWorker
from zerver.worker.missedmessage_emails import MissedMessageWorker
from zerver.worker.missedmessage_mobile_notifications import PushNotificationsWorker
from zerver.worker.user_activity import UserActivityWorker
@ -683,47 +677,6 @@ class WorkerTest(ZulipTestCase):
self.assertEqual(data["failed_tries"], 1 + MAX_REQUEST_RETRIES)
def test_invites_worker(self) -> None:
fake_client = FakeClient()
inviter = self.example_user("iago")
prereg_alice = PreregistrationUser.objects.create(
email=self.nonreg_email("alice"), referred_by=inviter, realm=inviter.realm
)
PreregistrationUser.objects.create(
email=self.nonreg_email("bob"), referred_by=inviter, realm=inviter.realm
)
invite_expires_in_minutes = 4 * 24 * 60
data: List[Dict[str, Any]] = [
dict(
prereg_id=prereg_alice.id,
referrer_id=inviter.id,
invite_expires_in_minutes=invite_expires_in_minutes,
),
dict(
prereg_id=prereg_alice.id,
referrer_id=inviter.id,
email_language="en",
invite_expires_in_minutes=invite_expires_in_minutes,
),
# Nonexistent prereg_id, as if the invitation was deleted
dict(
prereg_id=-1,
referrer_id=inviter.id,
invite_expires_in_minutes=invite_expires_in_minutes,
),
]
for element in data:
fake_client.enqueue("invites", element)
with simulated_queue_client(fake_client):
worker = ConfirmationEmailWorker()
worker.setup()
with patch("zerver.actions.user_settings.send_email"), patch(
"zerver.worker.invites.send_future_email"
) as send_mock:
worker.start()
self.assertEqual(send_mock.call_count, 2)
def test_error_handling(self) -> None:
processed = []

View File

@ -3,6 +3,7 @@ from typing import List, Optional, Sequence, Set
from django.conf import settings
from django.http import HttpRequest, HttpResponse
from django.utils.timezone import now as timezone_now
from django.utils.translation import gettext as _
from confirmation import settings as confirmation_settings
@ -10,9 +11,9 @@ from zerver.actions.invites import (
do_create_multiuse_invite_link,
do_get_invites_controlled_by_user,
do_invite_users,
do_resend_user_invite_email,
do_revoke_multi_use_invite,
do_revoke_user_invite,
do_send_user_invite_email,
)
from zerver.decorator import require_member_or_admin
from zerver.lib.exceptions import InvitationError, JsonableError, OrganizationOwnerRequiredError
@ -193,7 +194,7 @@ def resend_user_invite_email(
if prereg_user.referred_by_id != user_profile.id:
check_role_based_permissions(prereg_user.invited_as, user_profile, require_admin=True)
do_resend_user_invite_email(prereg_user)
do_send_user_invite_email(prereg_user, event_time=timezone_now())
return json_success(request)

View File

@ -15,6 +15,7 @@ from zerver.lib.send_email import (
initialize_connection,
send_email,
)
from zerver.models import Realm
from zerver.worker.base import ConcreteQueueWorker, LoopQueueProcessingWorker, assign_queue
logger = logging.getLogger(__name__)
@ -58,6 +59,10 @@ class EmailSendingWorker(LoopQueueProcessingWorker):
if "failed_tries" in copied_event:
del copied_event["failed_tries"]
handle_send_email_format_changes(copied_event)
if "realm_id" in copied_event:
# "realm" does not serialize over the queue, so we send the realm_id
copied_event["realm"] = Realm.objects.get(id=copied_event["realm_id"])
del copied_event["realm_id"]
self.connection = initialize_connection(self.connection)
send_email(**copied_event, connection=self.connection)

View File

@ -1,70 +0,0 @@
# Documented in https://zulip.readthedocs.io/en/latest/subsystems/queuing.html
import logging
from datetime import timedelta
from typing import Any, Mapping
from typing_extensions import override
from zerver.actions.invites import do_send_confirmation_email
from zerver.context_processors import common_context
from zerver.lib.send_email import FromAddress, send_future_email
from zerver.models import PreregistrationUser
from zerver.models.prereg_users import filter_to_valid_prereg_users
from zerver.models.users import get_user_profile_by_id
from zerver.worker.base import QueueProcessingWorker, assign_queue
logger = logging.getLogger(__name__)
@assign_queue("invites")
class ConfirmationEmailWorker(QueueProcessingWorker):
@override
def consume(self, data: Mapping[str, Any]) -> None:
if "invite_expires_in_days" in data:
invite_expires_in_minutes = data["invite_expires_in_days"] * 24 * 60
elif "invite_expires_in_minutes" in data:
invite_expires_in_minutes = data["invite_expires_in_minutes"]
invitee = filter_to_valid_prereg_users(
PreregistrationUser.objects.filter(id=data["prereg_id"]), invite_expires_in_minutes
).first()
if invitee is None:
# The invitation could have been revoked
return
referrer = get_user_profile_by_id(data["referrer_id"])
logger.info(
"Sending invitation for realm %s to %s", referrer.realm.string_id, invitee.email
)
if "email_language" in data:
email_language = data["email_language"]
else:
email_language = referrer.realm.default_language
activate_url = do_send_confirmation_email(
invitee, referrer, email_language, invite_expires_in_minutes
)
if invite_expires_in_minutes is None:
# We do not queue reminder email for never expiring
# invitations. This is probably a low importance bug; it
# would likely be more natural to send a reminder after 7
# days.
return
# queue invitation reminder
if invite_expires_in_minutes >= 4 * 24 * 60:
context = common_context(referrer)
context.update(
activate_url=activate_url,
referrer_name=referrer.full_name,
referrer_email=referrer.delivery_email,
referrer_realm_name=referrer.realm.name,
)
send_future_email(
"zerver/emails/invitation_reminder",
referrer.realm,
to_emails=[invitee.email],
from_address=FromAddress.tokenized_no_reply_placeholder,
language=email_language,
context=context,
delay=timedelta(minutes=invite_expires_in_minutes - (2 * 24 * 60)),
)