RabbitMQ clients have a setting called prefetch[1], which controls how
many un-acknowledged events the server forwards to the local queue in
the client. The default is 0; this means that when clients first
connect, the server must send them every message in the queue.
This itself may cause unbounded memory usage in the client, but also
has other detrimental effects. While the client is attempting to
process the head of the queue, it may be unable to read from the TCP
socket at the rate that the server is sending to it -- filling the TCP
buffers, and causing the server's writes to block. If the server
blocks for more than 30 seconds, it times out the send, and closes the
connection with:
```
closing AMQP connection <0.30902.126> (127.0.0.1:53870 -> 127.0.0.1:5672):
{writer,send_failed,{error,timeout}}
```
This is https://github.com/pika/pika/issues/753#issuecomment-318119222.
Set a prefetch limit of 100 messages, or the batch size, to better
handle queues which start with large numbers of outstanding events.
Setting prefetch=1 causes significant performance degradation in the
no-op queue worker, to 30% of the prefetch=0 performance. Setting
prefetch=100 achieves 90% of the prefetch=0 performance, and higher
values offer only minor gains above that. For batch workers, their
performance is not notably degraded by prefetch equal to their batch
size, and they cannot function on smaller prefetches than their batch
size.
We also set a 100-count prefetch on Tornado workers, as they are
potentially susceptible to the same effect.
[1] https://www.rabbitmq.com/confirms.html#channel-qos-prefetch
The BlockingChannel annotations in TornadoQueueClient were flat-out
wrong. BlockingChannel and Channel have no common base classes.
Signed-off-by: Anders Kaseorg <anders@zulip.com>
This low-level interface allows consuming from a queue with timeouts.
This can be used to either consume in batches (with an upper timeout),
or one-at-a-time. This is notably more performant than calling
`.get()` repeatedly (what json_drain_queue does under the hood), which
is "*highly discouraged* as it is *very inefficient*"[1].
Before this change:
```
$ ./manage.py queue_rate --count 10000 --batch
Purging queue...
Enqueue rate: 11158 / sec
Dequeue rate: 3075 / sec
```
After:
```
$ ./manage.py queue_rate --count 10000 --batch
Purging queue...
Enqueue rate: 11511 / sec
Dequeue rate: 19938 / sec
```
[1] https://www.rabbitmq.com/consumers.html#fetching
Despite its name, the `queue_size` method does not return the number
of items in the queue; it returns the number of items that the local
consumer has delivered but unprocessed. These are often, but not
always, the same.
RabbitMQ's queues maintain the queue of unacknowledged messages; when
a consumer connects, it sends to the consumer some number of messages
to handle, known as the "prefetch." This is a performance
optimization, to ensure the consumer code does not need to wait for a
network round-trip before having new data to consume.
The default prefetch is 0, which means that RabbitMQ immediately dumps
all outstanding messages to the consumer, which slowly processes and
acknowledges them. If a second consumer were to connect to the same
queue, they would receive no messages to process, as the first
consumer has already been allocated them. If the first consumer
disconnects or crashes, all prior events sent to it are then made
available for other consumers on the queue.
The consumer does not know the total size of the queue -- merely how
many messages it has been handed.
No change is made to the prefetch here; however, future changes may
wish to limit the prefetch, either for memory-saving, or to allow
multiple consumers to work the same queue.
Rename the method to make clear that it only contains information
about the local queue in the consumer, not the full RabbitMQ queue.
Also include the waiting message count, which is used by the
`consume()` iterator for similar purpose to the pending events list.
Currently, drain_queue and json_drain_queue ack every message as it is
pulled off of the queue, until the queue is empty. This means that if
the consumer crashes between pulling a batch of messages off the
queue, and actually processing them, those messages will be
permanently lost. Sending an ACK on every message also results in a
significant amount lot of traffic to rabbitmq, with notable
performance implications.
Send a singular ACK after the processing has completed, by making
`drain_queue` into a contextmanager. Additionally, use the `multiple`
flag to ACK all of the messages at once -- or explicitly NACK the
messages if processing failed. Sending a NACK will re-queue them at
the front of the queue.
Performance of a no-op dequeue before this change:
```
$ ./manage.py queue_rate --count 50000 --batch
Purging queue...
Enqueue rate: 10847 / sec
Dequeue rate: 2479 / sec
```
Performance of a no-op dequeue after this change (a 25% increase):
```
$ ./manage.py queue_rate --count 50000 --batch
Purging queue...
Enqueue rate: 10752 / sec
Dequeue rate: 3079 / sec
```
This comment stopped being true in 5686821150, and very much stopped
being relevant in dd40649e04 when the middleware entirely stopped
publishing to a queue.
Fixes#2665.
Regenerated by tabbott with `lint --fix` after a rebase and change in
parameters.
Note from tabbott: In a few cases, this converts technical debt in the
form of unsorted imports into different technical debt in the form of
our largest files having very long, ugly import sequences at the
start. I expect this change will increase pressure for us to split
those files, which isn't a bad thing.
Signed-off-by: Anders Kaseorg <anders@zulip.com>
The expected signatures for these callbacks seem to have changed
somewhere in https://github.com/pika/pika/pull/1002.
Signed-off-by: Anders Kaseorg <anders@zulipchat.com>
The code comment explains this issue in some detail, but essentially
in Kubernetes and Docker Swarm systems, the container overlayer
network has a relatively short TCP idle lifetime (about 15 minutes),
which can lead to it killing the connection between Tornado and
RabbitMQ.
We fix this by setting a TCP keepalive on that connection shorter than
15 minutes.
Fixes#10776.
We were seeing errors when pubishing typical events in the form of
`Dict[str, Any]` as the expected type to be a `Union`. So we instead
change the only non-dictionary call, to pass a dict instead of `str`.
Details in comment. Together with a few previous commits, this should
completely eliminate sending error mail to admins when the RabbitMQ
server is simply restarted and comes back up normally.
Because the base class's __init__ calls `_connect`, when we set the
value after that call has already returned, our new value only takes
effect if the first connection fails and we have to reconnect.
Make it take effect from the beginning.
This parameter isn't used anywhere. A good thing, because if it were,
the code would immediately raise an exception -- `self._on_open_cbs`
hasn't been initialized yet when we first call `_connect`, from the
base class's `__init__`.
So, just cut it. If we later need something like this, it's easy to
add a working version then.
Empirically, the retry in `_on_connection_closed` didn't actually work
-- if a reconnect failed, that was it, and the exception handler
didn't get run. A traceback would get logged, but all its frames were
in Tornado or Pika, not our own code; presumably something magic and
async was happening to the exception.
Moreover, though we would make one attempt to reconnect if we had a
connection that got closed, we didn't have any form of retry if the
original attempt at connecting failed in the first place.
Happily, upstream offers a perfectly reasonable bit of API that avoids
both of these problems: the on-open-error callback. So use that.
This method was new in Tornado 4.0. It saves us from having to get
the time ourselves and do the arithmetic -- which not only makes the
code a bit shorter, but also easier to get right. Tornado docs (see
http://www.tornadoweb.org/en/stable/ioloop.html) say we should have
been getting the time from `ioloop.time()` rather than hardcoding
`time.time()`, because the loop could e.g. be running on the
`time.monotonic()` clock.
Adding it afterward is inherently racy, and upstream's API is quite
reasonable for avoiding that -- just like we can pass an on-open
callback up front, we can do the same with the on-close callback.
This is a more thorough version of 4adf2d5c2 from back in 2013-04.
The default value of this parameter is already False upstream.
(It was already False in pika version 0.9.6, which we were
supposedly using when we introduced this in 4baeaaa52; not sure
what the story was there.)