Commit Graph

105 Commits

Author SHA1 Message Date
Anders Kaseorg 93198a19ed requirements: Upgrade Python requirements.
Signed-off-by: Anders Kaseorg <anders@zulip.com>
2024-01-29 10:41:54 -08:00
Alex Vandiver 2b37a35f71 queue: Only NAK the events if the channel is still open.
If the exception was because the channel closed, attempting to NAK the
events will just raise another error, and is pointless, as the server
already marked the pending events as NAK'd.
2023-12-12 09:20:29 -08:00
Anders Kaseorg a50eb2e809 mypy: Enable new error explicit-override.
Signed-off-by: Anders Kaseorg <anders@zulip.com>
2023-10-12 12:28:41 -07:00
Sahil Batra 35d5609996 bots: Remove private stream subscriptions on changing bot owner.
We remove bot's subscriptions for private streams to which the
new owner is not subscribed and keep the ones to which the new
owner is subscribed on changing owner.

This commit also changes the code for sending subscription
remove events to use transaction.on_commit since we call
the function inside a transactopn in do_change_bot_owner and
this also requires some changes in tests in test_events.
2023-08-16 15:37:37 -07:00
Anders Kaseorg c2c96eb0cf python: Annotate type aliases with TypeAlias.
This is not strictly necessary but it’s clearer and improves mypy’s
error messages.

https://docs.python.org/3/library/typing.html#typing.TypeAlias
https://mypy.readthedocs.io/en/stable/kinds_of_types.html#type-aliases

Signed-off-by: Anders Kaseorg <anders@zulip.com>
2023-08-07 10:02:49 -07:00
Zixuan James Li b6d1e56cac queue_processors: Avoid queue worker timeouts in tests.
For tests that use the dev server, like test-api, test-js-with-puppeteer,
we don't have the consumers for the queues. As they eventually timeout,
we get unnecessary error messages. This adds a new flag, disable_timeout,
to disable this behavior for the test cases.
2023-06-28 11:06:24 -07:00
Anders Kaseorg b907ad0dcb ruff: Fix more of RUF010 Use conversion in f-string.
Signed-off-by: Anders Kaseorg <anders@zulip.com>
2023-06-06 14:58:11 -07:00
Anders Kaseorg 9db3451333 Remove statsd support.
Signed-off-by: Anders Kaseorg <anders@zulip.com>
2023-04-25 19:58:16 -07:00
Alex Vandiver bf532de8bb queue: Allow enabling TLS for the RabbitMQ connection.
This allows using cloud-based RabbitMQ services like AmazonMQ.

Fixes: #24699.
2023-03-23 16:02:10 -07:00
Alex Vandiver 311a76ed1c Move QOS configuration into connection, not queue verification.
Prior to aa032bf62c, QOS prefetch was set on every `publish` and
before every `start_json_consumer` -- which had a large and
unnecessary effect on publishing rates, which don't care about the
prefetch QOS settings at all, much less re-setting them before every
publish.

Unfortunately, that change had the effect of causing prefetch settings
to almost never be respected -- since the configuration happened in
`ensure_queue`s re-check that the connection was still live.  The
initial connection is established in `__init__` via `_connect`, and
the consumer only calls `ensure_queue` once, before setting up the
consumer.

Having no prefetch value set causes an unbounded prefetch; this
manifests itself as the server attempting to shove every event down to
the worker as soon as it starts consuming; if the client cannot keep
up, the server closes the connection.  The worker observes the
connection has been shut down, and restarts.  While this does make
forward progress, it causes large queues to make progress more slowly,
as they suffer from sporadic restarts.

Shift the QOS configuration to when the connection is set up, which is
a more sensible place for it in general -- and ensures that it is set
on consumers and producers alike, but only once per connection
establishment.
2023-03-20 11:28:29 -07:00
Alex Vandiver aa032bf62c queue: Only set QOS on a newly-opened channel, once.
As written, the QOS parameters are (re)set every time ensure_queue is
called, which is every time a message is enqueued. This is wasteful --
particularly QOS parameters only apply for consumers, and setting them
takes a RTT to the server.

Switch to only setting the QOS once, when a connection
is (re)established.  In profiling, this reduces the time to call
`queue_json_publish("noop", {})` from 878µs to 150µs.
2023-02-23 11:47:43 -08:00
Alex Vandiver d3403dde86 rabbitmq: Add a RABBITMQ_PORT setting. 2023-02-20 12:04:25 -08:00
Anders Kaseorg 59eca10a43 ruff: Fix G004 Logging statement uses f-string.
Signed-off-by: Anders Kaseorg <anders@zulip.com>
2023-02-04 16:36:20 -08:00
Anders Kaseorg df001db1a9 black: Reformat with Black 23.
Black 23 enforces some slightly more specific rules about empty line
counts and redundant parenthesis removal, but the result is still
compatible with Black 22.

(This does not actually upgrade our Python environment to Black 23
yet.)

Signed-off-by: Anders Kaseorg <anders@zulip.com>
2023-02-02 10:40:13 -08:00
Alex Vandiver eb7a2f2c38 queue: Do test retries in tests.
The lambda passed to `queue_json_publish` is used if
`settings.USING_RABBITMQ` is unset -- which is only true in tests.  As
such, this pattern causes failures to never actually retry within
tests.

This behaviour has existed ever since the outgoing webhook code was
introduced in 53a8b2ac87, with no explanation.  Not passing that
argument allows tests to verify the retry behaviour when webhooks
fail.
2022-11-04 14:46:17 -07:00
Anders Kaseorg 7acb642fa5 requirements: Upgrade to Tornado 6.
Fixes #8913.

Signed-off-by: Anders Kaseorg <anders@zulip.com>
2022-05-02 17:41:49 -07:00
Anders Kaseorg 6fd1a558b7 runtornado: Switch to asyncio event loop.
Signed-off-by: Anders Kaseorg <anders@zulip.com>
2022-05-02 17:41:49 -07:00
Anders Kaseorg c263bfdb41 queue: Use a thread-local Pika connection.
According to the documentation: “Pika does not have any notion of
threading in the code. If you want to use Pika with threading, make
sure you have a Pika connection per thread, created in that thread. It
is not safe to share one Pika connection across threads, with one
exception: you may call the connection method add_callback_threadsafe
from another thread to schedule a callback within an active pika
connection.”

https://pika.readthedocs.io/en/stable/faq.html

This also means that synchronous Django code running in Tornado will
use its own synchronous SimpleQueueClient rather than sharing the
asynchronous TornadoQueueClient, which is unfortunate but necessary as
they’re about to be on different threads.

Signed-off-by: Anders Kaseorg <anders@zulip.com>
2022-05-02 17:41:49 -07:00
Anders Kaseorg b0ce4f1bce docs: Fix many spelling mistakes.
Signed-off-by: Anders Kaseorg <anders@zulip.com>
2022-02-07 18:51:06 -08:00
Alex Vandiver faeffa2466 queue_processors: Set a bounded prefetch size on rabbitmq queues.
RabbitMQ clients have a setting called prefetch[1], which controls how
many un-acknowledged events the server forwards to the local queue in
the client.  The default is 0; this means that when clients first
connect, the server must send them every message in the queue.

This itself may cause unbounded memory usage in the client, but also
has other detrimental effects.  While the client is attempting to
process the head of the queue, it may be unable to read from the TCP
socket at the rate that the server is sending to it -- filling the TCP
buffers, and causing the server's writes to block.  If the server
blocks for more than 30 seconds, it times out the send, and closes the
connection with:

```
closing AMQP connection <0.30902.126> (127.0.0.1:53870 -> 127.0.0.1:5672):
{writer,send_failed,{error,timeout}}
```

This is https://github.com/pika/pika/issues/753#issuecomment-318119222.

Set a prefetch limit of 100 messages, or the batch size, to better
handle queues which start with large numbers of outstanding events.

Setting prefetch=1 causes significant performance degradation in the
no-op queue worker, to 30% of the prefetch=0 performance.  Setting
prefetch=100 achieves 90% of the prefetch=0 performance, and higher
values offer only minor gains above that.  For batch workers, their
performance is not notably degraded by prefetch equal to their batch
size, and they cannot function on smaller prefetches than their batch
size.

We also set a 100-count prefetch on Tornado workers, as they are
potentially susceptible to the same effect.

[1] https://www.rabbitmq.com/confirms.html#channel-qos-prefetch
2021-11-16 11:48:50 -08:00
Alex Vandiver 7c3507feef queue: Allow passing down a prefetch count to pika. 2021-11-16 11:48:50 -08:00
PIG208 aa9d73c9f6 typing: Improve typing with assertions.
This fixes some mypy errors discovered with django-stubs.
2021-08-20 05:54:19 -07:00
Anders Kaseorg 04feadd917 mypy: Add pika-stubs.
Signed-off-by: Anders Kaseorg <anders@zulip.com>
2021-08-02 22:31:46 -07:00
Anders Kaseorg 9f8ba913fd queue: Fix _on_connection_open_error type to accept reason: str.
Signed-off-by: Anders Kaseorg <anders@zulip.com>
2021-08-02 22:31:46 -07:00
Anders Kaseorg f7e2426fc5 queue: Fix ensure_queue type to accept a callback returning any object.
channel.basic_consume actually returns str.

Signed-off-by: Anders Kaseorg <anders@zulip.com>
2021-08-02 22:31:46 -07:00
Anders Kaseorg 5e355abe2e queue: Add missing imports.
Signed-off-by: Anders Kaseorg <anders@zulip.com>
2021-08-02 22:31:46 -07:00
Anders Kaseorg 87799177b5 queue: Fix channel type for TornadoQueueClient.
The BlockingChannel annotations in TornadoQueueClient were flat-out
wrong.  BlockingChannel and Channel have no common base classes.

Signed-off-by: Anders Kaseorg <anders@zulip.com>
2021-08-02 22:31:46 -07:00
Anders Kaseorg 5751479932 queue: Switch TornadoQueueClient to the new base QueueClient.
Signed-off-by: Anders Kaseorg <anders@zulip.com>
2021-08-02 22:31:46 -07:00
Anders Kaseorg bd6a2b149c queue: Split common part of SimpleQueueClient into new base class.
Signed-off-by: Anders Kaseorg <anders@zulip.com>
2021-08-02 22:31:46 -07:00
Anders Kaseorg 6e4c3e41dc python: Normalize quotes with Black.
Signed-off-by: Anders Kaseorg <anders@zulip.com>
2021-02-12 13:11:19 -08:00
Anders Kaseorg 11741543da python: Reformat with Black, except quotes.
Signed-off-by: Anders Kaseorg <anders@zulip.com>
2021-02-12 13:11:19 -08:00
Anders Kaseorg b7a94be152 python: Catch BaseException when we need to clean something up.
Signed-off-by: Anders Kaseorg <anders@zulip.com>
2020-10-11 16:16:16 -07:00
Alex Vandiver c2132a4f9c queue: Drop register_json_consumer / json_drain_queue interface.
Now that all callsites use the same interface, drop the now-unused
ones, and their tests.
2020-10-11 14:19:42 -07:00
Alex Vandiver 179c387409 tornado: Switch to start_json_consumer interface. 2020-10-11 14:19:42 -07:00
Alex Vandiver f9358d5330 queue: Switch batch interface to use the channel.consume iterator.
This low-level interface allows consuming from a queue with timeouts.
This can be used to either consume in batches (with an upper timeout),
or one-at-a-time.  This is notably more performant than calling
`.get()` repeatedly (what json_drain_queue does under the hood), which
is "*highly discouraged* as it is *very inefficient*"[1].

Before this change:
```
$ ./manage.py queue_rate --count 10000 --batch
Purging queue...
Enqueue rate: 11158 / sec
Dequeue rate: 3075 / sec
```

After:
```
$ ./manage.py queue_rate --count 10000 --batch
Purging queue...
Enqueue rate: 11511 / sec
Dequeue rate: 19938 / sec
```

[1] https://www.rabbitmq.com/consumers.html#fetching
2020-10-11 14:19:40 -07:00
Alex Vandiver 2547bdbf4a queue: Rename consume_wrapper to a better name. 2020-10-09 20:40:51 -07:00
Alex Vandiver d5a6b0f99a queue: Rename queue_size, and update for all local queues.
Despite its name, the `queue_size` method does not return the number
of items in the queue; it returns the number of items that the local
consumer has delivered but unprocessed.  These are often, but not
always, the same.

RabbitMQ's queues maintain the queue of unacknowledged messages; when
a consumer connects, it sends to the consumer some number of messages
to handle, known as the "prefetch."  This is a performance
optimization, to ensure the consumer code does not need to wait for a
network round-trip before having new data to consume.

The default prefetch is 0, which means that RabbitMQ immediately dumps
all outstanding messages to the consumer, which slowly processes and
acknowledges them.  If a second consumer were to connect to the same
queue, they would receive no messages to process, as the first
consumer has already been allocated them.  If the first consumer
disconnects or crashes, all prior events sent to it are then made
available for other consumers on the queue.

The consumer does not know the total size of the queue -- merely how
many messages it has been handed.

No change is made to the prefetch here; however, future changes may
wish to limit the prefetch, either for memory-saving, or to allow
multiple consumers to work the same queue.

Rename the method to make clear that it only contains information
about the local queue in the consumer, not the full RabbitMQ queue.
Also include the waiting message count, which is used by the
`consume()` iterator for similar purpose to the pending events list.
2020-10-09 20:40:39 -07:00
Alex Vandiver a1ce1aca3b queue: Update comment to be more accurate about import errors. 2020-10-09 20:40:32 -07:00
Alex Vandiver baf882a133 queue: Only ACK drain_queue once it has completed work on the list.
Currently, drain_queue and json_drain_queue ack every message as it is
pulled off of the queue, until the queue is empty.  This means that if
the consumer crashes between pulling a batch of messages off the
queue, and actually processing them, those messages will be
permanently lost.  Sending an ACK on every message also results in a
significant amount lot of traffic to rabbitmq, with notable
performance implications.

Send a singular ACK after the processing has completed, by making
`drain_queue` into a contextmanager.  Additionally, use the `multiple`
flag to ACK all of the messages at once -- or explicitly NACK the
messages if processing failed.  Sending a NACK will re-queue them at
the front of the queue.

Performance of a no-op dequeue before this change:
```
$ ./manage.py queue_rate --count 50000 --batch
Purging queue...
Enqueue rate: 10847 / sec
Dequeue rate: 2479 / sec
```
Performance of a no-op dequeue after this change (a 25% increase):
```
$ ./manage.py queue_rate --count 50000 --batch
Purging queue...
Enqueue rate: 10752 / sec
Dequeue rate: 3079 / sec
```
2020-10-06 17:26:14 -07:00
Alex Vandiver 2b6989a40f queue: Remove a no-longer-correct comment.
This comment stopped being true in 5686821150, and very much stopped
being relevant in dd40649e04 when the middleware entirely stopped
publishing to a queue.
2020-08-14 11:30:13 -07:00
Anders Kaseorg 61d0417e75 python: Replace ujson with orjson.
Fixes #6507.

Signed-off-by: Anders Kaseorg <anders@zulip.com>
2020-08-11 10:55:12 -07:00
Anders Kaseorg 23b815bb50 queue: Fix types to reflect that Pika channels receive bytes.
Signed-off-by: Anders Kaseorg <anders@zulip.com>
2020-08-07 11:12:32 -07:00
Anders Kaseorg 489d73f63a queue: Fix strict_optional errors.
Signed-off-by: Anders Kaseorg <anders@zulip.com>
2020-07-06 11:25:48 -07:00
Anders Kaseorg 1ed2d9b4a0 logging: Use logging.exception and exc_info for unexpected exceptions.
logging.exception() and logging.debug(exc_info=True),
etc. automatically include a traceback.

Signed-off-by: Anders Kaseorg <anders@zulip.com>
2020-06-14 23:27:22 -07:00
Anders Kaseorg 4b6d2cf25f logging: Pass more format arguments to logging.
Signed-off-by: Anders Kaseorg <anders@zulip.com>
2020-06-14 23:27:22 -07:00
Anders Kaseorg 365fe0b3d5 python: Sort imports with isort.
Fixes #2665.

Regenerated by tabbott with `lint --fix` after a rebase and change in
parameters.

Note from tabbott: In a few cases, this converts technical debt in the
form of unsorted imports into different technical debt in the form of
our largest files having very long, ugly import sequences at the
start.  I expect this change will increase pressure for us to split
those files, which isn't a bad thing.

Signed-off-by: Anders Kaseorg <anders@zulip.com>
2020-06-11 16:45:32 -07:00
Anders Kaseorg 67e7a3631d python: Convert percent formatting to Python 3.6 f-strings.
Generated by pyupgrade --py36-plus.

Signed-off-by: Anders Kaseorg <anders@zulip.com>
2020-06-10 15:02:09 -07:00
Anders Kaseorg 19cc22e5ab queue: Fix types to reflect that Pika channels transmit bytes.
Signed-off-by: Anders Kaseorg <anders@zulip.com>
2020-06-07 11:09:24 -07:00
Anders Kaseorg bdc365d0fe logging: Pass format arguments to logging.
https://docs.python.org/3/howto/logging.html#optimization

Signed-off-by: Anders Kaseorg <anders@zulip.com>
2020-05-02 10:18:02 -07:00
Anders Kaseorg fead14951c python: Convert assignment type annotations to Python 3.6 style.
This commit was split by tabbott; this piece covers the vast majority
of files in Zulip, but excludes scripts/, tools/, and puppet/ to help
ensure we at least show the right error messages for Xenial systems.

We can likely further refine the remaining pieces with some testing.

Generated by com2ann, with whitespace fixes and various manual fixes
for runtime issues:

-    invoiced_through: Optional[LicenseLedger] = models.ForeignKey(
+    invoiced_through: Optional["LicenseLedger"] = models.ForeignKey(

-_apns_client: Optional[APNsClient] = None
+_apns_client: Optional["APNsClient"] = None

-    notifications_stream: Optional[Stream] = models.ForeignKey('Stream', related_name='+', null=True, blank=True, on_delete=CASCADE)
-    signup_notifications_stream: Optional[Stream] = models.ForeignKey('Stream', related_name='+', null=True, blank=True, on_delete=CASCADE)
+    notifications_stream: Optional["Stream"] = models.ForeignKey('Stream', related_name='+', null=True, blank=True, on_delete=CASCADE)
+    signup_notifications_stream: Optional["Stream"] = models.ForeignKey('Stream', related_name='+', null=True, blank=True, on_delete=CASCADE)

-    author: Optional[UserProfile] = models.ForeignKey('UserProfile', blank=True, null=True, on_delete=CASCADE)
+    author: Optional["UserProfile"] = models.ForeignKey('UserProfile', blank=True, null=True, on_delete=CASCADE)

-    bot_owner: Optional[UserProfile] = models.ForeignKey('self', null=True, on_delete=models.SET_NULL)
+    bot_owner: Optional["UserProfile"] = models.ForeignKey('self', null=True, on_delete=models.SET_NULL)

-    default_sending_stream: Optional[Stream] = models.ForeignKey('zerver.Stream', null=True, related_name='+', on_delete=CASCADE)
-    default_events_register_stream: Optional[Stream] = models.ForeignKey('zerver.Stream', null=True, related_name='+', on_delete=CASCADE)
+    default_sending_stream: Optional["Stream"] = models.ForeignKey('zerver.Stream', null=True, related_name='+', on_delete=CASCADE)
+    default_events_register_stream: Optional["Stream"] = models.ForeignKey('zerver.Stream', null=True, related_name='+', on_delete=CASCADE)

-descriptors_by_handler_id: Dict[int, ClientDescriptor] = {}
+descriptors_by_handler_id: Dict[int, "ClientDescriptor"] = {}

-worker_classes: Dict[str, Type[QueueProcessingWorker]] = {}
-queues: Dict[str, Dict[str, Type[QueueProcessingWorker]]] = {}
+worker_classes: Dict[str, Type["QueueProcessingWorker"]] = {}
+queues: Dict[str, Dict[str, Type["QueueProcessingWorker"]]] = {}

-AUTH_LDAP_REVERSE_EMAIL_SEARCH: Optional[LDAPSearch] = None
+AUTH_LDAP_REVERSE_EMAIL_SEARCH: Optional["LDAPSearch"] = None

Signed-off-by: Anders Kaseorg <anders@zulipchat.com>
2020-04-22 11:02:32 -07:00