tornado: Extract functions for Tornado queue names.

This moves all control for what queue to use for which realm in our
Tornado system to just the sharding.py file; no actual sharding is
done yet.
This commit is contained in:
Tim Abbott 2018-11-02 16:06:13 -07:00
parent 152c44b6d2
commit 0cac7e1cd3
5 changed files with 24 additions and 7 deletions

View File

@ -54,13 +54,14 @@ queues = {
'email_senders',
'missedmessage_mobile_notifications',
'outgoing_webhooks',
'notify_tornado',
'signups',
'slow_queries',
'tornado_return',
'user_activity'
'user_activity_interval',
'user_presence',
# These queues may not be present if settings.TORNADO_PROCESSES > 1
'notify_tornado',
'tornado_return',
}
for queue_name in queues:

View File

@ -25,6 +25,7 @@ from zerver.tornado.application import create_tornado_application, \
from zerver.tornado.autoreload import start as zulip_autoreload_start
from zerver.tornado.event_queue import add_client_gc_hook, \
missedmessage_hook, process_notification, setup_event_queue
from zerver.tornado.sharding import notify_tornado_queue_name, tornado_return_queue_name
from zerver.tornado.socket import respond_send_message
if settings.USING_RABBITMQ:
@ -90,8 +91,10 @@ class Command(BaseCommand):
if settings.USING_RABBITMQ:
queue_client = get_queue_client()
# Process notifications received via RabbitMQ
queue_client.register_json_consumer('notify_tornado', process_notification)
queue_client.register_json_consumer('tornado_return', respond_send_message)
queue_client.register_json_consumer(notify_tornado_queue_name(int(port)),
process_notification)
queue_client.register_json_consumer(tornado_return_queue_name(int(port)),
respond_send_message)
try:
# Application is an instance of Django's standard wsgi handler.

View File

@ -30,7 +30,8 @@ from zerver.lib.queue import queue_json_publish
from zerver.lib.request import JsonableError
from zerver.tornado.descriptors import clear_descriptor_by_handler_id, set_descriptor_by_handler_id
from zerver.tornado.exceptions import BadEventQueueIdError
from zerver.tornado.sharding import get_tornado_uri
from zerver.tornado.sharding import get_tornado_uri, get_tornado_port, \
notify_tornado_queue_name, tornado_return_queue_name
import copy
requests_client = requests.Session()
@ -1005,6 +1006,7 @@ def send_event(realm: Realm, event: Mapping[str, Any],
"""`users` is a list of user IDs, or in the case of `message` type
events, a list of dicts describing the users and metadata about
the user/message pair."""
queue_json_publish("notify_tornado",
port = get_tornado_port(realm)
queue_json_publish(notify_tornado_queue_name(port),
dict(event=event, users=users),
lambda *args, **kwargs: send_notification_http(realm, *args, **kwargs))

View File

@ -13,3 +13,13 @@ def get_tornado_uri(realm: Realm) -> str:
port = get_tornado_port(realm)
return "http://127.0.0.1:%d" % (port,)
def notify_tornado_queue_name(port: int) -> str:
if settings.TORNADO_PROCESSES == 1:
return "notify_tornado"
return "notify_tornado_port_%d" % (port,)
def tornado_return_queue_name(port: int) -> str:
if settings.TORNADO_PROCESSES == 1:
return "tornado_return"
return "tornado_return_port_%d" % (port,)

View File

@ -29,6 +29,7 @@ from zerver.lib.redis_utils import get_redis_client
from zerver.lib.sessions import get_session_user
from zerver.tornado.event_queue import get_client_descriptor
from zerver.tornado.exceptions import BadEventQueueIdError
from zerver.tornado.sharding import tornado_return_queue_name
logger = logging.getLogger('zulip.socket')
@ -220,7 +221,7 @@ class SocketConnection(sockjs.tornado.SockJSConnection):
req_id=msg['req_id'],
server_meta=dict(user_id=self.session.user_profile.id,
client_id=self.client_id,
return_queue="tornado_return",
return_queue=tornado_return_queue_name(self.port),
log_data=log_data,
request_environ=request_environ)))