mirror of https://github.com/zulip/zulip.git
241 lines
9.7 KiB
Python
Executable File
241 lines
9.7 KiB
Python
Executable File
#!/usr/bin/env python3
|
|
"""Create or update a webhook integration screenshot using a test fixture."""
|
|
|
|
# check for the venv
|
|
from lib import sanity_check
|
|
|
|
sanity_check.check_venv(__file__)
|
|
|
|
import os
|
|
import sys
|
|
|
|
TOOLS_DIR = os.path.abspath(os.path.dirname(__file__))
|
|
ROOT_DIR = os.path.dirname(TOOLS_DIR)
|
|
|
|
sys.path.insert(0, ROOT_DIR)
|
|
from scripts.lib.setup_path import setup_path
|
|
|
|
setup_path()
|
|
|
|
os.environ["DJANGO_SETTINGS_MODULE"] = "zproject.settings"
|
|
import django
|
|
|
|
django.setup()
|
|
|
|
import argparse
|
|
import base64
|
|
import subprocess
|
|
from typing import Any, Dict, Optional
|
|
from urllib.parse import urlencode, parse_qsl
|
|
|
|
import requests
|
|
import ujson
|
|
|
|
from scripts.lib.zulip_tools import BOLDRED, ENDC
|
|
from tools.lib.test_script import prepare_puppeteer_run
|
|
from zerver.models import UserProfile, Message, get_user_by_delivery_email, get_realm
|
|
from zerver.lib.actions import (
|
|
do_create_user, notify_created_bot, bulk_add_subscriptions, do_change_avatar_fields)
|
|
from zerver.lib.streams import create_stream_if_needed
|
|
from zerver.lib.upload import upload_avatar_image
|
|
from zerver.lib.integrations import (
|
|
WebhookIntegration, INTEGRATIONS, split_fixture_path, ScreenshotConfig, get_fixture_and_image_paths,
|
|
DOC_SCREENSHOT_CONFIG)
|
|
from zerver.lib.webhooks.common import get_fixture_http_headers
|
|
from zerver.lib.storage import static_path
|
|
|
|
def create_integration_bot(integration: WebhookIntegration, bot_name: Optional[str]=None) -> UserProfile:
|
|
realm = get_realm('zulip')
|
|
owner = get_user_by_delivery_email("iago@zulip.com", realm)
|
|
bot_email = "{}-bot@example.com".format(integration.name)
|
|
if bot_name is None:
|
|
bot_name = "{} Bot".format(integration.name.capitalize())
|
|
try:
|
|
bot = UserProfile.objects.get(email=bot_email)
|
|
except UserProfile.DoesNotExist:
|
|
bot = do_create_user(
|
|
email=bot_email,
|
|
password="123",
|
|
realm=owner.realm,
|
|
full_name=bot_name,
|
|
short_name=bot_name,
|
|
bot_type=UserProfile.INCOMING_WEBHOOK_BOT,
|
|
bot_owner=owner,
|
|
)
|
|
notify_created_bot(bot)
|
|
|
|
bot_avatar_path = integration.get_bot_avatar_path()
|
|
if bot_avatar_path is not None:
|
|
bot_avatar_path = static_path(bot_avatar_path)
|
|
if os.path.isfile(bot_avatar_path):
|
|
with open(bot_avatar_path, "rb") as f:
|
|
upload_avatar_image(f, owner, bot)
|
|
do_change_avatar_fields(bot, UserProfile.AVATAR_FROM_USER)
|
|
|
|
return bot
|
|
|
|
def create_integration_stream(integration: WebhookIntegration, bot: UserProfile) -> None:
|
|
assert isinstance(bot.bot_owner, UserProfile)
|
|
stream, created = create_stream_if_needed(bot.bot_owner.realm, integration.stream_name)
|
|
bulk_add_subscriptions([stream], [bot, bot.bot_owner], from_stream_creation=created)
|
|
|
|
def get_integration(integration_name: str) -> WebhookIntegration:
|
|
integration = INTEGRATIONS[integration_name]
|
|
assert isinstance(integration, WebhookIntegration), "Not a WebhookIntegration"
|
|
return integration
|
|
|
|
def get_requests_headers(integration_name: str, fixture_name: str) -> Dict[str, Any]:
|
|
headers = get_fixture_http_headers(integration_name, fixture_name)
|
|
|
|
def fix_name(header: str) -> str:
|
|
header = header if not header.startswith('HTTP_') else header[len('HTTP_'):]
|
|
return header.replace('_', '-')
|
|
|
|
return {fix_name(k): v for k, v in headers.items()}
|
|
|
|
def custom_headers(headers_json: str) -> Dict[str, str]:
|
|
if not headers_json:
|
|
return {}
|
|
try:
|
|
return ujson.loads(headers_json)
|
|
except ValueError as ve:
|
|
raise argparse.ArgumentTypeError(
|
|
'Encountered an error while attempting to parse custom headers: {}\n'
|
|
'Note: all strings must be enclosed within "" instead of \'\''.format(ve))
|
|
|
|
def send_bot_payload_message(bot: UserProfile, integration: WebhookIntegration, fixture_path: str,
|
|
config: ScreenshotConfig) -> bool:
|
|
# Delete all messages, so new message is the only one it's message group
|
|
Message.objects.filter(sender=bot).delete()
|
|
json_fixture = fixture_path.endswith('.json')
|
|
_, fixture_name = split_fixture_path(fixture_path)
|
|
|
|
if fixture_name:
|
|
with open(fixture_path) as f:
|
|
if json_fixture:
|
|
data = ujson.load(f)
|
|
else:
|
|
data = f.read().strip()
|
|
else:
|
|
data = ''
|
|
|
|
headers = get_requests_headers(integration.name, fixture_name)
|
|
if config.custom_headers:
|
|
headers.update(config.custom_headers)
|
|
if config.use_basic_auth:
|
|
credentials = base64.b64encode('{}:{}'.format(bot.email, bot.api_key).encode('utf8')).decode('utf8')
|
|
auth = 'basic {}'.format(credentials)
|
|
headers.update(dict(Authorization=auth))
|
|
|
|
assert isinstance(bot.bot_owner, UserProfile)
|
|
stream = integration.stream_name or 'devel'
|
|
url = "{}/{}".format(bot.bot_owner.realm.uri, integration.url)
|
|
params = {'api_key': bot.api_key, 'stream': stream}
|
|
if config.extra_params:
|
|
params.update(config.extra_params)
|
|
|
|
extra_args = {}
|
|
if not json_fixture and data:
|
|
# We overwrite any params in fixture with our params. stream name, for
|
|
# example, may be defined in the fixture.
|
|
parsed_params = dict(parse_qsl(data))
|
|
parsed_params.update(params)
|
|
params = parsed_params
|
|
|
|
elif config.payload_as_query_param:
|
|
params[config.payload_param_name] = ujson.dumps(data)
|
|
|
|
else:
|
|
extra_args = {'json': data}
|
|
|
|
url = '{}?{}'.format(url, urlencode(params))
|
|
|
|
try:
|
|
response = requests.post(url=url, headers=headers, **extra_args)
|
|
except requests.exceptions.ConnectionError:
|
|
print('This tool needs the local dev server to be running. '
|
|
'Please start it using tools/run-dev.py before running this tool.')
|
|
sys.exit(1)
|
|
if response.status_code != 200:
|
|
print(response.json())
|
|
print('Failed to trigger webhook')
|
|
return False
|
|
else:
|
|
print('Triggered {} webhook'.format(integration.name))
|
|
return True
|
|
|
|
def capture_last_message_screenshot(bot: UserProfile, image_path: str) -> None:
|
|
message = Message.objects.filter(sender=bot).last()
|
|
if message is None:
|
|
print('No message found for {}'.format(bot.full_name))
|
|
return
|
|
message_id = str(message.id)
|
|
screenshot_script = os.path.join(TOOLS_DIR, 'message-screenshot.js')
|
|
subprocess.check_call(['node', screenshot_script, message_id, image_path])
|
|
|
|
def generate_screenshot_from_config(integration_name: str, screenshot_config: ScreenshotConfig) -> None:
|
|
integration = get_integration(integration_name)
|
|
fixture_path, image_path = get_fixture_and_image_paths(integration, screenshot_config)
|
|
bot = create_integration_bot(integration, screenshot_config.bot_name)
|
|
create_integration_stream(integration, bot)
|
|
message_sent = send_bot_payload_message(bot, integration, fixture_path, screenshot_config)
|
|
if message_sent:
|
|
capture_last_message_screenshot(bot, image_path)
|
|
print(f'Screenshot captured to: {BOLDRED}{image_path}{ENDC}')
|
|
|
|
parser = argparse.ArgumentParser()
|
|
group = parser.add_mutually_exclusive_group(required=True)
|
|
group.add_argument('--all', default=False, action='store_true')
|
|
group.add_argument('--integration', type=str, help='Name of the integration')
|
|
parser.add_argument('--fixture', type=str, help='Name of the fixture file to use')
|
|
parser.add_argument('--image-name', type=str, help='Name for the screenshot image')
|
|
parser.add_argument('--image-dir', type=str, help='Directory name where to save the screenshot image')
|
|
parser.add_argument('--bot-name', type=str, help='Name to use for the bot')
|
|
parser.add_argument('-A', '--use-basic-auth', action='store_true',
|
|
help='Add basic auth headers to the request')
|
|
parser.add_argument('-Q', '--payload-as-query-param', action='store_true',
|
|
help='Send payload as query param instead of body')
|
|
parser.add_argument('-P', '--payload-param-name', type=str,
|
|
help='Param name to use for the payload')
|
|
parser.add_argument('-H', '--custom-headers',
|
|
type=custom_headers,
|
|
help='Any additional headers to be sent with the request.')
|
|
|
|
options = parser.parse_args()
|
|
prepare_puppeteer_run()
|
|
|
|
if options.all:
|
|
for key, value in vars(options).items():
|
|
if key != 'all' and value:
|
|
print('Generating screenshots for all integrations. Ignoring all command-line options')
|
|
for integration_name, screenshot_configs in DOC_SCREENSHOT_CONFIG.items():
|
|
for screenshot_config in screenshot_configs:
|
|
generate_screenshot_from_config(integration_name, screenshot_config)
|
|
|
|
elif options.fixture:
|
|
config = dict(fixture_name=options.fixture, use_basic_auth=options.use_basic_auth,
|
|
custom_headers=options.custom_headers,
|
|
payload_as_query_param=options.payload_as_query_param)
|
|
if options.image_name:
|
|
config['image_name'] = options.image_name
|
|
if options.image_dir:
|
|
config['image_dir'] = options.image_dir
|
|
if options.bot_name:
|
|
config['bot_name'] = options.bot_name
|
|
if options.payload_param_name:
|
|
config['payload_param_name'] = options.payload_param_name
|
|
screenshot_config = ScreenshotConfig(**config)
|
|
generate_screenshot_from_config(options.integration, screenshot_config)
|
|
|
|
elif options.integration in DOC_SCREENSHOT_CONFIG:
|
|
configs = DOC_SCREENSHOT_CONFIG[options.integration]
|
|
for screenshot_config in configs:
|
|
generate_screenshot_from_config(options.integration, screenshot_config)
|
|
|
|
else:
|
|
parser.error(
|
|
"Could not find configuration for integration. "
|
|
"You can specify a fixture file to use, using the --fixture flag. "
|
|
"Or add a configuration to zerver.lib.integrations.DOC_SCREENSHOT_CONFIG"
|
|
)
|