If the exception was because the channel closed, attempting to NAK the
events will just raise another error, and is pointless, as the server
already marked the pending events as NAK'd.
We remove bot's subscriptions for private streams to which the
new owner is not subscribed and keep the ones to which the new
owner is subscribed on changing owner.
This commit also changes the code for sending subscription
remove events to use transaction.on_commit since we call
the function inside a transactopn in do_change_bot_owner and
this also requires some changes in tests in test_events.
For tests that use the dev server, like test-api, test-js-with-puppeteer,
we don't have the consumers for the queues. As they eventually timeout,
we get unnecessary error messages. This adds a new flag, disable_timeout,
to disable this behavior for the test cases.
Prior to aa032bf62c, QOS prefetch was set on every `publish` and
before every `start_json_consumer` -- which had a large and
unnecessary effect on publishing rates, which don't care about the
prefetch QOS settings at all, much less re-setting them before every
publish.
Unfortunately, that change had the effect of causing prefetch settings
to almost never be respected -- since the configuration happened in
`ensure_queue`s re-check that the connection was still live. The
initial connection is established in `__init__` via `_connect`, and
the consumer only calls `ensure_queue` once, before setting up the
consumer.
Having no prefetch value set causes an unbounded prefetch; this
manifests itself as the server attempting to shove every event down to
the worker as soon as it starts consuming; if the client cannot keep
up, the server closes the connection. The worker observes the
connection has been shut down, and restarts. While this does make
forward progress, it causes large queues to make progress more slowly,
as they suffer from sporadic restarts.
Shift the QOS configuration to when the connection is set up, which is
a more sensible place for it in general -- and ensures that it is set
on consumers and producers alike, but only once per connection
establishment.
As written, the QOS parameters are (re)set every time ensure_queue is
called, which is every time a message is enqueued. This is wasteful --
particularly QOS parameters only apply for consumers, and setting them
takes a RTT to the server.
Switch to only setting the QOS once, when a connection
is (re)established. In profiling, this reduces the time to call
`queue_json_publish("noop", {})` from 878µs to 150µs.
Black 23 enforces some slightly more specific rules about empty line
counts and redundant parenthesis removal, but the result is still
compatible with Black 22.
(This does not actually upgrade our Python environment to Black 23
yet.)
Signed-off-by: Anders Kaseorg <anders@zulip.com>
The lambda passed to `queue_json_publish` is used if
`settings.USING_RABBITMQ` is unset -- which is only true in tests. As
such, this pattern causes failures to never actually retry within
tests.
This behaviour has existed ever since the outgoing webhook code was
introduced in 53a8b2ac87, with no explanation. Not passing that
argument allows tests to verify the retry behaviour when webhooks
fail.
According to the documentation: “Pika does not have any notion of
threading in the code. If you want to use Pika with threading, make
sure you have a Pika connection per thread, created in that thread. It
is not safe to share one Pika connection across threads, with one
exception: you may call the connection method add_callback_threadsafe
from another thread to schedule a callback within an active pika
connection.”
https://pika.readthedocs.io/en/stable/faq.html
This also means that synchronous Django code running in Tornado will
use its own synchronous SimpleQueueClient rather than sharing the
asynchronous TornadoQueueClient, which is unfortunate but necessary as
they’re about to be on different threads.
Signed-off-by: Anders Kaseorg <anders@zulip.com>
RabbitMQ clients have a setting called prefetch[1], which controls how
many un-acknowledged events the server forwards to the local queue in
the client. The default is 0; this means that when clients first
connect, the server must send them every message in the queue.
This itself may cause unbounded memory usage in the client, but also
has other detrimental effects. While the client is attempting to
process the head of the queue, it may be unable to read from the TCP
socket at the rate that the server is sending to it -- filling the TCP
buffers, and causing the server's writes to block. If the server
blocks for more than 30 seconds, it times out the send, and closes the
connection with:
```
closing AMQP connection <0.30902.126> (127.0.0.1:53870 -> 127.0.0.1:5672):
{writer,send_failed,{error,timeout}}
```
This is https://github.com/pika/pika/issues/753#issuecomment-318119222.
Set a prefetch limit of 100 messages, or the batch size, to better
handle queues which start with large numbers of outstanding events.
Setting prefetch=1 causes significant performance degradation in the
no-op queue worker, to 30% of the prefetch=0 performance. Setting
prefetch=100 achieves 90% of the prefetch=0 performance, and higher
values offer only minor gains above that. For batch workers, their
performance is not notably degraded by prefetch equal to their batch
size, and they cannot function on smaller prefetches than their batch
size.
We also set a 100-count prefetch on Tornado workers, as they are
potentially susceptible to the same effect.
[1] https://www.rabbitmq.com/confirms.html#channel-qos-prefetch
The BlockingChannel annotations in TornadoQueueClient were flat-out
wrong. BlockingChannel and Channel have no common base classes.
Signed-off-by: Anders Kaseorg <anders@zulip.com>
This low-level interface allows consuming from a queue with timeouts.
This can be used to either consume in batches (with an upper timeout),
or one-at-a-time. This is notably more performant than calling
`.get()` repeatedly (what json_drain_queue does under the hood), which
is "*highly discouraged* as it is *very inefficient*"[1].
Before this change:
```
$ ./manage.py queue_rate --count 10000 --batch
Purging queue...
Enqueue rate: 11158 / sec
Dequeue rate: 3075 / sec
```
After:
```
$ ./manage.py queue_rate --count 10000 --batch
Purging queue...
Enqueue rate: 11511 / sec
Dequeue rate: 19938 / sec
```
[1] https://www.rabbitmq.com/consumers.html#fetching
Despite its name, the `queue_size` method does not return the number
of items in the queue; it returns the number of items that the local
consumer has delivered but unprocessed. These are often, but not
always, the same.
RabbitMQ's queues maintain the queue of unacknowledged messages; when
a consumer connects, it sends to the consumer some number of messages
to handle, known as the "prefetch." This is a performance
optimization, to ensure the consumer code does not need to wait for a
network round-trip before having new data to consume.
The default prefetch is 0, which means that RabbitMQ immediately dumps
all outstanding messages to the consumer, which slowly processes and
acknowledges them. If a second consumer were to connect to the same
queue, they would receive no messages to process, as the first
consumer has already been allocated them. If the first consumer
disconnects or crashes, all prior events sent to it are then made
available for other consumers on the queue.
The consumer does not know the total size of the queue -- merely how
many messages it has been handed.
No change is made to the prefetch here; however, future changes may
wish to limit the prefetch, either for memory-saving, or to allow
multiple consumers to work the same queue.
Rename the method to make clear that it only contains information
about the local queue in the consumer, not the full RabbitMQ queue.
Also include the waiting message count, which is used by the
`consume()` iterator for similar purpose to the pending events list.
Currently, drain_queue and json_drain_queue ack every message as it is
pulled off of the queue, until the queue is empty. This means that if
the consumer crashes between pulling a batch of messages off the
queue, and actually processing them, those messages will be
permanently lost. Sending an ACK on every message also results in a
significant amount lot of traffic to rabbitmq, with notable
performance implications.
Send a singular ACK after the processing has completed, by making
`drain_queue` into a contextmanager. Additionally, use the `multiple`
flag to ACK all of the messages at once -- or explicitly NACK the
messages if processing failed. Sending a NACK will re-queue them at
the front of the queue.
Performance of a no-op dequeue before this change:
```
$ ./manage.py queue_rate --count 50000 --batch
Purging queue...
Enqueue rate: 10847 / sec
Dequeue rate: 2479 / sec
```
Performance of a no-op dequeue after this change (a 25% increase):
```
$ ./manage.py queue_rate --count 50000 --batch
Purging queue...
Enqueue rate: 10752 / sec
Dequeue rate: 3079 / sec
```
This comment stopped being true in 5686821150, and very much stopped
being relevant in dd40649e04 when the middleware entirely stopped
publishing to a queue.