2017-11-30 00:03:59 +01:00
|
|
|
import logging
|
|
|
|
import random
|
2023-03-23 20:04:19 +01:00
|
|
|
import ssl
|
2017-11-30 00:03:59 +01:00
|
|
|
import threading
|
|
|
|
import time
|
2021-08-03 02:30:24 +02:00
|
|
|
from abc import ABCMeta, abstractmethod
|
2020-06-11 00:54:34 +02:00
|
|
|
from collections import defaultdict
|
2023-03-23 20:04:19 +01:00
|
|
|
from typing import Any, Callable, Dict, Generic, List, Mapping, Optional, Set, Type, TypeVar, Union
|
2017-11-30 00:03:59 +01:00
|
|
|
|
2020-08-07 01:09:47 +02:00
|
|
|
import orjson
|
2013-01-11 21:16:42 +01:00
|
|
|
import pika
|
2019-01-31 08:10:34 +01:00
|
|
|
import pika.adapters.tornado_connection
|
2021-08-03 03:53:44 +02:00
|
|
|
import pika.connection
|
|
|
|
import pika.exceptions
|
2020-06-11 00:54:34 +02:00
|
|
|
from django.conf import settings
|
2016-07-03 15:58:27 +02:00
|
|
|
from pika.adapters.blocking_connection import BlockingChannel
|
2021-08-03 03:42:32 +02:00
|
|
|
from pika.channel import Channel
|
2016-07-03 15:58:27 +02:00
|
|
|
from pika.spec import Basic
|
2017-11-30 00:40:45 +01:00
|
|
|
from tornado import ioloop
|
2013-01-11 21:16:42 +01:00
|
|
|
|
2021-08-18 17:54:22 +02:00
|
|
|
from zerver.lib.utils import assert_is_not_none, statsd
|
2016-07-03 15:58:27 +02:00
|
|
|
|
2017-08-18 07:56:53 +02:00
|
|
|
MAX_REQUEST_RETRIES = 3
|
2021-08-03 03:42:32 +02:00
|
|
|
ChannelT = TypeVar("ChannelT", Channel, BlockingChannel)
|
|
|
|
Consumer = Callable[[ChannelT, Basic.Deliver, pika.BasicProperties, bytes], None]
|
2013-04-16 22:58:21 +02:00
|
|
|
|
2023-02-02 04:35:24 +01:00
|
|
|
|
2013-01-11 21:16:42 +01:00
|
|
|
# This simple queuing library doesn't expose much of the power of
|
2022-02-08 00:13:33 +01:00
|
|
|
# RabbitMQ/Pika's queuing system; its purpose is to just provide an
|
2013-01-11 21:16:42 +01:00
|
|
|
# interface for external files to put things into queues and take them
|
|
|
|
# out from bots without having to import pika code all over our codebase.
|
2021-08-03 03:42:32 +02:00
|
|
|
class QueueClient(Generic[ChannelT], metaclass=ABCMeta):
|
2021-02-12 08:19:30 +01:00
|
|
|
def __init__(
|
|
|
|
self,
|
|
|
|
# Disable RabbitMQ heartbeats by default because BlockingConnection can't process them
|
|
|
|
rabbitmq_heartbeat: Optional[int] = 0,
|
2021-11-15 21:03:55 +01:00
|
|
|
prefetch: int = 0,
|
2021-02-12 08:19:30 +01:00
|
|
|
) -> None:
|
2021-02-12 08:20:45 +01:00
|
|
|
self.log = logging.getLogger("zulip.queue")
|
python: Convert assignment type annotations to Python 3.6 style.
This commit was split by tabbott; this piece covers the vast majority
of files in Zulip, but excludes scripts/, tools/, and puppet/ to help
ensure we at least show the right error messages for Xenial systems.
We can likely further refine the remaining pieces with some testing.
Generated by com2ann, with whitespace fixes and various manual fixes
for runtime issues:
- invoiced_through: Optional[LicenseLedger] = models.ForeignKey(
+ invoiced_through: Optional["LicenseLedger"] = models.ForeignKey(
-_apns_client: Optional[APNsClient] = None
+_apns_client: Optional["APNsClient"] = None
- notifications_stream: Optional[Stream] = models.ForeignKey('Stream', related_name='+', null=True, blank=True, on_delete=CASCADE)
- signup_notifications_stream: Optional[Stream] = models.ForeignKey('Stream', related_name='+', null=True, blank=True, on_delete=CASCADE)
+ notifications_stream: Optional["Stream"] = models.ForeignKey('Stream', related_name='+', null=True, blank=True, on_delete=CASCADE)
+ signup_notifications_stream: Optional["Stream"] = models.ForeignKey('Stream', related_name='+', null=True, blank=True, on_delete=CASCADE)
- author: Optional[UserProfile] = models.ForeignKey('UserProfile', blank=True, null=True, on_delete=CASCADE)
+ author: Optional["UserProfile"] = models.ForeignKey('UserProfile', blank=True, null=True, on_delete=CASCADE)
- bot_owner: Optional[UserProfile] = models.ForeignKey('self', null=True, on_delete=models.SET_NULL)
+ bot_owner: Optional["UserProfile"] = models.ForeignKey('self', null=True, on_delete=models.SET_NULL)
- default_sending_stream: Optional[Stream] = models.ForeignKey('zerver.Stream', null=True, related_name='+', on_delete=CASCADE)
- default_events_register_stream: Optional[Stream] = models.ForeignKey('zerver.Stream', null=True, related_name='+', on_delete=CASCADE)
+ default_sending_stream: Optional["Stream"] = models.ForeignKey('zerver.Stream', null=True, related_name='+', on_delete=CASCADE)
+ default_events_register_stream: Optional["Stream"] = models.ForeignKey('zerver.Stream', null=True, related_name='+', on_delete=CASCADE)
-descriptors_by_handler_id: Dict[int, ClientDescriptor] = {}
+descriptors_by_handler_id: Dict[int, "ClientDescriptor"] = {}
-worker_classes: Dict[str, Type[QueueProcessingWorker]] = {}
-queues: Dict[str, Dict[str, Type[QueueProcessingWorker]]] = {}
+worker_classes: Dict[str, Type["QueueProcessingWorker"]] = {}
+queues: Dict[str, Dict[str, Type["QueueProcessingWorker"]]] = {}
-AUTH_LDAP_REVERSE_EMAIL_SEARCH: Optional[LDAPSearch] = None
+AUTH_LDAP_REVERSE_EMAIL_SEARCH: Optional["LDAPSearch"] = None
Signed-off-by: Anders Kaseorg <anders@zulipchat.com>
2020-04-22 01:09:50 +02:00
|
|
|
self.queues: Set[str] = set()
|
2021-08-03 03:42:32 +02:00
|
|
|
self.channel: Optional[ChannelT] = None
|
2021-11-15 21:03:55 +01:00
|
|
|
self.prefetch = prefetch
|
2021-08-03 03:42:32 +02:00
|
|
|
self.consumers: Dict[str, Set[Consumer[ChannelT]]] = defaultdict(set)
|
2018-03-20 02:08:52 +01:00
|
|
|
self.rabbitmq_heartbeat = rabbitmq_heartbeat
|
2020-10-09 22:50:53 +02:00
|
|
|
self.is_consuming = False
|
2013-01-18 19:15:09 +01:00
|
|
|
self._connect()
|
2013-01-11 21:16:42 +01:00
|
|
|
|
2021-08-03 02:30:24 +02:00
|
|
|
@abstractmethod
|
2017-11-05 11:15:10 +01:00
|
|
|
def _connect(self) -> None:
|
2021-08-03 02:30:24 +02:00
|
|
|
raise NotImplementedError
|
2013-01-18 19:15:09 +01:00
|
|
|
|
2021-08-03 02:30:24 +02:00
|
|
|
@abstractmethod
|
2017-11-05 11:15:10 +01:00
|
|
|
def _reconnect(self) -> None:
|
2021-08-03 02:30:24 +02:00
|
|
|
raise NotImplementedError
|
2013-03-11 19:53:41 +01:00
|
|
|
|
2017-11-05 11:15:10 +01:00
|
|
|
def _get_parameters(self) -> pika.ConnectionParameters:
|
2021-08-18 17:54:22 +02:00
|
|
|
credentials = pika.PlainCredentials(
|
|
|
|
settings.RABBITMQ_USERNAME, assert_is_not_none(settings.RABBITMQ_PASSWORD)
|
|
|
|
)
|
2019-07-04 04:08:19 +02:00
|
|
|
|
|
|
|
# With BlockingConnection, we are passed
|
|
|
|
# self.rabbitmq_heartbeat=0, which asks to explicitly disable
|
|
|
|
# the RabbitMQ heartbeat feature. This is correct since that
|
|
|
|
# heartbeat doesn't make sense with BlockingConnection (we do
|
|
|
|
# need it for TornadoConnection).
|
|
|
|
#
|
|
|
|
# Where we've disabled RabbitMQ's heartbeat, the only
|
|
|
|
# keepalive on this connection is the TCP keepalive (defaults:
|
|
|
|
# `/proc/sys/net/ipv4/tcp_keepalive_*`). On most Linux
|
|
|
|
# systems, the default is to start sending keepalive packets
|
|
|
|
# after TCP_KEEPIDLE (7200 seconds) of inactivity; after that
|
|
|
|
# point, it send them every TCP_KEEPINTVL (typically 75s).
|
|
|
|
# Some Kubernetes / Docker Swarm networks can kill "idle" TCP
|
|
|
|
# connections after as little as ~15 minutes of inactivity.
|
|
|
|
# To avoid this killing our RabbitMQ connections, we set
|
|
|
|
# TCP_KEEPIDLE to something significantly below 15 minutes.
|
|
|
|
tcp_options = None
|
|
|
|
if self.rabbitmq_heartbeat == 0:
|
|
|
|
tcp_options = dict(TCP_KEEPIDLE=60 * 5)
|
|
|
|
|
2023-03-23 20:04:19 +01:00
|
|
|
ssl_options: Union[
|
|
|
|
Type[pika.ConnectionParameters._DEFAULT], pika.SSLOptions
|
|
|
|
] = pika.ConnectionParameters._DEFAULT
|
|
|
|
if settings.RABBITMQ_USE_TLS:
|
|
|
|
ssl_options = pika.SSLOptions(context=ssl.create_default_context())
|
|
|
|
|
2021-02-12 08:19:30 +01:00
|
|
|
return pika.ConnectionParameters(
|
|
|
|
settings.RABBITMQ_HOST,
|
2023-02-16 17:19:57 +01:00
|
|
|
port=settings.RABBITMQ_PORT,
|
2021-02-12 08:19:30 +01:00
|
|
|
heartbeat=self.rabbitmq_heartbeat,
|
|
|
|
tcp_options=tcp_options,
|
2023-03-23 20:04:19 +01:00
|
|
|
ssl_options=ssl_options,
|
2021-02-12 08:19:30 +01:00
|
|
|
credentials=credentials,
|
|
|
|
)
|
2013-01-17 23:15:40 +01:00
|
|
|
|
2017-11-05 11:15:10 +01:00
|
|
|
def _generate_ctag(self, queue_name: str) -> str:
|
2020-06-10 06:41:04 +02:00
|
|
|
return f"{queue_name}_{str(random.getrandbits(16))}"
|
2013-02-15 17:01:28 +01:00
|
|
|
|
2021-08-03 03:42:32 +02:00
|
|
|
def _reconnect_consumer_callback(self, queue: str, consumer: Consumer[ChannelT]) -> None:
|
2023-02-04 01:42:19 +01:00
|
|
|
self.log.info("Queue reconnecting saved consumer %r to queue %s", consumer, queue)
|
2020-07-05 03:18:11 +02:00
|
|
|
self.ensure_queue(
|
|
|
|
queue,
|
|
|
|
lambda channel: channel.basic_consume(
|
|
|
|
queue,
|
|
|
|
consumer,
|
|
|
|
consumer_tag=self._generate_ctag(queue),
|
|
|
|
),
|
|
|
|
)
|
2013-09-09 20:13:40 +02:00
|
|
|
|
2017-11-05 11:15:10 +01:00
|
|
|
def _reconnect_consumer_callbacks(self) -> None:
|
2013-03-11 19:53:41 +01:00
|
|
|
for queue, consumers in self.consumers.items():
|
|
|
|
for consumer in consumers:
|
2013-09-09 20:13:40 +02:00
|
|
|
self._reconnect_consumer_callback(queue, consumer)
|
2013-03-11 19:53:41 +01:00
|
|
|
|
2017-11-05 11:15:10 +01:00
|
|
|
def ready(self) -> bool:
|
2013-01-18 19:15:09 +01:00
|
|
|
return self.channel is not None
|
|
|
|
|
2021-08-03 02:30:24 +02:00
|
|
|
@abstractmethod
|
2021-08-03 03:54:30 +02:00
|
|
|
def ensure_queue(self, queue_name: str, callback: Callable[[ChannelT], object]) -> None:
|
2021-08-03 02:30:24 +02:00
|
|
|
raise NotImplementedError
|
2013-01-11 21:16:42 +01:00
|
|
|
|
2020-06-06 04:22:15 +02:00
|
|
|
def publish(self, queue_name: str, body: bytes) -> None:
|
2021-08-03 03:42:32 +02:00
|
|
|
def do_publish(channel: ChannelT) -> None:
|
2020-07-05 03:18:11 +02:00
|
|
|
channel.basic_publish(
|
2021-02-12 08:20:45 +01:00
|
|
|
exchange="",
|
2017-01-24 07:06:13 +01:00
|
|
|
routing_key=queue_name,
|
|
|
|
properties=pika.BasicProperties(delivery_mode=2),
|
2021-02-12 08:19:30 +01:00
|
|
|
body=body,
|
|
|
|
)
|
2013-04-16 22:58:21 +02:00
|
|
|
|
2020-06-10 06:41:04 +02:00
|
|
|
statsd.incr(f"rabbitmq.publish.{queue_name}")
|
2013-04-16 22:58:21 +02:00
|
|
|
|
|
|
|
self.ensure_queue(queue_name, do_publish)
|
2013-01-11 21:16:42 +01:00
|
|
|
|
2020-06-06 04:22:15 +02:00
|
|
|
def json_publish(self, queue_name: str, body: Mapping[str, Any]) -> None:
|
2020-08-07 01:09:47 +02:00
|
|
|
data = orjson.dumps(body)
|
2013-03-11 19:53:41 +01:00
|
|
|
try:
|
2020-06-06 04:22:15 +02:00
|
|
|
self.publish(queue_name, data)
|
queue: Prevent an AttributeError rather than swallowing it.
When the RabbitMQ server disappears, we log errors like these:
```
Traceback (most recent call last):
File "./zerver/lib/queue.py", line 114, in json_publish
self.publish(queue_name, ujson.dumps(body))
File "./zerver/lib/queue.py", line 108, in publish
self.ensure_queue(queue_name, do_publish)
File "./zerver/lib/queue.py", line 88, in ensure_queue
if not self.connection.is_open:
AttributeError: 'NoneType' object has no attribute 'is_open'
During handling of the above exception, another exception occurred:
[... traceback of connection failure inside the retried self.publish()]
```
That's a type error -- a programming error, not an exceptional
condition from outside the program. Fix the programming error.
Also move the retry out of the `except:` block, so that if it also
fails we don't get the exceptions stacked on each other. This is a
new feature of Python 3 which is sometimes indispensable for
debugging, and which surfaced this nit in the logs (on Python 2 we'd
never see the AttributeError part), but in some cases it can cause a
lot of spew if care isn't taken.
2017-10-19 00:11:55 +02:00
|
|
|
return
|
|
|
|
except pika.exceptions.AMQPConnectionError:
|
2013-03-11 19:53:41 +01:00
|
|
|
self.log.warning("Failed to send to rabbitmq, trying to reconnect and send again")
|
|
|
|
|
queue: Prevent an AttributeError rather than swallowing it.
When the RabbitMQ server disappears, we log errors like these:
```
Traceback (most recent call last):
File "./zerver/lib/queue.py", line 114, in json_publish
self.publish(queue_name, ujson.dumps(body))
File "./zerver/lib/queue.py", line 108, in publish
self.ensure_queue(queue_name, do_publish)
File "./zerver/lib/queue.py", line 88, in ensure_queue
if not self.connection.is_open:
AttributeError: 'NoneType' object has no attribute 'is_open'
During handling of the above exception, another exception occurred:
[... traceback of connection failure inside the retried self.publish()]
```
That's a type error -- a programming error, not an exceptional
condition from outside the program. Fix the programming error.
Also move the retry out of the `except:` block, so that if it also
fails we don't get the exceptions stacked on each other. This is a
new feature of Python 3 which is sometimes indispensable for
debugging, and which surfaced this nit in the logs (on Python 2 we'd
never see the AttributeError part), but in some cases it can cause a
lot of spew if care isn't taken.
2017-10-19 00:11:55 +02:00
|
|
|
self._reconnect()
|
2020-06-06 04:22:15 +02:00
|
|
|
self.publish(queue_name, data)
|
2013-01-11 21:16:42 +01:00
|
|
|
|
2021-08-03 02:30:24 +02:00
|
|
|
|
2021-08-03 03:42:32 +02:00
|
|
|
class SimpleQueueClient(QueueClient[BlockingChannel]):
|
2021-08-03 01:02:27 +02:00
|
|
|
connection: Optional[pika.BlockingConnection]
|
|
|
|
|
2021-08-03 02:30:24 +02:00
|
|
|
def _connect(self) -> None:
|
|
|
|
start = time.time()
|
|
|
|
self.connection = pika.BlockingConnection(self._get_parameters())
|
|
|
|
self.channel = self.connection.channel()
|
Move QOS configuration into connection, not queue verification.
Prior to aa032bf62c2e, QOS prefetch was set on every `publish` and
before every `start_json_consumer` -- which had a large and
unnecessary effect on publishing rates, which don't care about the
prefetch QOS settings at all, much less re-setting them before every
publish.
Unfortunately, that change had the effect of causing prefetch settings
to almost never be respected -- since the configuration happened in
`ensure_queue`s re-check that the connection was still live. The
initial connection is established in `__init__` via `_connect`, and
the consumer only calls `ensure_queue` once, before setting up the
consumer.
Having no prefetch value set causes an unbounded prefetch; this
manifests itself as the server attempting to shove every event down to
the worker as soon as it starts consuming; if the client cannot keep
up, the server closes the connection. The worker observes the
connection has been shut down, and restarts. While this does make
forward progress, it causes large queues to make progress more slowly,
as they suffer from sporadic restarts.
Shift the QOS configuration to when the connection is set up, which is
a more sensible place for it in general -- and ensures that it is set
on consumers and producers alike, but only once per connection
establishment.
2023-03-20 16:51:14 +01:00
|
|
|
self.channel.basic_qos(prefetch_count=self.prefetch)
|
2023-02-04 01:42:19 +01:00
|
|
|
self.log.info("SimpleQueueClient connected (connecting took %.3fs)", time.time() - start)
|
2021-08-03 02:30:24 +02:00
|
|
|
|
|
|
|
def _reconnect(self) -> None:
|
|
|
|
self.connection = None
|
|
|
|
self.channel = None
|
|
|
|
self.queues = set()
|
|
|
|
self._connect()
|
|
|
|
|
|
|
|
def close(self) -> None:
|
|
|
|
if self.connection is not None:
|
|
|
|
self.connection.close()
|
|
|
|
|
2021-08-03 03:54:30 +02:00
|
|
|
def ensure_queue(self, queue_name: str, callback: Callable[[BlockingChannel], object]) -> None:
|
2021-08-03 02:30:24 +02:00
|
|
|
"""Ensure that a given queue has been declared, and then call
|
|
|
|
the callback with no arguments."""
|
|
|
|
if self.connection is None or not self.connection.is_open:
|
|
|
|
self._connect()
|
2023-02-23 05:36:14 +01:00
|
|
|
assert self.channel is not None
|
|
|
|
else:
|
|
|
|
assert self.channel is not None
|
2021-11-15 21:03:55 +01:00
|
|
|
|
2021-08-03 02:30:24 +02:00
|
|
|
if queue_name not in self.queues:
|
|
|
|
self.channel.queue_declare(queue=queue_name, durable=True)
|
|
|
|
self.queues.add(queue_name)
|
2021-11-15 21:03:55 +01:00
|
|
|
|
2021-08-03 02:30:24 +02:00
|
|
|
callback(self.channel)
|
|
|
|
|
2021-02-12 08:19:30 +01:00
|
|
|
def start_json_consumer(
|
|
|
|
self,
|
|
|
|
queue_name: str,
|
|
|
|
callback: Callable[[List[Dict[str, Any]]], None],
|
|
|
|
batch_size: int = 1,
|
|
|
|
timeout: Optional[int] = None,
|
|
|
|
) -> None:
|
2020-10-09 22:50:53 +02:00
|
|
|
if batch_size == 1:
|
|
|
|
timeout = None
|
|
|
|
|
|
|
|
def do_consume(channel: BlockingChannel) -> None:
|
|
|
|
events: List[Dict[str, Any]] = []
|
|
|
|
last_process = time.time()
|
|
|
|
max_processed: Optional[int] = None
|
|
|
|
self.is_consuming = True
|
|
|
|
|
|
|
|
# This iterator technique will iteratively collect up to
|
|
|
|
# batch_size events from the RabbitMQ queue (if present)
|
|
|
|
# before calling the callback with the batch. If not
|
|
|
|
# enough events are present, it will sleep for at most
|
|
|
|
# timeout seconds before calling the callback with the
|
|
|
|
# batch of events it has.
|
|
|
|
for method, properties, body in channel.consume(queue_name, inactivity_timeout=timeout):
|
|
|
|
if body is not None:
|
2021-08-03 01:02:27 +02:00
|
|
|
assert method is not None
|
2020-10-09 22:50:53 +02:00
|
|
|
events.append(orjson.loads(body))
|
|
|
|
max_processed = method.delivery_tag
|
|
|
|
now = time.time()
|
|
|
|
if len(events) >= batch_size or (timeout and now >= last_process + timeout):
|
|
|
|
if events:
|
2021-08-03 01:02:27 +02:00
|
|
|
assert max_processed is not None
|
2020-10-09 22:50:53 +02:00
|
|
|
try:
|
|
|
|
callback(events)
|
|
|
|
channel.basic_ack(max_processed, multiple=True)
|
2020-10-09 03:00:21 +02:00
|
|
|
except BaseException:
|
2020-10-09 22:50:53 +02:00
|
|
|
channel.basic_nack(max_processed, multiple=True)
|
|
|
|
raise
|
|
|
|
events = []
|
|
|
|
last_process = now
|
|
|
|
if not self.is_consuming:
|
|
|
|
break
|
2021-02-12 08:19:30 +01:00
|
|
|
|
2020-10-09 22:50:53 +02:00
|
|
|
self.ensure_queue(queue_name, do_consume)
|
|
|
|
|
queue: Rename queue_size, and update for all local queues.
Despite its name, the `queue_size` method does not return the number
of items in the queue; it returns the number of items that the local
consumer has delivered but unprocessed. These are often, but not
always, the same.
RabbitMQ's queues maintain the queue of unacknowledged messages; when
a consumer connects, it sends to the consumer some number of messages
to handle, known as the "prefetch." This is a performance
optimization, to ensure the consumer code does not need to wait for a
network round-trip before having new data to consume.
The default prefetch is 0, which means that RabbitMQ immediately dumps
all outstanding messages to the consumer, which slowly processes and
acknowledges them. If a second consumer were to connect to the same
queue, they would receive no messages to process, as the first
consumer has already been allocated them. If the first consumer
disconnects or crashes, all prior events sent to it are then made
available for other consumers on the queue.
The consumer does not know the total size of the queue -- merely how
many messages it has been handed.
No change is made to the prefetch here; however, future changes may
wish to limit the prefetch, either for memory-saving, or to allow
multiple consumers to work the same queue.
Rename the method to make clear that it only contains information
about the local queue in the consumer, not the full RabbitMQ queue.
Also include the waiting message count, which is used by the
`consume()` iterator for similar purpose to the pending events list.
2020-10-09 22:12:55 +02:00
|
|
|
def local_queue_size(self) -> int:
|
2020-07-05 03:18:11 +02:00
|
|
|
assert self.channel is not None
|
2021-08-03 01:02:27 +02:00
|
|
|
return self.channel.get_waiting_message_count() + len(
|
|
|
|
self.channel._pending_events # type: ignore[attr-defined] # private member missing from stubs
|
|
|
|
)
|
2020-03-18 20:48:49 +01:00
|
|
|
|
2017-11-05 11:15:10 +01:00
|
|
|
def stop_consuming(self) -> None:
|
2020-07-05 03:18:11 +02:00
|
|
|
assert self.channel is not None
|
2020-10-09 22:50:53 +02:00
|
|
|
assert self.is_consuming
|
|
|
|
self.is_consuming = False
|
2013-02-15 17:03:28 +01:00
|
|
|
self.channel.stop_consuming()
|
2013-01-18 23:15:23 +01:00
|
|
|
|
2021-02-12 08:19:30 +01:00
|
|
|
|
2019-01-31 08:10:34 +01:00
|
|
|
# Patch pika.adapters.tornado_connection.TornadoConnection so that a socket error doesn't
|
2013-04-17 16:11:43 +02:00
|
|
|
# throw an exception and disconnect the tornado process from the rabbitmq
|
|
|
|
# queue. Instead, just re-connect as usual
|
2019-01-31 08:10:34 +01:00
|
|
|
class ExceptionFreeTornadoConnection(pika.adapters.tornado_connection.TornadoConnection):
|
2017-11-05 11:15:10 +01:00
|
|
|
def _adapter_disconnect(self) -> None:
|
2013-04-17 16:11:43 +02:00
|
|
|
try:
|
2021-08-03 01:02:27 +02:00
|
|
|
super()._adapter_disconnect() # type: ignore[misc] # private method missing from stubs
|
2021-02-12 08:19:30 +01:00
|
|
|
except (
|
|
|
|
pika.exceptions.ProbableAuthenticationError,
|
|
|
|
pika.exceptions.ProbableAccessDeniedError,
|
|
|
|
pika.exceptions.IncompatibleProtocolError,
|
|
|
|
):
|
|
|
|
logging.warning(
|
|
|
|
"Caught exception in ExceptionFreeTornadoConnection when \
|
|
|
|
calling _adapter_disconnect, ignoring",
|
|
|
|
exc_info=True,
|
|
|
|
)
|
2013-04-17 16:11:43 +02:00
|
|
|
|
|
|
|
|
2021-08-03 03:42:32 +02:00
|
|
|
class TornadoQueueClient(QueueClient[Channel]):
|
2021-08-03 03:49:54 +02:00
|
|
|
connection: Optional[ExceptionFreeTornadoConnection]
|
|
|
|
|
2013-01-18 23:15:23 +01:00
|
|
|
# Based on:
|
2016-04-30 01:00:06 +02:00
|
|
|
# https://pika.readthedocs.io/en/0.9.8/examples/asynchronous_consumer_example.html
|
2017-11-05 11:15:10 +01:00
|
|
|
def __init__(self) -> None:
|
2018-03-20 02:08:52 +01:00
|
|
|
super().__init__(
|
|
|
|
# TornadoConnection can process heartbeats, so enable them.
|
queue_processors: Set a bounded prefetch size on rabbitmq queues.
RabbitMQ clients have a setting called prefetch[1], which controls how
many un-acknowledged events the server forwards to the local queue in
the client. The default is 0; this means that when clients first
connect, the server must send them every message in the queue.
This itself may cause unbounded memory usage in the client, but also
has other detrimental effects. While the client is attempting to
process the head of the queue, it may be unable to read from the TCP
socket at the rate that the server is sending to it -- filling the TCP
buffers, and causing the server's writes to block. If the server
blocks for more than 30 seconds, it times out the send, and closes the
connection with:
```
closing AMQP connection <0.30902.126> (127.0.0.1:53870 -> 127.0.0.1:5672):
{writer,send_failed,{error,timeout}}
```
This is https://github.com/pika/pika/issues/753#issuecomment-318119222.
Set a prefetch limit of 100 messages, or the batch size, to better
handle queues which start with large numbers of outstanding events.
Setting prefetch=1 causes significant performance degradation in the
no-op queue worker, to 30% of the prefetch=0 performance. Setting
prefetch=100 achieves 90% of the prefetch=0 performance, and higher
values offer only minor gains above that. For batch workers, their
performance is not notably degraded by prefetch equal to their batch
size, and they cannot function on smaller prefetches than their batch
size.
We also set a 100-count prefetch on Tornado workers, as they are
potentially susceptible to the same effect.
[1] https://www.rabbitmq.com/confirms.html#channel-qos-prefetch
2021-11-09 17:41:19 +01:00
|
|
|
rabbitmq_heartbeat=None,
|
|
|
|
# Only ask for 100 un-acknowledged messages at once from
|
|
|
|
# the server, rather than an unbounded number.
|
|
|
|
prefetch=100,
|
2021-02-12 08:19:30 +01:00
|
|
|
)
|
2021-08-03 03:42:32 +02:00
|
|
|
self._on_open_cbs: List[Callable[[Channel], None]] = []
|
2018-03-21 00:37:04 +01:00
|
|
|
self._connection_failure_count = 0
|
2013-01-18 23:15:23 +01:00
|
|
|
|
2018-03-20 02:04:01 +01:00
|
|
|
def _connect(self) -> None:
|
2013-03-11 19:53:41 +01:00
|
|
|
self.log.info("Beginning TornadoQueueClient connection")
|
2013-04-17 16:11:43 +02:00
|
|
|
self.connection = ExceptionFreeTornadoConnection(
|
2013-01-18 23:15:23 +01:00
|
|
|
self._get_parameters(),
|
2021-02-12 08:19:30 +01:00
|
|
|
on_open_callback=self._on_open,
|
|
|
|
on_open_error_callback=self._on_connection_open_error,
|
|
|
|
on_close_callback=self._on_connection_closed,
|
2017-11-29 23:54:38 +01:00
|
|
|
)
|
2013-01-18 23:15:23 +01:00
|
|
|
|
2017-11-05 11:15:10 +01:00
|
|
|
def _reconnect(self) -> None:
|
2013-03-11 19:53:41 +01:00
|
|
|
self.connection = None
|
|
|
|
self.channel = None
|
|
|
|
self.queues = set()
|
2017-11-30 00:40:45 +01:00
|
|
|
self.log.warning("TornadoQueueClient attempting to reconnect to RabbitMQ")
|
2013-04-17 18:11:28 +02:00
|
|
|
self._connect()
|
2013-03-11 19:53:41 +01:00
|
|
|
|
2017-11-30 00:40:45 +01:00
|
|
|
CONNECTION_RETRY_SECS = 2
|
2016-11-29 07:22:02 +01:00
|
|
|
|
2018-03-21 00:37:04 +01:00
|
|
|
# When the RabbitMQ server is restarted, it's normal for it to
|
|
|
|
# take a few seconds to come back; we'll retry a few times and all
|
|
|
|
# will be well. So for the first few failures, we report only at
|
|
|
|
# "warning" level, avoiding an email to the server admin.
|
|
|
|
#
|
|
|
|
# A loss of an existing connection starts a retry loop just like a
|
|
|
|
# failed connection attempt, so it counts as the first failure.
|
|
|
|
#
|
|
|
|
# On an unloaded test system, a RabbitMQ restart takes about 6s,
|
|
|
|
# potentially causing 4 failures. We add some headroom above that.
|
|
|
|
CONNECTION_FAILURES_BEFORE_NOTIFY = 10
|
|
|
|
|
2021-02-12 08:19:30 +01:00
|
|
|
def _on_connection_open_error(
|
2021-08-03 03:55:53 +02:00
|
|
|
self, connection: pika.connection.Connection, reason: Union[str, Exception]
|
2021-02-12 08:19:30 +01:00
|
|
|
) -> None:
|
2018-03-21 00:37:04 +01:00
|
|
|
self._connection_failure_count += 1
|
2017-11-30 00:40:45 +01:00
|
|
|
retry_secs = self.CONNECTION_RETRY_SECS
|
2020-06-14 04:27:02 +02:00
|
|
|
self.log.log(
|
|
|
|
logging.CRITICAL
|
|
|
|
if self._connection_failure_count > self.CONNECTION_FAILURES_BEFORE_NOTIFY
|
|
|
|
else logging.WARNING,
|
|
|
|
"TornadoQueueClient couldn't connect to RabbitMQ, retrying in %d secs...",
|
|
|
|
retry_secs,
|
|
|
|
)
|
2022-03-17 21:42:25 +01:00
|
|
|
ioloop.IOLoop.current().call_later(retry_secs, self._reconnect)
|
2013-03-11 19:53:41 +01:00
|
|
|
|
2021-02-12 08:19:30 +01:00
|
|
|
def _on_connection_closed(
|
|
|
|
self, connection: pika.connection.Connection, reason: Exception
|
|
|
|
) -> None:
|
2022-03-18 08:34:10 +01:00
|
|
|
if self.connection is None:
|
|
|
|
return
|
2018-03-21 00:37:04 +01:00
|
|
|
self._connection_failure_count = 1
|
2017-11-30 00:40:45 +01:00
|
|
|
retry_secs = self.CONNECTION_RETRY_SECS
|
2020-06-14 04:27:02 +02:00
|
|
|
self.log.warning(
|
|
|
|
"TornadoQueueClient lost connection to RabbitMQ, reconnecting in %d secs...",
|
|
|
|
retry_secs,
|
|
|
|
)
|
2022-03-17 21:42:25 +01:00
|
|
|
ioloop.IOLoop.current().call_later(retry_secs, self._reconnect)
|
2013-03-11 19:53:41 +01:00
|
|
|
|
2017-11-30 00:58:52 +01:00
|
|
|
def _on_open(self, connection: pika.connection.Connection) -> None:
|
2021-08-03 03:49:54 +02:00
|
|
|
assert self.connection is not None
|
2018-03-21 00:37:04 +01:00
|
|
|
self._connection_failure_count = 0
|
2018-03-20 03:06:19 +01:00
|
|
|
try:
|
2021-02-12 08:19:30 +01:00
|
|
|
self.connection.channel(on_open_callback=self._on_channel_open)
|
2018-03-20 03:06:19 +01:00
|
|
|
except pika.exceptions.ConnectionClosed:
|
|
|
|
# The connection didn't stay open long enough for this code to get to it.
|
|
|
|
# Let _on_connection_closed deal with trying again.
|
|
|
|
self.log.warning("TornadoQueueClient couldn't open channel: connection already closed")
|
2017-11-30 00:58:52 +01:00
|
|
|
|
2021-08-03 03:42:32 +02:00
|
|
|
def _on_channel_open(self, channel: Channel) -> None:
|
2017-11-30 00:58:52 +01:00
|
|
|
self.channel = channel
|
|
|
|
for callback in self._on_open_cbs:
|
2020-07-05 03:18:11 +02:00
|
|
|
callback(channel)
|
2017-11-30 00:58:52 +01:00
|
|
|
self._reconnect_consumer_callbacks()
|
2021-02-12 08:20:45 +01:00
|
|
|
self.log.info("TornadoQueueClient connected")
|
2017-11-30 00:58:52 +01:00
|
|
|
|
2021-08-03 03:49:54 +02:00
|
|
|
def close(self) -> None:
|
|
|
|
if self.connection is not None:
|
|
|
|
self.connection.close()
|
2022-03-18 08:34:10 +01:00
|
|
|
self.connection = None
|
2021-08-03 03:49:54 +02:00
|
|
|
|
2021-08-03 03:54:30 +02:00
|
|
|
def ensure_queue(self, queue_name: str, callback: Callable[[Channel], object]) -> None:
|
2021-11-15 21:03:55 +01:00
|
|
|
def set_qos(frame: Any) -> None:
|
2020-07-05 03:18:11 +02:00
|
|
|
assert self.channel is not None
|
2013-01-29 00:00:02 +01:00
|
|
|
self.queues.add(queue_name)
|
2021-11-15 21:03:55 +01:00
|
|
|
self.channel.basic_qos(prefetch_count=self.prefetch, callback=finish)
|
|
|
|
|
|
|
|
def finish(frame: Any) -> None:
|
|
|
|
assert self.channel is not None
|
2020-07-05 03:18:11 +02:00
|
|
|
callback(self.channel)
|
2013-01-29 00:00:02 +01:00
|
|
|
|
|
|
|
if queue_name not in self.queues:
|
2013-04-16 16:01:18 +02:00
|
|
|
# If we're not connected yet, send this message
|
|
|
|
# once we have created the channel
|
|
|
|
if not self.ready():
|
2020-07-05 03:18:11 +02:00
|
|
|
self._on_open_cbs.append(lambda channel: self.ensure_queue(queue_name, callback))
|
2013-04-16 16:01:18 +02:00
|
|
|
return
|
|
|
|
|
2020-07-05 03:18:11 +02:00
|
|
|
assert self.channel is not None
|
2021-11-15 21:03:55 +01:00
|
|
|
self.channel.queue_declare(queue=queue_name, durable=True, callback=set_qos)
|
2013-01-29 00:00:02 +01:00
|
|
|
else:
|
2020-07-05 03:18:11 +02:00
|
|
|
assert self.channel is not None
|
|
|
|
callback(self.channel)
|
2013-03-19 19:29:22 +01:00
|
|
|
|
2021-02-12 08:19:30 +01:00
|
|
|
def start_json_consumer(
|
|
|
|
self,
|
|
|
|
queue_name: str,
|
|
|
|
callback: Callable[[List[Dict[str, Any]]], None],
|
|
|
|
batch_size: int = 1,
|
|
|
|
timeout: Optional[int] = None,
|
|
|
|
) -> None:
|
|
|
|
def wrapped_consumer(
|
2021-08-03 03:42:32 +02:00
|
|
|
ch: Channel,
|
2021-02-12 08:19:30 +01:00
|
|
|
method: Basic.Deliver,
|
|
|
|
properties: pika.BasicProperties,
|
|
|
|
body: bytes,
|
|
|
|
) -> None:
|
2021-08-03 01:02:27 +02:00
|
|
|
assert method.delivery_tag is not None
|
2020-10-10 05:24:35 +02:00
|
|
|
callback([orjson.loads(body)])
|
2013-03-19 19:29:22 +01:00
|
|
|
ch.basic_ack(delivery_tag=method.delivery_tag)
|
|
|
|
|
2020-10-10 05:24:35 +02:00
|
|
|
assert batch_size == 1
|
|
|
|
assert timeout is None
|
|
|
|
self.consumers[queue_name].add(wrapped_consumer)
|
|
|
|
|
2013-03-19 19:29:22 +01:00
|
|
|
if not self.ready():
|
|
|
|
return
|
|
|
|
|
2020-07-05 03:18:11 +02:00
|
|
|
self.ensure_queue(
|
|
|
|
queue_name,
|
|
|
|
lambda channel: channel.basic_consume(
|
|
|
|
queue_name,
|
|
|
|
wrapped_consumer,
|
|
|
|
consumer_tag=self._generate_ctag(queue_name),
|
|
|
|
),
|
|
|
|
)
|
2013-03-25 20:37:00 +01:00
|
|
|
|
2021-02-12 08:19:30 +01:00
|
|
|
|
2022-04-16 02:30:58 +02:00
|
|
|
thread_data = threading.local()
|
2021-02-12 08:19:30 +01:00
|
|
|
|
|
|
|
|
2021-08-03 03:49:54 +02:00
|
|
|
def get_queue_client() -> Union[SimpleQueueClient, TornadoQueueClient]:
|
2022-04-16 02:30:58 +02:00
|
|
|
if not hasattr(thread_data, "queue_client"):
|
|
|
|
if not settings.USING_RABBITMQ:
|
2020-07-05 03:18:11 +02:00
|
|
|
raise RuntimeError("Cannot get a queue client without USING_RABBITMQ")
|
2022-04-16 02:30:58 +02:00
|
|
|
thread_data.queue_client = SimpleQueueClient()
|
2013-10-23 22:33:59 +02:00
|
|
|
|
2022-04-16 02:30:58 +02:00
|
|
|
return thread_data.queue_client
|
2013-03-25 20:37:00 +01:00
|
|
|
|
2021-02-12 08:19:30 +01:00
|
|
|
|
2022-04-16 02:30:58 +02:00
|
|
|
def set_queue_client(queue_client: Union[SimpleQueueClient, TornadoQueueClient]) -> None:
|
|
|
|
thread_data.queue_client = queue_client
|
2013-03-25 20:37:00 +01:00
|
|
|
|
2021-02-12 08:19:30 +01:00
|
|
|
|
2020-07-05 03:18:11 +02:00
|
|
|
def queue_json_publish(
|
|
|
|
queue_name: str,
|
|
|
|
event: Dict[str, Any],
|
|
|
|
processor: Optional[Callable[[Any], None]] = None,
|
|
|
|
) -> None:
|
2022-04-16 02:30:58 +02:00
|
|
|
if settings.USING_RABBITMQ:
|
|
|
|
get_queue_client().json_publish(queue_name, event)
|
|
|
|
elif processor:
|
|
|
|
processor(event)
|
|
|
|
else:
|
|
|
|
# Must be imported here: A top section import leads to circular imports
|
|
|
|
from zerver.worker.queue_processors import get_worker
|
2021-02-12 08:19:30 +01:00
|
|
|
|
2022-04-16 02:30:58 +02:00
|
|
|
get_worker(queue_name).consume_single_event(event)
|
2017-08-18 07:56:53 +02:00
|
|
|
|
2021-02-12 08:19:30 +01:00
|
|
|
|
|
|
|
def retry_event(
|
|
|
|
queue_name: str, event: Dict[str, Any], failure_processor: Callable[[Dict[str, Any]], None]
|
|
|
|
) -> None:
|
2021-02-12 08:20:45 +01:00
|
|
|
if "failed_tries" not in event:
|
|
|
|
event["failed_tries"] = 0
|
|
|
|
event["failed_tries"] += 1
|
|
|
|
if event["failed_tries"] > MAX_REQUEST_RETRIES:
|
2017-08-18 07:56:53 +02:00
|
|
|
failure_processor(event)
|
|
|
|
else:
|
2022-11-03 17:12:33 +01:00
|
|
|
queue_json_publish(queue_name, event)
|