#!/usr/bin/env python3 """Create or update a message thread screenshot using a thread file.""" import argparse import os import subprocess import sys from datetime import datetime, timezone from typing import Dict, List, Optional from pydantic import BaseModel, ConfigDict SCREENSHOTS_DIR = os.path.abspath(os.path.dirname(__file__)) TOOLS_DIR = os.path.abspath(os.path.dirname(SCREENSHOTS_DIR)) ROOT_DIR = os.path.dirname(TOOLS_DIR) sys.path.insert(0, ROOT_DIR) # check for the venv from tools.lib import sanity_check sanity_check.check_venv(__file__) from scripts.lib.setup_path import setup_path setup_path() os.environ["DJANGO_SETTINGS_MODULE"] = "zproject.settings" import django django.setup() import json from tools.lib.test_script import prepare_puppeteer_run from zerver.actions.create_user import do_create_user from zerver.actions.message_send import do_send_messages, internal_prep_stream_message from zerver.actions.reactions import do_add_reaction from zerver.actions.streams import bulk_add_subscriptions, do_change_subscription_property from zerver.actions.user_settings import do_change_avatar_fields from zerver.lib.emoji import get_emoji_data from zerver.lib.message import SendMessageRequest, access_message from zerver.lib.stream_subscription import get_active_subscriptions_for_stream_id from zerver.lib.streams import access_stream_by_id, ensure_stream from zerver.lib.timestamp import datetime_to_timestamp from zerver.lib.upload import upload_avatar_image from zerver.lib.url_encoding import topic_narrow_url from zerver.models import Message, UserProfile from zerver.models.realms import get_realm from zerver.models.users import get_user_by_delivery_email realm = get_realm("zulip") DEFAULT_USER = get_user_by_delivery_email("iago@zulip.com", realm) message_thread_ids: List[int] = [] class MessageThread(BaseModel): model_config = ConfigDict(frozen=True) sender: str content: str reactions: Dict[str, List[str]] date: Dict[str, int] def create_user(full_name: str, avatar_filename: str) -> None: email = f'{full_name.replace(" ", "")}@zulip.com' try: user = UserProfile.objects.get(realm=realm, full_name=full_name) except UserProfile.DoesNotExist: user = do_create_user(email, "password", realm, full_name, acting_user=DEFAULT_USER) if avatar_filename != "": set_avatar(user, avatar_filename) def set_avatar(user: UserProfile, filename: str) -> None: with open(filename, "rb") as f: upload_avatar_image(f, user, user) do_change_avatar_fields(user, UserProfile.AVATAR_FROM_USER, acting_user=DEFAULT_USER) def create_and_subscribe_stream( stream_name: str, users: List[str], color: Optional[str] = None ) -> None: stream = ensure_stream(realm, stream_name, acting_user=DEFAULT_USER) bulk_add_subscriptions( realm, [stream], list(UserProfile.objects.filter(realm=realm, full_name__in=users)), acting_user=DEFAULT_USER, ) (stream, sub) = access_stream_by_id(DEFAULT_USER, stream.id) assert sub is not None if color is not None: do_change_subscription_property( DEFAULT_USER, sub, stream, "color", color, acting_user=DEFAULT_USER ) def send_stream_messages( stream_name: str, topic: str, staged_messages_data: List[MessageThread] ) -> List[int]: staged_messages = [dict(staged_message) for staged_message in staged_messages_data] stream = ensure_stream(realm, stream_name, acting_user=DEFAULT_USER) subscribers_query = get_active_subscriptions_for_stream_id( stream.id, include_deactivated_users=False ).values_list("user_profile", flat=True) subscribers: Dict[str, UserProfile] = {} for subscriber_id in subscribers_query: subscriber = UserProfile.objects.get(realm=realm, id=subscriber_id) subscribers[subscriber.full_name] = subscriber messages: List[Optional[SendMessageRequest]] = [] for message in staged_messages: date_sent = message["date"] message_request = internal_prep_stream_message( subscribers[message["sender"]], stream, topic, message["content"], forged=True, forged_timestamp=datetime_to_timestamp( datetime( date_sent["year"], date_sent["month"], date_sent["day"], date_sent["hour"], date_sent["minute"], tzinfo=timezone.utc, ) ), ) messages.append(message_request) message_ids = [ sent_message_result.message_id for sent_message_result in do_send_messages(messages) ] global message_thread_ids message_thread_ids += message_ids for message, message_id in zip(staged_messages, message_ids): if message.get("reactions") is not None: reactions = message["reactions"] for reaction, user_names in reactions.items(): users = [subscribers[user_name] for user_name in user_names] add_message_reactions(message_id, reaction, users) return message_ids def add_message_reactions(message_id: int, emoji: str, users: List[UserProfile]) -> None: preview_message = access_message(user_profile=DEFAULT_USER, message_id=message_id) emoji_data = get_emoji_data(realm.id, emoji) for user in users: do_add_reaction( user, preview_message, emoji, emoji_data.emoji_code, emoji_data.reaction_type ) def capture_streams_narrow_screenshot( image_path: str, stream_name: str, topic: str, unread_msg_id: int ) -> None: stream = ensure_stream(realm, stream_name, acting_user=DEFAULT_USER) narrow_uri = topic_narrow_url(realm=realm, stream=stream, topic_name=topic) narrow = f"{stream_name}/{topic}" screenshot_script = os.path.join(SCREENSHOTS_DIR, "thread-screenshot.js") subprocess.check_call( ["node", screenshot_script, narrow_uri, narrow, str(unread_msg_id), image_path, realm.url] ) parser = argparse.ArgumentParser() group = parser.add_mutually_exclusive_group(required=True) group.add_argument( "--thread", nargs="+", type=str, help="Path of the file where the thread for screenshot is present", ) fixture_group = parser.add_argument_group("thread") options = parser.parse_args() prepare_puppeteer_run() try: realm = get_realm("zulip") with open(options.thread[0]) as f: threads = json.load(f) for thread in threads: for user, avatar in thread["users"].items(): create_user(user, avatar) if thread["recipient_type"] == "channel": users = list(thread["users"].keys()) users.append("Iago") create_and_subscribe_stream(thread["channel"], users, thread["color"]) message_ids = send_stream_messages( thread["channel"], thread["topic"], thread["messages"] ) capture_streams_narrow_screenshot( thread["screenshot"], thread["channel"], thread["topic"], min(message_ids) ) finally: Message.objects.filter(id__in=message_thread_ids).delete()