import re from functools import partial from inspect import signature from typing import Any, Dict, Optional from django.http import HttpRequest, HttpResponse from zerver.decorator import api_key_only_webhook_view from zerver.lib.request import REQ, has_request_variables from zerver.lib.response import json_success from zerver.lib.webhooks.common import ( UnexpectedWebhookEventType, check_send_webhook_message, get_http_headers_from_filename, validate_extract_webhook_http_header, ) from zerver.lib.webhooks.git import ( CONTENT_MESSAGE_TEMPLATE, TOPIC_WITH_BRANCH_TEMPLATE, TOPIC_WITH_PR_OR_ISSUE_INFO_TEMPLATE, get_commits_comment_action_message, get_issue_event_message, get_pull_request_event_message, get_push_commits_event_message, get_push_tag_event_message, get_release_event_message, get_setup_webhook_message, ) from zerver.models import UserProfile fixture_to_headers = get_http_headers_from_filename("HTTP_X_GITHUB_EVENT") class UnknownEventType(Exception): pass def get_opened_or_update_pull_request_body(payload: Dict[str, Any], include_title: bool=False) -> str: pull_request = payload['pull_request'] action = payload['action'] if action == 'synchronize': action = 'updated' assignee = None if pull_request.get('assignee'): assignee = pull_request['assignee']['login'] return get_pull_request_event_message( get_sender_name(payload), action, pull_request['html_url'], target_branch=pull_request['head']['ref'], base_branch=pull_request['base']['ref'], message=pull_request['body'], assignee=assignee, number=pull_request['number'], title=pull_request['title'] if include_title else None, ) def get_assigned_or_unassigned_pull_request_body(payload: Dict[str, Any], include_title: bool=False) -> str: pull_request = payload['pull_request'] assignee = pull_request.get('assignee') if assignee is not None: assignee = assignee.get('login') base_message = get_pull_request_event_message( get_sender_name(payload), payload['action'], pull_request['html_url'], number=pull_request['number'], title=pull_request['title'] if include_title else None, ) if assignee is not None: return f"{base_message[:-1]} to {assignee}." return base_message def get_closed_pull_request_body(payload: Dict[str, Any], include_title: bool=False) -> str: pull_request = payload['pull_request'] action = 'merged' if pull_request['merged'] else 'closed without merge' return get_pull_request_event_message( get_sender_name(payload), action, pull_request['html_url'], number=pull_request['number'], title=pull_request['title'] if include_title else None, ) def get_membership_body(payload: Dict[str, Any]) -> str: action = payload['action'] member = payload['member'] team_name = payload['team']['name'] return "{sender} {action} [{username}]({html_url}) {preposition} the {team_name} team.".format( sender=get_sender_name(payload), action=action, username=member['login'], html_url=member['html_url'], preposition='from' if action == 'removed' else 'to', team_name=team_name, ) def get_member_body(payload: Dict[str, Any]) -> str: return "{} {} [{}]({}) to [{}]({}).".format( get_sender_name(payload), payload['action'], payload['member']['login'], payload['member']['html_url'], get_repository_name(payload), payload['repository']['html_url'], ) def get_issue_body(payload: Dict[str, Any], include_title: bool=False) -> str: action = payload['action'] issue = payload['issue'] assignee = issue['assignee'] return get_issue_event_message( get_sender_name(payload), action, issue['html_url'], issue['number'], issue['body'], assignee=assignee['login'] if assignee else None, title=issue['title'] if include_title else None, ) def get_issue_comment_body(payload: Dict[str, Any], include_title: bool=False) -> str: action = payload['action'] comment = payload['comment'] issue = payload['issue'] if action == 'created': action = '[commented]' else: action = f'{action} a [comment]' action += '({}) on'.format(comment['html_url']) return get_issue_event_message( get_sender_name(payload), action, issue['html_url'], issue['number'], comment['body'], title=issue['title'] if include_title else None, ) def get_fork_body(payload: Dict[str, Any]) -> str: forkee = payload['forkee'] return "{} forked [{}]({}).".format( get_sender_name(payload), forkee['name'], forkee['html_url'], ) def get_deployment_body(payload: Dict[str, Any]) -> str: return f'{get_sender_name(payload)} created new deployment.' def get_change_deployment_status_body(payload: Dict[str, Any]) -> str: return 'Deployment changed status to {}.'.format( payload['deployment_status']['state'], ) def get_create_or_delete_body(payload: Dict[str, Any], action: str) -> str: ref_type = payload['ref_type'] return '{} {} {} {}.'.format( get_sender_name(payload), action, ref_type, payload['ref'], ).rstrip() def get_commit_comment_body(payload: Dict[str, Any]) -> str: comment = payload['comment'] comment_url = comment['html_url'] commit_url = comment_url.split('#', 1)[0] action = f'[commented]({comment_url})' return get_commits_comment_action_message( get_sender_name(payload), action, commit_url, comment.get('commit_id'), comment['body'], ) def get_push_tags_body(payload: Dict[str, Any]) -> str: return get_push_tag_event_message( get_sender_name(payload), get_tag_name_from_ref(payload['ref']), action='pushed' if payload.get('created') else 'removed', ) def get_push_commits_body(payload: Dict[str, Any]) -> str: commits_data = [{ 'name': (commit.get('author').get('username') or commit.get('author').get('name')), 'sha': commit['id'], 'url': commit['url'], 'message': commit['message'], } for commit in payload['commits']] return get_push_commits_event_message( get_sender_name(payload), payload['compare'], get_branch_name_from_ref(payload['ref']), commits_data, deleted=payload['deleted'], ) def get_public_body(payload: Dict[str, Any]) -> str: return "{} made the repository [{}]({}) public.".format( get_sender_name(payload), get_repository_full_name(payload), payload['repository']['html_url'], ) def get_wiki_pages_body(payload: Dict[str, Any]) -> str: wiki_page_info_template = "* {action} [{title}]({url})\n" wiki_info = '' for page in payload['pages']: wiki_info += wiki_page_info_template.format( action=page['action'], title=page['title'], url=page['html_url'], ) return f"{get_sender_name(payload)}:\n{wiki_info.rstrip()}" def get_watch_body(payload: Dict[str, Any]) -> str: return "{} starred the repository [{}]({}).".format( get_sender_name(payload), get_repository_full_name(payload), payload['repository']['html_url'], ) def get_repository_body(payload: Dict[str, Any]) -> str: return "{} {} the repository [{}]({}).".format( get_sender_name(payload), payload.get('action'), get_repository_full_name(payload), payload['repository']['html_url'], ) def get_add_team_body(payload: Dict[str, Any]) -> str: return "The repository [{}]({}) was added to team {}.".format( get_repository_full_name(payload), payload['repository']['html_url'], payload['team']['name'], ) def get_team_body(payload: Dict[str, Any]) -> str: changes = payload["changes"] if "description" in changes: actor = payload["sender"]["login"] new_description = payload["team"]["description"] return f"**{actor}** changed the team description to:\n```quote\n{new_description}\n```" if "name" in changes: original_name = changes["name"]["from"] new_name = payload["team"]["name"] return f"Team `{original_name}` was renamed to `{new_name}`." if "privacy" in changes: new_visibility = payload["team"]["privacy"] return f"Team visibility changed to `{new_visibility}`" else: # nocoverage raise UnexpectedWebhookEventType("GitHub", f"Team Edited: {changes.keys()}") def get_release_body(payload: Dict[str, Any]) -> str: data = { 'user_name': get_sender_name(payload), 'action': payload['action'], 'tagname': payload['release']['tag_name'], # Not every GitHub release has a "name" set; if not there, use the tag name. 'release_name': payload['release']['name'] or payload['release']['tag_name'], 'url': payload['release']['html_url'], } return get_release_event_message(**data) def get_page_build_body(payload: Dict[str, Any]) -> str: build = payload['build'] status = build['status'] actions = { 'null': 'has yet to be built', 'building': 'is being built', 'errored': 'has failed{}', 'built': 'has finished building', } action = actions.get(status, f'is {status}') action.format( CONTENT_MESSAGE_TEMPLATE.format(message=build['error']['message']), ) return "Github Pages build, triggered by {}, {}.".format( payload['build']['pusher']['login'], action, ) def get_status_body(payload: Dict[str, Any]) -> str: if payload['target_url']: status = '[{}]({})'.format( payload['state'], payload['target_url'], ) else: status = payload['state'] return "[{}]({}) changed its status to {}.".format( payload['sha'][:7], # TODO payload['commit']['html_url'], status, ) def get_pull_request_ready_for_review_body(payload: Dict[str, Any], include_title: bool=False) -> str: message = "**{sender}** has marked [PR #{pr_number}]({pr_url}) as ready for review." return message.format( sender = get_sender_name(payload), pr_number = payload['pull_request']['number'], pr_url = payload['pull_request']['html_url'], ) def get_pull_request_review_body(payload: Dict[str, Any], include_title: bool=False) -> str: title = "for #{} {}".format( payload['pull_request']['number'], payload['pull_request']['title'], ) return get_pull_request_event_message( get_sender_name(payload), 'submitted', payload['review']['html_url'], type='PR Review', title=title if include_title else None, ) def get_pull_request_review_comment_body(payload: Dict[str, Any], include_title: bool=False) -> str: action = payload['action'] message = None if action == 'created': message = payload['comment']['body'] title = "on #{} {}".format( payload['pull_request']['number'], payload['pull_request']['title'], ) return get_pull_request_event_message( get_sender_name(payload), action, payload['comment']['html_url'], message=message, type='PR Review Comment', title=title if include_title else None, ) def get_pull_request_review_requested_body(payload: Dict[str, Any], include_title: bool=False) -> str: requested_reviewer = [payload['requested_reviewer']] if 'requested_reviewer' in payload else [] requested_reviewers = (payload['pull_request']['requested_reviewers'] or requested_reviewer) requested_team = [payload['requested_team']] if 'requested_team' in payload else [] requested_team_reviewers = (payload['pull_request']['requested_teams'] or requested_team) sender = get_sender_name(payload) pr_number = payload['pull_request']['number'] pr_url = payload['pull_request']['html_url'] message = "**{sender}** requested {reviewers} for a review on [PR #{pr_number}]({pr_url})." message_with_title = ("**{sender}** requested {reviewers} for a review on " "[PR #{pr_number} {title}]({pr_url}).") body = message_with_title if include_title else message all_reviewers = [] for reviewer in requested_reviewers: all_reviewers.append("[{login}]({html_url})".format(**reviewer)) for team_reviewer in requested_team_reviewers: all_reviewers.append("[{name}]({html_url})".format(**team_reviewer)) reviewers = "" if len(all_reviewers) == 1: reviewers = all_reviewers[0] else: reviewers = "{} and {}".format(', '.join(all_reviewers[:-1]), all_reviewers[-1]) return body.format( sender=sender, reviewers=reviewers, pr_number=pr_number, pr_url=pr_url, title=payload['pull_request']['title'] if include_title else None, ) def get_check_run_body(payload: Dict[str, Any]) -> str: template = """ Check [{name}]({html_url}) {status} ({conclusion}). ([{short_hash}]({commit_url})) """.strip() kwargs = { 'name': payload['check_run']['name'], 'html_url': payload['check_run']['html_url'], 'status': payload['check_run']['status'], 'short_hash': payload['check_run']['head_sha'][:7], 'commit_url': "{}/commit/{}".format( payload['repository']['html_url'], payload['check_run']['head_sha'], ), 'conclusion': payload['check_run']['conclusion'], } return template.format(**kwargs) def get_star_body(payload: Dict[str, Any]) -> str: template = "{user} {action} the repository [{repo}]({url})." return template.format( user=payload['sender']['login'], action='starred' if payload['action'] == 'created' else 'unstarred', repo=get_repository_full_name(payload), url=payload['repository']['html_url'], ) def get_ping_body(payload: Dict[str, Any]) -> str: return get_setup_webhook_message('GitHub', get_sender_name(payload)) def get_repository_name(payload: Dict[str, Any]) -> str: return payload['repository']['name'] def get_repository_full_name(payload: Dict[str, Any]) -> str: return payload['repository']['full_name'] def get_organization_name(payload: Dict[str, Any]) -> str: return payload['organization']['login'] def get_sender_name(payload: Dict[str, Any]) -> str: return payload['sender']['login'] def get_branch_name_from_ref(ref_string: str) -> str: return re.sub(r'^refs/heads/', '', ref_string) def get_tag_name_from_ref(ref_string: str) -> str: return re.sub(r'^refs/tags/', '', ref_string) def is_commit_push_event(payload: Dict[str, Any]) -> bool: return bool(re.match(r'^refs/heads/', payload['ref'])) def get_subject_based_on_type(payload: Dict[str, Any], event: str) -> str: if 'pull_request' in event: return TOPIC_WITH_PR_OR_ISSUE_INFO_TEMPLATE.format( repo=get_repository_name(payload), type='PR', id=payload['pull_request']['number'], title=payload['pull_request']['title'], ) elif event.startswith('issue'): return TOPIC_WITH_PR_OR_ISSUE_INFO_TEMPLATE.format( repo=get_repository_name(payload), type='Issue', id=payload['issue']['number'], title=payload['issue']['title'], ) elif event.startswith('deployment'): return "{} / Deployment on {}".format( get_repository_name(payload), payload['deployment']['environment'], ) elif event == 'membership': return "{} organization".format(payload['organization']['login']) elif event == 'team': return "team {}".format(payload['team']['name']) elif event == 'push_commits': return TOPIC_WITH_BRANCH_TEMPLATE.format( repo=get_repository_name(payload), branch=get_branch_name_from_ref(payload['ref']), ) elif event == 'gollum': return TOPIC_WITH_BRANCH_TEMPLATE.format( repo=get_repository_name(payload), branch='Wiki Pages', ) elif event == 'ping': if payload.get('repository') is None: return get_organization_name(payload) elif event == 'check_run': return f"{get_repository_name(payload)} / checks" return get_repository_name(payload) EVENT_FUNCTION_MAPPER = { 'commit_comment': get_commit_comment_body, 'closed_pull_request': get_closed_pull_request_body, 'create': partial(get_create_or_delete_body, action='created'), 'check_run': get_check_run_body, 'delete': partial(get_create_or_delete_body, action='deleted'), 'deployment': get_deployment_body, 'deployment_status': get_change_deployment_status_body, 'fork': get_fork_body, 'gollum': get_wiki_pages_body, 'issue_comment': get_issue_comment_body, 'issues': get_issue_body, 'member': get_member_body, 'membership': get_membership_body, 'opened_or_update_pull_request': get_opened_or_update_pull_request_body, 'assigned_or_unassigned_pull_request': get_assigned_or_unassigned_pull_request_body, 'page_build': get_page_build_body, 'ping': get_ping_body, 'public': get_public_body, 'pull_request_ready_for_review': get_pull_request_ready_for_review_body, 'pull_request_review': get_pull_request_review_body, 'pull_request_review_comment': get_pull_request_review_comment_body, 'pull_request_review_requested': get_pull_request_review_requested_body, 'push_commits': get_push_commits_body, 'push_tags': get_push_tags_body, 'release': get_release_body, 'repository': get_repository_body, 'star': get_star_body, 'status': get_status_body, 'team': get_team_body, 'team_add': get_add_team_body, 'watch': get_watch_body, } IGNORED_EVENTS = [ 'repository_vulnerability_alert', 'project_card', 'check_suite', 'organization', 'milestone', 'meta', 'label', ] @api_key_only_webhook_view('GitHub', notify_bot_owner_on_invalid_json=True) @has_request_variables def api_github_webhook( request: HttpRequest, user_profile: UserProfile, payload: Dict[str, Any]=REQ(argument_type='body'), branches: Optional[str]=REQ(default=None), user_specified_topic: Optional[str]=REQ("topic", default=None)) -> HttpResponse: event = get_event(request, payload, branches) if event is not None: subject = get_subject_based_on_type(payload, event) body_function = get_body_function_based_on_type(event) if 'include_title' in signature(body_function).parameters: body = body_function( payload, include_title=user_specified_topic is not None, ) else: body = body_function(payload) check_send_webhook_message(request, user_profile, subject, body) return json_success() def get_event(request: HttpRequest, payload: Dict[str, Any], branches: Optional[str]) -> Optional[str]: event = validate_extract_webhook_http_header(request, 'X_GITHUB_EVENT', 'GitHub') if event == 'pull_request': action = payload['action'] if action in ('opened', 'synchronize', 'reopened', 'edited'): return 'opened_or_update_pull_request' if action in ('assigned', 'unassigned'): return 'assigned_or_unassigned_pull_request' if action == 'closed': return 'closed_pull_request' if action == 'review_requested': return f'{event}_{action}' if action == 'ready_for_review': return 'pull_request_ready_for_review' # Unsupported pull_request events if action in ('labeled', 'unlabeled', 'review_request_removed'): return None elif event == 'push': if is_commit_push_event(payload): if branches is not None: branch = get_branch_name_from_ref(payload['ref']) if branches.find(branch) == -1: return None return "push_commits" else: return "push_tags" elif event == 'check_run': if payload['check_run']['status'] != 'completed': return None return event elif event in list(EVENT_FUNCTION_MAPPER.keys()) or event == 'ping': return event elif event in IGNORED_EVENTS: return None complete_event = "{}:{}".format(event, payload.get("action", "???")) # nocoverage raise UnexpectedWebhookEventType('GitHub', complete_event) def get_body_function_based_on_type(type: str) -> Any: return EVENT_FUNCTION_MAPPER.get(type)