2017-01-28 18:42:59 +01:00
|
|
|
# vim:fenc=utf-8
|
2022-04-02 19:25:30 +02:00
|
|
|
from typing import Dict, List, Optional, Protocol
|
2017-11-16 00:43:10 +01:00
|
|
|
|
|
|
|
from django.http import HttpRequest, HttpResponse
|
|
|
|
|
2020-08-20 00:32:15 +02:00
|
|
|
from zerver.decorator import webhook_view
|
2022-11-17 09:30:48 +01:00
|
|
|
from zerver.lib.exceptions import UnsupportedWebhookEventTypeError
|
2019-02-02 23:53:55 +01:00
|
|
|
from zerver.lib.response import json_success
|
2023-09-27 19:01:31 +02:00
|
|
|
from zerver.lib.typed_endpoint import JsonBodyPayload, typed_endpoint
|
2023-08-12 09:34:31 +02:00
|
|
|
from zerver.lib.validator import WildValue, check_bool, check_int, check_string
|
2020-06-11 00:54:34 +02:00
|
|
|
from zerver.lib.webhooks.common import (
|
2023-08-12 09:34:31 +02:00
|
|
|
OptionalUserSpecifiedTopicStr,
|
2020-06-11 00:54:34 +02:00
|
|
|
check_send_webhook_message,
|
|
|
|
get_http_headers_from_filename,
|
|
|
|
validate_extract_webhook_http_header,
|
|
|
|
)
|
|
|
|
from zerver.lib.webhooks.git import (
|
|
|
|
TOPIC_WITH_BRANCH_TEMPLATE,
|
|
|
|
TOPIC_WITH_PR_OR_ISSUE_INFO_TEMPLATE,
|
|
|
|
TOPIC_WITH_RELEASE_TEMPLATE,
|
|
|
|
get_create_branch_event_message,
|
|
|
|
get_issue_event_message,
|
|
|
|
get_pull_request_event_message,
|
|
|
|
get_push_commits_event_message,
|
|
|
|
get_release_event_message,
|
|
|
|
)
|
2017-10-31 04:25:48 +01:00
|
|
|
from zerver.models import UserProfile
|
2017-01-28 18:42:59 +01:00
|
|
|
|
2019-07-04 20:48:51 +02:00
|
|
|
fixture_to_headers = get_http_headers_from_filename("HTTP_X_GOGS_EVENT")
|
|
|
|
|
2021-02-12 08:19:30 +01:00
|
|
|
|
2019-11-18 03:41:13 +01:00
|
|
|
def get_issue_url(repo_url: str, issue_nr: int) -> str:
|
2020-06-09 00:25:09 +02:00
|
|
|
return f"{repo_url}/issues/{issue_nr}"
|
2019-11-18 03:41:13 +01:00
|
|
|
|
2021-02-12 08:19:30 +01:00
|
|
|
|
2022-04-02 19:25:30 +02:00
|
|
|
def format_push_event(payload: WildValue) -> str:
|
|
|
|
user_name = payload["sender"]["username"].tame(check_string)
|
|
|
|
compare_url = payload["compare_url"].tame(check_string)
|
|
|
|
branch_name = payload["ref"].tame(check_string).replace("refs/heads/", "")
|
|
|
|
commits_data = _transform_commits_list_to_common_format(payload["commits"])
|
|
|
|
return get_push_commits_event_message(
|
|
|
|
user_name=user_name,
|
|
|
|
compare_url=compare_url,
|
|
|
|
branch_name=branch_name,
|
|
|
|
commits_data=commits_data,
|
|
|
|
)
|
2017-01-28 18:42:59 +01:00
|
|
|
|
|
|
|
|
2022-04-02 19:25:30 +02:00
|
|
|
def _transform_commits_list_to_common_format(commits: WildValue) -> List[Dict[str, str]]:
|
2023-07-31 22:52:35 +02:00
|
|
|
return [
|
|
|
|
{
|
|
|
|
"name": commit["author"]["username"].tame(check_string)
|
|
|
|
or commit["author"]["name"].tame(check_string).split()[0],
|
|
|
|
"sha": commit["id"].tame(check_string),
|
|
|
|
"url": commit["url"].tame(check_string),
|
|
|
|
"message": commit["message"].tame(check_string),
|
|
|
|
}
|
|
|
|
for commit in commits
|
|
|
|
]
|
2017-01-28 18:42:59 +01:00
|
|
|
|
2021-02-12 08:19:30 +01:00
|
|
|
|
2022-04-02 19:25:30 +02:00
|
|
|
def format_new_branch_event(payload: WildValue) -> str:
|
|
|
|
branch_name = payload["ref"].tame(check_string)
|
|
|
|
url = "{}/src/{}".format(payload["repository"]["html_url"].tame(check_string), branch_name)
|
2017-01-28 18:42:59 +01:00
|
|
|
|
|
|
|
data = {
|
2022-04-02 19:25:30 +02:00
|
|
|
"user_name": payload["sender"]["username"].tame(check_string),
|
2021-02-12 08:20:45 +01:00
|
|
|
"url": url,
|
|
|
|
"branch_name": branch_name,
|
2017-01-28 18:42:59 +01:00
|
|
|
}
|
|
|
|
return get_create_branch_event_message(**data)
|
|
|
|
|
2021-02-12 08:19:30 +01:00
|
|
|
|
2022-04-02 19:25:30 +02:00
|
|
|
def format_pull_request_event(payload: WildValue, include_title: bool = False) -> str:
|
|
|
|
if payload["pull_request"]["merged"].tame(check_bool):
|
|
|
|
user_name = payload["pull_request"]["merged_by"]["username"].tame(check_string)
|
|
|
|
action = "merged"
|
|
|
|
else:
|
|
|
|
user_name = payload["pull_request"]["user"]["username"].tame(check_string)
|
|
|
|
action = payload["action"].tame(check_string)
|
|
|
|
url = payload["pull_request"]["html_url"].tame(check_string)
|
|
|
|
number = payload["pull_request"]["number"].tame(check_int)
|
2023-04-02 20:07:12 +02:00
|
|
|
target_branch = None
|
|
|
|
base_branch = None
|
|
|
|
if action != "edited":
|
|
|
|
target_branch = payload["pull_request"]["head_branch"].tame(check_string)
|
|
|
|
base_branch = payload["pull_request"]["base_branch"].tame(check_string)
|
2022-04-02 19:25:30 +02:00
|
|
|
title = payload["pull_request"]["title"].tame(check_string) if include_title else None
|
|
|
|
|
|
|
|
return get_pull_request_event_message(
|
|
|
|
user_name=user_name,
|
|
|
|
action=action,
|
|
|
|
url=url,
|
|
|
|
number=number,
|
|
|
|
target_branch=target_branch,
|
|
|
|
base_branch=base_branch,
|
|
|
|
title=title,
|
|
|
|
)
|
2017-01-28 18:42:59 +01:00
|
|
|
|
2021-02-12 08:19:30 +01:00
|
|
|
|
2022-04-02 19:25:30 +02:00
|
|
|
def format_issues_event(payload: WildValue, include_title: bool = False) -> str:
|
|
|
|
issue_nr = payload["issue"]["number"].tame(check_int)
|
2021-02-12 08:20:45 +01:00
|
|
|
assignee = payload["issue"]["assignee"]
|
2019-11-18 03:41:13 +01:00
|
|
|
return get_issue_event_message(
|
2023-03-16 20:36:47 +01:00
|
|
|
user_name=payload["sender"]["login"].tame(check_string),
|
|
|
|
action=payload["action"].tame(check_string),
|
|
|
|
url=get_issue_url(payload["repository"]["html_url"].tame(check_string), issue_nr),
|
|
|
|
number=issue_nr,
|
|
|
|
message=payload["issue"]["body"].tame(check_string),
|
2022-04-02 19:25:30 +02:00
|
|
|
assignee=assignee["login"].tame(check_string) if assignee else None,
|
|
|
|
title=payload["issue"]["title"].tame(check_string) if include_title else None,
|
2019-11-18 03:41:13 +01:00
|
|
|
)
|
|
|
|
|
2021-02-12 08:19:30 +01:00
|
|
|
|
2022-04-02 19:25:30 +02:00
|
|
|
def format_issue_comment_event(payload: WildValue, include_title: bool = False) -> str:
|
|
|
|
action = payload["action"].tame(check_string)
|
2021-02-12 08:20:45 +01:00
|
|
|
comment = payload["comment"]
|
|
|
|
issue = payload["issue"]
|
2019-11-18 03:41:13 +01:00
|
|
|
|
2021-02-12 08:20:45 +01:00
|
|
|
if action == "created":
|
|
|
|
action = "[commented]"
|
2019-11-18 03:41:13 +01:00
|
|
|
else:
|
2021-02-12 08:20:45 +01:00
|
|
|
action = f"{action} a [comment]"
|
2022-04-02 19:25:30 +02:00
|
|
|
action += "({}) on".format(comment["html_url"].tame(check_string))
|
2019-11-18 03:41:13 +01:00
|
|
|
|
|
|
|
return get_issue_event_message(
|
2023-03-16 20:36:47 +01:00
|
|
|
user_name=payload["sender"]["login"].tame(check_string),
|
|
|
|
action=action,
|
|
|
|
url=get_issue_url(
|
2022-04-02 19:25:30 +02:00
|
|
|
payload["repository"]["html_url"].tame(check_string), issue["number"].tame(check_int)
|
|
|
|
),
|
2023-03-16 20:36:47 +01:00
|
|
|
number=issue["number"].tame(check_int),
|
|
|
|
message=comment["body"].tame(check_string),
|
2022-04-02 19:25:30 +02:00
|
|
|
title=issue["title"].tame(check_string) if include_title else None,
|
2019-11-18 03:41:13 +01:00
|
|
|
)
|
|
|
|
|
2021-02-12 08:19:30 +01:00
|
|
|
|
2022-04-02 19:25:30 +02:00
|
|
|
def format_release_event(payload: WildValue, include_title: bool = False) -> str:
|
2020-05-11 08:58:53 +02:00
|
|
|
data = {
|
2022-04-02 19:25:30 +02:00
|
|
|
"user_name": payload["release"]["author"]["username"].tame(check_string),
|
|
|
|
"action": payload["action"].tame(check_string),
|
|
|
|
"tagname": payload["release"]["tag_name"].tame(check_string),
|
|
|
|
"release_name": payload["release"]["name"].tame(check_string),
|
|
|
|
"url": payload["repository"]["html_url"].tame(check_string),
|
2020-05-11 08:58:53 +02:00
|
|
|
}
|
|
|
|
|
|
|
|
return get_release_event_message(**data)
|
|
|
|
|
2021-02-12 08:19:30 +01:00
|
|
|
|
2021-07-16 11:40:46 +02:00
|
|
|
ALL_EVENT_TYPES = ["issue_comment", "issues", "create", "pull_request", "push", "release"]
|
|
|
|
|
|
|
|
|
|
|
|
@webhook_view("Gogs", all_event_types=ALL_EVENT_TYPES)
|
2023-08-12 09:34:31 +02:00
|
|
|
@typed_endpoint
|
2021-02-12 08:19:30 +01:00
|
|
|
def api_gogs_webhook(
|
|
|
|
request: HttpRequest,
|
|
|
|
user_profile: UserProfile,
|
2023-08-12 09:34:31 +02:00
|
|
|
*,
|
2023-09-27 19:01:31 +02:00
|
|
|
payload: JsonBodyPayload[WildValue],
|
2023-08-12 09:34:31 +02:00
|
|
|
branches: Optional[str] = None,
|
|
|
|
user_specified_topic: OptionalUserSpecifiedTopicStr = None,
|
2021-02-12 08:19:30 +01:00
|
|
|
) -> HttpResponse:
|
|
|
|
return gogs_webhook_main(
|
|
|
|
"Gogs",
|
2022-05-12 06:54:12 +02:00
|
|
|
"X-Gogs-Event",
|
2021-02-12 08:19:30 +01:00
|
|
|
format_pull_request_event,
|
|
|
|
request,
|
|
|
|
user_profile,
|
|
|
|
payload,
|
|
|
|
branches,
|
|
|
|
user_specified_topic,
|
|
|
|
)
|
|
|
|
|
|
|
|
|
2021-02-16 01:25:55 +01:00
|
|
|
class FormatPullRequestEvent(Protocol):
|
2022-04-02 19:25:30 +02:00
|
|
|
def __call__(self, payload: WildValue, include_title: bool) -> str:
|
2021-02-16 01:25:55 +01:00
|
|
|
...
|
|
|
|
|
|
|
|
|
2021-02-12 08:19:30 +01:00
|
|
|
def gogs_webhook_main(
|
|
|
|
integration_name: str,
|
|
|
|
http_header_name: str,
|
2021-02-16 01:25:55 +01:00
|
|
|
format_pull_request_event: FormatPullRequestEvent,
|
2021-02-12 08:19:30 +01:00
|
|
|
request: HttpRequest,
|
|
|
|
user_profile: UserProfile,
|
2022-04-02 19:25:30 +02:00
|
|
|
payload: WildValue,
|
2021-02-12 08:19:30 +01:00
|
|
|
branches: Optional[str],
|
|
|
|
user_specified_topic: Optional[str],
|
|
|
|
) -> HttpResponse:
|
2022-04-02 19:25:30 +02:00
|
|
|
repo = payload["repository"]["name"].tame(check_string)
|
2019-11-18 21:07:30 +01:00
|
|
|
event = validate_extract_webhook_http_header(request, http_header_name, integration_name)
|
2021-02-12 08:20:45 +01:00
|
|
|
if event == "push":
|
2022-04-02 19:25:30 +02:00
|
|
|
branch = payload["ref"].tame(check_string).replace("refs/heads/", "")
|
2021-02-12 08:20:45 +01:00
|
|
|
if branches is not None and branch not in branches.split(","):
|
2022-01-31 13:44:02 +01:00
|
|
|
return json_success(request)
|
2017-08-24 17:31:04 +02:00
|
|
|
body = format_push_event(payload)
|
2018-11-09 20:59:15 +01:00
|
|
|
topic = TOPIC_WITH_BRANCH_TEMPLATE.format(
|
2017-08-24 17:31:04 +02:00
|
|
|
repo=repo,
|
python: Use trailing commas consistently.
Automatically generated by the following script, based on the output
of lint with flake8-comma:
import re
import sys
last_filename = None
last_row = None
lines = []
for msg in sys.stdin:
m = re.match(
r"\x1b\[35mflake8 \|\x1b\[0m \x1b\[1;31m(.+):(\d+):(\d+): (\w+)", msg
)
if m:
filename, row_str, col_str, err = m.groups()
row, col = int(row_str), int(col_str)
if filename == last_filename:
assert last_row != row
else:
if last_filename is not None:
with open(last_filename, "w") as f:
f.writelines(lines)
with open(filename) as f:
lines = f.readlines()
last_filename = filename
last_row = row
line = lines[row - 1]
if err in ["C812", "C815"]:
lines[row - 1] = line[: col - 1] + "," + line[col - 1 :]
elif err in ["C819"]:
assert line[col - 2] == ","
lines[row - 1] = line[: col - 2] + line[col - 1 :].lstrip(" ")
if last_filename is not None:
with open(last_filename, "w") as f:
f.writelines(lines)
Signed-off-by: Anders Kaseorg <anders@zulipchat.com>
2020-04-10 05:23:40 +02:00
|
|
|
branch=branch,
|
2017-08-24 17:31:04 +02:00
|
|
|
)
|
2021-02-12 08:20:45 +01:00
|
|
|
elif event == "create":
|
2017-08-24 17:31:04 +02:00
|
|
|
body = format_new_branch_event(payload)
|
2018-11-09 20:59:15 +01:00
|
|
|
topic = TOPIC_WITH_BRANCH_TEMPLATE.format(
|
2017-08-24 17:31:04 +02:00
|
|
|
repo=repo,
|
2022-04-02 19:25:30 +02:00
|
|
|
branch=payload["ref"].tame(check_string),
|
2017-08-24 17:31:04 +02:00
|
|
|
)
|
2021-02-12 08:20:45 +01:00
|
|
|
elif event == "pull_request":
|
2018-07-23 20:14:24 +02:00
|
|
|
body = format_pull_request_event(
|
|
|
|
payload,
|
python: Use trailing commas consistently.
Automatically generated by the following script, based on the output
of lint with flake8-comma:
import re
import sys
last_filename = None
last_row = None
lines = []
for msg in sys.stdin:
m = re.match(
r"\x1b\[35mflake8 \|\x1b\[0m \x1b\[1;31m(.+):(\d+):(\d+): (\w+)", msg
)
if m:
filename, row_str, col_str, err = m.groups()
row, col = int(row_str), int(col_str)
if filename == last_filename:
assert last_row != row
else:
if last_filename is not None:
with open(last_filename, "w") as f:
f.writelines(lines)
with open(filename) as f:
lines = f.readlines()
last_filename = filename
last_row = row
line = lines[row - 1]
if err in ["C812", "C815"]:
lines[row - 1] = line[: col - 1] + "," + line[col - 1 :]
elif err in ["C819"]:
assert line[col - 2] == ","
lines[row - 1] = line[: col - 2] + line[col - 1 :].lstrip(" ")
if last_filename is not None:
with open(last_filename, "w") as f:
f.writelines(lines)
Signed-off-by: Anders Kaseorg <anders@zulipchat.com>
2020-04-10 05:23:40 +02:00
|
|
|
include_title=user_specified_topic is not None,
|
2018-07-23 20:14:24 +02:00
|
|
|
)
|
2018-11-09 20:59:15 +01:00
|
|
|
topic = TOPIC_WITH_PR_OR_ISSUE_INFO_TEMPLATE.format(
|
2017-08-24 17:31:04 +02:00
|
|
|
repo=repo,
|
2021-02-12 08:20:45 +01:00
|
|
|
type="PR",
|
2022-04-02 19:25:30 +02:00
|
|
|
id=payload["pull_request"]["id"].tame(check_int),
|
|
|
|
title=payload["pull_request"]["title"].tame(check_string),
|
2017-08-24 17:31:04 +02:00
|
|
|
)
|
2021-02-12 08:20:45 +01:00
|
|
|
elif event == "issues":
|
2019-11-18 03:41:13 +01:00
|
|
|
body = format_issues_event(
|
|
|
|
payload,
|
python: Use trailing commas consistently.
Automatically generated by the following script, based on the output
of lint with flake8-comma:
import re
import sys
last_filename = None
last_row = None
lines = []
for msg in sys.stdin:
m = re.match(
r"\x1b\[35mflake8 \|\x1b\[0m \x1b\[1;31m(.+):(\d+):(\d+): (\w+)", msg
)
if m:
filename, row_str, col_str, err = m.groups()
row, col = int(row_str), int(col_str)
if filename == last_filename:
assert last_row != row
else:
if last_filename is not None:
with open(last_filename, "w") as f:
f.writelines(lines)
with open(filename) as f:
lines = f.readlines()
last_filename = filename
last_row = row
line = lines[row - 1]
if err in ["C812", "C815"]:
lines[row - 1] = line[: col - 1] + "," + line[col - 1 :]
elif err in ["C819"]:
assert line[col - 2] == ","
lines[row - 1] = line[: col - 2] + line[col - 1 :].lstrip(" ")
if last_filename is not None:
with open(last_filename, "w") as f:
f.writelines(lines)
Signed-off-by: Anders Kaseorg <anders@zulipchat.com>
2020-04-10 05:23:40 +02:00
|
|
|
include_title=user_specified_topic is not None,
|
2019-11-18 03:41:13 +01:00
|
|
|
)
|
|
|
|
topic = TOPIC_WITH_PR_OR_ISSUE_INFO_TEMPLATE.format(
|
|
|
|
repo=repo,
|
2021-05-10 07:02:14 +02:00
|
|
|
type="issue",
|
2022-04-02 19:25:30 +02:00
|
|
|
id=payload["issue"]["number"].tame(check_int),
|
|
|
|
title=payload["issue"]["title"].tame(check_string),
|
2019-11-18 03:41:13 +01:00
|
|
|
)
|
2021-02-12 08:20:45 +01:00
|
|
|
elif event == "issue_comment":
|
2019-11-18 03:41:13 +01:00
|
|
|
body = format_issue_comment_event(
|
|
|
|
payload,
|
python: Use trailing commas consistently.
Automatically generated by the following script, based on the output
of lint with flake8-comma:
import re
import sys
last_filename = None
last_row = None
lines = []
for msg in sys.stdin:
m = re.match(
r"\x1b\[35mflake8 \|\x1b\[0m \x1b\[1;31m(.+):(\d+):(\d+): (\w+)", msg
)
if m:
filename, row_str, col_str, err = m.groups()
row, col = int(row_str), int(col_str)
if filename == last_filename:
assert last_row != row
else:
if last_filename is not None:
with open(last_filename, "w") as f:
f.writelines(lines)
with open(filename) as f:
lines = f.readlines()
last_filename = filename
last_row = row
line = lines[row - 1]
if err in ["C812", "C815"]:
lines[row - 1] = line[: col - 1] + "," + line[col - 1 :]
elif err in ["C819"]:
assert line[col - 2] == ","
lines[row - 1] = line[: col - 2] + line[col - 1 :].lstrip(" ")
if last_filename is not None:
with open(last_filename, "w") as f:
f.writelines(lines)
Signed-off-by: Anders Kaseorg <anders@zulipchat.com>
2020-04-10 05:23:40 +02:00
|
|
|
include_title=user_specified_topic is not None,
|
2019-11-18 03:41:13 +01:00
|
|
|
)
|
|
|
|
topic = TOPIC_WITH_PR_OR_ISSUE_INFO_TEMPLATE.format(
|
|
|
|
repo=repo,
|
2021-05-10 07:02:14 +02:00
|
|
|
type="issue",
|
2022-04-02 19:25:30 +02:00
|
|
|
id=payload["issue"]["number"].tame(check_int),
|
|
|
|
title=payload["issue"]["title"].tame(check_string),
|
2019-11-18 03:41:13 +01:00
|
|
|
)
|
2021-02-12 08:20:45 +01:00
|
|
|
elif event == "release":
|
2020-05-11 08:58:53 +02:00
|
|
|
body = format_release_event(
|
|
|
|
payload,
|
python: Use trailing commas consistently.
Automatically generated by the following script, based on the output
of lint with flake8-comma:
import re
import sys
last_filename = None
last_row = None
lines = []
for msg in sys.stdin:
m = re.match(
r"\x1b\[35mflake8 \|\x1b\[0m \x1b\[1;31m(.+):(\d+):(\d+): (\w+)", msg
)
if m:
filename, row_str, col_str, err = m.groups()
row, col = int(row_str), int(col_str)
if filename == last_filename:
assert last_row != row
else:
if last_filename is not None:
with open(last_filename, "w") as f:
f.writelines(lines)
with open(filename) as f:
lines = f.readlines()
last_filename = filename
last_row = row
line = lines[row - 1]
if err in ["C812", "C815"]:
lines[row - 1] = line[: col - 1] + "," + line[col - 1 :]
elif err in ["C819"]:
assert line[col - 2] == ","
lines[row - 1] = line[: col - 2] + line[col - 1 :].lstrip(" ")
if last_filename is not None:
with open(last_filename, "w") as f:
f.writelines(lines)
Signed-off-by: Anders Kaseorg <anders@zulipchat.com>
2020-04-10 05:23:40 +02:00
|
|
|
include_title=user_specified_topic is not None,
|
2020-05-11 08:58:53 +02:00
|
|
|
)
|
|
|
|
topic = TOPIC_WITH_RELEASE_TEMPLATE.format(
|
|
|
|
repo=repo,
|
2022-04-02 19:25:30 +02:00
|
|
|
tag=payload["release"]["tag_name"].tame(check_string),
|
|
|
|
title=payload["release"]["name"].tame(check_string),
|
2020-05-11 08:58:53 +02:00
|
|
|
)
|
|
|
|
|
2017-08-24 17:31:04 +02:00
|
|
|
else:
|
2022-11-17 09:30:48 +01:00
|
|
|
raise UnsupportedWebhookEventTypeError(event)
|
2017-01-28 18:42:59 +01:00
|
|
|
|
2021-07-16 11:40:46 +02:00
|
|
|
check_send_webhook_message(request, user_profile, topic, body, event)
|
2022-01-31 13:44:02 +01:00
|
|
|
return json_success(request)
|