email_mirror: Set a short timeout on parsing incoming emails.

This timeout needs to be short enough that we don't drop the RabbitMQ
connection. Also drop the offending message (by returning with no
further exception) so we don't hit a head-of-queue failure situation.

Ideally, the parser would just be lightning-fast, so this would never
happen.
This commit is contained in:
Alex Vandiver 2023-03-17 22:10:15 +00:00 committed by Tim Abbott
parent c134cc3136
commit 19d115a9da
1 changed files with 15 additions and 3 deletions

View File

@ -16,18 +16,21 @@ from zerver.lib.email_mirror import (
)
from zerver.lib.email_mirror import process_message as mirror_email
from zerver.lib.exceptions import RateLimitedError
from zerver.worker.base import QueueProcessingWorker, assign_queue
from zerver.worker.base import QueueProcessingWorker, WorkerTimeoutError, assign_queue
logger = logging.getLogger(__name__)
@assign_queue("email_mirror")
class MirrorWorker(QueueProcessingWorker):
MAX_CONSUME_SECONDS = 5
@override
def consume(self, event: Mapping[str, Any]) -> None:
rcpt_to = event["rcpt_to"]
content = base64.b64decode(event["msg_base64"])
msg = email.message_from_bytes(
base64.b64decode(event["msg_base64"]),
content,
policy=email.policy.default,
)
assert isinstance(msg, EmailMessage) # https://github.com/python/typeshed/issues/2417
@ -45,4 +48,13 @@ class MirrorWorker(QueueProcessingWorker):
)
return
mirror_email(msg, rcpt_to=rcpt_to)
try:
mirror_email(msg, rcpt_to=rcpt_to)
except WorkerTimeoutError: # nocoverage
logging.error(
"Timed out ingesting message-id %s to %s (%d bytes) -- dropping!",
msg["Message-ID"] or "<?>",
rcpt_to,
len(content),
)
return