mirror of https://github.com/zulip/zulip.git
176 lines
5.9 KiB
Python
Executable File
176 lines
5.9 KiB
Python
Executable File
#!/usr/bin/env python3
|
|
"""
|
|
Fetch contributors data from GitHub using their API, convert it to structured
|
|
JSON data for the /team/ page contributors section.
|
|
"""
|
|
|
|
import argparse
|
|
import json
|
|
import logging
|
|
import os
|
|
import sys
|
|
import unicodedata
|
|
from datetime import datetime, timezone
|
|
from typing import Dict, List, Optional, Union
|
|
|
|
sys.path.insert(0, os.path.join(os.path.dirname(__file__), ".."))
|
|
from scripts.lib.setup_path import setup_path
|
|
|
|
setup_path()
|
|
os.environ["DJANGO_SETTINGS_MODULE"] = "zproject.settings"
|
|
|
|
from typing import TypedDict
|
|
|
|
import django
|
|
from django.conf import settings
|
|
from urllib3.util import Retry
|
|
|
|
django.setup()
|
|
|
|
from zerver.lib.avatar_hash import gravatar_hash
|
|
from zerver.lib.github import GithubSession
|
|
from zproject.config import get_secret
|
|
|
|
duplicate_commits_file = os.path.join(os.path.dirname(__file__), "duplicate_commits.json")
|
|
|
|
parser = argparse.ArgumentParser()
|
|
parser.add_argument(
|
|
"--max-retries", type=int, default=10, help="Number of times to retry fetching data from GitHub"
|
|
)
|
|
args = parser.parse_args()
|
|
|
|
|
|
class ContributorsJSON(TypedDict):
|
|
date: str
|
|
contributors: List[Dict[str, Union[int, str]]]
|
|
|
|
|
|
class Contributor(TypedDict):
|
|
avatar_url: Optional[str]
|
|
contributions: int
|
|
login: Optional[str]
|
|
email: Optional[str]
|
|
name: Optional[str]
|
|
|
|
|
|
logger = logging.getLogger("zulip.fetch_contributors_json")
|
|
|
|
|
|
def fetch_contributors(repo_name: str, max_retries: int) -> List[Contributor]:
|
|
contributors: List[Contributor] = []
|
|
page_index = 1
|
|
|
|
api_link = f"https://api.github.com/repos/zulip/{repo_name}/contributors"
|
|
api_data = {"anon": "1"}
|
|
certificates = os.environ.get("CUSTOM_CA_CERTIFICATES")
|
|
|
|
headers: Dict[str, str] = {}
|
|
personal_access_token = get_secret("github_personal_access_token")
|
|
if personal_access_token is not None:
|
|
headers = {"Authorization": f"token {personal_access_token}"}
|
|
|
|
Retry.DEFAULT_BACKOFF_MAX = 64
|
|
retry = Retry(
|
|
total=max_retries,
|
|
backoff_factor=2.0,
|
|
status_forcelist={
|
|
403, # Github does unauth rate-limiting via 403's
|
|
429, # The formal rate-limiting response code
|
|
502, # Bad gateway
|
|
503, # Service unavailable
|
|
},
|
|
)
|
|
session = GithubSession(max_retries=retry)
|
|
while True:
|
|
response = session.get(
|
|
api_link,
|
|
params={**api_data, "page": f"{page_index}"},
|
|
verify=certificates,
|
|
headers=headers,
|
|
)
|
|
response.raise_for_status()
|
|
data = response.json()
|
|
if len(data) == 0:
|
|
return contributors
|
|
contributors.extend(data)
|
|
page_index += 1
|
|
|
|
|
|
def write_to_disk(json_data: ContributorsJSON, out_file: str) -> None:
|
|
with open(out_file, "w") as f:
|
|
json.dump(json_data, f, indent=2, sort_keys=True)
|
|
f.write("\n")
|
|
|
|
|
|
def update_contributor_data_file() -> None:
|
|
# This list should hold all repositories that should be included in
|
|
# the total count, including those that should *not* have tabs on the team
|
|
# page (e.g. if they are deprecated).
|
|
repo_names = [
|
|
"python-zulip-api",
|
|
"zulip",
|
|
"zulip-android-legacy",
|
|
"zulip-desktop",
|
|
"zulip-flutter",
|
|
"zulip-ios-legacy",
|
|
"zulip-js",
|
|
"zulip-mobile",
|
|
"zulip-terminal",
|
|
"zulipbot",
|
|
]
|
|
|
|
data: ContributorsJSON = dict(date=str(datetime.now(tz=timezone.utc).date()), contributors=[])
|
|
contributor_username_to_data: Dict[str, Dict[str, Union[str, int]]] = {}
|
|
|
|
for repo_name in repo_names:
|
|
contributors = fetch_contributors(repo_name, args.max_retries)
|
|
for contributor in contributors:
|
|
username = contributor.get("login") or contributor.get("email")
|
|
assert username is not None
|
|
if username in contributor_username_to_data:
|
|
contributor_username_to_data[username][repo_name] = contributor["contributions"]
|
|
else:
|
|
contributor_username_to_data[username] = {repo_name: contributor["contributions"]}
|
|
|
|
avatar_url = contributor.get("avatar_url")
|
|
if avatar_url is not None:
|
|
contributor_username_to_data[username]["avatar"] = avatar_url
|
|
|
|
email = contributor.get("email")
|
|
if email is not None:
|
|
contributor_username_to_data[username]["email"] = email
|
|
hash_key = gravatar_hash(email)
|
|
gravatar_url = f"https://secure.gravatar.com/avatar/{hash_key}?d=identicon"
|
|
contributor_username_to_data[username]["avatar"] = gravatar_url
|
|
|
|
login = contributor.get("login")
|
|
if login is not None:
|
|
contributor_username_to_data[username]["github_username"] = login
|
|
|
|
name = contributor.get("name")
|
|
if name is not None:
|
|
contributor_username_to_data[username]["name"] = unicodedata.normalize(
|
|
"NFC", name
|
|
)
|
|
|
|
# remove duplicate contributions count
|
|
# find commits at the time of split and subtract from zulip-server
|
|
with open(duplicate_commits_file) as f:
|
|
duplicate_commits = json.load(f)
|
|
for committer in duplicate_commits:
|
|
if committer in contributor_username_to_data and contributor_username_to_data[
|
|
committer
|
|
].get("zulip"):
|
|
total_commits = contributor_username_to_data[committer]["zulip"]
|
|
assert isinstance(total_commits, int)
|
|
duplicate_commits_count = duplicate_commits[committer]
|
|
original_commits = total_commits - duplicate_commits_count
|
|
contributor_username_to_data[committer]["zulip"] = original_commits
|
|
|
|
data["contributors"] = list(contributor_username_to_data.values())
|
|
write_to_disk(data, settings.CONTRIBUTOR_DATA_FILE_PATH)
|
|
|
|
|
|
if __name__ == "__main__":
|
|
update_contributor_data_file()
|