Commit Graph

359 Commits

Author SHA1 Message Date
Mateusz Mandera 58d9975cca embed_links: Interrupt consume() function on worker timeout.
This fixes a bug introduced in 95b46549e1
which made the worker simply log a warning about the timeout and then
continue consume()ing the event that should have also been interrupted.

The idea here is to introduce an exception which can be used to
interrupt the consume() process without triggering the regular handling
of exceptions that happens in _handle_consume_exception.
2021-07-07 09:24:50 -07:00
Mateusz Mandera 95b46549e1 embed_links: Only log warning if worker times out.
Throwing an exception is excessive in case of this worker, as it's
expected for it to time out sometimes if the urls take too long to
process.

With a test added by tabbott.
2021-07-06 14:17:24 -07:00
Mateusz Mandera d9ab70bdde queue_processors: Make timer_expired receive list of events as argument.
This will give queue workers more flexibility when defining their own
override of the method.
2021-07-06 13:46:48 -07:00
Mateusz Mandera c101f3acd6 queue_processors: Make timer_expired() a method.
This allows specific queue workers to override the defaut behavior and
implement their own response to the timer expiring. We will want to use
this for embed_links queue at least.
2021-07-06 13:46:48 -07:00
PIG208 75cea329b4 markdown: Refactor out additional properties added to Message.
This adds a new class called MessageRenderingResult to contain the
additional properties we added to the Message object (like alert_words)
as well as the rendered content to ensure typesafe reference. No
behavioral change is made except changes in typing.

This is a preparatory change for adding django-stubs to the backend.

Related: #18777
2021-06-24 18:14:53 -07:00
sahil839 37bf160298 queue_processor: Add langauge to the events added to invites queue.
This is a prep commit for adding realm-level default for various
user settings. We add the language, in which the invite email will
be sent, to the dict added to queue itself to avoid making queries
in a loop when sending multiple emails from queue.

We also handle the case for old events in the queue.
2021-06-22 16:55:32 -07:00
Mateusz Mandera 496e744053 queue_processors: Log more detailed info when marking messages as read. 2021-05-26 11:17:21 -07:00
PIG208 7150fe5dc5 backend: Extract check_update_message from update_message_backend. 2021-05-09 20:44:04 -07:00
Cyril Pletinckx e4ff372fc3 emails: Transform SMTPException into EmailNotDeliveredException.
Django's default SMTP implementation can raise various exceptions
when trying to send an email. In order to allow Zulip calling code
to catch fewer exceptions to handle any cause of "email not
sent", we translate most of them into EmailNotDeliveredException.
The non-translated exceptions concern the connection with the
SMTP server. They were not merged with the rest to keep some
details about the nature of these.

Tests are implemented in the test_send_email.py module.
2021-05-05 20:16:11 -07:00
Alex Vandiver a9688ceb75 worker: Allow long MissedMessageWorker consumes.
This will stop dropping events in the case that the background
`maybe_send_batched_email` thread takes longer than 30s.  However, see
also #15280 and the TODO comment about how we lose events upon
restart; this worker is still lossy.
2021-05-04 08:45:48 -07:00
Cyril Pletinckx 9afde790c6 email: Open a single SMTP connection to send email batches.
Previously the outgoing emails were sent over several SMTP
connections through the EmailSendingWorker; establishing a new
connection each time adds notable overhead.

Redefine EmailSendingWorker worker to be a LoopQueueProcessingWorker,
which allows it to handle batches of events. At the same time, persist
the connection across email sending, if possible.

The connection is initialized in the constructor of the worker
in order to keep the same connection throughout the whole process.
The concrete implementation of the consume_batch function is simply
processing each email one at a time until they have all been sent.

In order to reuse the previously implemented decorator to retry
sending failures a new method that meets the decorator's required
arguments is declared inside the EmailSendingWorker class. This
allows to retry the sending process of a particular email inside
the batch if the caught exception leaves this process retriable.

A second retry mechanism is used inside the initialize_connection
function to redo the opening of the connection until it works or
until three attempts failed. For this purpose the backoff module
has been added to the dependencies and a test has been added to
ensure that this retry mechanism works well.

The connection is closed when the stop method is called.

Fixes: #17672.
2021-04-26 17:27:22 -07:00
Alex Vandiver 0ad17925eb send_email: Remove unnecessary send_email_from_dict.
This was introduced in 8321bd3f92 to serve as a sort of drop-in
replacement for zerver.lib.queue.queue_json_publish, but its use has
been subsequently cut out (e.g. `9fcdb6c83ac5`).

Remote its last callsite.
2021-04-26 17:27:22 -07:00
Anders Kaseorg 178736c8eb docs: Fix spelling errors caught by codespell.
Signed-off-by: Anders Kaseorg <anders@zulip.com>
2021-04-26 09:31:08 -07:00
Tim Abbott 260861426c queue_processors: Document when can remove compatibility code. 2021-04-16 09:55:14 -07:00
Anders Kaseorg e7ed907cf6 python: Convert deprecated Django ugettext alias to gettext.
django.utils.translation.ugettext is a deprecated alias of
django.utils.translation.gettext as of Django 3.0, and will be removed
in Django 4.0.

Signed-off-by: Anders Kaseorg <anders@zulip.com>
2021-04-15 18:01:34 -07:00
Anders Kaseorg 1fe29aad42 queue_processors: Simplify unnecessary use of Optional.
Signed-off-by: Anders Kaseorg <anders@zulip.com>
2021-04-13 08:54:26 -07:00
Alex Vandiver a280905a89 outgoing_webhook: Join build_bot_request and send_data_to_server.
The existing organization, of returning an opaque blob from
`build_bot_request`, which was later consumed by
`send_data_to_server`, is not particularly sensible; the steps become
oddly split between the OutgoingWebhookWorker, `do_rest_call`, and the
`OutgoingWebhookServiceInterface`.

Make the `OutgoingWebhookServiceInterface` in charge of building,
making, and returning the request in one method; another method
handles extracting content from a successful response.  `do_rest_call`
is responsible for calling both halves of this, and doing common error
handling.
2021-03-29 18:24:44 -07:00
Anders Kaseorg d55dc6f8f1 requirements: Upgrade python-zulip-api from Git.
Signed-off-by: Anders Kaseorg <anders@zulip.com>
2021-03-26 16:31:03 -07:00
Mateusz Mandera b9c1fed18c invites: Delete old compat code in the invites queue worker.
1.7.* is old enough at this point that we can clean up this code.
2021-02-26 08:26:43 -08:00
Mateusz Mandera 09fc79f911 actions: Remove realm argument to internal_send_private_message.
The argument is redundant.
2021-02-23 15:26:47 -08:00
Anders Kaseorg 6e4c3e41dc python: Normalize quotes with Black.
Signed-off-by: Anders Kaseorg <anders@zulip.com>
2021-02-12 13:11:19 -08:00
Anders Kaseorg 11741543da python: Reformat with Black, except quotes.
Signed-off-by: Anders Kaseorg <anders@zulip.com>
2021-02-12 13:11:19 -08:00
Anders Kaseorg 5028c081cb python: Merge concatenated string literals that Black would uglify.
Signed-off-by: Anders Kaseorg <anders@zulip.com>
2021-02-12 13:11:19 -08:00
Alex Vandiver d0f0c2f2ed digest: Fix the structure that we enqueue across when digesting.
This rename was missed in bfa0bdf3d6.
Without this fix, digest messages fail to send.
2021-02-08 17:28:59 -08:00
Anders Kaseorg 454144c35f queue_processors: Fix retry_send_email_failures type.
Signed-off-by: Anders Kaseorg <anders@zulip.com>
2021-01-26 13:27:50 -08:00
Steve Howell bfa0bdf3d6 email digests: Process users in chunks of 30.
This should make the queue empty more quickly,
because we do bulk queries to prevent database
hops.
2021-01-17 11:28:30 -08:00
Alex Vandiver c2526844e9 worker: Remove SignupWorker and friends.
ZULIP_FRIENDS_LIST_ID and MAILCHIMP_API_KEY are not currently used in
production.

This removes the unused 'signups' queue and worker.
2021-01-17 11:16:35 -08:00
Alex Vandiver d688e18de2 errors: Remove references to "deployment", use "host".
The `deployment` key was only set in `do_report_error`, which is now
only used in one codepath (the queue worker).  The logging handlers on
staging call notify_server_error directly, which omits the
`deployment` key.

Remove the odd one-of key, and instead simply do dispatch in
`do_report_error`.
2021-01-17 11:08:12 -08:00
Anders Kaseorg cc55393671 python: Open text files as text to skip decode operations.
Signed-off-by: Anders Kaseorg <anders@zulip.com>
2020-10-30 11:36:38 -07:00
Abhijeet Prasad Bodas e98a8856c7 logging: Add logging in deferred_work queue processor.
Adds logging statements in deferred_work queue consume.
2020-10-29 10:34:53 -07:00
Alex Vandiver 142de0f670 queue: Increase default timeout to 30s, from 10s.
Not all of the workers are known to be safe to interrupt; they might
leave inconsistent state.  As such, terminating them with timeouts
should currently only be a last-resort against stalled queues, not a
regular occurrence.
2020-10-27 16:39:31 -07:00
Alex Vandiver c73dd194f0 sentry: Group all worker timeouts together, by queue.
Since the exception can be triggered at arbitrary places in the stack
based on whenever the alarm happens to fire, they do not often group
together.

Explicitly group them together, grouped only by which queue the work
is in.
2020-10-27 16:39:31 -07:00
Alex Vandiver 7cf737988d queue: Be more explicit about test/real queue division. 2020-10-26 12:32:47 -07:00
Mateusz Mandera 716df658fa queue_processors: Don't run test queues with run-dev.py. 2020-10-18 14:07:31 -07:00
Steve Howell 378062cc83 performance: Avoid call to access_stream_by_id.
We already trust ids that are put on our queue
for deferred work. For example, see the code for
"mark_stream_messages_as_read_for_everyone"

We now pass stream_recipient_id when we queue
up work for do_mark_stream_messages_as_read.

This generally saves about 3 queries per
user when we unsubscribe them from a stream.
2020-10-16 12:58:11 -07:00
Steve Howell 31eb97ddde performance: Fix do_mark_stream_messages_as_read.
This function no longer asks for data that it
doesn't need.
2020-10-16 12:58:11 -07:00
Anders Kaseorg 6564540d15 docs: Fix some spelling errors.
Signed-off-by: Anders Kaseorg <anders@zulip.com>
2020-10-13 15:47:13 -07:00
Alex Vandiver f0b23b0752 queue: Switch non-batch consumer to also use start_json_consumer.
This has no effect on consumption rate, but unifies the codepaths.
Before:
```
$ ./manage.py queue_rate --count 50000
Purging queue...
Enqueue rate: 11187 / sec
Dequeue rate: 4158 / sec
```

After:
```
$ ./manage.py queue_rate --count 50000
Purging queue...
Enqueue rate: 11010 / sec
Dequeue rate: 4113 / sec
```
2020-10-11 14:19:42 -07:00
Alex Vandiver 45c9c3cc30 queue: Monitor user_activity queue, now that it has a consumer.
Since this was using repead individual get() calls previously, it
could not be monitored for having a consumer.  Add it in, by marking
it of queue type "consumer" (the default), and adding Nagios lines for
it.

Also adjust missedmessage_emails to be monitored; it stopped using
LoopQueueProcessingWorker in 5cec566cb9, but was never added back
into the set of monitored consumers.
2020-10-11 14:19:42 -07:00
Alex Vandiver f9358d5330 queue: Switch batch interface to use the channel.consume iterator.
This low-level interface allows consuming from a queue with timeouts.
This can be used to either consume in batches (with an upper timeout),
or one-at-a-time.  This is notably more performant than calling
`.get()` repeatedly (what json_drain_queue does under the hood), which
is "*highly discouraged* as it is *very inefficient*"[1].

Before this change:
```
$ ./manage.py queue_rate --count 10000 --batch
Purging queue...
Enqueue rate: 11158 / sec
Dequeue rate: 3075 / sec
```

After:
```
$ ./manage.py queue_rate --count 10000 --batch
Purging queue...
Enqueue rate: 11511 / sec
Dequeue rate: 19938 / sec
```

[1] https://www.rabbitmq.com/consumers.html#fetching
2020-10-11 14:19:40 -07:00
Alex Vandiver 2547bdbf4a queue: Rename consume_wrapper to a better name. 2020-10-09 20:40:51 -07:00
Alex Vandiver d5a6b0f99a queue: Rename queue_size, and update for all local queues.
Despite its name, the `queue_size` method does not return the number
of items in the queue; it returns the number of items that the local
consumer has delivered but unprocessed.  These are often, but not
always, the same.

RabbitMQ's queues maintain the queue of unacknowledged messages; when
a consumer connects, it sends to the consumer some number of messages
to handle, known as the "prefetch."  This is a performance
optimization, to ensure the consumer code does not need to wait for a
network round-trip before having new data to consume.

The default prefetch is 0, which means that RabbitMQ immediately dumps
all outstanding messages to the consumer, which slowly processes and
acknowledges them.  If a second consumer were to connect to the same
queue, they would receive no messages to process, as the first
consumer has already been allocated them.  If the first consumer
disconnects or crashes, all prior events sent to it are then made
available for other consumers on the queue.

The consumer does not know the total size of the queue -- merely how
many messages it has been handed.

No change is made to the prefetch here; however, future changes may
wish to limit the prefetch, either for memory-saving, or to allow
multiple consumers to work the same queue.

Rename the method to make clear that it only contains information
about the local queue in the consumer, not the full RabbitMQ queue.
Also include the waiting message count, which is used by the
`consume()` iterator for similar purpose to the pending events list.
2020-10-09 20:40:39 -07:00
Anders Kaseorg 9bfbb29763 queue_processors: Use try…finally to prevent leaking an alarm.
Otherwise, if consume_func raised an exception for any reason *other*
than the alarm being fired, the still-pending alarm would have fired
later at some arbitrary point in the calling code.

We need two try…finally blocks in case the signal arrives just before
signal.alarm(0).

Signed-off-by: Anders Kaseorg <anders@zulip.com>
2020-10-07 15:37:46 -07:00
Alex Vandiver d47637fa40 queue: Set a max consume timeout with SIGALRM.
SIGALRM is the simplest way to set a specific maximum duration that
queue workers can take to handle a specific message.  This only works
in non-threaded environments, however, as signal handlers are
per-process, not per-thread.

The MAX_CONSUME_SECONDS is set quite high, at 10s -- the longest
average worker consume time is embed_links, which hovers near 1s.
Since just knowing the recent mean does not give much information[1],
it is difficult to know how much variance is expected.  As such, we
set the threshold to be such that only events which are significant
outliers will be timed out.  This can be tuned downwards as more
statistics are gathered on the runtime of the workers.

The exception to this is DeferredWorker, which deals with quite-long
requests, and thus has no enforceable SLO.

[1] https://www.autodesk.com/research/publications/same-stats-different-graphs
2020-10-06 17:26:14 -07:00
Alex Vandiver baf882a133 queue: Only ACK drain_queue once it has completed work on the list.
Currently, drain_queue and json_drain_queue ack every message as it is
pulled off of the queue, until the queue is empty.  This means that if
the consumer crashes between pulling a batch of messages off the
queue, and actually processing them, those messages will be
permanently lost.  Sending an ACK on every message also results in a
significant amount lot of traffic to rabbitmq, with notable
performance implications.

Send a singular ACK after the processing has completed, by making
`drain_queue` into a contextmanager.  Additionally, use the `multiple`
flag to ACK all of the messages at once -- or explicitly NACK the
messages if processing failed.  Sending a NACK will re-queue them at
the front of the queue.

Performance of a no-op dequeue before this change:
```
$ ./manage.py queue_rate --count 50000 --batch
Purging queue...
Enqueue rate: 10847 / sec
Dequeue rate: 2479 / sec
```
Performance of a no-op dequeue after this change (a 25% increase):
```
$ ./manage.py queue_rate --count 50000 --batch
Purging queue...
Enqueue rate: 10752 / sec
Dequeue rate: 3079 / sec
```
2020-10-06 17:26:14 -07:00
Alex Vandiver df86a564dc queue: Let stop() work with LoopQueueProcessingWorker. 2020-10-06 17:26:14 -07:00
Alex Vandiver 8cf37a0d4b queue: Add a tool to profile no-op enqueue and dequeue actions. 2020-10-06 17:26:14 -07:00
Tim Abbott 7fa8bafe81 lint: Fix type of initial 0 in queue monitoring. 2020-09-21 15:47:30 -07:00
Mateusz Mandera 810514dd9d queue: Update stats file every 30 seconds.
This system can't update stats while the queue is idle, without using
threads for this, but at least we ensure to update the file after
consuming an event if more than MAX_SECONDS_BEFORE_UPDATE_STATS passed
since the last update, regardless of the number of iterations done so
far.
2020-09-21 15:24:02 -07:00
Mateusz Mandera 40c4511a9c queue: Fix misspelled consume_iteration_counter variable. 2020-09-21 15:22:58 -07:00