#!/usr/bin/env python3 """Create or update a webhook integration screenshot using a test fixture.""" # check for the venv from lib import sanity_check sanity_check.check_venv(__file__) import os import sys TOOLS_DIR = os.path.abspath(os.path.dirname(__file__)) ROOT_DIR = os.path.dirname(TOOLS_DIR) sys.path.insert(0, ROOT_DIR) from scripts.lib.setup_path import setup_path setup_path() os.environ["DJANGO_SETTINGS_MODULE"] = "zproject.settings" import django django.setup() import argparse import base64 import subprocess from typing import Any, Dict, Optional from urllib.parse import parse_qsl, urlencode import orjson import requests from scripts.lib.zulip_tools import BOLDRED, ENDC from tools.lib.test_script import prepare_puppeteer_run from zerver.lib.actions import ( bulk_add_subscriptions, do_change_avatar_fields, do_create_user, notify_created_bot, ) from zerver.lib.integrations import ( DOC_SCREENSHOT_CONFIG, INTEGRATIONS, ScreenshotConfig, WebhookIntegration, get_fixture_and_image_paths, split_fixture_path, ) from zerver.lib.storage import static_path from zerver.lib.streams import create_stream_if_needed from zerver.lib.upload import upload_avatar_image from zerver.lib.webhooks.common import get_fixture_http_headers from zerver.models import Message, UserProfile, get_realm, get_user_by_delivery_email def create_integration_bot( integration: WebhookIntegration, bot_name: Optional[str] = None ) -> UserProfile: realm = get_realm("zulip") owner = get_user_by_delivery_email("iago@zulip.com", realm) bot_email = f"{integration.name}-bot@example.com" if bot_name is None: bot_name = f"{integration.name.capitalize()} Bot" try: bot = UserProfile.objects.get(email=bot_email) except UserProfile.DoesNotExist: bot = do_create_user( email=bot_email, password="123", realm=owner.realm, full_name=bot_name, bot_type=UserProfile.INCOMING_WEBHOOK_BOT, bot_owner=owner, acting_user=owner, ) notify_created_bot(bot) bot_avatar_path = integration.get_bot_avatar_path() if bot_avatar_path is not None: bot_avatar_path = static_path(bot_avatar_path) if os.path.isfile(bot_avatar_path): with open(bot_avatar_path, "rb") as f: upload_avatar_image(f, owner, bot) do_change_avatar_fields(bot, UserProfile.AVATAR_FROM_USER, acting_user=owner) return bot def create_integration_stream(integration: WebhookIntegration, bot: UserProfile) -> None: assert isinstance(bot.bot_owner, UserProfile) realm = bot.bot_owner.realm stream, created = create_stream_if_needed(realm, integration.stream_name) bulk_add_subscriptions(realm, [stream], [bot, bot.bot_owner], acting_user=bot) def get_integration(integration_name: str) -> WebhookIntegration: integration = INTEGRATIONS[integration_name] assert isinstance(integration, WebhookIntegration), "Not a WebhookIntegration" return integration def get_requests_headers(integration_name: str, fixture_name: str) -> Dict[str, Any]: headers = get_fixture_http_headers(integration_name, fixture_name) def fix_name(header: str) -> str: header = header if not header.startswith("HTTP_") else header[len("HTTP_") :] return header.replace("_", "-") return {fix_name(k): v for k, v in headers.items()} def custom_headers(headers_json: str) -> Dict[str, str]: if not headers_json: return {} try: return orjson.loads(headers_json) except orjson.JSONDecodeError as ve: raise argparse.ArgumentTypeError( "Encountered an error while attempting to parse custom headers: {}\n" "Note: all strings must be enclosed within \"\" instead of ''".format(ve) ) def send_bot_payload_message( bot: UserProfile, integration: WebhookIntegration, fixture_path: str, config: ScreenshotConfig ) -> bool: # Delete all messages, so new message is the only one it's message group Message.objects.filter(sender=bot).delete() json_fixture = fixture_path.endswith(".json") _, fixture_name = split_fixture_path(fixture_path) if fixture_name: if json_fixture: with open(fixture_path, "rb") as fb: data = orjson.loads(fb.read()) else: with open(fixture_path) as f: data = f.read().strip() else: data = "" headers = get_requests_headers(integration.name, fixture_name) headers.update(config.custom_headers) if config.use_basic_auth: credentials = base64.b64encode(f"{bot.email}:{bot.api_key}".encode("utf8")).decode("utf8") auth = f"basic {credentials}" headers.update(dict(Authorization=auth)) assert isinstance(bot.bot_owner, UserProfile) stream = integration.stream_name or "devel" url = f"{bot.bot_owner.realm.uri}/{integration.url}" params = {"api_key": bot.api_key, "stream": stream} params.update(config.extra_params) extra_args = {} if not json_fixture and data: # We overwrite any params in fixture with our params. stream name, for # example, may be defined in the fixture. parsed_params = dict(parse_qsl(data)) parsed_params.update(params) params = parsed_params elif config.payload_as_query_param: params[config.payload_param_name] = orjson.dumps(data).decode() else: extra_args = {"json": data} url = f"{url}?{urlencode(params)}" try: response = requests.post(url=url, headers=headers, **extra_args) except requests.exceptions.ConnectionError: print( "This tool needs the local dev server to be running. " "Please start it using tools/run-dev.py before running this tool." ) sys.exit(1) if response.status_code != 200: print(response.json()) print("Failed to trigger webhook") return False else: print(f"Triggered {integration.name} webhook") return True def capture_last_message_screenshot(bot: UserProfile, image_path: str) -> None: message = Message.objects.filter(sender=bot).last() if message is None: print(f"No message found for {bot.full_name}") return message_id = str(message.id) screenshot_script = os.path.join(TOOLS_DIR, "message-screenshot.js") subprocess.check_call(["node", screenshot_script, message_id, image_path]) def generate_screenshot_from_config( integration_name: str, screenshot_config: ScreenshotConfig ) -> None: integration = get_integration(integration_name) fixture_path, image_path = get_fixture_and_image_paths(integration, screenshot_config) bot = create_integration_bot(integration, screenshot_config.bot_name) create_integration_stream(integration, bot) message_sent = send_bot_payload_message(bot, integration, fixture_path, screenshot_config) if message_sent: capture_last_message_screenshot(bot, image_path) print(f"Screenshot captured to: {BOLDRED}{image_path}{ENDC}") parser = argparse.ArgumentParser() group = parser.add_mutually_exclusive_group(required=True) group.add_argument("--all", action="store_true") group.add_argument("--integration", help="Name of the integration") parser.add_argument("--fixture", help="Name of the fixture file to use") parser.add_argument("--image-name", help="Name for the screenshot image") parser.add_argument("--image-dir", help="Directory name where to save the screenshot image") parser.add_argument("--bot-name", help="Name to use for the bot") parser.add_argument( "-A", "--use-basic-auth", action="store_true", help="Add basic auth headers to the request" ) parser.add_argument( "-Q", "--payload-as-query-param", action="store_true", help="Send payload as query param instead of body", ) parser.add_argument("-P", "--payload-param-name", help="Param name to use for the payload") parser.add_argument( "-H", "--custom-headers", type=custom_headers, help="Any additional headers to be sent with the request.", ) options = parser.parse_args() prepare_puppeteer_run() if options.all: for key, value in vars(options).items(): if key != "all" and value: print("Generating screenshots for all integrations. Ignoring all command-line options") for integration_name, screenshot_configs in DOC_SCREENSHOT_CONFIG.items(): for screenshot_config in screenshot_configs: generate_screenshot_from_config(integration_name, screenshot_config) elif options.fixture: config = dict( fixture_name=options.fixture, use_basic_auth=options.use_basic_auth, custom_headers=options.custom_headers, payload_as_query_param=options.payload_as_query_param, ) if options.image_name: config["image_name"] = options.image_name if options.image_dir: config["image_dir"] = options.image_dir if options.bot_name: config["bot_name"] = options.bot_name if options.payload_param_name: config["payload_param_name"] = options.payload_param_name screenshot_config = ScreenshotConfig(**config) generate_screenshot_from_config(options.integration, screenshot_config) elif options.integration in DOC_SCREENSHOT_CONFIG: configs = DOC_SCREENSHOT_CONFIG[options.integration] for screenshot_config in configs: generate_screenshot_from_config(options.integration, screenshot_config) else: parser.error( "Could not find configuration for integration. " "You can specify a fixture file to use, using the --fixture flag. " "Or add a configuration to zerver.lib.integrations.DOC_SCREENSHOT_CONFIG", )