rabbitmq: Set a short TCP keepalive idle time on BlockingConnection.

The code comment explains this issue in some detail, but essentially
in Kubernetes and Docker Swarm systems, the container overlayer
network has a relatively short TCP idle lifetime (about 15 minutes),
which can lead to it killing the connection between Tornado and
RabbitMQ.

We fix this by setting a TCP keepalive on that connection shorter than
15 minutes.

Fixes #10776.
This commit is contained in:
Andrew Szeto 2019-07-03 19:08:19 -07:00 committed by Tim Abbott
parent 7fc72dff44
commit b312001fd9
1 changed files with 22 additions and 2 deletions

View File

@ -47,12 +47,32 @@ class SimpleQueueClient:
self._connect()
def _get_parameters(self) -> pika.ConnectionParameters:
# We explicitly disable the RabbitMQ heartbeat feature, since
# it doesn't make sense with BlockingConnection
credentials = pika.PlainCredentials(settings.RABBITMQ_USERNAME,
settings.RABBITMQ_PASSWORD)
# With BlockingConnection, we are passed
# self.rabbitmq_heartbeat=0, which asks to explicitly disable
# the RabbitMQ heartbeat feature. This is correct since that
# heartbeat doesn't make sense with BlockingConnection (we do
# need it for TornadoConnection).
#
# Where we've disabled RabbitMQ's heartbeat, the only
# keepalive on this connection is the TCP keepalive (defaults:
# `/proc/sys/net/ipv4/tcp_keepalive_*`). On most Linux
# systems, the default is to start sending keepalive packets
# after TCP_KEEPIDLE (7200 seconds) of inactivity; after that
# point, it send them every TCP_KEEPINTVL (typically 75s).
# Some Kubernetes / Docker Swarm networks can kill "idle" TCP
# connections after as little as ~15 minutes of inactivity.
# To avoid this killing our RabbitMQ connections, we set
# TCP_KEEPIDLE to something significantly below 15 minutes.
tcp_options = None
if self.rabbitmq_heartbeat == 0:
tcp_options = dict(TCP_KEEPIDLE=60 * 5)
return pika.ConnectionParameters(settings.RABBITMQ_HOST,
heartbeat=self.rabbitmq_heartbeat,
tcp_options=tcp_options,
credentials=credentials)
def _generate_ctag(self, queue_name: str) -> str: