2020-08-10 19:40:36 +02:00
|
|
|
from functools import lru_cache
|
2020-08-10 20:34:05 +02:00
|
|
|
from typing import Any, Container, Dict, Iterable, List, Mapping, Optional, Sequence, Tuple, Union
|
2020-09-14 22:34:44 +02:00
|
|
|
from urllib.parse import urlparse
|
2020-08-10 18:40:38 +02:00
|
|
|
|
2020-08-07 01:09:47 +02:00
|
|
|
import orjson
|
2020-08-10 18:40:38 +02:00
|
|
|
import requests
|
|
|
|
from django.conf import settings
|
2020-08-10 20:34:05 +02:00
|
|
|
from requests.adapters import ConnectionError, HTTPAdapter
|
|
|
|
from requests.models import PreparedRequest, Response
|
2022-01-13 22:02:54 +01:00
|
|
|
from urllib3.util import Retry
|
2020-08-10 18:40:38 +02:00
|
|
|
|
|
|
|
from zerver.lib.queue import queue_json_publish
|
|
|
|
from zerver.models import Client, Realm, UserProfile
|
|
|
|
from zerver.tornado.sharding import get_tornado_port, get_tornado_uri, notify_tornado_queue_name
|
|
|
|
|
2020-08-10 19:40:36 +02:00
|
|
|
|
2020-08-10 20:34:05 +02:00
|
|
|
class TornadoAdapter(HTTPAdapter):
|
|
|
|
def __init__(self) -> None:
|
tornado: Retry POST requests from Django to Tornado.
While urllib3 retries all connection errors, it only retries a subset
of read errors, since not all requests are safe to retry if they are
not idempotent, and the far side may have already processed them once.
By default, the only methods that are urllib3 retries read errors on
are GET, TRACE, DELETE, OPTIONS, HEAD, and PUT. However, all of the
requests into Tornado from Django are POST requests, which limits the
effectiveness of bb754e09028a.
POST requests to `/api/v1/events/internal` are safe to retry; at worst,
they will result in another event queue, which is low cost and will be
GC'd in short order.
POST requests to `/notify_tornado` are _not_ safe to retry, but this
codepath is only used if USING_RABBITMQ is False, which only occurs
during testing.
Enable retries for read errors during all POSTs to Tornado, to better
handle Tornado restarts without 500's.
2020-09-15 20:46:36 +02:00
|
|
|
# All of the POST requests we make to Tornado are safe to
|
|
|
|
# retry; allow retries of them, which is not the default.
|
2022-01-22 01:34:27 +01:00
|
|
|
retry_methods = Retry.DEFAULT_ALLOWED_METHODS | {"POST"}
|
|
|
|
retry = Retry(total=3, backoff_factor=1, allowed_methods=retry_methods)
|
2020-09-02 20:26:49 +02:00
|
|
|
super().__init__(max_retries=retry)
|
2020-08-10 20:34:05 +02:00
|
|
|
|
|
|
|
def send(
|
|
|
|
self,
|
|
|
|
request: PreparedRequest,
|
|
|
|
stream: bool = False,
|
2020-08-10 20:35:02 +02:00
|
|
|
timeout: Union[None, float, Tuple[float, float], Tuple[float, None]] = 0.5,
|
2020-08-10 20:34:05 +02:00
|
|
|
verify: Union[bool, str] = True,
|
|
|
|
cert: Union[None, bytes, str, Container[Union[bytes, str]]] = None,
|
|
|
|
proxies: Optional[Mapping[str, str]] = None,
|
|
|
|
) -> Response:
|
2020-10-16 01:42:47 +02:00
|
|
|
# Don't talk to Tornado through proxies, which only allow
|
|
|
|
# requests to external hosts.
|
|
|
|
proxies = {}
|
2020-08-10 20:34:05 +02:00
|
|
|
try:
|
2021-02-12 08:19:30 +01:00
|
|
|
resp = super().send(
|
|
|
|
request, stream=stream, timeout=timeout, verify=verify, cert=cert, proxies=proxies
|
|
|
|
)
|
2020-08-10 20:34:05 +02:00
|
|
|
except ConnectionError:
|
2020-09-14 22:34:44 +02:00
|
|
|
parsed_url = urlparse(request.url)
|
2021-02-12 08:19:30 +01:00
|
|
|
logfile = (
|
|
|
|
f"tornado-{parsed_url.port}.log"
|
|
|
|
if settings.TORNADO_PROCESSES > 1
|
|
|
|
else "tornado.log"
|
|
|
|
)
|
2020-08-10 20:34:05 +02:00
|
|
|
raise ConnectionError(
|
|
|
|
f"Django cannot connect to Tornado server ({request.url}); "
|
2020-09-14 22:34:44 +02:00
|
|
|
f"check {settings.ERROR_FILE_LOG_PATH} and {logfile}"
|
2020-08-10 20:34:05 +02:00
|
|
|
)
|
|
|
|
resp.raise_for_status()
|
|
|
|
return resp
|
|
|
|
|
2021-02-12 08:19:30 +01:00
|
|
|
|
2020-08-10 19:40:36 +02:00
|
|
|
@lru_cache(None)
|
|
|
|
def requests_client() -> requests.Session:
|
|
|
|
c = requests.Session()
|
2020-08-10 20:34:05 +02:00
|
|
|
adapter = TornadoAdapter()
|
2020-08-10 21:09:38 +02:00
|
|
|
for scheme in ("https://", "http://"):
|
|
|
|
c.mount(scheme, adapter)
|
2020-08-10 19:40:36 +02:00
|
|
|
return c
|
2020-08-10 18:40:38 +02:00
|
|
|
|
2021-02-12 08:19:30 +01:00
|
|
|
|
|
|
|
def request_event_queue(
|
|
|
|
user_profile: UserProfile,
|
|
|
|
user_client: Client,
|
|
|
|
apply_markdown: bool,
|
|
|
|
client_gravatar: bool,
|
|
|
|
slim_presence: bool,
|
|
|
|
queue_lifespan_secs: int,
|
2021-04-30 00:15:33 +02:00
|
|
|
event_types: Optional[Sequence[str]] = None,
|
2021-02-12 08:19:30 +01:00
|
|
|
all_public_streams: bool = False,
|
|
|
|
narrow: Iterable[Sequence[str]] = [],
|
|
|
|
bulk_message_deletion: bool = False,
|
2021-04-18 18:12:35 +02:00
|
|
|
stream_typing_notifications: bool = False,
|
2021-07-24 19:51:25 +02:00
|
|
|
user_settings_object: bool = False,
|
2021-02-12 08:19:30 +01:00
|
|
|
) -> Optional[str]:
|
2020-08-10 18:40:38 +02:00
|
|
|
|
2020-09-17 00:30:45 +02:00
|
|
|
if not settings.USING_TORNADO:
|
2020-08-10 18:53:02 +02:00
|
|
|
return None
|
2020-08-10 18:40:38 +02:00
|
|
|
|
2020-08-10 18:53:02 +02:00
|
|
|
tornado_uri = get_tornado_uri(user_profile.realm)
|
2021-02-12 08:19:30 +01:00
|
|
|
req = {
|
2021-02-12 08:20:45 +01:00
|
|
|
"dont_block": "true",
|
|
|
|
"apply_markdown": orjson.dumps(apply_markdown),
|
|
|
|
"client_gravatar": orjson.dumps(client_gravatar),
|
|
|
|
"slim_presence": orjson.dumps(slim_presence),
|
|
|
|
"all_public_streams": orjson.dumps(all_public_streams),
|
|
|
|
"client": "internal",
|
|
|
|
"user_profile_id": user_profile.id,
|
|
|
|
"user_client": user_client.name,
|
|
|
|
"narrow": orjson.dumps(narrow),
|
|
|
|
"secret": settings.SHARED_SECRET,
|
|
|
|
"lifespan_secs": queue_lifespan_secs,
|
|
|
|
"bulk_message_deletion": orjson.dumps(bulk_message_deletion),
|
2021-04-18 18:12:35 +02:00
|
|
|
"stream_typing_notifications": orjson.dumps(stream_typing_notifications),
|
2021-07-24 19:51:25 +02:00
|
|
|
"user_settings_object": orjson.dumps(user_settings_object),
|
2021-02-12 08:19:30 +01:00
|
|
|
}
|
2020-08-10 18:40:38 +02:00
|
|
|
|
2020-08-10 18:53:02 +02:00
|
|
|
if event_types is not None:
|
2021-02-12 08:20:45 +01:00
|
|
|
req["event_types"] = orjson.dumps(event_types)
|
2020-08-10 18:40:38 +02:00
|
|
|
|
2021-02-12 08:20:45 +01:00
|
|
|
resp = requests_client().post(tornado_uri + "/api/v1/events/internal", data=req)
|
|
|
|
return resp.json()["queue_id"]
|
2020-08-10 18:40:38 +02:00
|
|
|
|
2021-02-12 08:19:30 +01:00
|
|
|
|
|
|
|
def get_user_events(
|
|
|
|
user_profile: UserProfile, queue_id: str, last_event_id: int
|
|
|
|
) -> List[Dict[str, Any]]:
|
2020-09-17 00:30:45 +02:00
|
|
|
if not settings.USING_TORNADO:
|
2020-08-10 18:53:02 +02:00
|
|
|
return []
|
2020-08-10 18:40:38 +02:00
|
|
|
|
2020-08-10 18:53:02 +02:00
|
|
|
tornado_uri = get_tornado_uri(user_profile.realm)
|
|
|
|
post_data: Dict[str, Any] = {
|
2021-02-12 08:20:45 +01:00
|
|
|
"queue_id": queue_id,
|
|
|
|
"last_event_id": last_event_id,
|
|
|
|
"dont_block": "true",
|
|
|
|
"user_profile_id": user_profile.id,
|
|
|
|
"secret": settings.SHARED_SECRET,
|
|
|
|
"client": "internal",
|
2020-08-10 18:53:02 +02:00
|
|
|
}
|
2021-02-12 08:20:45 +01:00
|
|
|
resp = requests_client().post(tornado_uri + "/api/v1/events/internal", data=post_data)
|
|
|
|
return resp.json()["events"]
|
2020-08-10 18:40:38 +02:00
|
|
|
|
2021-02-12 08:19:30 +01:00
|
|
|
|
2020-08-10 18:40:38 +02:00
|
|
|
def send_notification_http(realm: Realm, data: Mapping[str, Any]) -> None:
|
2020-09-17 00:30:45 +02:00
|
|
|
if not settings.USING_TORNADO or settings.RUNNING_INSIDE_TORNADO:
|
2021-09-28 01:23:25 +02:00
|
|
|
# To allow the backend test suite to not require a separate
|
|
|
|
# Tornado process, we simply call the process_notification
|
|
|
|
# handler directly rather than making the notify_tornado HTTP
|
|
|
|
# request. It would perhaps be better to instead implement
|
|
|
|
# this via some sort of `responses` module configuration, but
|
|
|
|
# perhaps it's more readable to have the logic live here.
|
|
|
|
#
|
|
|
|
# We use an import local to this function to prevent this hack
|
|
|
|
# from creating import cycles.
|
|
|
|
from zerver.tornado.event_queue import process_notification
|
|
|
|
|
2020-08-10 18:53:02 +02:00
|
|
|
process_notification(data)
|
|
|
|
else:
|
2020-08-10 18:40:38 +02:00
|
|
|
tornado_uri = get_tornado_uri(realm)
|
2020-08-10 19:40:36 +02:00
|
|
|
requests_client().post(
|
|
|
|
tornado_uri + "/notify_tornado",
|
2020-08-07 01:09:47 +02:00
|
|
|
data=dict(data=orjson.dumps(data), secret=settings.SHARED_SECRET),
|
2020-08-10 19:40:36 +02:00
|
|
|
)
|
2020-08-10 18:40:38 +02:00
|
|
|
|
2021-02-12 08:19:30 +01:00
|
|
|
|
2020-09-25 21:53:00 +02:00
|
|
|
# The core function for sending an event from Django to Tornado (which
|
|
|
|
# will then push it to web and mobile clients for the target users).
|
|
|
|
# By convention, send_event should only be called from
|
|
|
|
# zerver/lib/actions.py, which helps make it easy to find event
|
|
|
|
# generation code.
|
|
|
|
#
|
|
|
|
# Every call point should be covered by a test in `test_events.py`,
|
|
|
|
# with the schema verified in `zerver/lib/event_schema.py`.
|
|
|
|
#
|
|
|
|
# See https://zulip.readthedocs.io/en/latest/subsystems/events-system.html
|
2021-02-12 08:19:30 +01:00
|
|
|
def send_event(
|
|
|
|
realm: Realm, event: Mapping[str, Any], users: Union[Iterable[int], Iterable[Mapping[str, Any]]]
|
|
|
|
) -> None:
|
message send: Optimize how user data is sent to Tornado.
Before this commit, we used to pre-calculate flags for user data and send
it to Tornado, like so:
```
{
"id": 10,
"flags": ["mentioned"],
"mentioned": true,
"online_push_enabled": false,
"stream_push_notify": false,
"stream_email_notify": false,
"wildcard_mention_notify": false,
"sender_is_muted": false,
}
```
This has the benefit of simplifying the logic in the event_queue code a bit.
However, because we sent such an object for each user receiving the event,
the string keys (like "stream_email_notify") get duplicated in the JSON
blob that is sent to Tornado.
For 1000 users, this data may take up upto ~190KB of space, which can
cause performance degradation in large organisations.
Hence, as an alternative, we send just the list of user_ids fitting
each notification criteria, and then calculate the flags in Tornado.
This brings down the space to ~60KB for 1000 users.
This commit reverts parts of following commits:
- 2179275
- 40cd6b5
We will in the future, add helpers to create `UserMessageNotificationsData`
objects from these lists, so as to avoid code duplication.
2021-06-18 19:49:21 +02:00
|
|
|
"""`users` is a list of user IDs, or in some special cases like message
|
|
|
|
send/update or embeds, dictionaries containing extra data."""
|
2020-08-10 18:40:38 +02:00
|
|
|
port = get_tornado_port(realm)
|
2021-02-12 08:19:30 +01:00
|
|
|
queue_json_publish(
|
|
|
|
notify_tornado_queue_name(port),
|
|
|
|
dict(event=event, users=list(users)),
|
|
|
|
lambda *args, **kwargs: send_notification_http(realm, *args, **kwargs),
|
|
|
|
)
|