#!/usr/bin/env python3 """Create or update a webhook integration screenshot using a test fixture.""" # check for the venv from lib import sanity_check sanity_check.check_venv(__file__) import os import sys TOOLS_DIR = os.path.abspath(os.path.dirname(__file__)) ROOT_DIR = os.path.dirname(TOOLS_DIR) sys.path.insert(0, ROOT_DIR) from scripts.lib.setup_path import setup_path setup_path() os.environ["DJANGO_SETTINGS_MODULE"] = "zproject.settings" import django django.setup() import argparse import base64 import subprocess from typing import Any, Dict, Optional from urllib.parse import parse_qsl, urlencode import orjson import requests from scripts.lib.zulip_tools import BOLDRED, ENDC from tools.lib.test_script import prepare_puppeteer_run from zerver.lib.actions import ( bulk_add_subscriptions, do_change_avatar_fields, do_create_user, notify_created_bot, ) from zerver.lib.integrations import ( DOC_SCREENSHOT_CONFIG, INTEGRATIONS, ScreenshotConfig, WebhookIntegration, get_fixture_and_image_paths, split_fixture_path, ) from zerver.lib.storage import static_path from zerver.lib.streams import create_stream_if_needed from zerver.lib.upload import upload_avatar_image from zerver.lib.webhooks.common import get_fixture_http_headers from zerver.models import Message, UserProfile, get_realm, get_user_by_delivery_email def create_integration_bot(integration: WebhookIntegration, bot_name: Optional[str]=None) -> UserProfile: realm = get_realm('zulip') owner = get_user_by_delivery_email("iago@zulip.com", realm) bot_email = f"{integration.name}-bot@example.com" if bot_name is None: bot_name = f"{integration.name.capitalize()} Bot" try: bot = UserProfile.objects.get(email=bot_email) except UserProfile.DoesNotExist: bot = do_create_user( email=bot_email, password="123", realm=owner.realm, full_name=bot_name, bot_type=UserProfile.INCOMING_WEBHOOK_BOT, bot_owner=owner, acting_user=owner, ) notify_created_bot(bot) bot_avatar_path = integration.get_bot_avatar_path() if bot_avatar_path is not None: bot_avatar_path = static_path(bot_avatar_path) if os.path.isfile(bot_avatar_path): with open(bot_avatar_path, "rb") as f: upload_avatar_image(f, owner, bot) do_change_avatar_fields(bot, UserProfile.AVATAR_FROM_USER, acting_user=owner) return bot def create_integration_stream(integration: WebhookIntegration, bot: UserProfile) -> None: assert isinstance(bot.bot_owner, UserProfile) realm = bot.bot_owner.realm stream, created = create_stream_if_needed(realm, integration.stream_name) bulk_add_subscriptions(realm, [stream], [bot, bot.bot_owner]) def get_integration(integration_name: str) -> WebhookIntegration: integration = INTEGRATIONS[integration_name] assert isinstance(integration, WebhookIntegration), "Not a WebhookIntegration" return integration def get_requests_headers(integration_name: str, fixture_name: str) -> Dict[str, Any]: headers = get_fixture_http_headers(integration_name, fixture_name) def fix_name(header: str) -> str: header = header if not header.startswith('HTTP_') else header[len('HTTP_'):] return header.replace('_', '-') return {fix_name(k): v for k, v in headers.items()} def custom_headers(headers_json: str) -> Dict[str, str]: if not headers_json: return {} try: return orjson.loads(headers_json) except orjson.JSONDecodeError as ve: raise argparse.ArgumentTypeError( 'Encountered an error while attempting to parse custom headers: {}\n' 'Note: all strings must be enclosed within "" instead of \'\''.format(ve)) def send_bot_payload_message(bot: UserProfile, integration: WebhookIntegration, fixture_path: str, config: ScreenshotConfig) -> bool: # Delete all messages, so new message is the only one it's message group Message.objects.filter(sender=bot).delete() json_fixture = fixture_path.endswith('.json') _, fixture_name = split_fixture_path(fixture_path) if fixture_name: if json_fixture: with open(fixture_path, "rb") as fb: data = orjson.loads(fb.read()) else: with open(fixture_path) as f: data = f.read().strip() else: data = '' headers = get_requests_headers(integration.name, fixture_name) headers.update(config.custom_headers) if config.use_basic_auth: credentials = base64.b64encode(f'{bot.email}:{bot.api_key}'.encode('utf8')).decode('utf8') auth = f'basic {credentials}' headers.update(dict(Authorization=auth)) assert isinstance(bot.bot_owner, UserProfile) stream = integration.stream_name or 'devel' url = f"{bot.bot_owner.realm.uri}/{integration.url}" params = {'api_key': bot.api_key, 'stream': stream} params.update(config.extra_params) extra_args = {} if not json_fixture and data: # We overwrite any params in fixture with our params. stream name, for # example, may be defined in the fixture. parsed_params = dict(parse_qsl(data)) parsed_params.update(params) params = parsed_params elif config.payload_as_query_param: params[config.payload_param_name] = orjson.dumps(data).decode() else: extra_args = {'json': data} url = f'{url}?{urlencode(params)}' try: response = requests.post(url=url, headers=headers, **extra_args) except requests.exceptions.ConnectionError: print('This tool needs the local dev server to be running. ' 'Please start it using tools/run-dev.py before running this tool.') sys.exit(1) if response.status_code != 200: print(response.json()) print('Failed to trigger webhook') return False else: print(f'Triggered {integration.name} webhook') return True def capture_last_message_screenshot(bot: UserProfile, image_path: str) -> None: message = Message.objects.filter(sender=bot).last() if message is None: print(f'No message found for {bot.full_name}') return message_id = str(message.id) screenshot_script = os.path.join(TOOLS_DIR, 'message-screenshot.js') subprocess.check_call(['node', screenshot_script, message_id, image_path]) def generate_screenshot_from_config(integration_name: str, screenshot_config: ScreenshotConfig) -> None: integration = get_integration(integration_name) fixture_path, image_path = get_fixture_and_image_paths(integration, screenshot_config) bot = create_integration_bot(integration, screenshot_config.bot_name) create_integration_stream(integration, bot) message_sent = send_bot_payload_message(bot, integration, fixture_path, screenshot_config) if message_sent: capture_last_message_screenshot(bot, image_path) print(f'Screenshot captured to: {BOLDRED}{image_path}{ENDC}') parser = argparse.ArgumentParser() group = parser.add_mutually_exclusive_group(required=True) group.add_argument('--all', action='store_true') group.add_argument('--integration', help='Name of the integration') parser.add_argument('--fixture', help='Name of the fixture file to use') parser.add_argument('--image-name', help='Name for the screenshot image') parser.add_argument('--image-dir', help='Directory name where to save the screenshot image') parser.add_argument('--bot-name', help='Name to use for the bot') parser.add_argument('-A', '--use-basic-auth', action='store_true', help='Add basic auth headers to the request') parser.add_argument('-Q', '--payload-as-query-param', action='store_true', help='Send payload as query param instead of body') parser.add_argument('-P', '--payload-param-name', help='Param name to use for the payload') parser.add_argument('-H', '--custom-headers', type=custom_headers, help='Any additional headers to be sent with the request.') options = parser.parse_args() prepare_puppeteer_run() if options.all: for key, value in vars(options).items(): if key != 'all' and value: print('Generating screenshots for all integrations. Ignoring all command-line options') for integration_name, screenshot_configs in DOC_SCREENSHOT_CONFIG.items(): for screenshot_config in screenshot_configs: generate_screenshot_from_config(integration_name, screenshot_config) elif options.fixture: config = dict(fixture_name=options.fixture, use_basic_auth=options.use_basic_auth, custom_headers=options.custom_headers, payload_as_query_param=options.payload_as_query_param) if options.image_name: config['image_name'] = options.image_name if options.image_dir: config['image_dir'] = options.image_dir if options.bot_name: config['bot_name'] = options.bot_name if options.payload_param_name: config['payload_param_name'] = options.payload_param_name screenshot_config = ScreenshotConfig(**config) generate_screenshot_from_config(options.integration, screenshot_config) elif options.integration in DOC_SCREENSHOT_CONFIG: configs = DOC_SCREENSHOT_CONFIG[options.integration] for screenshot_config in configs: generate_screenshot_from_config(options.integration, screenshot_config) else: parser.error( "Could not find configuration for integration. " "You can specify a fixture file to use, using the --fixture flag. " "Or add a configuration to zerver.lib.integrations.DOC_SCREENSHOT_CONFIG", )