tools: Support sending mock messages for non-webhook integrations.

Non-webhook integrations should have fixtures containing mock messages
in json format with fields "subject" and "body" indicating the topic
and content respectively.
This commit is contained in:
PIG208 2021-03-29 22:35:59 +08:00 committed by Tim Abbott
parent 9d7fc01622
commit 540072e860
1 changed files with 35 additions and 2 deletions

View File

@ -29,6 +29,7 @@ from urllib.parse import parse_qsl, urlencode
import orjson import orjson
import requests import requests
import zulip
from scripts.lib.zulip_tools import BOLDRED, ENDC from scripts.lib.zulip_tools import BOLDRED, ENDC
from tools.lib.test_script import prepare_puppeteer_run from tools.lib.test_script import prepare_puppeteer_run
@ -137,6 +138,35 @@ def custom_headers(headers_json: str) -> Dict[str, str]:
) )
def send_bot_mock_message(
bot: UserProfile, integration: Integration, fixture_path: str, config: BaseScreenshotConfig
) -> None:
# Delete all messages, so new message is the only one it's message group
Message.objects.filter(sender=bot).delete()
data, _, _ = get_fixture_info(fixture_path)
assert bot.bot_owner is not None
url = f"{bot.bot_owner.realm.uri}"
client = zulip.Client(email=bot.email, api_key=bot.api_key, site=url)
try:
request = {
"type": "stream",
"to": integration.stream_name,
"topic": data["subject"],
"content": data["body"],
}
client.send_message(request)
except KeyError:
print(
"{} contains invalid configuration. "
'Fields "subject" and "body" are required for non-webhook integrations.'.format(
fixture_path
)
)
sys.exit(1)
def send_bot_payload_message( def send_bot_payload_message(
bot: UserProfile, integration: WebhookIntegration, fixture_path: str, config: ScreenshotConfig bot: UserProfile, integration: WebhookIntegration, fixture_path: str, config: ScreenshotConfig
) -> bool: ) -> bool:
@ -210,10 +240,13 @@ def generate_screenshot_from_config(
bot = create_integration_bot(integration, screenshot_config.bot_name) bot = create_integration_bot(integration, screenshot_config.bot_name)
create_integration_stream(integration, bot) create_integration_stream(integration, bot)
if isinstance(integration, WebhookIntegration): if isinstance(integration, WebhookIntegration):
assert isinstance(screenshot_config, ScreenshotConfig) assert isinstance(
screenshot_config, ScreenshotConfig
), "Webhook integrations require ScreenshotConfig"
message_sent = send_bot_payload_message(bot, integration, fixture_path, screenshot_config) message_sent = send_bot_payload_message(bot, integration, fixture_path, screenshot_config)
else: else:
raise AssertionError("Not yet implemented") send_bot_mock_message(bot, integration, fixture_path, screenshot_config)
message_sent = True
if message_sent: if message_sent:
capture_last_message_screenshot(bot, image_path) capture_last_message_screenshot(bot, image_path)
print(f"Screenshot captured to: {BOLDRED}{image_path}{ENDC}") print(f"Screenshot captured to: {BOLDRED}{image_path}{ENDC}")