zulip/tools/screenshots/generate-user-messages-scre...

213 lines
7.1 KiB
Python
Executable File

#!/usr/bin/env python3
"""Create or update a message thread screenshot using a thread file."""
import argparse
import os
import subprocess
import sys
from datetime import datetime, timezone
from typing import Dict, List, Optional
from pydantic import BaseModel, ConfigDict
SCREENSHOTS_DIR = os.path.abspath(os.path.dirname(__file__))
TOOLS_DIR = os.path.abspath(os.path.dirname(SCREENSHOTS_DIR))
ROOT_DIR = os.path.dirname(TOOLS_DIR)
sys.path.insert(0, ROOT_DIR)
# check for the venv
from tools.lib import sanity_check
sanity_check.check_venv(__file__)
from scripts.lib.setup_path import setup_path
setup_path()
os.environ["DJANGO_SETTINGS_MODULE"] = "zproject.settings"
import django
django.setup()
import json
from tools.lib.test_script import prepare_puppeteer_run
from zerver.actions.create_user import do_create_user
from zerver.actions.message_send import do_send_messages, internal_prep_stream_message
from zerver.actions.reactions import do_add_reaction
from zerver.actions.streams import bulk_add_subscriptions, do_change_subscription_property
from zerver.actions.user_settings import do_change_avatar_fields
from zerver.lib.emoji import get_emoji_data
from zerver.lib.message import SendMessageRequest, access_message
from zerver.lib.stream_subscription import get_active_subscriptions_for_stream_id
from zerver.lib.streams import access_stream_by_id, ensure_stream
from zerver.lib.timestamp import datetime_to_timestamp
from zerver.lib.upload import upload_avatar_image
from zerver.lib.url_encoding import topic_narrow_url
from zerver.models import Message, UserProfile
from zerver.models.realms import get_realm
from zerver.models.users import get_user_by_delivery_email
realm = get_realm("zulip")
DEFAULT_USER = get_user_by_delivery_email("iago@zulip.com", realm)
message_thread_ids: List[int] = []
class MessageThread(BaseModel):
model_config = ConfigDict(frozen=True)
sender: str
content: str
reactions: Dict[str, List[str]]
date: Dict[str, int]
def create_user(full_name: str, avatar_filename: str) -> None:
email = f'{full_name.replace(" ", "")}@zulip.com'
try:
user = UserProfile.objects.get(realm=realm, full_name=full_name)
except UserProfile.DoesNotExist:
user = do_create_user(email, "password", realm, full_name, acting_user=DEFAULT_USER)
if avatar_filename != "":
set_avatar(user, avatar_filename)
def set_avatar(user: UserProfile, filename: str) -> None:
with open(filename, "rb") as f:
upload_avatar_image(f, user)
do_change_avatar_fields(user, UserProfile.AVATAR_FROM_USER, acting_user=DEFAULT_USER)
def create_and_subscribe_stream(
stream_name: str, users: List[str], color: Optional[str] = None
) -> None:
stream = ensure_stream(realm, stream_name, acting_user=DEFAULT_USER)
bulk_add_subscriptions(
realm,
[stream],
list(UserProfile.objects.filter(realm=realm, full_name__in=users)),
acting_user=DEFAULT_USER,
)
(stream, sub) = access_stream_by_id(DEFAULT_USER, stream.id)
assert sub is not None
if color is not None:
do_change_subscription_property(
DEFAULT_USER, sub, stream, "color", color, acting_user=DEFAULT_USER
)
def send_stream_messages(
stream_name: str, topic: str, staged_messages_data: List[MessageThread]
) -> List[int]:
staged_messages = [dict(staged_message) for staged_message in staged_messages_data]
stream = ensure_stream(realm, stream_name, acting_user=DEFAULT_USER)
subscribers_query = get_active_subscriptions_for_stream_id(
stream.id, include_deactivated_users=False
).values_list("user_profile", flat=True)
subscribers: Dict[str, UserProfile] = {}
for subscriber_id in subscribers_query:
subscriber = UserProfile.objects.get(realm=realm, id=subscriber_id)
subscribers[subscriber.full_name] = subscriber
messages: List[Optional[SendMessageRequest]] = []
for message in staged_messages:
date_sent = message["date"]
message_request = internal_prep_stream_message(
subscribers[message["sender"]],
stream,
topic,
message["content"],
forged=True,
forged_timestamp=datetime_to_timestamp(
datetime(
date_sent["year"],
date_sent["month"],
date_sent["day"],
date_sent["hour"],
date_sent["minute"],
tzinfo=timezone.utc,
)
),
)
messages.append(message_request)
message_ids = [
sent_message_result.message_id for sent_message_result in do_send_messages(messages)
]
global message_thread_ids
message_thread_ids += message_ids
for message, message_id in zip(staged_messages, message_ids):
if message.get("reactions") is not None:
reactions = message["reactions"]
for reaction, user_names in reactions.items():
users = [subscribers[user_name] for user_name in user_names]
add_message_reactions(message_id, reaction, users)
return message_ids
def add_message_reactions(message_id: int, emoji: str, users: List[UserProfile]) -> None:
preview_message = access_message(user_profile=DEFAULT_USER, message_id=message_id)
emoji_data = get_emoji_data(realm.id, emoji)
for user in users:
do_add_reaction(
user, preview_message, emoji, emoji_data.emoji_code, emoji_data.reaction_type
)
def capture_streams_narrow_screenshot(
image_path: str, stream_name: str, topic: str, unread_msg_id: int
) -> None:
stream = ensure_stream(realm, stream_name, acting_user=DEFAULT_USER)
narrow_uri = topic_narrow_url(realm=realm, stream=stream, topic_name=topic)
narrow = f"{stream_name}/{topic}"
screenshot_script = os.path.join(SCREENSHOTS_DIR, "thread-screenshot.js")
subprocess.check_call(
["node", screenshot_script, narrow_uri, narrow, str(unread_msg_id), image_path, realm.url]
)
parser = argparse.ArgumentParser()
group = parser.add_mutually_exclusive_group(required=True)
group.add_argument(
"--thread",
nargs="+",
type=str,
help="Path of the file where the thread for screenshot is present",
)
fixture_group = parser.add_argument_group("thread")
options = parser.parse_args()
prepare_puppeteer_run()
try:
realm = get_realm("zulip")
with open(options.thread[0]) as f:
threads = json.load(f)
for thread in threads:
for user, avatar in thread["users"].items():
create_user(user, avatar)
if thread["recipient_type"] == "channel":
users = list(thread["users"].keys())
users.append("Iago")
create_and_subscribe_stream(thread["channel"], users, thread["color"])
message_ids = send_stream_messages(
thread["channel"], thread["topic"], thread["messages"]
)
capture_streams_narrow_screenshot(
thread["screenshot"], thread["channel"], thread["topic"], min(message_ids)
)
finally:
Message.objects.filter(id__in=message_thread_ids).delete()