From 2f097c524e11d52001d282f15801e1dfdc8ea698 Mon Sep 17 00:00:00 2001 From: Christopher Chong Date: Sat, 2 Apr 2022 17:25:30 +0000 Subject: [PATCH] gitea, gogs: Strengthen types using WildValue. --- zerver/webhooks/gitea/view.py | 44 +++++---- zerver/webhooks/gogs/view.py | 176 +++++++++++++++++++--------------- 2 files changed, 125 insertions(+), 95 deletions(-) diff --git a/zerver/webhooks/gitea/view.py b/zerver/webhooks/gitea/view.py index ff1ac5690d..5b1dd76682 100644 --- a/zerver/webhooks/gitea/view.py +++ b/zerver/webhooks/gitea/view.py @@ -1,10 +1,11 @@ # vim:fenc=utf-8 -from typing import Any, Dict, Optional +from typing import Optional from django.http import HttpRequest, HttpResponse from zerver.decorator import webhook_view from zerver.lib.request import REQ, has_request_variables +from zerver.lib.validator import WildValue, check_bool, check_int, check_string, to_wild_value from zerver.lib.webhooks.common import get_http_headers_from_filename from zerver.lib.webhooks.git import get_pull_request_event_message from zerver.models import UserProfile @@ -15,24 +16,33 @@ from zerver.webhooks.gogs.view import gogs_webhook_main fixture_to_headers = get_http_headers_from_filename("HTTP_X_GITEA_EVENT") -def format_pull_request_event(payload: Dict[str, Any], include_title: bool = False) -> str: +def format_pull_request_event(payload: WildValue, include_title: bool = False) -> str: assignee = payload["pull_request"]["assignee"] - data = { - "user_name": payload["pull_request"]["user"]["username"], - "action": payload["action"], - "url": payload["pull_request"]["html_url"], - "number": payload["pull_request"]["number"], - "target_branch": payload["pull_request"]["head"]["ref"], - "base_branch": payload["pull_request"]["base"]["ref"], - "title": payload["pull_request"]["title"] if include_title else None, - "assignee": assignee["login"] if assignee else None, - } - if payload["pull_request"]["merged"]: - data["user_name"] = payload["pull_request"]["merged_by"]["username"] - data["action"] = "merged" + if payload["pull_request"]["merged"].tame(check_bool): + user_name = payload["pull_request"]["merged_by"]["username"].tame(check_string) + action = "merged" + else: + user_name = payload["pull_request"]["user"]["username"].tame(check_string) + action = payload["action"].tame(check_string) - return get_pull_request_event_message(**data) + url = payload["pull_request"]["html_url"].tame(check_string) + number = payload["pull_request"]["number"].tame(check_int) + target_branch = payload["pull_request"]["head"]["ref"].tame(check_string) + base_branch = payload["pull_request"]["base"]["ref"].tame(check_string) + title = payload["pull_request"]["title"].tame(check_string) if include_title else None + stringified_assignee = assignee["login"].tame(check_string) if assignee else None + + return get_pull_request_event_message( + user_name=user_name, + action=action, + url=url, + number=number, + target_branch=target_branch, + base_branch=base_branch, + title=title, + assignee=stringified_assignee, + ) @webhook_view("Gitea") @@ -40,7 +50,7 @@ def format_pull_request_event(payload: Dict[str, Any], include_title: bool = Fal def api_gitea_webhook( request: HttpRequest, user_profile: UserProfile, - payload: Dict[str, Any] = REQ(argument_type="body"), + payload: WildValue = REQ(argument_type="body", converter=to_wild_value), branches: Optional[str] = REQ(default=None), user_specified_topic: Optional[str] = REQ("topic", default=None), ) -> HttpResponse: diff --git a/zerver/webhooks/gogs/view.py b/zerver/webhooks/gogs/view.py index b6393c8e2c..99c607d918 100644 --- a/zerver/webhooks/gogs/view.py +++ b/zerver/webhooks/gogs/view.py @@ -1,5 +1,5 @@ # vim:fenc=utf-8 -from typing import Any, Dict, Optional, Protocol +from typing import Dict, List, Optional, Protocol from django.http import HttpRequest, HttpResponse @@ -7,6 +7,7 @@ from zerver.decorator import webhook_view from zerver.lib.exceptions import UnsupportedWebhookEventType from zerver.lib.request import REQ, has_request_variables from zerver.lib.response import json_success +from zerver.lib.validator import WildValue, check_bool, check_int, check_string, to_wild_value from zerver.lib.webhooks.common import ( check_send_webhook_message, get_http_headers_from_filename, @@ -31,70 +32,87 @@ def get_issue_url(repo_url: str, issue_nr: int) -> str: return f"{repo_url}/issues/{issue_nr}" -def format_push_event(payload: Dict[str, Any]) -> str: +def format_push_event(payload: WildValue) -> str: + user_name = payload["sender"]["username"].tame(check_string) + compare_url = payload["compare_url"].tame(check_string) + branch_name = payload["ref"].tame(check_string).replace("refs/heads/", "") + commits_data = _transform_commits_list_to_common_format(payload["commits"]) + return get_push_commits_event_message( + user_name=user_name, + compare_url=compare_url, + branch_name=branch_name, + commits_data=commits_data, + ) - for commit in payload["commits"]: - commit["sha"] = commit["id"] - commit["name"] = commit["author"]["username"] or commit["author"]["name"].split()[0] + +def _transform_commits_list_to_common_format(commits: WildValue) -> List[Dict[str, str]]: + new_commits_list = [] + for commit in commits: + new_commits_list.append( + { + "name": commit["author"]["username"].tame(check_string) + or commit["author"]["name"].tame(check_string).split()[0], + "sha": commit["id"].tame(check_string), + "url": commit["url"].tame(check_string), + "message": commit["message"].tame(check_string), + } + ) + return new_commits_list + + +def format_new_branch_event(payload: WildValue) -> str: + + branch_name = payload["ref"].tame(check_string) + url = "{}/src/{}".format(payload["repository"]["html_url"].tame(check_string), branch_name) data = { - "user_name": payload["sender"]["username"], - "compare_url": payload["compare_url"], - "branch_name": payload["ref"].replace("refs/heads/", ""), - "commits_data": payload["commits"], - } - - return get_push_commits_event_message(**data) - - -def format_new_branch_event(payload: Dict[str, Any]) -> str: - - branch_name = payload["ref"] - url = "{}/src/{}".format(payload["repository"]["html_url"], branch_name) - - data = { - "user_name": payload["sender"]["username"], + "user_name": payload["sender"]["username"].tame(check_string), "url": url, "branch_name": branch_name, } return get_create_branch_event_message(**data) -def format_pull_request_event(payload: Dict[str, Any], include_title: bool = False) -> str: +def format_pull_request_event(payload: WildValue, include_title: bool = False) -> str: + if payload["pull_request"]["merged"].tame(check_bool): + user_name = payload["pull_request"]["merged_by"]["username"].tame(check_string) + action = "merged" + else: + user_name = payload["pull_request"]["user"]["username"].tame(check_string) + action = payload["action"].tame(check_string) + url = payload["pull_request"]["html_url"].tame(check_string) + number = payload["pull_request"]["number"].tame(check_int) + target_branch = payload["pull_request"]["head_branch"].tame(check_string) + base_branch = payload["pull_request"]["base_branch"].tame(check_string) + title = payload["pull_request"]["title"].tame(check_string) if include_title else None - data = { - "user_name": payload["pull_request"]["user"]["username"], - "action": payload["action"], - "url": payload["pull_request"]["html_url"], - "number": payload["pull_request"]["number"], - "target_branch": payload["pull_request"]["head_branch"], - "base_branch": payload["pull_request"]["base_branch"], - "title": payload["pull_request"]["title"] if include_title else None, - } - - if payload["pull_request"]["merged"]: - data["user_name"] = payload["pull_request"]["merged_by"]["username"] - data["action"] = "merged" - - return get_pull_request_event_message(**data) - - -def format_issues_event(payload: Dict[str, Any], include_title: bool = False) -> str: - issue_nr = payload["issue"]["number"] - assignee = payload["issue"]["assignee"] - return get_issue_event_message( - payload["sender"]["login"], - payload["action"], - get_issue_url(payload["repository"]["html_url"], issue_nr), - issue_nr, - payload["issue"]["body"], - assignee=assignee["login"] if assignee else None, - title=payload["issue"]["title"] if include_title else None, + return get_pull_request_event_message( + user_name=user_name, + action=action, + url=url, + number=number, + target_branch=target_branch, + base_branch=base_branch, + title=title, ) -def format_issue_comment_event(payload: Dict[str, Any], include_title: bool = False) -> str: - action = payload["action"] +def format_issues_event(payload: WildValue, include_title: bool = False) -> str: + issue_nr = payload["issue"]["number"].tame(check_int) + assignee = payload["issue"]["assignee"] + return get_issue_event_message( + payload["sender"]["login"].tame(check_string), + payload["action"].tame(check_string), + get_issue_url(payload["repository"]["html_url"].tame(check_string), issue_nr), + issue_nr, + payload["issue"]["body"].tame(check_string), + assignee=assignee["login"].tame(check_string) if assignee else None, + title=payload["issue"]["title"].tame(check_string) if include_title else None, + ) + + +def format_issue_comment_event(payload: WildValue, include_title: bool = False) -> str: + action = payload["action"].tame(check_string) comment = payload["comment"] issue = payload["issue"] @@ -102,25 +120,27 @@ def format_issue_comment_event(payload: Dict[str, Any], include_title: bool = Fa action = "[commented]" else: action = f"{action} a [comment]" - action += "({}) on".format(comment["html_url"]) + action += "({}) on".format(comment["html_url"].tame(check_string)) return get_issue_event_message( - payload["sender"]["login"], + payload["sender"]["login"].tame(check_string), action, - get_issue_url(payload["repository"]["html_url"], issue["number"]), - issue["number"], - comment["body"], - title=issue["title"] if include_title else None, + get_issue_url( + payload["repository"]["html_url"].tame(check_string), issue["number"].tame(check_int) + ), + issue["number"].tame(check_int), + comment["body"].tame(check_string), + title=issue["title"].tame(check_string) if include_title else None, ) -def format_release_event(payload: Dict[str, Any], include_title: bool = False) -> str: +def format_release_event(payload: WildValue, include_title: bool = False) -> str: data = { - "user_name": payload["release"]["author"]["username"], - "action": payload["action"], - "tagname": payload["release"]["tag_name"], - "release_name": payload["release"]["name"], - "url": payload["repository"]["html_url"], + "user_name": payload["release"]["author"]["username"].tame(check_string), + "action": payload["action"].tame(check_string), + "tagname": payload["release"]["tag_name"].tame(check_string), + "release_name": payload["release"]["name"].tame(check_string), + "url": payload["repository"]["html_url"].tame(check_string), } return get_release_event_message(**data) @@ -134,7 +154,7 @@ ALL_EVENT_TYPES = ["issue_comment", "issues", "create", "pull_request", "push", def api_gogs_webhook( request: HttpRequest, user_profile: UserProfile, - payload: Dict[str, Any] = REQ(argument_type="body"), + payload: WildValue = REQ(argument_type="body", converter=to_wild_value), branches: Optional[str] = REQ(default=None), user_specified_topic: Optional[str] = REQ("topic", default=None), ) -> HttpResponse: @@ -151,7 +171,7 @@ def api_gogs_webhook( class FormatPullRequestEvent(Protocol): - def __call__(self, payload: Dict[str, Any], include_title: bool) -> str: + def __call__(self, payload: WildValue, include_title: bool) -> str: ... @@ -161,14 +181,14 @@ def gogs_webhook_main( format_pull_request_event: FormatPullRequestEvent, request: HttpRequest, user_profile: UserProfile, - payload: Dict[str, Any], + payload: WildValue, branches: Optional[str], user_specified_topic: Optional[str], ) -> HttpResponse: - repo = payload["repository"]["name"] + repo = payload["repository"]["name"].tame(check_string) event = validate_extract_webhook_http_header(request, http_header_name, integration_name) if event == "push": - branch = payload["ref"].replace("refs/heads/", "") + branch = payload["ref"].tame(check_string).replace("refs/heads/", "") if branches is not None and branch not in branches.split(","): return json_success(request) body = format_push_event(payload) @@ -180,7 +200,7 @@ def gogs_webhook_main( body = format_new_branch_event(payload) topic = TOPIC_WITH_BRANCH_TEMPLATE.format( repo=repo, - branch=payload["ref"], + branch=payload["ref"].tame(check_string), ) elif event == "pull_request": body = format_pull_request_event( @@ -190,8 +210,8 @@ def gogs_webhook_main( topic = TOPIC_WITH_PR_OR_ISSUE_INFO_TEMPLATE.format( repo=repo, type="PR", - id=payload["pull_request"]["id"], - title=payload["pull_request"]["title"], + id=payload["pull_request"]["id"].tame(check_int), + title=payload["pull_request"]["title"].tame(check_string), ) elif event == "issues": body = format_issues_event( @@ -201,8 +221,8 @@ def gogs_webhook_main( topic = TOPIC_WITH_PR_OR_ISSUE_INFO_TEMPLATE.format( repo=repo, type="issue", - id=payload["issue"]["number"], - title=payload["issue"]["title"], + id=payload["issue"]["number"].tame(check_int), + title=payload["issue"]["title"].tame(check_string), ) elif event == "issue_comment": body = format_issue_comment_event( @@ -212,8 +232,8 @@ def gogs_webhook_main( topic = TOPIC_WITH_PR_OR_ISSUE_INFO_TEMPLATE.format( repo=repo, type="issue", - id=payload["issue"]["number"], - title=payload["issue"]["title"], + id=payload["issue"]["number"].tame(check_int), + title=payload["issue"]["title"].tame(check_string), ) elif event == "release": body = format_release_event( @@ -222,8 +242,8 @@ def gogs_webhook_main( ) topic = TOPIC_WITH_RELEASE_TEMPLATE.format( repo=repo, - tag=payload["release"]["tag_name"], - title=payload["release"]["name"], + tag=payload["release"]["tag_name"].tame(check_string), + title=payload["release"]["name"].tame(check_string), ) else: