Python's behaviour on `sys.exit` is to wait for all non-daemon threads
to exit. In the context of the missedmessage_emails worker, if any
work is pending, a non-daemon Timer thread exists, which is waiting
for 5 seconds. As soon as that thread is serviced, it sets up another
5-second Timer, a process which repeats until all
ScheduledMessageNotificationEmail records have been handled. This
likely takes two minutes, but may theoretically take up to a week
until the thread exits, and thus sys.exit can complete.
Supervisor only gives the process 30 seconds to shut down, so
something else must prevent this endless Timer.
When `stop` is called, take the lock so we can mutate the timer.
However, since `stop` may have been called from a signal handler, our
thread may _already_ have the lock. As Python provides no way to know
if our thread is the one which has the lock, make the lock a
re-entrant one, allowing us to always try to take it.
With the lock in hand, cancel any outstanding timers. A race exists
where the timer may not be able to be canceled because it has
finished, maybe_send_batched_emails has been called, and is itself
blocked on the lock. Handle this case by timing out the thread join
in `stop()`, and signal the running thread to exit by unsetting the
timer event, which will be detected once it claims the lock.
RabbitMQ clients have a setting called prefetch[1], which controls how
many un-acknowledged events the server forwards to the local queue in
the client. The default is 0; this means that when clients first
connect, the server must send them every message in the queue.
This itself may cause unbounded memory usage in the client, but also
has other detrimental effects. While the client is attempting to
process the head of the queue, it may be unable to read from the TCP
socket at the rate that the server is sending to it -- filling the TCP
buffers, and causing the server's writes to block. If the server
blocks for more than 30 seconds, it times out the send, and closes the
connection with:
```
closing AMQP connection <0.30902.126> (127.0.0.1:53870 -> 127.0.0.1:5672):
{writer,send_failed,{error,timeout}}
```
This is https://github.com/pika/pika/issues/753#issuecomment-318119222.
Set a prefetch limit of 100 messages, or the batch size, to better
handle queues which start with large numbers of outstanding events.
Setting prefetch=1 causes significant performance degradation in the
no-op queue worker, to 30% of the prefetch=0 performance. Setting
prefetch=100 achieves 90% of the prefetch=0 performance, and higher
values offer only minor gains above that. For batch workers, their
performance is not notably degraded by prefetch equal to their batch
size, and they cannot function on smaller prefetches than their batch
size.
We also set a 100-count prefetch on Tornado workers, as they are
potentially susceptible to the same effect.
[1] https://www.rabbitmq.com/confirms.html#channel-qos-prefetch
The `current_queue_size` key in the queue monitoring stats file was
the local queue size, not the global queue size -- d5a6b0f99a
renamed the function, but did not adjust the queue monitoring JSON,
despite the last use of it having been removed in cd9b194d88.
The function is still used to mark "we emptied our queue," and it
remains a reasonable metric for that.
For export realm following changes have been made:
- `./manage.py export --upload` would delete `.tar.gz` and unpacked dir
- `./manage.py export` would only delete `unpacked dir`
Besides, we have removed `--delete-after-upload` as we have set it as
the default.
Fixes#20081
Since 3853285241, PushNotificationsWorker uses the aioapns library
to send Apple push notifications. This introduces an asyncio event
loop into this worker process, which, if unlucky, can respond poorly
when a SIGALRM is introduced to it:
```
[asyncio] Task exception was never retrieved
future: <Task finished coro=<send_apple_push_notification.<locals>.attempt_send() done, defined at /path/to/zerver/lib/push_notifications.py:166> exception=WorkerTimeoutException(30, 1)>
Traceback (most recent call last):
File "/path/to/zerver/lib/push_notifications.py", line 169, in attempt_send
result = await apns_context.apns.send_notification(request)
File "/path/to/zulip-py3-venv/lib/python3.6/site-packages/aioapns/client.py", line 57, in send_notification
response = await self.pool.send_notification(request)
File "/path/to/zulip-py3-venv/lib/python3.6/site-packages/aioapns/connection.py", line 407, in send_notification
response = await connection.send_notification(request)
File "/path/to/zulip-py3-venv/lib/python3.6/site-packages/aioapns/connection.py", line 189, in send_notification
data = json.dumps(request.message, ensure_ascii=False).encode()
File "/usr/lib/python3.6/json/__init__.py", line 238, in dumps
**kw).encode(obj)
File "/usr/lib/python3.6/json/encoder.py", line 199, in encode
chunks = self.iterencode(o, _one_shot=True)
File "/usr/lib/python3.6/json/encoder.py", line 257, in iterencode
return _iterencode(o, 0)
File "/path/to/zerver/worker/queue_processors.py", line 353, in timer_expired
raise WorkerTimeoutException(limit, len(events))
zerver.worker.queue_processors.WorkerTimeoutException: Timed out after 30 seconds processing 1 events
```
...which subsequently leads to the worker failing to make any progress
on the queue.
Remove the timeout on the worker. This may result in failing to make
forward progress if Apple/Google take overly long handling requests,
but is likely preferable to failing to make forward progress if _one_
request takes too long and gets unlucky with when the signal comes
through.
This extends the invite api endpoints to handle an extra
argument, expiration duration, which states the number of
days before the invitation link expires.
For prereg users, expiration info is attached to event
object to pass it to invite queue processor in order to
create and send confirmation link.
In case of multiuse invites, confirmation links are
created directly inside do_create_multiuse_invite_link(),
For filtering valid user invites, expiration info stored in
Confirmation object is used, which is accessed by a prereg
user using reverse generic relations.
Fixes#16359.
This information can be gleaned from the stacktrace, but making it
explicit in the stringification makes it much easier to differentiate
types of errors at a glance, particularly in Sentry.
maybe_send_batched_emails handles batches of emails from different
users at once; as it processes each user's batch, it enqueues messages
onto the `email_senders` queue. If `handle_missedmessage_emails`
raises an exception when processing a single user's email, no events
are marked as handled -- including those that were already handled and
enqueued onto `email_senders`. This results in an increasing number
of users being sent repeated emails about the same missed messages.
Catch and log any exceptions when handling an individual user's
events. This guarantees forward progress, and that notifications are
sent at-most-once, not at-least-once.
We only have one query which will change database state in this function,
and we already have a lock on the process itself, so there's no need for
a transaction.
This was added in ebb4eab0f9.
Previously, we stored up to 2 minutes worth of email events in memory
before processing them. So, if the server were to go down we would lose
those events.
To fix this, we store the events in the database.
This is a prep change for allowing users to set custom grace period for
email notifications, since the bug noted above will aggravate with
longer grace periods.
The `# nocoverage` was unnecessary apart from for the compatibility code,
so add a test for that code and remove the `# nocoverage`.
The `message_id` -> `message_ids` conversion was done in
9869153ae8.
This fixes a bug introduced in 95b46549e1
which made the worker simply log a warning about the timeout and then
continue consume()ing the event that should have also been interrupted.
The idea here is to introduce an exception which can be used to
interrupt the consume() process without triggering the regular handling
of exceptions that happens in _handle_consume_exception.
Throwing an exception is excessive in case of this worker, as it's
expected for it to time out sometimes if the urls take too long to
process.
With a test added by tabbott.
This allows specific queue workers to override the defaut behavior and
implement their own response to the timer expiring. We will want to use
this for embed_links queue at least.
This adds a new class called MessageRenderingResult to contain the
additional properties we added to the Message object (like alert_words)
as well as the rendered content to ensure typesafe reference. No
behavioral change is made except changes in typing.
This is a preparatory change for adding django-stubs to the backend.
Related: #18777
This is a prep commit for adding realm-level default for various
user settings. We add the language, in which the invite email will
be sent, to the dict added to queue itself to avoid making queries
in a loop when sending multiple emails from queue.
We also handle the case for old events in the queue.
Django's default SMTP implementation can raise various exceptions
when trying to send an email. In order to allow Zulip calling code
to catch fewer exceptions to handle any cause of "email not
sent", we translate most of them into EmailNotDeliveredException.
The non-translated exceptions concern the connection with the
SMTP server. They were not merged with the rest to keep some
details about the nature of these.
Tests are implemented in the test_send_email.py module.
This will stop dropping events in the case that the background
`maybe_send_batched_email` thread takes longer than 30s. However, see
also #15280 and the TODO comment about how we lose events upon
restart; this worker is still lossy.
Previously the outgoing emails were sent over several SMTP
connections through the EmailSendingWorker; establishing a new
connection each time adds notable overhead.
Redefine EmailSendingWorker worker to be a LoopQueueProcessingWorker,
which allows it to handle batches of events. At the same time, persist
the connection across email sending, if possible.
The connection is initialized in the constructor of the worker
in order to keep the same connection throughout the whole process.
The concrete implementation of the consume_batch function is simply
processing each email one at a time until they have all been sent.
In order to reuse the previously implemented decorator to retry
sending failures a new method that meets the decorator's required
arguments is declared inside the EmailSendingWorker class. This
allows to retry the sending process of a particular email inside
the batch if the caught exception leaves this process retriable.
A second retry mechanism is used inside the initialize_connection
function to redo the opening of the connection until it works or
until three attempts failed. For this purpose the backoff module
has been added to the dependencies and a test has been added to
ensure that this retry mechanism works well.
The connection is closed when the stop method is called.
Fixes: #17672.
This was introduced in 8321bd3f92 to serve as a sort of drop-in
replacement for zerver.lib.queue.queue_json_publish, but its use has
been subsequently cut out (e.g. `9fcdb6c83ac5`).
Remote its last callsite.
django.utils.translation.ugettext is a deprecated alias of
django.utils.translation.gettext as of Django 3.0, and will be removed
in Django 4.0.
Signed-off-by: Anders Kaseorg <anders@zulip.com>
The existing organization, of returning an opaque blob from
`build_bot_request`, which was later consumed by
`send_data_to_server`, is not particularly sensible; the steps become
oddly split between the OutgoingWebhookWorker, `do_rest_call`, and the
`OutgoingWebhookServiceInterface`.
Make the `OutgoingWebhookServiceInterface` in charge of building,
making, and returning the request in one method; another method
handles extracting content from a successful response. `do_rest_call`
is responsible for calling both halves of this, and doing common error
handling.
The `deployment` key was only set in `do_report_error`, which is now
only used in one codepath (the queue worker). The logging handlers on
staging call notify_server_error directly, which omits the
`deployment` key.
Remove the odd one-of key, and instead simply do dispatch in
`do_report_error`.
Not all of the workers are known to be safe to interrupt; they might
leave inconsistent state. As such, terminating them with timeouts
should currently only be a last-resort against stalled queues, not a
regular occurrence.
Since the exception can be triggered at arbitrary places in the stack
based on whenever the alarm happens to fire, they do not often group
together.
Explicitly group them together, grouped only by which queue the work
is in.
We already trust ids that are put on our queue
for deferred work. For example, see the code for
"mark_stream_messages_as_read_for_everyone"
We now pass stream_recipient_id when we queue
up work for do_mark_stream_messages_as_read.
This generally saves about 3 queries per
user when we unsubscribe them from a stream.
Since this was using repead individual get() calls previously, it
could not be monitored for having a consumer. Add it in, by marking
it of queue type "consumer" (the default), and adding Nagios lines for
it.
Also adjust missedmessage_emails to be monitored; it stopped using
LoopQueueProcessingWorker in 5cec566cb9, but was never added back
into the set of monitored consumers.
This low-level interface allows consuming from a queue with timeouts.
This can be used to either consume in batches (with an upper timeout),
or one-at-a-time. This is notably more performant than calling
`.get()` repeatedly (what json_drain_queue does under the hood), which
is "*highly discouraged* as it is *very inefficient*"[1].
Before this change:
```
$ ./manage.py queue_rate --count 10000 --batch
Purging queue...
Enqueue rate: 11158 / sec
Dequeue rate: 3075 / sec
```
After:
```
$ ./manage.py queue_rate --count 10000 --batch
Purging queue...
Enqueue rate: 11511 / sec
Dequeue rate: 19938 / sec
```
[1] https://www.rabbitmq.com/consumers.html#fetching
Despite its name, the `queue_size` method does not return the number
of items in the queue; it returns the number of items that the local
consumer has delivered but unprocessed. These are often, but not
always, the same.
RabbitMQ's queues maintain the queue of unacknowledged messages; when
a consumer connects, it sends to the consumer some number of messages
to handle, known as the "prefetch." This is a performance
optimization, to ensure the consumer code does not need to wait for a
network round-trip before having new data to consume.
The default prefetch is 0, which means that RabbitMQ immediately dumps
all outstanding messages to the consumer, which slowly processes and
acknowledges them. If a second consumer were to connect to the same
queue, they would receive no messages to process, as the first
consumer has already been allocated them. If the first consumer
disconnects or crashes, all prior events sent to it are then made
available for other consumers on the queue.
The consumer does not know the total size of the queue -- merely how
many messages it has been handed.
No change is made to the prefetch here; however, future changes may
wish to limit the prefetch, either for memory-saving, or to allow
multiple consumers to work the same queue.
Rename the method to make clear that it only contains information
about the local queue in the consumer, not the full RabbitMQ queue.
Also include the waiting message count, which is used by the
`consume()` iterator for similar purpose to the pending events list.
Otherwise, if consume_func raised an exception for any reason *other*
than the alarm being fired, the still-pending alarm would have fired
later at some arbitrary point in the calling code.
We need two try…finally blocks in case the signal arrives just before
signal.alarm(0).
Signed-off-by: Anders Kaseorg <anders@zulip.com>
SIGALRM is the simplest way to set a specific maximum duration that
queue workers can take to handle a specific message. This only works
in non-threaded environments, however, as signal handlers are
per-process, not per-thread.
The MAX_CONSUME_SECONDS is set quite high, at 10s -- the longest
average worker consume time is embed_links, which hovers near 1s.
Since just knowing the recent mean does not give much information[1],
it is difficult to know how much variance is expected. As such, we
set the threshold to be such that only events which are significant
outliers will be timed out. This can be tuned downwards as more
statistics are gathered on the runtime of the workers.
The exception to this is DeferredWorker, which deals with quite-long
requests, and thus has no enforceable SLO.
[1] https://www.autodesk.com/research/publications/same-stats-different-graphs
Currently, drain_queue and json_drain_queue ack every message as it is
pulled off of the queue, until the queue is empty. This means that if
the consumer crashes between pulling a batch of messages off the
queue, and actually processing them, those messages will be
permanently lost. Sending an ACK on every message also results in a
significant amount lot of traffic to rabbitmq, with notable
performance implications.
Send a singular ACK after the processing has completed, by making
`drain_queue` into a contextmanager. Additionally, use the `multiple`
flag to ACK all of the messages at once -- or explicitly NACK the
messages if processing failed. Sending a NACK will re-queue them at
the front of the queue.
Performance of a no-op dequeue before this change:
```
$ ./manage.py queue_rate --count 50000 --batch
Purging queue...
Enqueue rate: 10847 / sec
Dequeue rate: 2479 / sec
```
Performance of a no-op dequeue after this change (a 25% increase):
```
$ ./manage.py queue_rate --count 50000 --batch
Purging queue...
Enqueue rate: 10752 / sec
Dequeue rate: 3079 / sec
```
This system can't update stats while the queue is idle, without using
threads for this, but at least we ensure to update the file after
consuming an event if more than MAX_SECONDS_BEFORE_UPDATE_STATS passed
since the last update, regardless of the number of iterations done so
far.
The race condition is described in the comment block removed by this
commit. This leaves room for another, remaining race condition
that should be virtually impossible, but nevertheless it seems
worthwhile to have it documented in the code, so we put a new comment
describing it.
As a final note, this is not a new race condition,
it was hypothetically possible with the old code as well.
The query to finds and marks all unread UserMessages in the stream as read
can be quite expensive, so we'll move that work to the deferred_work
queue and split it into batches.
Fixes#15770.
This queue had a race condition with creation of another Timer while
maybe_send_batched_emails is still doing its work, which may cause
two or more threads to be running maybe_send_batched_emails
at the same time, mutating the shared data simultaneously.
Another less likely potential race condition was that
maybe_send_batched_emails after sending out its email, can call
ensure_timer(). If the consume function is run simultaneously
in the main thread, it will call ensure_timer() too, which,
given unfortunate timings, might lead to both calls setting a new Timer.
We add locking to the queue to avoid such race conditions.
Tested manually, by print debugging with the following setup:
1. Making handle_missedmessage_emails sleep 2 seconds for each email,
and changed BATCH_DURATION to 1s to make the queue start working
right after launching.
2. Putting a bunch of events in the queue.
3. ./manage.py process_queue --queue_name missedmessage_emails
4. Once maybe_send_batched_emails is called and while it's processing
the events, I pushed more events to the queue. That triggers the
consume() function and ensure_timer().
Before implementing the locking mechanism, this causes two threads
to run maybe_send_batched_emails at the same time, mutating each other's
shared data, causing a traceback such as
Exception in thread Thread-3:
Traceback (most recent call last):
File "/usr/lib/python3.6/threading.py", line 916, in _bootstrap_inner
self.run()
File "/usr/lib/python3.6/threading.py", line 1182, in run
self.function(*self.args, **self.kwargs)
File "/srv/zulip/zerver/worker/queue_processors.py", line 507, in maybe_send_batched_emails
del self.events_by_recipient[user_profile_id]
KeyError: '5'
With the locking mechanism, things get handled as expected, and
ensure_timer() exits if it can't obtain the lock due to
maybe_send_batched_emails still working.
Co-authored-by: Tim Abbott <tabbott@zulip.com>
The exception trace only goes from where the exception was thrown up
to where the `logging.exception` call is; any context as to where
_that_ was called from is lost, unless `stack_info` is passed as well.
Having the stack is particularly useful for Sentry exceptions, which
gain the full stack trace.
Add `stack_info=True` on all `logging.exception` calls with a
non-trivial stack; we omit `wsgi.py`. Adjusts tests to match.
consume_time_seconds wasn't properly defined at the beginning, so when
a BaseException that isn't a subclass of Exception is thrown, the
finally: block could be entered with it still undefined.
Without this change, pyflakes reports this exception:
pyflakes | zerver/worker/queue_processors.py:152:9 local variable 'e' is assigned to but never used
pyflakes | zerver/worker/queue_processors.py:155:81 undefined name 'e'
We use the EMAIL_TIMEOUT django setting to timeout after 15s of trying
to send an email. This will nicely lead to retries in the email_senders
queue, due to the retry_send_email_failures decorator.
smtlib documentation suggests that socket.timeout can be raised as the
result of timing out, so in attempts I'm getting
smtplib.SMTPServerDisconnected. Either way, seems appropriate to add
socket.timeout to the exception that we catch.
The most import change here is the one in maybe_send_to_registration
codepath, as the insufficient validation there could lead to fetching
an expired PreregistrationUser that was invited as an administrator
admin even years ago, leading to this registration ending up in the
new user being a realm administrator.
Combined with the buggy migration in
0198_preregistrationuser_invited_as.py, this led to users incorrectly
joining as organizations administrators by accident. But even without
that bug, this issue could have allowed a user who was invited as an
administrator but then had that invitation expire and then joined via
social authentication incorrectly join as an organization administrator.
The second change is in ConfirmationEmailWorker, where this wasn't a
security problem, but if the server was stopped for long enough, with
some invites to send out email for in the queue, then after starting it
up again, the queue worker would send out emails for invites that
had already expired.
Fixes#2665.
Regenerated by tabbott with `lint --fix` after a rebase and change in
parameters.
Note from tabbott: In a few cases, this converts technical debt in the
form of unsorted imports into different technical debt in the form of
our largest files having very long, ugly import sequences at the
start. I expect this change will increase pressure for us to split
those files, which isn't a bad thing.
Signed-off-by: Anders Kaseorg <anders@zulip.com>
Automatically generated by the following script, based on the output
of lint with flake8-comma:
import re
import sys
last_filename = None
last_row = None
lines = []
for msg in sys.stdin:
m = re.match(
r"\x1b\[35mflake8 \|\x1b\[0m \x1b\[1;31m(.+):(\d+):(\d+): (\w+)", msg
)
if m:
filename, row_str, col_str, err = m.groups()
row, col = int(row_str), int(col_str)
if filename == last_filename:
assert last_row != row
else:
if last_filename is not None:
with open(last_filename, "w") as f:
f.writelines(lines)
with open(filename) as f:
lines = f.readlines()
last_filename = filename
last_row = row
line = lines[row - 1]
if err in ["C812", "C815"]:
lines[row - 1] = line[: col - 1] + "," + line[col - 1 :]
elif err in ["C819"]:
assert line[col - 2] == ","
lines[row - 1] = line[: col - 2] + line[col - 1 :].lstrip(" ")
if last_filename is not None:
with open(last_filename, "w") as f:
f.writelines(lines)
Signed-off-by: Anders Kaseorg <anders@zulipchat.com>
This commit adds three `.pysa` model files: `false_positives.pysa`
for ruling out false positive flows with `Sanitize` annotations,
`req_lib.pysa` for educating pysa about Zulip's `REQ()` pattern for
extracting user input, and `redirects.pysa` for capturing the risk
of open redirects within Zulip code. Additionally, this commit
introduces `mark_sanitized`, an identity function which can be used
to selectively clear taint in cases where `Sanitize` models will not
work. This commit also puts `mark_sanitized` to work removing known
false postive flows.
Generated by pyupgrade --py36-plus --keep-percent-format, but with the
NamedTuple changes reverted (see commit
ba7906a3c6, #15132).
Signed-off-by: Anders Kaseorg <anders@zulip.com>
This saves the completely unnecessary work of mapping the Client name
to its ID. Because we had in-process caching of the immutable Client
objects, this isn't a material performance win, but it will eventually
let us delete that caching logic and have a simpler system.