2022-09-22 22:09:34 +02:00
|
|
|
from collections import defaultdict
|
2020-08-10 19:40:36 +02:00
|
|
|
from functools import lru_cache
|
2022-05-03 03:58:44 +02:00
|
|
|
from typing import Any, Dict, Iterable, List, Mapping, Optional, Sequence, Tuple, Union
|
2020-09-14 22:34:44 +02:00
|
|
|
from urllib.parse import urlparse
|
2020-08-10 18:40:38 +02:00
|
|
|
|
2020-08-07 01:09:47 +02:00
|
|
|
import orjson
|
2020-08-10 18:40:38 +02:00
|
|
|
import requests
|
|
|
|
from django.conf import settings
|
django_api: Extract send_event_on_commit helper.
django-stubs 4.2.1 gives transaction.on_commit a more accurate type
annotation, but this exposed that mypy can’t handle the lambda default
parameters that we use to recapture loop variables such as
for stream_id in public_stream_ids:
peer_user_ids = …
event = …
transaction.on_commit(
lambda event=event, peer_user_ids=peer_user_ids: send_event(
realm, event, peer_user_ids
)
)
https://github.com/python/mypy/issues/15459
A workaround that mypy accepts is
transaction.on_commit(
(
lambda event, peer_user_ids: lambda: send_event(
realm, event, peer_user_ids
)
)(event, peer_user_ids)
)
But that’s kind of ugly and potentially error-prone, so let’s make a
helper function for this very common pattern.
send_event_on_commit(realm, event, peer_user_ids)
Signed-off-by: Anders Kaseorg <anders@zulip.com>
2023-06-17 20:53:07 +02:00
|
|
|
from django.db import transaction
|
2020-08-10 20:34:05 +02:00
|
|
|
from requests.adapters import ConnectionError, HTTPAdapter
|
|
|
|
from requests.models import PreparedRequest, Response
|
2023-04-13 02:05:54 +02:00
|
|
|
from returns.curry import partial
|
2023-10-12 19:43:45 +02:00
|
|
|
from typing_extensions import override
|
2022-01-13 22:02:54 +01:00
|
|
|
from urllib3.util import Retry
|
2020-08-10 18:40:38 +02:00
|
|
|
|
|
|
|
from zerver.lib.queue import queue_json_publish
|
|
|
|
from zerver.models import Client, Realm, UserProfile
|
2022-09-22 22:09:34 +02:00
|
|
|
from zerver.tornado.sharding import (
|
|
|
|
get_realm_tornado_ports,
|
2023-04-26 02:40:54 +02:00
|
|
|
get_tornado_url,
|
2022-09-22 22:09:34 +02:00
|
|
|
get_user_id_tornado_port,
|
|
|
|
get_user_tornado_port,
|
|
|
|
notify_tornado_queue_name,
|
|
|
|
)
|
2020-08-10 18:40:38 +02:00
|
|
|
|
2020-08-10 19:40:36 +02:00
|
|
|
|
2020-08-10 20:34:05 +02:00
|
|
|
class TornadoAdapter(HTTPAdapter):
|
|
|
|
def __init__(self) -> None:
|
tornado: Retry POST requests from Django to Tornado.
While urllib3 retries all connection errors, it only retries a subset
of read errors, since not all requests are safe to retry if they are
not idempotent, and the far side may have already processed them once.
By default, the only methods that are urllib3 retries read errors on
are GET, TRACE, DELETE, OPTIONS, HEAD, and PUT. However, all of the
requests into Tornado from Django are POST requests, which limits the
effectiveness of bb754e09028a.
POST requests to `/api/v1/events/internal` are safe to retry; at worst,
they will result in another event queue, which is low cost and will be
GC'd in short order.
POST requests to `/notify_tornado` are _not_ safe to retry, but this
codepath is only used if USING_RABBITMQ is False, which only occurs
during testing.
Enable retries for read errors during all POSTs to Tornado, to better
handle Tornado restarts without 500's.
2020-09-15 20:46:36 +02:00
|
|
|
# All of the POST requests we make to Tornado are safe to
|
|
|
|
# retry; allow retries of them, which is not the default.
|
2022-01-22 01:34:27 +01:00
|
|
|
retry_methods = Retry.DEFAULT_ALLOWED_METHODS | {"POST"}
|
|
|
|
retry = Retry(total=3, backoff_factor=1, allowed_methods=retry_methods)
|
2020-09-02 20:26:49 +02:00
|
|
|
super().__init__(max_retries=retry)
|
2020-08-10 20:34:05 +02:00
|
|
|
|
2023-10-12 19:43:45 +02:00
|
|
|
@override
|
2020-08-10 20:34:05 +02:00
|
|
|
def send(
|
|
|
|
self,
|
|
|
|
request: PreparedRequest,
|
|
|
|
stream: bool = False,
|
2020-08-10 20:35:02 +02:00
|
|
|
timeout: Union[None, float, Tuple[float, float], Tuple[float, None]] = 0.5,
|
2020-08-10 20:34:05 +02:00
|
|
|
verify: Union[bool, str] = True,
|
2022-05-03 03:58:44 +02:00
|
|
|
cert: Union[None, bytes, str, Tuple[Union[bytes, str], Union[bytes, str]]] = None,
|
2020-08-10 20:34:05 +02:00
|
|
|
proxies: Optional[Mapping[str, str]] = None,
|
|
|
|
) -> Response:
|
2020-10-16 01:42:47 +02:00
|
|
|
# Don't talk to Tornado through proxies, which only allow
|
|
|
|
# requests to external hosts.
|
|
|
|
proxies = {}
|
2020-08-10 20:34:05 +02:00
|
|
|
try:
|
2021-02-12 08:19:30 +01:00
|
|
|
resp = super().send(
|
|
|
|
request, stream=stream, timeout=timeout, verify=verify, cert=cert, proxies=proxies
|
|
|
|
)
|
2020-08-10 20:34:05 +02:00
|
|
|
except ConnectionError:
|
2020-09-14 22:34:44 +02:00
|
|
|
parsed_url = urlparse(request.url)
|
2021-02-12 08:19:30 +01:00
|
|
|
logfile = (
|
|
|
|
f"tornado-{parsed_url.port}.log"
|
|
|
|
if settings.TORNADO_PROCESSES > 1
|
|
|
|
else "tornado.log"
|
|
|
|
)
|
2020-08-10 20:34:05 +02:00
|
|
|
raise ConnectionError(
|
|
|
|
f"Django cannot connect to Tornado server ({request.url}); "
|
2020-09-14 22:34:44 +02:00
|
|
|
f"check {settings.ERROR_FILE_LOG_PATH} and {logfile}"
|
2020-08-10 20:34:05 +02:00
|
|
|
)
|
|
|
|
resp.raise_for_status()
|
|
|
|
return resp
|
|
|
|
|
2021-02-12 08:19:30 +01:00
|
|
|
|
2020-08-10 19:40:36 +02:00
|
|
|
@lru_cache(None)
|
|
|
|
def requests_client() -> requests.Session:
|
|
|
|
c = requests.Session()
|
2020-08-10 20:34:05 +02:00
|
|
|
adapter = TornadoAdapter()
|
2020-08-10 21:09:38 +02:00
|
|
|
for scheme in ("https://", "http://"):
|
|
|
|
c.mount(scheme, adapter)
|
2020-08-10 19:40:36 +02:00
|
|
|
return c
|
2020-08-10 18:40:38 +02:00
|
|
|
|
2021-02-12 08:19:30 +01:00
|
|
|
|
|
|
|
def request_event_queue(
|
|
|
|
user_profile: UserProfile,
|
|
|
|
user_client: Client,
|
|
|
|
apply_markdown: bool,
|
|
|
|
client_gravatar: bool,
|
|
|
|
slim_presence: bool,
|
|
|
|
queue_lifespan_secs: int,
|
2021-04-30 00:15:33 +02:00
|
|
|
event_types: Optional[Sequence[str]] = None,
|
2021-02-12 08:19:30 +01:00
|
|
|
all_public_streams: bool = False,
|
|
|
|
narrow: Iterable[Sequence[str]] = [],
|
|
|
|
bulk_message_deletion: bool = False,
|
2021-04-18 18:12:35 +02:00
|
|
|
stream_typing_notifications: bool = False,
|
2021-07-24 19:51:25 +02:00
|
|
|
user_settings_object: bool = False,
|
2022-10-27 19:05:10 +02:00
|
|
|
pronouns_field_type_supported: bool = True,
|
linkifier: Support URL templates for linkifiers.
This swaps out url_format_string from all of our APIs and replaces it
with url_template. Note that the documentation changes in the following
commits will be squashed with this commit.
We change the "url_format" key to "url_template" for the
realm_linkifiers events in event_schema, along with updating
LinkifierDict. "url_template" is the name chosen to normalize
mixed usages of "url_format_string" and "url_format" throughout
the backend.
The markdown processor is updated to stop handling the format string
interpolation and delegate the task template expansion to the uri_template
library instead.
This change affects many test cases. We mostly just replace "%(name)s"
with "{name}", "url_format_string" with "url_template" to make sure that
they still pass. There are some test cases dedicated for testing "%"
escaping, which aren't relevant anymore and are subject to removal.
But for now we keep most of them as-is, and make sure that "%" is always
escaped since we do not use it for variable substitution any more.
Since url_format_string is not populated anymore, a migration is created
to remove this field entirely, and make url_template non-nullable since
we will always populate it. Note that it is possible to have
url_template being null after migration 0422 and before 0424, but
in practice, url_template will not be None after backfilling and the
backend now is always setting url_template.
With the removal of url_format_string, RealmFilter model will now be cleaned
with URL template checks, and the old checks for escapes are removed.
We also modified RealmFilter.clean to skip the validation when the
url_template is invalid. This avoids raising mulitple ValidationError's
when calling full_clean on a linkifier. But we might eventually want to
have a more centric approach to data validation instead of having
the same validation in both the clean method and the validator.
Fixes #23124.
Signed-off-by: Zixuan James Li <p359101898@gmail.com>
2022-10-05 20:55:31 +02:00
|
|
|
linkifier_url_template: bool = False,
|
2021-02-12 08:19:30 +01:00
|
|
|
) -> Optional[str]:
|
2020-09-17 00:30:45 +02:00
|
|
|
if not settings.USING_TORNADO:
|
2020-08-10 18:53:02 +02:00
|
|
|
return None
|
2020-08-10 18:40:38 +02:00
|
|
|
|
2023-04-26 02:40:54 +02:00
|
|
|
tornado_url = get_tornado_url(get_user_tornado_port(user_profile))
|
2021-02-12 08:19:30 +01:00
|
|
|
req = {
|
2021-02-12 08:20:45 +01:00
|
|
|
"dont_block": "true",
|
|
|
|
"apply_markdown": orjson.dumps(apply_markdown),
|
|
|
|
"client_gravatar": orjson.dumps(client_gravatar),
|
|
|
|
"slim_presence": orjson.dumps(slim_presence),
|
|
|
|
"all_public_streams": orjson.dumps(all_public_streams),
|
|
|
|
"client": "internal",
|
|
|
|
"user_profile_id": user_profile.id,
|
|
|
|
"user_client": user_client.name,
|
|
|
|
"narrow": orjson.dumps(narrow),
|
|
|
|
"secret": settings.SHARED_SECRET,
|
|
|
|
"lifespan_secs": queue_lifespan_secs,
|
|
|
|
"bulk_message_deletion": orjson.dumps(bulk_message_deletion),
|
2021-04-18 18:12:35 +02:00
|
|
|
"stream_typing_notifications": orjson.dumps(stream_typing_notifications),
|
2021-07-24 19:51:25 +02:00
|
|
|
"user_settings_object": orjson.dumps(user_settings_object),
|
2022-10-27 19:05:10 +02:00
|
|
|
"pronouns_field_type_supported": orjson.dumps(pronouns_field_type_supported),
|
linkifier: Support URL templates for linkifiers.
This swaps out url_format_string from all of our APIs and replaces it
with url_template. Note that the documentation changes in the following
commits will be squashed with this commit.
We change the "url_format" key to "url_template" for the
realm_linkifiers events in event_schema, along with updating
LinkifierDict. "url_template" is the name chosen to normalize
mixed usages of "url_format_string" and "url_format" throughout
the backend.
The markdown processor is updated to stop handling the format string
interpolation and delegate the task template expansion to the uri_template
library instead.
This change affects many test cases. We mostly just replace "%(name)s"
with "{name}", "url_format_string" with "url_template" to make sure that
they still pass. There are some test cases dedicated for testing "%"
escaping, which aren't relevant anymore and are subject to removal.
But for now we keep most of them as-is, and make sure that "%" is always
escaped since we do not use it for variable substitution any more.
Since url_format_string is not populated anymore, a migration is created
to remove this field entirely, and make url_template non-nullable since
we will always populate it. Note that it is possible to have
url_template being null after migration 0422 and before 0424, but
in practice, url_template will not be None after backfilling and the
backend now is always setting url_template.
With the removal of url_format_string, RealmFilter model will now be cleaned
with URL template checks, and the old checks for escapes are removed.
We also modified RealmFilter.clean to skip the validation when the
url_template is invalid. This avoids raising mulitple ValidationError's
when calling full_clean on a linkifier. But we might eventually want to
have a more centric approach to data validation instead of having
the same validation in both the clean method and the validator.
Fixes #23124.
Signed-off-by: Zixuan James Li <p359101898@gmail.com>
2022-10-05 20:55:31 +02:00
|
|
|
"linkifier_url_template": orjson.dumps(linkifier_url_template),
|
2021-02-12 08:19:30 +01:00
|
|
|
}
|
2020-08-10 18:40:38 +02:00
|
|
|
|
2020-08-10 18:53:02 +02:00
|
|
|
if event_types is not None:
|
2021-02-12 08:20:45 +01:00
|
|
|
req["event_types"] = orjson.dumps(event_types)
|
2020-08-10 18:40:38 +02:00
|
|
|
|
2023-04-26 02:40:54 +02:00
|
|
|
resp = requests_client().post(tornado_url + "/api/v1/events/internal", data=req)
|
2021-02-12 08:20:45 +01:00
|
|
|
return resp.json()["queue_id"]
|
2020-08-10 18:40:38 +02:00
|
|
|
|
2021-02-12 08:19:30 +01:00
|
|
|
|
|
|
|
def get_user_events(
|
|
|
|
user_profile: UserProfile, queue_id: str, last_event_id: int
|
|
|
|
) -> List[Dict[str, Any]]:
|
2020-09-17 00:30:45 +02:00
|
|
|
if not settings.USING_TORNADO:
|
2020-08-10 18:53:02 +02:00
|
|
|
return []
|
2020-08-10 18:40:38 +02:00
|
|
|
|
2023-04-26 02:40:54 +02:00
|
|
|
tornado_url = get_tornado_url(get_user_tornado_port(user_profile))
|
2020-08-10 18:53:02 +02:00
|
|
|
post_data: Dict[str, Any] = {
|
2021-02-12 08:20:45 +01:00
|
|
|
"queue_id": queue_id,
|
|
|
|
"last_event_id": last_event_id,
|
|
|
|
"dont_block": "true",
|
|
|
|
"user_profile_id": user_profile.id,
|
|
|
|
"secret": settings.SHARED_SECRET,
|
|
|
|
"client": "internal",
|
2020-08-10 18:53:02 +02:00
|
|
|
}
|
2023-04-26 02:40:54 +02:00
|
|
|
resp = requests_client().post(tornado_url + "/api/v1/events/internal", data=post_data)
|
2021-02-12 08:20:45 +01:00
|
|
|
return resp.json()["events"]
|
2020-08-10 18:40:38 +02:00
|
|
|
|
2021-02-12 08:19:30 +01:00
|
|
|
|
2022-09-22 22:09:34 +02:00
|
|
|
def send_notification_http(port: int, data: Mapping[str, Any]) -> None:
|
2020-09-17 00:30:45 +02:00
|
|
|
if not settings.USING_TORNADO or settings.RUNNING_INSIDE_TORNADO:
|
2021-09-28 01:23:25 +02:00
|
|
|
# To allow the backend test suite to not require a separate
|
|
|
|
# Tornado process, we simply call the process_notification
|
|
|
|
# handler directly rather than making the notify_tornado HTTP
|
|
|
|
# request. It would perhaps be better to instead implement
|
|
|
|
# this via some sort of `responses` module configuration, but
|
|
|
|
# perhaps it's more readable to have the logic live here.
|
|
|
|
#
|
|
|
|
# We use an import local to this function to prevent this hack
|
|
|
|
# from creating import cycles.
|
|
|
|
from zerver.tornado.event_queue import process_notification
|
|
|
|
|
2020-08-10 18:53:02 +02:00
|
|
|
process_notification(data)
|
|
|
|
else:
|
2023-04-26 02:40:54 +02:00
|
|
|
tornado_url = get_tornado_url(port)
|
2020-08-10 19:40:36 +02:00
|
|
|
requests_client().post(
|
2023-04-26 02:40:54 +02:00
|
|
|
tornado_url + "/notify_tornado",
|
2020-08-07 01:09:47 +02:00
|
|
|
data=dict(data=orjson.dumps(data), secret=settings.SHARED_SECRET),
|
2020-08-10 19:40:36 +02:00
|
|
|
)
|
2020-08-10 18:40:38 +02:00
|
|
|
|
2021-02-12 08:19:30 +01:00
|
|
|
|
2020-09-25 21:53:00 +02:00
|
|
|
# The core function for sending an event from Django to Tornado (which
|
|
|
|
# will then push it to web and mobile clients for the target users).
|
|
|
|
# By convention, send_event should only be called from
|
2022-04-14 00:48:36 +02:00
|
|
|
# zerver/actions/*.py, which helps make it easy to find event
|
2020-09-25 21:53:00 +02:00
|
|
|
# generation code.
|
|
|
|
#
|
|
|
|
# Every call point should be covered by a test in `test_events.py`,
|
|
|
|
# with the schema verified in `zerver/lib/event_schema.py`.
|
|
|
|
#
|
|
|
|
# See https://zulip.readthedocs.io/en/latest/subsystems/events-system.html
|
2021-02-12 08:19:30 +01:00
|
|
|
def send_event(
|
|
|
|
realm: Realm, event: Mapping[str, Any], users: Union[Iterable[int], Iterable[Mapping[str, Any]]]
|
|
|
|
) -> None:
|
message send: Optimize how user data is sent to Tornado.
Before this commit, we used to pre-calculate flags for user data and send
it to Tornado, like so:
```
{
"id": 10,
"flags": ["mentioned"],
"mentioned": true,
"online_push_enabled": false,
"stream_push_notify": false,
"stream_email_notify": false,
"wildcard_mention_notify": false,
"sender_is_muted": false,
}
```
This has the benefit of simplifying the logic in the event_queue code a bit.
However, because we sent such an object for each user receiving the event,
the string keys (like "stream_email_notify") get duplicated in the JSON
blob that is sent to Tornado.
For 1000 users, this data may take up upto ~190KB of space, which can
cause performance degradation in large organisations.
Hence, as an alternative, we send just the list of user_ids fitting
each notification criteria, and then calculate the flags in Tornado.
This brings down the space to ~60KB for 1000 users.
This commit reverts parts of following commits:
- 2179275
- 40cd6b5
We will in the future, add helpers to create `UserMessageNotificationsData`
objects from these lists, so as to avoid code duplication.
2021-06-18 19:49:21 +02:00
|
|
|
"""`users` is a list of user IDs, or in some special cases like message
|
|
|
|
send/update or embeds, dictionaries containing extra data."""
|
2022-09-22 22:09:34 +02:00
|
|
|
realm_ports = get_realm_tornado_ports(realm)
|
|
|
|
if len(realm_ports) == 1:
|
|
|
|
port_user_map = {realm_ports[0]: list(users)}
|
|
|
|
else:
|
|
|
|
port_user_map = defaultdict(list)
|
|
|
|
for user in users:
|
|
|
|
user_id = user if isinstance(user, int) else user["id"]
|
|
|
|
port_user_map[get_user_id_tornado_port(realm_ports, user_id)].append(user)
|
|
|
|
|
|
|
|
for port, port_users in port_user_map.items():
|
|
|
|
queue_json_publish(
|
|
|
|
notify_tornado_queue_name(port),
|
|
|
|
dict(event=event, users=port_users),
|
2023-04-13 02:05:54 +02:00
|
|
|
partial(send_notification_http, port),
|
2022-09-22 22:09:34 +02:00
|
|
|
)
|
django_api: Extract send_event_on_commit helper.
django-stubs 4.2.1 gives transaction.on_commit a more accurate type
annotation, but this exposed that mypy can’t handle the lambda default
parameters that we use to recapture loop variables such as
for stream_id in public_stream_ids:
peer_user_ids = …
event = …
transaction.on_commit(
lambda event=event, peer_user_ids=peer_user_ids: send_event(
realm, event, peer_user_ids
)
)
https://github.com/python/mypy/issues/15459
A workaround that mypy accepts is
transaction.on_commit(
(
lambda event, peer_user_ids: lambda: send_event(
realm, event, peer_user_ids
)
)(event, peer_user_ids)
)
But that’s kind of ugly and potentially error-prone, so let’s make a
helper function for this very common pattern.
send_event_on_commit(realm, event, peer_user_ids)
Signed-off-by: Anders Kaseorg <anders@zulip.com>
2023-06-17 20:53:07 +02:00
|
|
|
|
|
|
|
|
|
|
|
def send_event_on_commit(
|
|
|
|
realm: Realm, event: Mapping[str, Any], users: Union[Iterable[int], Iterable[Mapping[str, Any]]]
|
|
|
|
) -> None:
|
|
|
|
transaction.on_commit(lambda: send_event(realm, event, users))
|