The MissedMessage queue worker is the single callsite of
`handle_missedmessage_emails`, which immediately transforms the list
of events into a dict keyed by message-id.
Skip the intermediate list step, and use defaultdict and a dataclass
to simplify and make explicit the pieces. This removes the unused
user_profile_id and message_id pieces of the data structure.
For tests that use the dev server, like test-api, test-js-with-puppeteer,
we don't have the consumers for the queues. As they eventually timeout,
we get unnecessary error messages. This adds a new flag, disable_timeout,
to disable this behavior for the test cases.
Realm exports may OOM on deployments with low memory; to ensure
forward progress, log the start time in the RealmAuditLog entry, and
key off of the existence of that to prevent re-attempting an export
which was already tried once.
We previously hard-coded 6 threads for the realm export; in low-memory
environments, spawning 6 threads for an export can lean to an OOM,
which kills the process and leaves a partial export on disk -- which
is then tried again, since the export was never completed. This leads
to excessive disk consumption and brief repeated outages of all other
workers, until the failing export job is manually de-queued somehow.
Lower the export to only use on thread if it is already running in a
multi-threaded environment. Note that this does not guarantee forward
progress, it merely makes it more likely that exports will succeed in
low-memory deployments.
The previous implementation leaked database connections, as a new
thread (and thus a new thread-local database connection) was made for
each timer execution. While these connections were relatively
lightweight in Python, they also incur memory overhead in the
PostgreSQL server itself. The logic for managing the timer was also
unclear, and the unavoidable deadlock in the stopping logic was rather
unfortunate.
Rewrite with one explicit worker thread which handles the delayed
message sending. The RabbitMQ consumer creates the database rows, and
notifies the worker to start its 5s timeout. Because it is controlled
by a condition variable, it does not hold the lock while waiting, and
can be notified to exit.
This is helpful for debugging -- generally these tasks are in a worker
queue because they take a long time to run, so knowing what long task
is about to start before it does, rather than just after, is useful.
return inside finally blocks causes exceptions to be silenced.
Although these blocks follow blanket ‘except Exception’ handlers, they
do not seem to have a goal of silencing BaseException and exceptions
thrown by the exception handler, so rewrite them to avoid it.
Signed-off-by: Anders Kaseorg <anders@zulip.com>
Both of these compatibility blocks can be deleted, since you can't
upgrade directly to any supported release from the versions where the
old event formats would be used.
Previously, an active production Zulip server would experience a class
of deadlocks caused by two or more concurrent bulk update operations
on the UserMessage table.
This is because UPDATE ... SET ... WHERE statements that execute in
parallel take row-level UPDATE locks as they get results; since the
query plans may result in getting rows in different orders between two
queries, this can result in deadlocks.
Some databases allow ORDER BY on their UPDATE ... WHERE statements;
PostgreSQL does not. In PostgreSQL, the answer is to do a sub-select
with an ORDER BY ... FOR UPDATE to ensure consistent ordering on row
locks.
We do this all code paths using bitand or bitor as part of bulk
editing message flags, which should ensure that these concurrent
operations obtain row level locks on the table in the same order.
Fixes#19054.
cfcbf58cd1 rightly removed the use of `user_ids` in
`render_markdown`, which in turn makes it unnecessary in
`render_incoming_message`.
Remove the unnecessary parameter from `render_incoming_message`.
We leave the fetching of links outside of the lock, as they could take
seconds, which is an unreasonable amount of time to hold a lock on the
message row. This may result in unnecessary work, in the case that
the message was since edited, but the unnecessary work is preferable
to blocking other work on the message row for the duration.
This commit changes the invite API to accept invitation
expiration time in minutes since we are going to add a
custom option in further commits which would allow a user
to set expiration time in minutes, hours and weeks as well.
The `get_link_embed_data` / `link_embed_data_from_cache` pair as
introduced in c93f1d4eda uses the cache
as a temporary store inside of the `embed_links` worker; this means
that it must be durable storage, or the worker will stall and re-fetch
the same links to preview them.
Switch to plumbing through the fetched URL embed data as an parameter
to the Markdown evaluation which uses them, rather than using the
cache as an intermediary. This frees up the cache to be merely a
non-durable cache.
As a side-effect, this removes get_cache_with_key, and
link_embed_data_from_cache which was its only callsite.
The database value for expiry_date is None for the invite
that will never expire and the clients send -1 as value
in the API similar to the message retention setting.
Also, when passing invite_expire_in_days as an argument
in various functions, invite_expire_in_days is passed as
-1 for "Never expires" option since invite_expire_in_days
is an optional argument in some functions and thus we cannot
pass "None" value.
As a preparatory step to refactoring json_success to accept
request as a parameter, change `do_report_error`, which is
called from the events queue for "error_reports", to return
None instead of json_success.
Adds an assertion error to `ErrorReporter` queue processor
and removes `JsonableError` from `do_report_error`.
It is likely that `do_error_report` was moved from a view in a
previous refactor, but was not updated to no longer return an
HttpReponse.
Python's behaviour on `sys.exit` is to wait for all non-daemon threads
to exit. In the context of the missedmessage_emails worker, if any
work is pending, a non-daemon Timer thread exists, which is waiting
for 5 seconds. As soon as that thread is serviced, it sets up another
5-second Timer, a process which repeats until all
ScheduledMessageNotificationEmail records have been handled. This
likely takes two minutes, but may theoretically take up to a week
until the thread exits, and thus sys.exit can complete.
Supervisor only gives the process 30 seconds to shut down, so
something else must prevent this endless Timer.
When `stop` is called, take the lock so we can mutate the timer.
However, since `stop` may have been called from a signal handler, our
thread may _already_ have the lock. As Python provides no way to know
if our thread is the one which has the lock, make the lock a
re-entrant one, allowing us to always try to take it.
With the lock in hand, cancel any outstanding timers. A race exists
where the timer may not be able to be canceled because it has
finished, maybe_send_batched_emails has been called, and is itself
blocked on the lock. Handle this case by timing out the thread join
in `stop()`, and signal the running thread to exit by unsetting the
timer event, which will be detected once it claims the lock.
RabbitMQ clients have a setting called prefetch[1], which controls how
many un-acknowledged events the server forwards to the local queue in
the client. The default is 0; this means that when clients first
connect, the server must send them every message in the queue.
This itself may cause unbounded memory usage in the client, but also
has other detrimental effects. While the client is attempting to
process the head of the queue, it may be unable to read from the TCP
socket at the rate that the server is sending to it -- filling the TCP
buffers, and causing the server's writes to block. If the server
blocks for more than 30 seconds, it times out the send, and closes the
connection with:
```
closing AMQP connection <0.30902.126> (127.0.0.1:53870 -> 127.0.0.1:5672):
{writer,send_failed,{error,timeout}}
```
This is https://github.com/pika/pika/issues/753#issuecomment-318119222.
Set a prefetch limit of 100 messages, or the batch size, to better
handle queues which start with large numbers of outstanding events.
Setting prefetch=1 causes significant performance degradation in the
no-op queue worker, to 30% of the prefetch=0 performance. Setting
prefetch=100 achieves 90% of the prefetch=0 performance, and higher
values offer only minor gains above that. For batch workers, their
performance is not notably degraded by prefetch equal to their batch
size, and they cannot function on smaller prefetches than their batch
size.
We also set a 100-count prefetch on Tornado workers, as they are
potentially susceptible to the same effect.
[1] https://www.rabbitmq.com/confirms.html#channel-qos-prefetch
The `current_queue_size` key in the queue monitoring stats file was
the local queue size, not the global queue size -- d5a6b0f99a
renamed the function, but did not adjust the queue monitoring JSON,
despite the last use of it having been removed in cd9b194d88.
The function is still used to mark "we emptied our queue," and it
remains a reasonable metric for that.
For export realm following changes have been made:
- `./manage.py export --upload` would delete `.tar.gz` and unpacked dir
- `./manage.py export` would only delete `unpacked dir`
Besides, we have removed `--delete-after-upload` as we have set it as
the default.
Fixes#20081
Since 3853285241, PushNotificationsWorker uses the aioapns library
to send Apple push notifications. This introduces an asyncio event
loop into this worker process, which, if unlucky, can respond poorly
when a SIGALRM is introduced to it:
```
[asyncio] Task exception was never retrieved
future: <Task finished coro=<send_apple_push_notification.<locals>.attempt_send() done, defined at /path/to/zerver/lib/push_notifications.py:166> exception=WorkerTimeoutException(30, 1)>
Traceback (most recent call last):
File "/path/to/zerver/lib/push_notifications.py", line 169, in attempt_send
result = await apns_context.apns.send_notification(request)
File "/path/to/zulip-py3-venv/lib/python3.6/site-packages/aioapns/client.py", line 57, in send_notification
response = await self.pool.send_notification(request)
File "/path/to/zulip-py3-venv/lib/python3.6/site-packages/aioapns/connection.py", line 407, in send_notification
response = await connection.send_notification(request)
File "/path/to/zulip-py3-venv/lib/python3.6/site-packages/aioapns/connection.py", line 189, in send_notification
data = json.dumps(request.message, ensure_ascii=False).encode()
File "/usr/lib/python3.6/json/__init__.py", line 238, in dumps
**kw).encode(obj)
File "/usr/lib/python3.6/json/encoder.py", line 199, in encode
chunks = self.iterencode(o, _one_shot=True)
File "/usr/lib/python3.6/json/encoder.py", line 257, in iterencode
return _iterencode(o, 0)
File "/path/to/zerver/worker/queue_processors.py", line 353, in timer_expired
raise WorkerTimeoutException(limit, len(events))
zerver.worker.queue_processors.WorkerTimeoutException: Timed out after 30 seconds processing 1 events
```
...which subsequently leads to the worker failing to make any progress
on the queue.
Remove the timeout on the worker. This may result in failing to make
forward progress if Apple/Google take overly long handling requests,
but is likely preferable to failing to make forward progress if _one_
request takes too long and gets unlucky with when the signal comes
through.