mirror of https://github.com/zulip/zulip.git
473 lines
18 KiB
Python
473 lines
18 KiB
Python
import logging
|
|
from typing import Any, Dict, List, Mapping, Optional, Tuple, Union
|
|
from urllib.parse import urljoin
|
|
|
|
import orjson
|
|
import requests
|
|
from django.conf import settings
|
|
from django.db.models import QuerySet
|
|
from django.utils.timezone import now as timezone_now
|
|
from django.utils.translation import gettext as _
|
|
from pydantic import UUID4, BaseModel, ConfigDict, Field, Json, field_validator
|
|
|
|
from analytics.lib.counts import LOGGING_COUNT_STAT_PROPERTIES_NOT_SENT_TO_BOUNCER
|
|
from analytics.models import InstallationCount, RealmCount
|
|
from version import API_FEATURE_LEVEL, ZULIP_VERSION
|
|
from zerver.actions.realm_settings import (
|
|
do_set_push_notifications_enabled_end_timestamp,
|
|
do_set_realm_property,
|
|
)
|
|
from zerver.lib import redis_utils
|
|
from zerver.lib.exceptions import (
|
|
JsonableError,
|
|
MissingRemoteRealmError,
|
|
RemoteRealmServerMismatchError,
|
|
)
|
|
from zerver.lib.outgoing_http import OutgoingSession
|
|
from zerver.lib.queue import queue_event_on_commit
|
|
from zerver.lib.redis_utils import get_redis_client
|
|
from zerver.models import Realm, RealmAuditLog
|
|
from zerver.models.realms import OrgTypeEnum
|
|
|
|
redis_client = get_redis_client()
|
|
|
|
|
|
class PushBouncerSession(OutgoingSession):
|
|
def __init__(self, timeout: int = 15) -> None:
|
|
super().__init__(role="push_bouncer", timeout=timeout)
|
|
|
|
|
|
class PushNotificationBouncerError(Exception):
|
|
pass
|
|
|
|
|
|
class PushNotificationBouncerRetryLaterError(JsonableError):
|
|
http_status_code = 502
|
|
|
|
|
|
class PushNotificationBouncerServerError(PushNotificationBouncerRetryLaterError):
|
|
http_status_code = 502
|
|
|
|
|
|
class RealmCountDataForAnalytics(BaseModel):
|
|
property: str
|
|
realm: int
|
|
id: int
|
|
end_time: float
|
|
subgroup: Optional[str]
|
|
value: int
|
|
|
|
|
|
class InstallationCountDataForAnalytics(BaseModel):
|
|
property: str
|
|
id: int
|
|
end_time: float
|
|
subgroup: Optional[str]
|
|
value: int
|
|
|
|
|
|
class RealmAuditLogDataForAnalytics(BaseModel):
|
|
id: int
|
|
realm: int
|
|
event_time: float
|
|
backfilled: bool
|
|
extra_data: Optional[Union[str, Dict[str, Any]]]
|
|
event_type: int
|
|
|
|
|
|
class RealmDataForAnalytics(BaseModel):
|
|
model_config = ConfigDict(extra="forbid")
|
|
|
|
id: int
|
|
host: str
|
|
url: str
|
|
name: str = ""
|
|
org_type: int = 0
|
|
date_created: float
|
|
deactivated: bool
|
|
is_system_bot_realm: bool = False
|
|
|
|
authentication_methods: Dict[str, bool] = Field(default_factory=dict)
|
|
|
|
uuid: UUID4
|
|
uuid_owner_secret: str
|
|
|
|
@field_validator("org_type")
|
|
@classmethod
|
|
def check_is_allowed_value(cls, value: int) -> int:
|
|
if value not in [org_type.value for org_type in OrgTypeEnum]:
|
|
raise ValueError("Not a valid org_type value")
|
|
|
|
return value
|
|
|
|
|
|
class AnalyticsRequest(BaseModel):
|
|
realm_counts: Json[List[RealmCountDataForAnalytics]]
|
|
installation_counts: Json[List[InstallationCountDataForAnalytics]]
|
|
realmauditlog_rows: Optional[Json[List[RealmAuditLogDataForAnalytics]]] = None
|
|
realms: Json[List[RealmDataForAnalytics]]
|
|
version: Optional[Json[str]]
|
|
api_feature_level: Optional[Json[int]]
|
|
|
|
|
|
class UserDataForRemoteBilling(BaseModel):
|
|
uuid: UUID4
|
|
email: str
|
|
full_name: str
|
|
|
|
|
|
def send_to_push_bouncer(
|
|
method: str,
|
|
endpoint: str,
|
|
post_data: Union[bytes, Mapping[str, Union[str, int, None, bytes]]],
|
|
extra_headers: Mapping[str, str] = {},
|
|
) -> Dict[str, object]:
|
|
"""While it does actually send the notice, this function has a lot of
|
|
code and comments around error handling for the push notifications
|
|
bouncer. There are several classes of failures, each with its own
|
|
potential solution:
|
|
|
|
* Network errors with requests.request. We raise an exception to signal
|
|
it to the callers.
|
|
|
|
* 500 errors from the push bouncer or other unexpected responses;
|
|
we don't try to parse the response, but do make clear the cause.
|
|
|
|
* 400 errors from the push bouncer. Here there are 2 categories:
|
|
Our server failed to connect to the push bouncer (should throw)
|
|
vs. client-side errors like an invalid token.
|
|
|
|
"""
|
|
assert settings.PUSH_NOTIFICATION_BOUNCER_URL is not None
|
|
assert settings.ZULIP_ORG_ID is not None
|
|
assert settings.ZULIP_ORG_KEY is not None
|
|
url = urljoin(settings.PUSH_NOTIFICATION_BOUNCER_URL, "/api/v1/remotes/" + endpoint)
|
|
api_auth = requests.auth.HTTPBasicAuth(settings.ZULIP_ORG_ID, settings.ZULIP_ORG_KEY)
|
|
|
|
headers = {"User-agent": f"ZulipServer/{ZULIP_VERSION}"}
|
|
headers.update(extra_headers)
|
|
|
|
if endpoint == "server/analytics":
|
|
# Uploading audit log and/or analytics data can require the
|
|
# bouncer to do a significant chunk of work in a few
|
|
# situations; since this occurs in background jobs, set a long
|
|
# timeout.
|
|
session = PushBouncerSession(timeout=90)
|
|
else:
|
|
session = PushBouncerSession()
|
|
|
|
try:
|
|
res = session.request(
|
|
method,
|
|
url,
|
|
data=post_data,
|
|
auth=api_auth,
|
|
verify=True,
|
|
headers=headers,
|
|
)
|
|
except (
|
|
requests.exceptions.Timeout,
|
|
requests.exceptions.SSLError,
|
|
requests.exceptions.ConnectionError,
|
|
) as e:
|
|
raise PushNotificationBouncerRetryLaterError(
|
|
f"{type(e).__name__} while trying to connect to push notification bouncer"
|
|
)
|
|
|
|
if res.status_code >= 500:
|
|
# 5xx's should be resolved by the people who run the push
|
|
# notification bouncer service, and they'll get an appropriate
|
|
# error notification from the server. We raise an exception to signal
|
|
# to the callers that the attempt failed and they can retry.
|
|
error_msg = f"Received {res.status_code} from push notification bouncer"
|
|
logging.warning(error_msg)
|
|
raise PushNotificationBouncerServerError(error_msg)
|
|
elif res.status_code >= 400:
|
|
# If JSON parsing errors, just let that exception happen
|
|
result_dict = orjson.loads(res.content)
|
|
msg = result_dict["msg"]
|
|
if "code" in result_dict and result_dict["code"] == "INVALID_ZULIP_SERVER":
|
|
# Invalid Zulip server credentials should email this server's admins
|
|
raise PushNotificationBouncerError(
|
|
_("Push notifications bouncer error: {error}").format(error=msg)
|
|
)
|
|
elif "code" in result_dict and result_dict["code"] == "PUSH_NOTIFICATIONS_DISALLOWED":
|
|
from zerver.lib.push_notifications import PushNotificationsDisallowedByBouncerError
|
|
|
|
raise PushNotificationsDisallowedByBouncerError(reason=msg)
|
|
elif (
|
|
endpoint == "push/test_notification"
|
|
and "code" in result_dict
|
|
and result_dict["code"] == "INVALID_REMOTE_PUSH_DEVICE_TOKEN"
|
|
):
|
|
# This error from the notification debugging endpoint should just be directly
|
|
# communicated to the device.
|
|
# TODO: Extend this to use a more general mechanism when we add more such error responses.
|
|
from zerver.lib.push_notifications import InvalidRemotePushDeviceTokenError
|
|
|
|
raise InvalidRemotePushDeviceTokenError
|
|
elif (
|
|
endpoint == "server/billing"
|
|
and "code" in result_dict
|
|
and result_dict["code"] == "MISSING_REMOTE_REALM"
|
|
): # nocoverage
|
|
# The callers requesting this endpoint want the exception to propagate
|
|
# so they can catch it.
|
|
raise MissingRemoteRealmError
|
|
elif (
|
|
endpoint == "server/billing"
|
|
and "code" in result_dict
|
|
and result_dict["code"] == "REMOTE_REALM_SERVER_MISMATCH_ERROR"
|
|
): # nocoverage
|
|
# The callers requesting this endpoint want the exception to propagate
|
|
# so they can catch it.
|
|
raise RemoteRealmServerMismatchError
|
|
else:
|
|
# But most other errors coming from the push bouncer
|
|
# server are client errors (e.g. never-registered token)
|
|
# and should be handled as such.
|
|
raise JsonableError(msg)
|
|
elif res.status_code != 200:
|
|
# Anything else is unexpected and likely suggests a bug in
|
|
# this version of Zulip, so we throw an exception that will
|
|
# email the server admins.
|
|
raise PushNotificationBouncerError(
|
|
f"Push notification bouncer returned unexpected status code {res.status_code}"
|
|
)
|
|
|
|
# If we don't throw an exception, it's a successful bounce!
|
|
return orjson.loads(res.content)
|
|
|
|
|
|
def send_json_to_push_bouncer(
|
|
method: str, endpoint: str, post_data: Mapping[str, object]
|
|
) -> Dict[str, object]:
|
|
return send_to_push_bouncer(
|
|
method,
|
|
endpoint,
|
|
orjson.dumps(post_data),
|
|
extra_headers={"Content-type": "application/json"},
|
|
)
|
|
|
|
|
|
PUSH_NOTIFICATIONS_RECENTLY_WORKING_REDIS_KEY = "push_notifications_recently_working_ts"
|
|
|
|
|
|
def record_push_notifications_recently_working() -> None:
|
|
# Record the timestamp in redis, marking that push notifications
|
|
# were working as of this moment.
|
|
|
|
redis_key = redis_utils.REDIS_KEY_PREFIX + PUSH_NOTIFICATIONS_RECENTLY_WORKING_REDIS_KEY
|
|
# Keep this record around for 24h in case it's useful for debugging.
|
|
redis_client.set(redis_key, str(timezone_now().timestamp()), ex=60 * 60 * 24)
|
|
|
|
|
|
def check_push_notifications_recently_working() -> bool:
|
|
# Check in redis whether push notifications were working in the last hour.
|
|
redis_key = redis_utils.REDIS_KEY_PREFIX + PUSH_NOTIFICATIONS_RECENTLY_WORKING_REDIS_KEY
|
|
timestamp = redis_client.get(redis_key)
|
|
if timestamp is None:
|
|
return False
|
|
|
|
# If the timestamp is within the last hour, we consider push notifications to be working.
|
|
return timezone_now().timestamp() - float(timestamp) < 60 * 60
|
|
|
|
|
|
def maybe_mark_pushes_disabled(
|
|
e: Union[JsonableError, orjson.JSONDecodeError], logger: logging.Logger
|
|
) -> None:
|
|
if isinstance(e, PushNotificationBouncerServerError):
|
|
# We don't fall through and deactivate the flag, since this is
|
|
# not under the control of the caller.
|
|
return
|
|
|
|
if isinstance(e, JsonableError):
|
|
logger.warning(e.msg)
|
|
else:
|
|
logger.exception("Exception communicating with %s", settings.PUSH_NOTIFICATION_BOUNCER_URL)
|
|
|
|
# An exception was thrown talking to the push bouncer. There may
|
|
# be certain transient failures that we could ignore here -
|
|
# therefore we check whether push notifications were recently working
|
|
# and if so, the error can be treated as transient.
|
|
# Otherwise, the assumed explanation is that there is something wrong
|
|
# either with our credentials being corrupted or our ability to reach the
|
|
# bouncer service over the network, so we move to
|
|
# reporting push notifications as likely not working.
|
|
if check_push_notifications_recently_working():
|
|
# Push notifications were recently observed working, so we
|
|
# assume this is likely a transient failure.
|
|
return
|
|
|
|
for realm in Realm.objects.filter(push_notifications_enabled=True):
|
|
do_set_realm_property(realm, "push_notifications_enabled", False, acting_user=None)
|
|
do_set_push_notifications_enabled_end_timestamp(realm, None, acting_user=None)
|
|
|
|
|
|
def build_analytics_data(
|
|
realm_count_query: QuerySet[RealmCount],
|
|
installation_count_query: QuerySet[InstallationCount],
|
|
realmauditlog_query: QuerySet[RealmAuditLog],
|
|
) -> Tuple[
|
|
List[RealmCountDataForAnalytics],
|
|
List[InstallationCountDataForAnalytics],
|
|
List[RealmAuditLogDataForAnalytics],
|
|
]:
|
|
# We limit the batch size on the client side to avoid OOM kills timeouts, etc.
|
|
MAX_CLIENT_BATCH_SIZE = 10000
|
|
realm_count_data = [
|
|
RealmCountDataForAnalytics(
|
|
property=row.property,
|
|
realm=row.realm.id,
|
|
id=row.id,
|
|
end_time=row.end_time.timestamp(),
|
|
subgroup=row.subgroup,
|
|
value=row.value,
|
|
)
|
|
for row in realm_count_query.order_by("id")[0:MAX_CLIENT_BATCH_SIZE]
|
|
]
|
|
installation_count_data = [
|
|
InstallationCountDataForAnalytics(
|
|
property=row.property,
|
|
id=row.id,
|
|
end_time=row.end_time.timestamp(),
|
|
subgroup=row.subgroup,
|
|
value=row.value,
|
|
)
|
|
for row in installation_count_query.order_by("id")[0:MAX_CLIENT_BATCH_SIZE]
|
|
]
|
|
zerver_realmauditlog = [
|
|
RealmAuditLogDataForAnalytics(
|
|
id=row.id,
|
|
realm=row.realm.id,
|
|
event_time=row.event_time.timestamp(),
|
|
backfilled=row.backfilled,
|
|
# Note that we don't need to add extra_data_json here because
|
|
# the view remote_server_post_analytics populates extra_data_json
|
|
# from the provided extra_data.
|
|
extra_data=row.extra_data,
|
|
event_type=row.event_type,
|
|
)
|
|
for row in realmauditlog_query.order_by("id")[0:MAX_CLIENT_BATCH_SIZE]
|
|
]
|
|
|
|
return realm_count_data, installation_count_data, zerver_realmauditlog
|
|
|
|
|
|
def get_realms_info_for_push_bouncer(realm_id: Optional[int] = None) -> List[RealmDataForAnalytics]:
|
|
realms = Realm.objects.order_by("id")
|
|
if realm_id is not None: # nocoverage
|
|
realms = realms.filter(id=realm_id)
|
|
|
|
realm_info_list = [
|
|
RealmDataForAnalytics(
|
|
id=realm.id,
|
|
uuid=realm.uuid,
|
|
uuid_owner_secret=realm.uuid_owner_secret,
|
|
host=realm.host,
|
|
url=realm.url,
|
|
deactivated=realm.deactivated,
|
|
date_created=realm.date_created.timestamp(),
|
|
org_type=realm.org_type,
|
|
name=realm.name,
|
|
authentication_methods=realm.authentication_methods_dict(),
|
|
is_system_bot_realm=realm.string_id == settings.SYSTEM_BOT_REALM,
|
|
)
|
|
for realm in realms
|
|
]
|
|
|
|
return realm_info_list
|
|
|
|
|
|
def send_server_data_to_push_bouncer(consider_usage_statistics: bool = True) -> None:
|
|
logger = logging.getLogger("zulip.analytics")
|
|
# first, check what's latest
|
|
try:
|
|
result = send_to_push_bouncer("GET", "server/analytics/status", {})
|
|
except (JsonableError, orjson.JSONDecodeError) as e:
|
|
maybe_mark_pushes_disabled(e, logger)
|
|
return
|
|
|
|
# Gather only entries with IDs greater than the last ID received by the push bouncer.
|
|
# We don't re-send old data that's already been submitted.
|
|
last_acked_realm_count_id = result["last_realm_count_id"]
|
|
last_acked_installation_count_id = result["last_installation_count_id"]
|
|
last_acked_realmauditlog_id = result["last_realmauditlog_id"]
|
|
|
|
if settings.SUBMIT_USAGE_STATISTICS and consider_usage_statistics:
|
|
# Only upload usage statistics, which is relatively expensive,
|
|
# if called from the analytics cron job and the server has
|
|
# uploading such statistics enabled.
|
|
installation_count_query = InstallationCount.objects.filter(
|
|
id__gt=last_acked_installation_count_id
|
|
).exclude(property__in=LOGGING_COUNT_STAT_PROPERTIES_NOT_SENT_TO_BOUNCER)
|
|
realm_count_query = RealmCount.objects.filter(id__gt=last_acked_realm_count_id).exclude(
|
|
property__in=LOGGING_COUNT_STAT_PROPERTIES_NOT_SENT_TO_BOUNCER
|
|
)
|
|
else:
|
|
installation_count_query = InstallationCount.objects.none()
|
|
realm_count_query = RealmCount.objects.none()
|
|
|
|
(realm_count_data, installation_count_data, realmauditlog_data) = build_analytics_data(
|
|
realm_count_query=realm_count_query,
|
|
installation_count_query=installation_count_query,
|
|
realmauditlog_query=RealmAuditLog.objects.filter(
|
|
event_type__in=RealmAuditLog.SYNCED_BILLING_EVENTS, id__gt=last_acked_realmauditlog_id
|
|
),
|
|
)
|
|
|
|
record_count = len(realm_count_data) + len(installation_count_data) + len(realmauditlog_data)
|
|
request = AnalyticsRequest.model_construct(
|
|
realm_counts=realm_count_data,
|
|
installation_counts=installation_count_data,
|
|
realmauditlog_rows=realmauditlog_data,
|
|
realms=get_realms_info_for_push_bouncer(),
|
|
version=ZULIP_VERSION,
|
|
api_feature_level=API_FEATURE_LEVEL,
|
|
)
|
|
|
|
# Send the actual request, and process the response.
|
|
try:
|
|
response = send_to_push_bouncer(
|
|
"POST", "server/analytics", request.model_dump(round_trip=True)
|
|
)
|
|
except (JsonableError, orjson.JSONDecodeError) as e:
|
|
maybe_mark_pushes_disabled(e, logger)
|
|
return
|
|
|
|
assert isinstance(response["realms"], dict) # for mypy
|
|
realms = response["realms"]
|
|
for realm_uuid, data in realms.items():
|
|
try:
|
|
realm = Realm.objects.get(uuid=realm_uuid)
|
|
except Realm.DoesNotExist:
|
|
# This occurs if the installation's database was rebuilt
|
|
# from scratch or a realm was hard-deleted from the local
|
|
# database, after generating secrets and talking to the
|
|
# bouncer.
|
|
logger.warning("Received unexpected realm UUID from bouncer %s", realm_uuid)
|
|
continue
|
|
|
|
do_set_realm_property(
|
|
realm, "push_notifications_enabled", data["can_push"], acting_user=None
|
|
)
|
|
do_set_push_notifications_enabled_end_timestamp(
|
|
realm, data["expected_end_timestamp"], acting_user=None
|
|
)
|
|
|
|
logger.info("Reported %d records", record_count)
|
|
|
|
|
|
def maybe_enqueue_audit_log_upload(realm: Realm) -> None:
|
|
# Update the push notifications service, either with the fact that
|
|
# the realm now exists or updates to its audit log of users.
|
|
#
|
|
# Done via a queue worker so that networking failures cannot have
|
|
# any impact on the success operation of the local server's
|
|
# ability to do operations that trigger these updates.
|
|
from zerver.lib.push_notifications import uses_notification_bouncer
|
|
|
|
if uses_notification_bouncer():
|
|
event = {"type": "push_bouncer_update_for_realm", "realm_id": realm.id}
|
|
queue_event_on_commit("deferred_work", event)
|