2013-10-21 23:25:53 +02:00
|
|
|
import datetime
|
2017-12-13 01:45:57 +01:00
|
|
|
import logging
|
2020-06-11 00:54:34 +02:00
|
|
|
from collections import defaultdict
|
2020-11-03 17:13:22 +01:00
|
|
|
from dataclasses import dataclass
|
2020-11-03 20:48:57 +01:00
|
|
|
from typing import Any, Dict, List, Set, Tuple
|
2013-10-21 23:25:53 +02:00
|
|
|
|
2013-11-16 00:54:12 +01:00
|
|
|
from django.conf import settings
|
2017-08-27 16:30:48 +02:00
|
|
|
from django.utils.timezone import now as timezone_now
|
2013-10-21 23:25:53 +02:00
|
|
|
|
2018-11-08 22:40:27 +01:00
|
|
|
from confirmation.models import one_click_unsubscribe_link
|
2016-11-08 10:07:47 +01:00
|
|
|
from zerver.context_processors import common_context
|
2020-06-11 00:54:34 +02:00
|
|
|
from zerver.lib.email_notifications import build_message_list
|
2017-12-13 01:45:57 +01:00
|
|
|
from zerver.lib.logging_util import log_to_file
|
2020-06-11 00:54:34 +02:00
|
|
|
from zerver.lib.queue import queue_json_publish
|
|
|
|
from zerver.lib.send_email import FromAddress, send_future_email
|
|
|
|
from zerver.lib.url_encoding import encode_stream
|
|
|
|
from zerver.models import (
|
|
|
|
Message,
|
|
|
|
Realm,
|
|
|
|
RealmAuditLog,
|
|
|
|
Recipient,
|
|
|
|
Subscription,
|
|
|
|
UserActivity,
|
|
|
|
UserProfile,
|
|
|
|
get_active_streams,
|
|
|
|
get_user_profile_by_id,
|
|
|
|
)
|
2013-10-21 23:25:53 +02:00
|
|
|
|
2017-12-13 01:45:57 +01:00
|
|
|
logger = logging.getLogger(__name__)
|
|
|
|
log_to_file(logger, settings.DIGEST_LOG_PATH)
|
2014-01-08 17:33:12 +01:00
|
|
|
|
2017-08-27 16:30:48 +02:00
|
|
|
DIGEST_CUTOFF = 5
|
|
|
|
|
2020-11-03 14:57:11 +01:00
|
|
|
TopicKey = Tuple[int, str]
|
|
|
|
|
2020-11-03 17:13:22 +01:00
|
|
|
@dataclass
|
|
|
|
class TopicActivity:
|
|
|
|
topics_by_length: List[TopicKey]
|
|
|
|
topics_by_diversity: List[TopicKey]
|
|
|
|
topic_senders: Dict[TopicKey, Set[str]] # full_name
|
|
|
|
topic_length: Dict[TopicKey, int]
|
|
|
|
topic_messages: Dict[TopicKey, List[Message]]
|
|
|
|
|
2019-05-02 05:54:48 +02:00
|
|
|
# Digests accumulate 2 types of interesting traffic for a user:
|
|
|
|
# 1. New streams
|
|
|
|
# 2. Interesting stream traffic, as determined by the longest and most
|
2013-10-21 23:25:53 +02:00
|
|
|
# diversely comment upon topics.
|
|
|
|
|
2017-11-05 11:15:10 +01:00
|
|
|
def inactive_since(user_profile: UserProfile, cutoff: datetime.datetime) -> bool:
|
2017-08-25 21:44:28 +02:00
|
|
|
# Hasn't used the app in the last DIGEST_CUTOFF (5) days.
|
|
|
|
most_recent_visit = [row.last_visit for row in
|
|
|
|
UserActivity.objects.filter(
|
|
|
|
user_profile=user_profile)]
|
|
|
|
|
|
|
|
if not most_recent_visit:
|
|
|
|
# This person has never used the app.
|
|
|
|
return True
|
|
|
|
|
|
|
|
last_visit = max(most_recent_visit)
|
|
|
|
return last_visit < cutoff
|
|
|
|
|
2017-11-05 11:15:10 +01:00
|
|
|
def should_process_digest(realm_str: str) -> bool:
|
2017-08-25 21:48:19 +02:00
|
|
|
if realm_str in settings.SYSTEM_ONLY_REALMS:
|
|
|
|
# Don't try to send emails to system-only realms
|
|
|
|
return False
|
|
|
|
return True
|
|
|
|
|
2017-08-25 21:52:47 +02:00
|
|
|
# Changes to this should also be reflected in
|
|
|
|
# zerver/worker/queue_processors.py:DigestWorker.consume()
|
2017-11-05 11:15:10 +01:00
|
|
|
def queue_digest_recipient(user_profile: UserProfile, cutoff: datetime.datetime) -> None:
|
2017-08-25 21:52:47 +02:00
|
|
|
# Convert cutoff to epoch seconds for transit.
|
|
|
|
event = {"user_profile_id": user_profile.id,
|
|
|
|
"cutoff": cutoff.strftime('%s')}
|
2017-11-24 13:18:46 +01:00
|
|
|
queue_json_publish("digest_emails", event)
|
2017-08-25 21:52:47 +02:00
|
|
|
|
2017-11-05 11:15:10 +01:00
|
|
|
def enqueue_emails(cutoff: datetime.datetime) -> None:
|
2018-03-09 00:16:44 +01:00
|
|
|
if not settings.SEND_DIGEST_EMAILS:
|
|
|
|
return
|
|
|
|
|
2019-03-28 04:47:03 +01:00
|
|
|
weekday = timezone_now().weekday()
|
|
|
|
for realm in Realm.objects.filter(deactivated=False, digest_emails_enabled=True, digest_weekday=weekday):
|
2017-08-27 16:30:48 +02:00
|
|
|
if not should_process_digest(realm.string_id):
|
|
|
|
continue
|
|
|
|
|
|
|
|
user_profiles = UserProfile.objects.filter(
|
|
|
|
realm=realm, is_active=True, is_bot=False, enable_digest_emails=True)
|
|
|
|
|
|
|
|
for user_profile in user_profiles:
|
|
|
|
if inactive_since(user_profile, cutoff):
|
|
|
|
queue_digest_recipient(user_profile, cutoff)
|
2020-05-02 08:44:14 +02:00
|
|
|
logger.info(
|
|
|
|
"User %s is inactive, queuing for potential digest",
|
|
|
|
user_profile.id,
|
|
|
|
)
|
2017-08-27 16:30:48 +02:00
|
|
|
|
2020-11-03 17:13:22 +01:00
|
|
|
def get_recent_topic_activity(
|
2020-11-03 13:51:59 +01:00
|
|
|
stream_ids: List[int],
|
|
|
|
cutoff_date: datetime.datetime,
|
2020-11-03 17:13:22 +01:00
|
|
|
) -> TopicActivity:
|
|
|
|
# Gather information about topic conversations, then
|
|
|
|
# classify by:
|
|
|
|
# * topic length
|
|
|
|
# * number of senders
|
2013-10-21 23:25:53 +02:00
|
|
|
|
2020-11-03 13:51:59 +01:00
|
|
|
messages = Message.objects.filter(
|
|
|
|
recipient__type=Recipient.STREAM,
|
|
|
|
recipient__type_id__in=stream_ids,
|
|
|
|
date_sent__gt=cutoff_date).select_related('recipient', 'sender', 'sending_client')
|
|
|
|
|
2020-11-03 16:54:45 +01:00
|
|
|
topic_length: Dict[TopicKey, int] = defaultdict(int)
|
|
|
|
topic_messages: Dict[TopicKey, List[Message]] = defaultdict(list)
|
|
|
|
topic_senders: Dict[TopicKey, Set[str]] = defaultdict(set)
|
2018-11-11 19:06:59 +01:00
|
|
|
for message in messages:
|
|
|
|
key = (message.recipient.type_id,
|
|
|
|
message.topic_name())
|
|
|
|
|
2020-11-03 16:54:45 +01:00
|
|
|
topic_messages[key].append(message)
|
2018-11-11 18:46:21 +01:00
|
|
|
|
2018-11-11 19:06:59 +01:00
|
|
|
if not message.sent_by_human():
|
2013-12-31 22:45:21 +01:00
|
|
|
# Don't include automated messages in the count.
|
|
|
|
continue
|
|
|
|
|
2020-11-03 16:54:45 +01:00
|
|
|
topic_senders[key].add(message.sender.full_name)
|
|
|
|
topic_length[key] += 1
|
2013-10-21 23:25:53 +02:00
|
|
|
|
2020-11-03 16:54:45 +01:00
|
|
|
topics_by_diversity = list(topic_senders)
|
|
|
|
topics_by_diversity.sort(key=lambda key: topic_senders[key], reverse=True)
|
2013-10-21 23:25:53 +02:00
|
|
|
|
2020-11-03 16:54:45 +01:00
|
|
|
topics_by_length = list(topic_length)
|
|
|
|
topics_by_diversity.sort(key=lambda key: topic_length[key], reverse=True)
|
2013-10-21 23:25:53 +02:00
|
|
|
|
2020-11-03 17:13:22 +01:00
|
|
|
return TopicActivity(
|
|
|
|
topics_by_diversity=topics_by_diversity,
|
|
|
|
topics_by_length=topics_by_length,
|
|
|
|
topic_senders=topic_senders,
|
|
|
|
topic_length=topic_length,
|
|
|
|
topic_messages=topic_messages,
|
|
|
|
)
|
|
|
|
|
2020-11-03 18:03:42 +01:00
|
|
|
def get_hot_topics(
|
2020-11-03 17:13:22 +01:00
|
|
|
topic_activity: TopicActivity,
|
2020-11-03 18:03:42 +01:00
|
|
|
) -> List[TopicKey]:
|
|
|
|
# Get out top 4 hottest topics
|
2020-11-03 17:13:22 +01:00
|
|
|
|
|
|
|
topics_by_diversity = topic_activity.topics_by_diversity
|
|
|
|
topics_by_length = topic_activity.topics_by_length
|
|
|
|
|
2020-11-06 12:44:48 +01:00
|
|
|
assert set(topics_by_diversity) == set(topics_by_length)
|
|
|
|
|
|
|
|
# Start with the two most diverse topics.
|
2020-11-03 16:54:45 +01:00
|
|
|
hot_topics = topics_by_diversity[:2]
|
2020-11-06 12:44:48 +01:00
|
|
|
|
|
|
|
# Pad out our list up to 4 items, using the topics' length (aka message
|
|
|
|
# count) as the secondary filter.
|
2020-11-03 15:20:45 +01:00
|
|
|
for topic_key in topics_by_length:
|
2020-11-03 16:54:45 +01:00
|
|
|
if topic_key not in hot_topics:
|
|
|
|
hot_topics.append(topic_key)
|
|
|
|
if len(hot_topics) >= 4:
|
2013-10-21 23:25:53 +02:00
|
|
|
break
|
|
|
|
|
2020-11-03 18:03:42 +01:00
|
|
|
return hot_topics
|
|
|
|
|
|
|
|
def gather_hot_topics(
|
|
|
|
user_profile: UserProfile,
|
|
|
|
hot_topics: List[TopicKey],
|
|
|
|
topic_activity: TopicActivity,
|
|
|
|
) -> List[Dict[str, Any]]:
|
|
|
|
# Returns a list of dictionaries containing the templating
|
|
|
|
# information for each hot topic.
|
|
|
|
|
|
|
|
topic_senders = topic_activity.topic_senders
|
|
|
|
topic_length = topic_activity.topic_length
|
|
|
|
topic_messages = topic_activity.topic_messages
|
|
|
|
|
2020-11-03 16:54:45 +01:00
|
|
|
hot_topic_render_payloads = []
|
|
|
|
for h in hot_topics:
|
|
|
|
users = list(topic_senders[h])
|
|
|
|
count = topic_length[h]
|
|
|
|
messages = topic_messages[h]
|
2013-10-21 23:25:53 +02:00
|
|
|
|
2020-11-03 16:54:45 +01:00
|
|
|
# We'll display up to 2 messages from the topic.
|
2018-11-11 18:46:21 +01:00
|
|
|
first_few_messages = messages[:2]
|
2013-10-21 23:25:53 +02:00
|
|
|
|
2013-10-25 18:53:35 +02:00
|
|
|
teaser_data = {"participants": users,
|
2013-10-21 23:25:53 +02:00
|
|
|
"count": count - len(first_few_messages),
|
2016-12-02 08:15:16 +01:00
|
|
|
"first_few_messages": build_message_list(
|
2017-01-24 06:02:39 +01:00
|
|
|
user_profile, first_few_messages)}
|
2013-10-21 23:25:53 +02:00
|
|
|
|
2020-11-03 16:54:45 +01:00
|
|
|
hot_topic_render_payloads.append(teaser_data)
|
|
|
|
return hot_topic_render_payloads
|
2013-10-21 23:25:53 +02:00
|
|
|
|
2017-11-05 11:15:10 +01:00
|
|
|
def gather_new_streams(user_profile: UserProfile,
|
2018-05-10 19:13:36 +02:00
|
|
|
threshold: datetime.datetime) -> Tuple[int, Dict[str, List[str]]]:
|
2020-07-24 19:41:27 +02:00
|
|
|
if user_profile.is_guest:
|
|
|
|
new_streams = list(get_active_streams(user_profile.realm).filter(
|
|
|
|
is_web_public=True, date_created__gt=threshold))
|
|
|
|
|
|
|
|
elif user_profile.can_access_public_streams():
|
2014-01-24 23:30:53 +01:00
|
|
|
new_streams = list(get_active_streams(user_profile.realm).filter(
|
2017-01-24 07:06:13 +01:00
|
|
|
invite_only=False, date_created__gt=threshold))
|
2013-10-21 23:25:53 +02:00
|
|
|
|
2020-06-10 06:41:04 +02:00
|
|
|
base_url = f"{user_profile.realm.uri}/#narrow/stream/"
|
2013-10-25 18:53:35 +02:00
|
|
|
|
|
|
|
streams_html = []
|
|
|
|
streams_plain = []
|
|
|
|
|
2013-10-21 23:25:53 +02:00
|
|
|
for stream in new_streams:
|
2018-02-15 21:02:47 +01:00
|
|
|
narrow_url = base_url + encode_stream(stream.id, stream.name)
|
2020-06-10 06:41:04 +02:00
|
|
|
stream_link = f"<a href='{narrow_url}'>{stream.name}</a>"
|
2013-10-25 18:53:35 +02:00
|
|
|
streams_html.append(stream_link)
|
|
|
|
streams_plain.append(stream.name)
|
2013-10-21 23:25:53 +02:00
|
|
|
|
|
|
|
return len(new_streams), {"html": streams_html, "plain": streams_plain}
|
|
|
|
|
2019-05-02 05:54:48 +02:00
|
|
|
def enough_traffic(hot_conversations: str, new_streams: int) -> bool:
|
|
|
|
return bool(hot_conversations or new_streams)
|
2013-10-21 23:25:53 +02:00
|
|
|
|
2020-11-04 23:40:29 +01:00
|
|
|
def bulk_get_digest_context(users: List[UserProfile], cutoff: float) -> Dict[int, Dict[str, Any]]:
|
2013-10-21 23:25:53 +02:00
|
|
|
# Convert from epoch seconds to a datetime object.
|
2020-06-05 06:55:20 +02:00
|
|
|
cutoff_date = datetime.datetime.fromtimestamp(int(cutoff), tz=datetime.timezone.utc)
|
2013-10-21 23:25:53 +02:00
|
|
|
|
2020-11-04 23:40:29 +01:00
|
|
|
result: Dict[int, Dict[str, Any]] = {}
|
2016-11-08 10:07:47 +01:00
|
|
|
|
2020-11-05 11:48:48 +01:00
|
|
|
user_ids = [user.id for user in users]
|
|
|
|
|
2020-11-05 14:55:45 +01:00
|
|
|
def get_stream_map(user_ids: List[int]) -> Dict[int, Set[int]]:
|
2020-11-05 11:48:48 +01:00
|
|
|
rows = Subscription.objects.filter(
|
|
|
|
user_profile_id__in=user_ids,
|
|
|
|
recipient__type=Recipient.STREAM,
|
|
|
|
active=True,
|
|
|
|
is_muted=False,
|
|
|
|
).values('user_profile_id', 'recipient__type_id')
|
|
|
|
|
2020-11-05 14:55:45 +01:00
|
|
|
# maps user_id -> {stream_id, stream_id, ...}
|
|
|
|
dct: Dict[int, Set[int]] = defaultdict(set)
|
2020-11-05 11:48:48 +01:00
|
|
|
for row in rows:
|
2020-11-05 14:55:45 +01:00
|
|
|
dct[row['user_profile_id']].add(row['recipient__type_id'])
|
2020-11-05 11:48:48 +01:00
|
|
|
|
|
|
|
return dct
|
|
|
|
|
|
|
|
stream_map = get_stream_map(user_ids)
|
|
|
|
|
2020-11-04 23:40:29 +01:00
|
|
|
for user in users:
|
|
|
|
context = common_context(user)
|
2013-10-21 23:25:53 +02:00
|
|
|
|
2020-11-04 23:40:29 +01:00
|
|
|
# Start building email template data.
|
|
|
|
unsubscribe_link = one_click_unsubscribe_link(user, "digest")
|
|
|
|
context.update(unsubscribe_link=unsubscribe_link)
|
2013-10-21 23:25:53 +02:00
|
|
|
|
2020-11-05 14:55:45 +01:00
|
|
|
stream_ids = stream_map[user.id]
|
2013-10-21 23:25:53 +02:00
|
|
|
|
2020-11-05 14:55:45 +01:00
|
|
|
if user.long_term_idle:
|
|
|
|
stream_ids -= streams_recently_modified_for_user(user, cutoff_date)
|
2020-11-03 17:13:22 +01:00
|
|
|
|
2020-11-05 14:55:45 +01:00
|
|
|
topic_activity = get_recent_topic_activity(sorted(list(stream_ids)), cutoff_date)
|
2020-11-04 23:40:29 +01:00
|
|
|
hot_topics = get_hot_topics(topic_activity)
|
2013-10-21 23:25:53 +02:00
|
|
|
|
2020-11-04 23:40:29 +01:00
|
|
|
# Gather hot conversations.
|
|
|
|
context["hot_conversations"] = gather_hot_topics(user, hot_topics, topic_activity)
|
2013-10-21 23:25:53 +02:00
|
|
|
|
2020-11-04 23:40:29 +01:00
|
|
|
# Gather new streams.
|
|
|
|
new_streams_count, new_streams = gather_new_streams(user, cutoff_date)
|
|
|
|
context["new_streams"] = new_streams
|
|
|
|
context["new_streams_count"] = new_streams_count
|
2019-08-17 11:04:48 +02:00
|
|
|
|
2020-11-04 23:40:29 +01:00
|
|
|
result[user.id] = context
|
|
|
|
|
|
|
|
return result
|
2020-11-03 20:48:57 +01:00
|
|
|
|
2020-11-04 23:40:29 +01:00
|
|
|
def get_digest_context(user: UserProfile, cutoff: float) -> Dict[str, Any]:
|
|
|
|
return bulk_get_digest_context([user], cutoff)[user.id]
|
|
|
|
|
|
|
|
def bulk_handle_digest_email(user_ids: List[int], cutoff: float) -> None:
|
|
|
|
users = [get_user_profile_by_id(user_id) for user_id in user_ids]
|
|
|
|
context_map = bulk_get_digest_context(users, cutoff)
|
|
|
|
|
|
|
|
for user in users:
|
|
|
|
context = context_map[user.id]
|
|
|
|
|
|
|
|
# We don't want to send emails containing almost no information.
|
|
|
|
if enough_traffic(context["hot_conversations"], context["new_streams_count"]):
|
|
|
|
logger.info("Sending digest email for user %s", user.id)
|
|
|
|
# Send now, as a ScheduledEmail
|
|
|
|
send_future_email(
|
|
|
|
'zerver/emails/digest',
|
|
|
|
user.realm,
|
|
|
|
to_user_ids=[user.id],
|
|
|
|
from_name="Zulip Digest",
|
|
|
|
from_address=FromAddress.no_reply_placeholder,
|
|
|
|
context=context,
|
|
|
|
)
|
|
|
|
|
|
|
|
def handle_digest_email(user_id: int, cutoff: float) -> None:
|
|
|
|
bulk_handle_digest_email([user_id], cutoff)
|
2019-03-03 07:14:58 +01:00
|
|
|
|
2020-11-05 14:55:45 +01:00
|
|
|
def streams_recently_modified_for_user(user: UserProfile, cutoff_date: datetime.datetime) -> Set[int]:
|
2019-03-03 07:14:58 +01:00
|
|
|
events = [
|
|
|
|
RealmAuditLog.SUBSCRIPTION_CREATED,
|
|
|
|
RealmAuditLog.SUBSCRIPTION_ACTIVATED,
|
python: Use trailing commas consistently.
Automatically generated by the following script, based on the output
of lint with flake8-comma:
import re
import sys
last_filename = None
last_row = None
lines = []
for msg in sys.stdin:
m = re.match(
r"\x1b\[35mflake8 \|\x1b\[0m \x1b\[1;31m(.+):(\d+):(\d+): (\w+)", msg
)
if m:
filename, row_str, col_str, err = m.groups()
row, col = int(row_str), int(col_str)
if filename == last_filename:
assert last_row != row
else:
if last_filename is not None:
with open(last_filename, "w") as f:
f.writelines(lines)
with open(filename) as f:
lines = f.readlines()
last_filename = filename
last_row = row
line = lines[row - 1]
if err in ["C812", "C815"]:
lines[row - 1] = line[: col - 1] + "," + line[col - 1 :]
elif err in ["C819"]:
assert line[col - 2] == ","
lines[row - 1] = line[: col - 2] + line[col - 1 :].lstrip(" ")
if last_filename is not None:
with open(last_filename, "w") as f:
f.writelines(lines)
Signed-off-by: Anders Kaseorg <anders@zulipchat.com>
2020-04-10 05:23:40 +02:00
|
|
|
RealmAuditLog.SUBSCRIPTION_DEACTIVATED,
|
2019-03-03 07:14:58 +01:00
|
|
|
]
|
|
|
|
|
|
|
|
# Streams where the user's subscription was changed
|
|
|
|
modified_streams = RealmAuditLog.objects.filter(
|
2020-11-05 14:55:45 +01:00
|
|
|
realm=user.realm,
|
|
|
|
modified_user=user,
|
2019-03-03 07:14:58 +01:00
|
|
|
event_time__gt=cutoff_date,
|
|
|
|
event_type__in=events).values_list('modified_stream_id', flat=True)
|
|
|
|
|
2020-11-05 14:55:45 +01:00
|
|
|
return set(modified_streams)
|