streams: Pass stream_weekly_traffic field in stream objects.

This commit adds code to pass stream traffic data using
the "stream_weekly_traffic" field in stream objects.

We already include the traffic data in Subscription objects,
but the traffic data does not depend on the user to stream
relationship and is stream-only information, so it's better
to include it in Stream objects. We may remove the traffic
data and other stream information fields for Subscription
objects in future.

This will help clients to correctly display the stream
traffic data in case where client receives a stream
creation event and no subscription event, for an already
existing stream which the user did not have access to before.
This commit is contained in:
Sahil Batra 2023-07-27 20:12:21 +05:30 committed by Tim Abbott
parent 261fca11ec
commit ae72151ec1
15 changed files with 176 additions and 50 deletions

View File

@ -20,6 +20,14 @@ format used by the Zulip server that they are interacting with.
## Changes in Zulip 8.0
**Feature level 199**
* [`POST /register`](/api/register-queue), [`GET /events`][/api/get-events],
[`GET /streams`](/api/get-streams),
[`GET /streams/{stream_id}`](/api/get-stream-by-id): Stream objects now
include a `stream_weekly_traffic` field indicating the stream's level of
traffic.
**Feature level 198**
* [`GET /events`](/api/get-events), [`POST /register`](/api/register-queue),

View File

@ -33,7 +33,7 @@ DESKTOP_WARNING_VERSION = "5.9.3"
# Changes should be accompanied by documentation explaining what the
# new level means in api_docs/changelog.md, as well as "**Changes**"
# entries in the endpoint's documentation in `zulip.yaml`.
API_FEATURE_LEVEL = 198
API_FEATURE_LEVEL = 199
# Bump the minor PROVISION_VERSION to indicate that folks should provision
# only when going from an old version of the code to a newer version. Bump

View File

@ -650,13 +650,31 @@ exports.fixtures = {
stream__create: {
type: "stream",
op: "create",
streams: [streams.devel, streams.test],
streams: [
{
...streams.devel,
stream_weekly_traffic: null,
},
{
...streams.test,
stream_weekly_traffic: null,
},
],
},
stream__delete: {
type: "stream",
op: "delete",
streams: [streams.devel, streams.test],
streams: [
{
...streams.devel,
stream_weekly_traffic: null,
},
{
...streams.test,
stream_weekly_traffic: null,
},
],
},
stream__update: {

View File

@ -37,7 +37,7 @@ from zerver.lib.stream_subscription import (
get_bulk_stream_subscriber_info,
get_used_colors_for_user_ids,
)
from zerver.lib.stream_traffic import get_average_weekly_stream_traffic, get_streams_traffic
from zerver.lib.stream_traffic import get_streams_traffic
from zerver.lib.streams import (
can_access_stream_user_ids,
get_occupied_streams,
@ -210,10 +210,15 @@ def do_reactivate_stream(
event_time=timezone_now(),
)
recent_traffic = None
if not realm.is_zephyr_mirror_realm:
# We do not need stream traffic data for streams in zephyr mirroring realm.
recent_traffic = get_streams_traffic({stream.id})
# All admins always get to know about private streams' existence,
# but we only subscribe the realm owners.
send_stream_creation_event(
realm, stream, [user.id for user in realm.get_admin_users_and_bots()]
realm, stream, [user.id for user in realm.get_admin_users_and_bots()], recent_traffic
)
bulk_add_subscriptions(
realm=realm,
@ -313,7 +318,6 @@ def get_subscriber_ids(
@dataclass
class StreamInfo:
email_address: str
stream_weekly_traffic: Optional[int]
subscribers: List[int]
@ -336,16 +340,12 @@ def send_subscription_add_events(
stream = sub_info.stream
if stream.id not in stream_info_dict:
email_address = encode_email_address(stream, show_sender=True)
stream_weekly_traffic = get_average_weekly_stream_traffic(
stream.id, stream.date_created, recent_traffic
)
if stream.is_in_zephyr_realm and not stream.invite_only:
subscribers = []
else:
subscribers = list(subscriber_dict[stream.id])
stream_info_dict[stream.id] = StreamInfo(
email_address=email_address,
stream_weekly_traffic=stream_weekly_traffic,
subscribers=subscribers,
)
@ -355,7 +355,7 @@ def send_subscription_add_events(
stream = sub_info.stream
stream_info = stream_info_dict[stream.id]
subscription = sub_info.sub
stream_dict = stream_to_dict(stream)
stream_dict = stream_to_dict(stream, recent_traffic)
# This is verbose as we cannot unpack existing TypedDict
# to initialize another TypedDict while making mypy happy.
# https://github.com/python/mypy/issues/5382
@ -372,7 +372,7 @@ def send_subscription_add_events(
# Computed fields not present in Subscription.API_FIELDS
email_address=stream_info.email_address,
in_home_view=not subscription.is_muted,
stream_weekly_traffic=stream_info.stream_weekly_traffic,
stream_weekly_traffic=stream_dict["stream_weekly_traffic"],
subscribers=stream_info.subscribers,
# Fields from Stream.API_FIELDS
can_remove_subscribers_group=stream_dict["can_remove_subscribers_group"],
@ -452,6 +452,12 @@ def send_stream_creation_events_for_previously_inaccessible_streams(
altered_user_dict: Dict[int, Set[int]],
altered_guests: Set[int],
) -> None:
recent_traffic = None
if not realm.is_zephyr_mirror_realm:
# We do not need stream traffic data for streams in zephyr mirroring realm.
stream_ids = set(altered_user_dict.keys())
recent_traffic = get_streams_traffic(stream_ids)
for stream_id, stream_users_ids in altered_user_dict.items():
stream = stream_dict[stream_id]
@ -472,7 +478,7 @@ def send_stream_creation_events_for_previously_inaccessible_streams(
notify_user_ids = list(stream_users_ids & altered_guests)
if notify_user_ids:
send_stream_creation_event(realm, stream, notify_user_ids)
send_stream_creation_event(realm, stream, notify_user_ids, recent_traffic)
def send_peer_subscriber_events(
@ -1058,7 +1064,12 @@ def do_change_stream_permission(
)
non_guest_user_ids = set(active_non_guest_user_ids(stream.realm_id))
notify_stream_creation_ids = non_guest_user_ids - old_can_access_stream_user_ids
send_stream_creation_event(realm, stream, list(notify_stream_creation_ids))
recent_traffic = None
if not realm.is_zephyr_mirror_realm:
# We do not need stream traffic data for streams in zephyr mirroing realm.
recent_traffic = get_streams_traffic({stream.id})
send_stream_creation_event(realm, stream, list(notify_stream_creation_ids), recent_traffic)
# Add subscribers info to the stream object. We need to send peer_add
# events to users who were previously subscribed to the streams as

View File

@ -1,6 +1,6 @@
from typing import List, Set
from zerver.lib.types import APIStreamDict
from zerver.lib.types import DefaultStreamDict
from zerver.models import DefaultStream, Stream
@ -22,7 +22,7 @@ def get_default_stream_ids_for_realm(realm_id: int) -> Set[int]:
return set(DefaultStream.objects.filter(realm_id=realm_id).values_list("stream_id", flat=True))
def get_default_streams_for_realm_as_dicts(realm_id: int) -> List[APIStreamDict]:
def get_default_streams_for_realm_as_dicts(realm_id: int) -> List[DefaultStreamDict]:
"""
Return all the default streams for a realm using a list of dictionaries sorted
by stream name.

View File

@ -49,7 +49,7 @@ from zerver.models import Realm, RealmUserDefault, Stream, UserProfile
# These fields are used for "stream" events, and are included in the
# larger "subscription" events that also contain personal settings.
basic_stream_fields = [
default_stream_fields = [
("can_remove_subscribers_group", int),
("date_created", int),
("description", str),
@ -65,6 +65,11 @@ basic_stream_fields = [
("stream_post_policy", int),
]
basic_stream_fields = [
*default_stream_fields,
("stream_weekly_traffic", OptionalType(int)),
]
subscription_fields: Sequence[Tuple[str, object]] = [
*basic_stream_fields,
("audible_notifications", OptionalType(bool)),
@ -76,7 +81,6 @@ subscription_fields: Sequence[Tuple[str, object]] = [
("is_muted", bool),
("pin_to_top", bool),
("push_notifications", OptionalType(bool)),
("stream_weekly_traffic", OptionalType(int)),
# We may try to remove subscribers from some events in
# the future for clients that don't want subscriber
# info.
@ -185,7 +189,7 @@ _check_stream_group = DictType(
("name", str),
("id", int),
("description", str),
("streams", ListType(DictType(basic_stream_fields))),
("streams", ListType(DictType(default_stream_fields))),
]
)
@ -200,7 +204,7 @@ check_default_stream_groups = make_checker(default_stream_groups_event)
default_streams_event = event_dict_type(
required_keys=[
("type", Equals("default_streams")),
("default_streams", ListType(DictType(basic_stream_fields))),
("default_streams", ListType(DictType(default_stream_fields))),
]
)
check_default_streams = make_checker(default_streams_event)

View File

@ -998,12 +998,6 @@ def apply_event(
if include_subscribers:
stream_data["subscribers"] = []
# We know the stream has no traffic, and this
# field is not present in the event.
#
# TODO: Probably this should just be added to the event.
stream_data["stream_weekly_traffic"] = None
# Add stream to never_subscribed (if not invite_only)
state["never_subscribed"].append(stream_data)
if "streams" in state:

View File

@ -1,4 +1,4 @@
from typing import Collection, List, Optional, Set, Tuple, TypedDict, Union
from typing import Collection, Dict, List, Optional, Set, Tuple, TypedDict, Union
from django.db import transaction
from django.db.models import Exists, OuterRef, Q, QuerySet
@ -16,6 +16,7 @@ from zerver.lib.stream_subscription import (
get_active_subscriptions_for_stream_id,
get_subscribed_stream_ids_for_user,
)
from zerver.lib.stream_traffic import get_average_weekly_stream_traffic, get_streams_traffic
from zerver.lib.string_validation import check_stream_name
from zerver.lib.timestamp import datetime_to_timestamp
from zerver.lib.types import APIStreamDict
@ -111,8 +112,13 @@ def render_stream_description(text: str, realm: Realm) -> str:
return markdown_convert(text, message_realm=realm, no_previews=True).rendered_content
def send_stream_creation_event(realm: Realm, stream: Stream, user_ids: List[int]) -> None:
event = dict(type="stream", op="create", streams=[stream_to_dict(stream)])
def send_stream_creation_event(
realm: Realm,
stream: Stream,
user_ids: List[int],
recent_traffic: Optional[Dict[int, int]] = None,
) -> None:
event = dict(type="stream", op="create", streams=[stream_to_dict(stream, recent_traffic)])
send_event(realm, event, user_ids)
@ -826,7 +832,22 @@ def get_occupied_streams(realm: Realm) -> QuerySet[Stream]:
return occupied_streams
def stream_to_dict(stream: Stream) -> APIStreamDict:
def stream_to_dict(
stream: Stream, recent_traffic: Optional[Dict[int, int]] = None
) -> APIStreamDict:
if recent_traffic is not None:
stream_weekly_traffic = get_average_weekly_stream_traffic(
stream.id, stream.date_created, recent_traffic
)
else:
# We cannot compute the traffic data for a newly created
# stream, so we set "stream_weekly_traffic" field to
# "None" for the stream object in creation event.
# Also, there are some cases where we do not need to send
# the traffic data, like when deactivating a stream, and
# passing stream data to spectators.
stream_weekly_traffic = None
return APIStreamDict(
can_remove_subscribers_group=stream.can_remove_subscribers_group_id,
date_created=datetime_to_timestamp(stream.date_created),
@ -841,6 +862,7 @@ def stream_to_dict(stream: Stream) -> APIStreamDict:
stream_id=stream.id,
stream_post_policy=stream.stream_post_policy,
is_announcement_only=stream.stream_post_policy == Stream.STREAM_POST_POLICY_ADMINS,
stream_weekly_traffic=stream_weekly_traffic,
)
@ -914,7 +936,13 @@ def do_get_streams(
# Don't bother going to the database with no valid sources
return []
stream_dicts = [stream_to_dict(stream) for stream in streams]
recent_traffic = None
if not user_profile.realm.is_zephyr_mirror_realm:
# We do not need stream traffic data for streams in zephyr mirroring realm.
stream_ids = {stream.id for stream in streams}
recent_traffic = get_streams_traffic(stream_ids)
stream_dicts = [stream_to_dict(stream, recent_traffic) for stream in streams]
stream_dicts.sort(key=lambda elt: elt["name"])
if include_default:

View File

@ -217,7 +217,7 @@ class NeverSubscribedStreamDict(TypedDict):
subscribers: NotRequired[List[int]]
class APIStreamDict(TypedDict):
class DefaultStreamDict(TypedDict):
"""Stream information provided to Zulip clients as a dictionary via API.
It should contain all the fields specified in `zerver.models.Stream.API_FIELDS`
with few exceptions and possible additional fields.
@ -240,6 +240,10 @@ class APIStreamDict(TypedDict):
is_default: NotRequired[bool]
class APIStreamDict(DefaultStreamDict):
stream_weekly_traffic: Optional[int]
class APISubscriptionDict(APIStreamDict):
"""Similar to StreamClientDict, it should contain all the fields specified in
`zerver.models.Subscription.API_FIELDS` and several additional fields.
@ -256,7 +260,6 @@ class APISubscriptionDict(APIStreamDict):
# Computed fields not specified in `Subscription.API_FIELDS`
email_address: str
in_home_view: bool
stream_weekly_traffic: Optional[int]
subscribers: List[int]

View File

@ -86,7 +86,7 @@ from zerver.lib.exceptions import JsonableError, RateLimitedError
from zerver.lib.pysa import mark_sanitized
from zerver.lib.timestamp import datetime_to_timestamp
from zerver.lib.types import (
APIStreamDict,
DefaultStreamDict,
DisplayRecipientT,
ExtendedFieldElement,
ExtendedValidator,
@ -2701,8 +2701,8 @@ class Stream(models.Model):
"can_remove_subscribers_group_id",
]
def to_dict(self) -> APIStreamDict:
return APIStreamDict(
def to_dict(self) -> DefaultStreamDict:
return DefaultStreamDict(
can_remove_subscribers_group=self.can_remove_subscribers_group_id,
date_created=datetime_to_timestamp(self.date_created),
description=self.description,

View File

@ -1884,7 +1884,7 @@ paths:
An array of dictionaries where each dictionary
contains details about a single default stream.
items:
$ref: "#/components/schemas/BasicStream"
$ref: "#/components/schemas/DefaultStream"
example:
{
"type": "default_streams",
@ -11766,7 +11766,7 @@ paths:
realm_default_streams:
type: array
items:
$ref: "#/components/schemas/BasicStream"
$ref: "#/components/schemas/DefaultStream"
description: |
Present if `default_streams` is present in `fetch_event_types`.
@ -15689,6 +15689,21 @@ paths:
nullable: true
is_announcement_only: {}
can_remove_subscribers_group: {}
stream_weekly_traffic:
type: integer
nullable: true
description: |
The average number of messages sent to the stream per week, as
estimated based on recent weeks, rounded to the nearest integer.
If `null`, no information is provided on the average traffic.
This can be because the stream was recently created and there
is insufficient data to make an estimate, or because the server
wishes to omit this information for this client, this realm, or
this endpoint or type of event.
**Changes**: New in Zulip 8.0 (feature level 199). Previously,
this statistic was available only in subscription objects.
is_default:
type: boolean
description: |
@ -16976,6 +16991,41 @@ components:
The unique message ID. Messages should always be
displayed sorted by ID.
BasicStream:
allOf:
- $ref: "#/components/schemas/BasicStreamBase"
- additionalProperties: false
properties:
stream_id: {}
name: {}
description: {}
date_created: {}
invite_only: {}
rendered_description: {}
is_web_public: {}
stream_post_policy: {}
message_retention_days:
nullable: true
history_public_to_subscribers: {}
first_message_id:
nullable: true
is_announcement_only: {}
can_remove_subscribers_group: {}
stream_weekly_traffic:
type: integer
nullable: true
description: |
The average number of messages sent to the stream per week, as
estimated based on recent weeks, rounded to the nearest integer.
If `null`, no information is provided on the average traffic.
This can be because the stream was recently created and there
is insufficient data to make an estimate, or because the server
wishes to omit this information for this client, this realm, or
this endpoint or type of event.
**Changes**: New in Zulip 8.0 (feature level 199). Previously, this
statistic was available only in subscription objects.
DefaultStream:
allOf:
- $ref: "#/components/schemas/BasicStreamBase"
- additionalProperties: false
@ -17786,7 +17836,7 @@ components:
Array containing details about the streams
in the default stream group.
items:
$ref: "#/components/schemas/BasicStream"
$ref: "#/components/schemas/DefaultStream"
EmailAddressVisibility:
type: integer
description: |

View File

@ -1249,7 +1249,7 @@ class FetchQueriesTest(ZulipTestCase):
self.login_user(user)
flush_per_request_caches()
with self.assert_database_query_count(38):
with self.assert_database_query_count(39):
with mock.patch("zerver.lib.events.always_want") as want_mock:
fetch_initial_state_data(user)
@ -1279,7 +1279,7 @@ class FetchQueriesTest(ZulipTestCase):
recent_private_conversations=1,
scheduled_messages=1,
starred_messages=1,
stream=2,
stream=3,
stop_words=0,
subscription=4,
update_display_settings=0,

View File

@ -2476,6 +2476,7 @@ class NormalActionsTest(BaseAction):
action = lambda: do_deactivate_stream(stream, acting_user=None)
events = self.verify_action(action, include_streams=include_streams)
check_stream_delete("events[0]", events[0])
self.assertIsNone(events[0]["streams"][0]["stream_weekly_traffic"])
def test_subscribe_other_user_never_subscribed(self) -> None:
for i, include_streams in enumerate([True, False]):
@ -3266,6 +3267,7 @@ class SubscribeActionTest(BaseAction):
events[0]["streams"][0]["message_retention_days"],
10,
)
self.assertIsNone(events[0]["streams"][0]["stream_weekly_traffic"])
stream.invite_only = False
stream.save()

View File

@ -230,7 +230,7 @@ class TestMiscStuff(ZulipTestCase):
expected_fields |= {"can_remove_subscribers_group"}
stream_dict_fields = set(APIStreamDict.__annotations__.keys())
computed_fields = {"is_announcement_only", "is_default"}
computed_fields = {"is_announcement_only", "is_default", "stream_weekly_traffic"}
self.assertEqual(stream_dict_fields - computed_fields, expected_fields)
@ -262,6 +262,7 @@ class TestCreateStreams(ZulipTestCase):
# Send public stream creation event to all active users.
self.assertEqual(events[0]["users"], active_non_guest_user_ids(realm.id))
self.assertEqual(events[0]["event"]["streams"][0]["name"], "Public stream")
self.assertEqual(events[0]["event"]["streams"][0]["stream_weekly_traffic"], None)
with self.capture_send_event_calls(expected_num_events=1) as events:
ensure_stream(realm, "Private stream", invite_only=True, acting_user=None)
@ -273,6 +274,7 @@ class TestCreateStreams(ZulipTestCase):
self.assertTrue(self.example_user("iago").id in events[0]["users"])
self.assertTrue(self.example_user("desdemona").id in events[0]["users"])
self.assertEqual(events[0]["event"]["streams"][0]["name"], "Private stream")
self.assertEqual(events[0]["event"]["streams"][0]["stream_weekly_traffic"], None)
moderators_system_group = UserGroup.objects.get(
name="role:moderators", realm=realm, is_system_group=True
@ -4528,7 +4530,7 @@ class SubscriptionAPITest(ZulipTestCase):
streams_to_sub = ["multi_user_stream"]
flush_per_request_caches()
with self.capture_send_event_calls(expected_num_events=5) as events:
with self.assert_database_query_count(36):
with self.assert_database_query_count(37):
self.common_subscribe_to_streams(
self.test_user,
streams_to_sub,
@ -4552,7 +4554,7 @@ class SubscriptionAPITest(ZulipTestCase):
# Now add ourselves
with self.capture_send_event_calls(expected_num_events=2) as events:
with self.assert_database_query_count(12):
with self.assert_database_query_count(13):
self.common_subscribe_to_streams(
self.test_user,
streams_to_sub,
@ -4982,7 +4984,7 @@ class SubscriptionAPITest(ZulipTestCase):
test_user_ids = [user.id for user in test_users]
with self.assert_database_query_count(20):
with self.assert_database_query_count(21):
with cache_tries_captured() as cache_tries:
with mock.patch("zerver.views.streams.send_messages_for_new_subscribers"):
self.common_subscribe_to_streams(
@ -5456,7 +5458,7 @@ class SubscriptionAPITest(ZulipTestCase):
]
# Test creating a public stream when realm does not have a notification stream.
with self.assert_database_query_count(36):
with self.assert_database_query_count(37):
self.common_subscribe_to_streams(
self.test_user,
[new_streams[0]],
@ -5464,7 +5466,7 @@ class SubscriptionAPITest(ZulipTestCase):
)
# Test creating private stream.
with self.assert_database_query_count(35):
with self.assert_database_query_count(36):
self.common_subscribe_to_streams(
self.test_user,
[new_streams[1]],
@ -5476,7 +5478,7 @@ class SubscriptionAPITest(ZulipTestCase):
notifications_stream = get_stream(self.streams[0], self.test_realm)
self.test_realm.notifications_stream_id = notifications_stream.id
self.test_realm.save()
with self.assert_database_query_count(44):
with self.assert_database_query_count(45):
self.common_subscribe_to_streams(
self.test_user,
[new_streams[2]],
@ -5897,7 +5899,7 @@ class GetSubscribersTest(ZulipTestCase):
polonius.id,
]
with self.assert_database_query_count(47):
with self.assert_database_query_count(48):
self.common_subscribe_to_streams(
self.user_profile,
streams,

View File

@ -58,6 +58,7 @@ from zerver.lib.request import REQ, has_request_variables
from zerver.lib.response import json_partial_success, json_success
from zerver.lib.retention import STREAM_MESSAGE_BATCH_SIZE as RETENTION_STREAM_MESSAGE_BATCH_SIZE
from zerver.lib.retention import parse_message_retention_days
from zerver.lib.stream_traffic import get_streams_traffic
from zerver.lib.streams import (
StreamDict,
access_default_stream_group_by_id,
@ -874,7 +875,12 @@ def get_stream_backend(
stream_id: int,
) -> HttpResponse:
(stream, sub) = access_stream_by_id(user_profile, stream_id, allow_realm_admin=True)
return json_success(request, data={"stream": stream_to_dict(stream)})
recent_traffic = None
if not user_profile.realm.is_zephyr_mirror_realm:
# We do not need stream traffic data in zephyr mirroring realm.
recent_traffic = get_streams_traffic({stream.id})
return json_success(request, data={"stream": stream_to_dict(stream, recent_traffic)})
@has_request_variables