This low-level interface allows consuming from a queue with timeouts.
This can be used to either consume in batches (with an upper timeout),
or one-at-a-time. This is notably more performant than calling
`.get()` repeatedly (what json_drain_queue does under the hood), which
is "*highly discouraged* as it is *very inefficient*"[1].
Before this change:
```
$ ./manage.py queue_rate --count 10000 --batch
Purging queue...
Enqueue rate: 11158 / sec
Dequeue rate: 3075 / sec
```
After:
```
$ ./manage.py queue_rate --count 10000 --batch
Purging queue...
Enqueue rate: 11511 / sec
Dequeue rate: 19938 / sec
```
[1] https://www.rabbitmq.com/consumers.html#fetching
`loopworker_sleep_mock` is a file-level variable used to mock out the
sleep() call in LoopQueueProcessingWorker; don't reuse the variable
name for something else.
Despite its name, the `queue_size` method does not return the number
of items in the queue; it returns the number of items that the local
consumer has delivered but unprocessed. These are often, but not
always, the same.
RabbitMQ's queues maintain the queue of unacknowledged messages; when
a consumer connects, it sends to the consumer some number of messages
to handle, known as the "prefetch." This is a performance
optimization, to ensure the consumer code does not need to wait for a
network round-trip before having new data to consume.
The default prefetch is 0, which means that RabbitMQ immediately dumps
all outstanding messages to the consumer, which slowly processes and
acknowledges them. If a second consumer were to connect to the same
queue, they would receive no messages to process, as the first
consumer has already been allocated them. If the first consumer
disconnects or crashes, all prior events sent to it are then made
available for other consumers on the queue.
The consumer does not know the total size of the queue -- merely how
many messages it has been handed.
No change is made to the prefetch here; however, future changes may
wish to limit the prefetch, either for memory-saving, or to allow
multiple consumers to work the same queue.
Rename the method to make clear that it only contains information
about the local queue in the consumer, not the full RabbitMQ queue.
Also include the waiting message count, which is used by the
`consume()` iterator for similar purpose to the pending events list.
SIGALRM is the simplest way to set a specific maximum duration that
queue workers can take to handle a specific message. This only works
in non-threaded environments, however, as signal handlers are
per-process, not per-thread.
The MAX_CONSUME_SECONDS is set quite high, at 10s -- the longest
average worker consume time is embed_links, which hovers near 1s.
Since just knowing the recent mean does not give much information[1],
it is difficult to know how much variance is expected. As such, we
set the threshold to be such that only events which are significant
outliers will be timed out. This can be tuned downwards as more
statistics are gathered on the runtime of the workers.
The exception to this is DeferredWorker, which deals with quite-long
requests, and thus has no enforceable SLO.
[1] https://www.autodesk.com/research/publications/same-stats-different-graphs
Currently, drain_queue and json_drain_queue ack every message as it is
pulled off of the queue, until the queue is empty. This means that if
the consumer crashes between pulling a batch of messages off the
queue, and actually processing them, those messages will be
permanently lost. Sending an ACK on every message also results in a
significant amount lot of traffic to rabbitmq, with notable
performance implications.
Send a singular ACK after the processing has completed, by making
`drain_queue` into a contextmanager. Additionally, use the `multiple`
flag to ACK all of the messages at once -- or explicitly NACK the
messages if processing failed. Sending a NACK will re-queue them at
the front of the queue.
Performance of a no-op dequeue before this change:
```
$ ./manage.py queue_rate --count 50000 --batch
Purging queue...
Enqueue rate: 10847 / sec
Dequeue rate: 2479 / sec
```
Performance of a no-op dequeue after this change (a 25% increase):
```
$ ./manage.py queue_rate --count 50000 --batch
Purging queue...
Enqueue rate: 10752 / sec
Dequeue rate: 3079 / sec
```
This queue had a race condition with creation of another Timer while
maybe_send_batched_emails is still doing its work, which may cause
two or more threads to be running maybe_send_batched_emails
at the same time, mutating the shared data simultaneously.
Another less likely potential race condition was that
maybe_send_batched_emails after sending out its email, can call
ensure_timer(). If the consume function is run simultaneously
in the main thread, it will call ensure_timer() too, which,
given unfortunate timings, might lead to both calls setting a new Timer.
We add locking to the queue to avoid such race conditions.
Tested manually, by print debugging with the following setup:
1. Making handle_missedmessage_emails sleep 2 seconds for each email,
and changed BATCH_DURATION to 1s to make the queue start working
right after launching.
2. Putting a bunch of events in the queue.
3. ./manage.py process_queue --queue_name missedmessage_emails
4. Once maybe_send_batched_emails is called and while it's processing
the events, I pushed more events to the queue. That triggers the
consume() function and ensure_timer().
Before implementing the locking mechanism, this causes two threads
to run maybe_send_batched_emails at the same time, mutating each other's
shared data, causing a traceback such as
Exception in thread Thread-3:
Traceback (most recent call last):
File "/usr/lib/python3.6/threading.py", line 916, in _bootstrap_inner
self.run()
File "/usr/lib/python3.6/threading.py", line 1182, in run
self.function(*self.args, **self.kwargs)
File "/srv/zulip/zerver/worker/queue_processors.py", line 507, in maybe_send_batched_emails
del self.events_by_recipient[user_profile_id]
KeyError: '5'
With the locking mechanism, things get handled as expected, and
ensure_timer() exits if it can't obtain the lock due to
maybe_send_batched_emails still working.
Co-authored-by: Tim Abbott <tabbott@zulip.com>
The exception trace only goes from where the exception was thrown up
to where the `logging.exception` call is; any context as to where
_that_ was called from is lost, unless `stack_info` is passed as well.
Having the stack is particularly useful for Sentry exceptions, which
gain the full stack trace.
Add `stack_info=True` on all `logging.exception` calls with a
non-trivial stack; we omit `wsgi.py`. Adjusts tests to match.
Fixes#2665.
Regenerated by tabbott with `lint --fix` after a rebase and change in
parameters.
Note from tabbott: In a few cases, this converts technical debt in the
form of unsorted imports into different technical debt in the form of
our largest files having very long, ugly import sequences at the
start. I expect this change will increase pressure for us to split
those files, which isn't a bad thing.
Signed-off-by: Anders Kaseorg <anders@zulip.com>
Automatically generated by the following script, based on the output
of lint with flake8-comma:
import re
import sys
last_filename = None
last_row = None
lines = []
for msg in sys.stdin:
m = re.match(
r"\x1b\[35mflake8 \|\x1b\[0m \x1b\[1;31m(.+):(\d+):(\d+): (\w+)", msg
)
if m:
filename, row_str, col_str, err = m.groups()
row, col = int(row_str), int(col_str)
if filename == last_filename:
assert last_row != row
else:
if last_filename is not None:
with open(last_filename, "w") as f:
f.writelines(lines)
with open(filename) as f:
lines = f.readlines()
last_filename = filename
last_row = row
line = lines[row - 1]
if err in ["C812", "C815"]:
lines[row - 1] = line[: col - 1] + "," + line[col - 1 :]
elif err in ["C819"]:
assert line[col - 2] == ","
lines[row - 1] = line[: col - 2] + line[col - 1 :].lstrip(" ")
if last_filename is not None:
with open(last_filename, "w") as f:
f.writelines(lines)
Signed-off-by: Anders Kaseorg <anders@zulipchat.com>
zerver/lib/i18n.py:34:28: E741 ambiguous variable name 'l'
zerver/lib/webhooks/common.py:103:34: E225 missing whitespace around operator
zerver/tests/test_queue_worker.py:563:9: E306 expected 1 blank line before a nested definition, found 0
Signed-off-by: Anders Kaseorg <anders@zulip.com>
This saves the completely unnecessary work of mapping the Client name
to its ID. Because we had in-process caching of the immutable Client
objects, this isn't a material performance win, but it will eventually
let us delete that caching logic and have a simpler system.
mock is just a backport of the standard library’s unittest.mock now.
The SAMLAuthBackendTest change is needed because
MagicMock.call_args.args wasn’t introduced until Python
3.8 (https://bugs.python.org/issue21269).
The PROVISION_VERSION bump is skipped because mock is still an
indirect dev requirement via moto.
Signed-off-by: Anders Kaseorg <anders@zulip.com>
While this functionality to post slow queries to a Zulip stream was
very useful in the early days of Zulip, when there were only a few
hundred accounts, it's long since been useless since (1) the total
request volume on larger Zulip servers run by Zulip developers, and
(2) other server operators don't want real-time notifications of slow
backend queries. The right structure for this is just a log file.
We get rid of the queue and replace it with a "zulip.slow_queries"
logger, which will still log to /var/log/zulip/slow_queries.log for
ease of access to this information and propagate to the other logging
handlers. Reducing the amount of queues is good for lowering zulip's
memory footprint and restart performance, since we run at least one
dedicated queue worker process for each one in most configurations.
Refactored code in actions.py and streams.py to move stream related
functions into streams.py and remove the dependency on actions.py.
validate_sender_can_write_to_stream function in actions.py was renamed
to access_stream_for_send_message in streams.py.
Generated by `pyupgrade --py3-plus --keep-percent-format` on all our
Python code except `zthumbor` and `zulip-ec2-configure-interfaces`,
followed by manual indentation fixes.
Signed-off-by: Anders Kaseorg <anders@zulipchat.com>
type().__name__ is sufficient, and much readable than type(), so it's
better to use the former for keys.
We also make the classes consistent in forming the keys in the format
type(self).__name__:identifier and adjust logger.warning and statsd to
take advantage of that and simply log the key().
This commit mostly makes our tests less
noisy, since emails are no longer an important
detail of sending messages (they're not even
really used in the API).
It also sets us up to have more scrutiny
on delivery_email/email in the future
for things that actually matter. (This is
a prep commit for something along those
lines, kind of hard to explain the full
plan.)
Note that while the test mocks the actual message
send, we now have a `get_stream` call in the queue
worker, so we have to set up a real stream for
testing (or we could have mocked that as well, but
it didn't seem necessary). The setup queries add
to the amount of queries reported by the test,
plus the `get_stream` call. I just made the
query count a digits regex, which is a little bit
lame, but I don't think it's worth risking test
flakes for this.
QueueProcessingWorker and LoopQueueProcessingWorker are abstract classes
meant to be subclassed by a class that will define its own consume()
or consume_batch() method. ABCs are suited for that and we can tag
consume/consume_batch with the @abstractmethod wrapper which will
prevent subclasses that don't define these methods properly to be
impossible to even instantiate (as opposed to only crashing once
consume() is called). It's also nicely detected by mypy, which will
throw errors such as this on invalid use:
error: Only concrete class can be given where "Type[TestWorker]" is
expected
error: Cannot instantiate abstract class 'TestWorker' with abstract
attribute 'consume'
Due to it being detected by mypy, we can remove the test
test_worker_noconsume which just tested the old version of this -
raising an exception when the unimplemented consume() gets called. Now
it can be handled already on the linter level.
LoopQueueProcessingWorker can handle exceptions inside consume_batch in
a similar manner to how QueueProcessingWorker handles exceptions inside
consume.
We add PushNotificationBouncerRetryLaterError as an exception to signal
an error occurred when trying to communicate with the bouncer and it
should be retried. We use JsonableError as the base class, because this
signal will need to work in two roles:
1. When the push notification was being issued by the queue worker
PushNotificationsWorker, it will signal to the worker to requeue the
event and try again later.
2. The exception will also possibly be raised (this will be added in the
next commit) on codepaths coming from a request to an API endpoint (for
example to add a token, to users/me/apns_device_token). In that case,
it'll be needed to provide a good error to the API user - and basing
this exception on JsonableError will allow that.
A confirmation object is already created when
do_send_confirmation_email is called just above.
Tweaked by tabbott to remove an unnecessary somewhat hacky database
query.
This should dramatically improve the queue processor's performance in
cases where there's a very high volume of requests on a given endpoint
by a given user, as described in the new docstring.
Until we test this more broadly in production, we won't know if this
is a full solution to the problem, but I think it's likely. We've
never seen the UserActivityInterval worker end up backlogged without a
total queue processor outage, and it should have a similar workload.
Fixes#13180.
Instead of just mocking some fake events, we use the code
path that generates slow query events and publishes them
to SlowQueryWorker.
This test improvement would have got a recent potential regression
caught in code review.
Previous cleanups (mostly the removals of Python __future__ imports)
were done in a way that introduced leading newlines. Delete leading
newlines from all files, except static/assets/zulip-emoji/NOTICE,
which is a verbatim copy of the Apache 2.0 license.
Signed-off-by: Anders Kaseorg <anders@zulipchat.com>
We were seeing errors when pubishing typical events in the form of
`Dict[str, Any]` as the expected type to be a `Union`. So we instead
change the only non-dictionary call, to pass a dict instead of `str`.