tools: Use Python 3 syntax for typing in many files.

This commit is contained in:
rht 2017-11-28 00:24:00 +00:00 committed by Greg Price
parent 5a1869901d
commit 66261f1cc3
17 changed files with 74 additions and 146 deletions

View File

@ -9,8 +9,7 @@ import subprocess
from lib import sanity_check from lib import sanity_check
sanity_check.check_venv(__file__) sanity_check.check_venv(__file__)
def validate(fn): def validate(fn: str) -> None:
# type: (str) -> None
text = open(fn).read() text = open(fn).read()
section_list = parse(text) section_list = parse(text)
if text != section_list.text(): if text != section_list.text():
@ -19,8 +18,7 @@ def validate(fn):
subprocess.call(['diff', fn, '/var/tmp/pretty_css.txt'], stderr=subprocess.STDOUT) subprocess.call(['diff', fn, '/var/tmp/pretty_css.txt'], stderr=subprocess.STDOUT)
sys.exit(1) sys.exit(1)
def check_our_files(filenames): def check_our_files(filenames: Iterable[str]) -> None:
# type: (Iterable[str]) -> None
for filename in filenames: for filename in filenames:
if 'pygments.css' in filename: if 'pygments.css' in filename:
# This just has really strange formatting that our # This just has really strange formatting that our

View File

@ -14,8 +14,7 @@ import subprocess
sys.path.append(os.path.join(os.path.dirname(__file__), '..')) sys.path.append(os.path.join(os.path.dirname(__file__), '..'))
from scripts.lib.zulip_tools import WARNING, FAIL, ENDC from scripts.lib.zulip_tools import WARNING, FAIL, ENDC
def find_handlebars(translatable_strings): def find_handlebars(translatable_strings: List[Text]) -> List[Text]:
# type: (List[Text]) -> List[Text]
errored = [] errored = []
for string in translatable_strings: for string in translatable_strings:
if '{{' in string: if '{{' in string:

View File

@ -19,8 +19,7 @@ EXCLUDED_FILES = [
"docs/_templates", "docs/_templates",
] ]
def check_our_files(modified_only, all_dups, targets): def check_our_files(modified_only: bool, all_dups: bool, targets: List[str]) -> None:
# type: (bool, bool, List[str]) -> None
by_lang = cast( by_lang = cast(
Dict[str, List[str]], Dict[str, List[str]],
lister.list_files( lister.list_files(
@ -32,8 +31,7 @@ def check_our_files(modified_only, all_dups, targets):
check_handlebar_templates(by_lang['handlebars']) check_handlebar_templates(by_lang['handlebars'])
check_html_templates(by_lang['html'], args.all_dups) check_html_templates(by_lang['html'], args.all_dups)
def check_html_templates(templates, all_dups): def check_html_templates(templates: Iterable[str], all_dups: bool) -> None:
# type: (Iterable[str], bool) -> None
# Our files with .html extensions are usually for Django, but we also # Our files with .html extensions are usually for Django, but we also
# have a few static .html files. # have a few static .html files.
# #
@ -130,8 +128,7 @@ def check_html_templates(templates, all_dups):
if not validate_indent_html(fn): if not validate_indent_html(fn):
sys.exit(1) sys.exit(1)
def check_handlebar_templates(templates): def check_handlebar_templates(templates: Iterable[str]) -> None:
# type: (Iterable[str]) -> None
# Check all our handlebars templates. # Check all our handlebars templates.
templates = [fn for fn in templates if fn.endswith('.handlebars')] templates = [fn for fn in templates if fn.endswith('.handlebars')]

View File

@ -19,35 +19,30 @@ from typing import Dict, List
os.chdir(settings.DEPLOY_ROOT) os.chdir(settings.DEPLOY_ROOT)
STATIC_PATH = 'static/' STATIC_PATH = 'static/'
def get_templates(): def get_templates() -> List[str]:
# type: () -> List[str]
return (glob.glob(os.path.join(STATIC_PATH, 'templates/*.handlebars')) + return (glob.glob(os.path.join(STATIC_PATH, 'templates/*.handlebars')) +
glob.glob(os.path.join(STATIC_PATH, 'templates/settings/*.handlebars'))) glob.glob(os.path.join(STATIC_PATH, 'templates/settings/*.handlebars')))
def run(): def run() -> None:
# type: () -> None
subprocess.check_call(['node', 'node_modules/.bin/handlebars'] + subprocess.check_call(['node', 'node_modules/.bin/handlebars'] +
get_templates() + get_templates() +
['--output', os.path.join(STATIC_PATH, 'templates/compiled.js'), ['--output', os.path.join(STATIC_PATH, 'templates/compiled.js'),
'--known', 'if,unless,each,with']) '--known', 'if,unless,each,with'])
def add_error_stamp_file(file_path): def add_error_stamp_file(file_path: str) -> None:
# type: (str) -> None
file_dir = os.path.dirname(file_path) file_dir = os.path.dirname(file_path)
if not os.path.exists(file_dir): if not os.path.exists(file_dir):
os.makedirs(file_dir) os.makedirs(file_dir)
open(file_path, 'a').close() open(file_path, 'a').close()
def remove_error_stamp_file(file_path): def remove_error_stamp_file(file_path: str) -> None:
# type: (str) -> None
if os.path.exists(file_path): if os.path.exists(file_path):
os.remove(file_path) os.remove(file_path)
def run_forever(): def run_forever() -> None:
# type: () -> None
# Keep polling for file changes, similar to how Django does it in # Keep polling for file changes, similar to how Django does it in
# django/utils/autoreload.py. If any of our templates change, rebuild # django/utils/autoreload.py. If any of our templates change, rebuild
# compiled.js # compiled.js

View File

@ -15,8 +15,7 @@ import ujson
Call = Dict[str, Any] Call = Dict[str, Any]
def clean_up_pattern(s): def clean_up_pattern(s: str) -> str:
# type: (str) -> str
paren_level = 0 paren_level = 0
in_braces = False in_braces = False
result = '' result = ''
@ -36,8 +35,7 @@ def clean_up_pattern(s):
prior_char = c prior_char = c
return result return result
def encode_info(info): def encode_info(info: Any) -> str:
# type: (Any) -> str
try: try:
result = '' result = ''
try: try:
@ -55,12 +53,10 @@ def encode_info(info):
pass pass
return 'NOT ENCODABLE' return 'NOT ENCODABLE'
def fix_test_name(s): def fix_test_name(s: str) -> str:
# type: (str) -> str
return s.replace('zerver.tests.', '') return s.replace('zerver.tests.', '')
def create_single_page(pattern, out_dir, href, calls): def create_single_page(pattern: str, out_dir: str, href: str, calls: List[Call]) -> None:
# type: (str, str, str, List[Call]) -> None
fn = out_dir + '/' + href fn = out_dir + '/' + href
with open(fn, 'w') as f: with open(fn, 'w') as f:
f.write(''' f.write('''
@ -86,8 +82,7 @@ def create_single_page(pattern, out_dir, href, calls):
f.write('<br>') f.write('<br>')
f.write('</div>') f.write('</div>')
def create_user_docs(): def create_user_docs() -> None:
# type: () -> None
fn = 'var/url_coverage.txt' # TODO: make path more robust, maybe use json suffix fn = 'var/url_coverage.txt' # TODO: make path more robust, maybe use json suffix
out_dir = 'var/api_docs' out_dir = 'var/api_docs'

View File

@ -9,8 +9,7 @@ import sys
from lib import sanity_check from lib import sanity_check
sanity_check.check_venv(__file__) sanity_check.check_venv(__file__)
def process_files(): def process_files() -> None:
# type: () -> None
description = ''' description = '''
Use this tool to find HTML classes that we use in our JS code. Use this tool to find HTML classes that we use in our JS code.

View File

@ -9,20 +9,17 @@ sanity_check.check_venv(__file__)
from typing import Any, Dict, List from typing import Any, Dict, List
def debug(obj): def debug(obj: Any) -> None:
# type: (Any) -> None
print(json.dumps(obj, indent=4)) print(json.dumps(obj, indent=4))
def parse_file(fn): def parse_file(fn: str) -> Dict[str, Any]:
# type: (str) -> Dict[str, Any]
text = open(fn).read() text = open(fn).read()
tags = re.findall('{+\s*(.*?)\s*}+', text) tags = re.findall('{+\s*(.*?)\s*}+', text)
root = {} # type: Dict[str, Any] root = {} # type: Dict[str, Any]
context = root context = root
stack = [] # type: List[Dict[str, Any]] stack = [] # type: List[Dict[str, Any]]
def set_var(var, val): def set_var(var: str, val: Any) -> None:
# type: (str, Any) -> None
num_levels_up = len(re.findall('\.\.', var)) num_levels_up = len(re.findall('\.\.', var))
if num_levels_up: if num_levels_up:
var = var.split('/')[-1] var = var.split('/')[-1]
@ -94,8 +91,7 @@ def parse_file(fn):
set_var(tag, '') set_var(tag, '')
def clean_this(obj): def clean_this(obj: Any) -> Any:
# type: (Any) -> Any
if isinstance(obj, list): if isinstance(obj, list):
return [clean_this(item) for item in obj] return [clean_this(item) for item in obj]
if isinstance(obj, dict): if isinstance(obj, dict):

View File

@ -46,8 +46,7 @@ USAGE = '''
TODO: allow specific files to be searched.''' TODO: allow specific files to be searched.'''
def check_our_files(): def check_our_files() -> None:
# type: () -> None
parser = argparse.ArgumentParser(description=USAGE, parser = argparse.ArgumentParser(description=USAGE,
formatter_class=argparse.RawTextHelpFormatter) formatter_class=argparse.RawTextHelpFormatter)

View File

@ -6,8 +6,7 @@ import time
from contextlib import contextmanager from contextlib import contextmanager
if False: from typing import Any, Iterator, Optional
from typing import (Any, Iterator, Optional)
# Verify the Zulip venv is available. # Verify the Zulip venv is available.
from tools.lib import sanity_check from tools.lib import sanity_check
@ -22,16 +21,14 @@ if TOOLS_DIR not in sys.path:
from zerver.lib.test_fixtures import is_template_database_current from zerver.lib.test_fixtures import is_template_database_current
def set_up_django(external_host): def set_up_django(external_host: str) -> None:
# type: (str) -> None
os.environ['EXTERNAL_HOST'] = external_host os.environ['EXTERNAL_HOST'] = external_host
os.environ["TORNADO_SERVER"] = "http://127.0.0.1:9983" os.environ["TORNADO_SERVER"] = "http://127.0.0.1:9983"
os.environ['DJANGO_SETTINGS_MODULE'] = 'zproject.test_settings' os.environ['DJANGO_SETTINGS_MODULE'] = 'zproject.test_settings'
django.setup() django.setup()
os.environ['PYTHONUNBUFFERED'] = 'y' os.environ['PYTHONUNBUFFERED'] = 'y'
def assert_server_running(server, log_file): def assert_server_running(server: subprocess.Popen, log_file: Optional[str]) -> None:
# type: (subprocess.Popen, Optional[str]) -> None
"""Get the exit code of the server, or None if it is still running.""" """Get the exit code of the server, or None if it is still running."""
if server.poll() is not None: if server.poll() is not None:
message = 'Server died unexpectedly!' message = 'Server died unexpectedly!'
@ -39,8 +36,7 @@ def assert_server_running(server, log_file):
message += '\nSee %s\n' % (log_file,) message += '\nSee %s\n' % (log_file,)
raise RuntimeError(message) raise RuntimeError(message)
def server_is_up(server, log_file): def server_is_up(server: subprocess.Popen, log_file: Optional[str]) -> bool:
# type: (subprocess.Popen, Optional[str]) -> bool
assert_server_running(server, log_file) assert_server_running(server, log_file)
try: try:
# We could get a 501 error if the reverse proxy is up but the Django app isn't. # We could get a 501 error if the reverse proxy is up but the Django app isn't.

View File

@ -17,8 +17,7 @@ import lister
from typing import cast, Callable, Dict, Iterator, List from typing import cast, Callable, Dict, Iterator, List
def run_parallel(lint_functions): def run_parallel(lint_functions: Dict[str, Callable[[], int]]) -> bool:
# type: (Dict[str, Callable[[], int]]) -> bool
pids = [] pids = []
for name, func in lint_functions.items(): for name, func in lint_functions.items():
pid = os.fork() pid = os.fork()
@ -38,8 +37,7 @@ def run_parallel(lint_functions):
failed = True failed = True
return failed return failed
def run(): def run() -> None:
# type: () -> None
parser = argparse.ArgumentParser() parser = argparse.ArgumentParser()
parser.add_argument('--force', default=False, parser.add_argument('--force', default=False,
action="store_true", action="store_true",
@ -117,13 +115,11 @@ def run():
lint_functions = {} # type: Dict[str, Callable[[], int]] lint_functions = {} # type: Dict[str, Callable[[], int]]
def lint(func): def lint(func: Callable[[], int]) -> Callable[[], int]:
# type: (Callable[[], int]) -> Callable[[], int]
lint_functions[func.__name__] = func lint_functions[func.__name__] = func
return func return func
def external_linter(name, command, target_langs=[]): def external_linter(name: str, command: List[str], target_langs: List[str]=[]) -> None:
# type: (str, List[str], List[str]) -> None
"""Registers an external linter program to be run as part of the """Registers an external linter program to be run as part of the
linter. This program will be passed the subset of files being linter. This program will be passed the subset of files being
linted that have extensions in target_langs. If there are no linted that have extensions in target_langs. If there are no
@ -133,8 +129,7 @@ def run():
""" """
color = next(colors) color = next(colors)
def run_linter(): def run_linter() -> int:
# type: () -> int
targets = [] # type: List[str] targets = [] # type: List[str]
if len(target_langs) != 0: if len(target_langs) != 0:
targets = [target for lang in target_langs for target in by_lang[lang]] targets = [target for lang in target_langs for target in by_lang[lang]]
@ -172,26 +167,22 @@ def run():
# external_linter('commit_messages', ['tools/commit-message-lint']) # external_linter('commit_messages', ['tools/commit-message-lint'])
@lint @lint
def custom_py(): def custom_py() -> int:
# type: () -> int
failed = check_custom_checks_py() failed = check_custom_checks_py()
return 1 if failed else 0 return 1 if failed else 0
@lint @lint
def custom_nonpy(): def custom_nonpy() -> int:
# type: () -> int
failed = check_custom_checks_nonpy() failed = check_custom_checks_nonpy()
return 1 if failed else 0 return 1 if failed else 0
@lint @lint
def pyflakes(): def pyflakes() -> int:
# type: () -> int
failed = check_pyflakes(args, by_lang) failed = check_pyflakes(args, by_lang)
return 1 if failed else 0 return 1 if failed else 0
@lint @lint
def pep8(): def pep8() -> int:
# type: () -> int
failed = check_pep8(by_lang['py']) failed = check_pep8(by_lang['py'])
return 1 if failed else 0 return 1 if failed else 0

View File

@ -172,8 +172,7 @@ for cmd in cmds:
subprocess.Popen(cmd) subprocess.Popen(cmd)
def transform_url(protocol, path, query, target_port, target_host): def transform_url(protocol: str, path: str, query: str, target_port: int, target_host: str) -> str:
# type: (str, str, str, int, str) -> str
# generate url with target host # generate url with target host
host = ":".join((target_host, str(target_port))) host = ":".join((target_host, str(target_port)))
newpath = urlunparse((protocol, host, path, '', query, '')) newpath = urlunparse((protocol, host, path, '', query, ''))
@ -181,8 +180,7 @@ def transform_url(protocol, path, query, target_port, target_host):
@gen.engine @gen.engine
def fetch_request(url, callback, **kwargs): def fetch_request(url: str, callback: Any, **kwargs: Any) -> Generator[Callable[..., Any], Any, None]:
# type: (str, Any, **Any) -> Generator[Callable[..., Any], Any, None]
# use large timeouts to handle polling requests # use large timeouts to handle polling requests
req = httpclient.HTTPRequest(url, connect_timeout=240.0, request_timeout=240.0, **kwargs) req = httpclient.HTTPRequest(url, connect_timeout=240.0, request_timeout=240.0, **kwargs)
client = httpclient.AsyncHTTPClient() client = httpclient.AsyncHTTPClient()
@ -197,19 +195,16 @@ class BaseWebsocketHandler(WebSocketHandler):
# target server port # target server port
target_port = None # type: int target_port = None # type: int
def __init__(self, *args, **kwargs): def __init__(self, *args: Any, **kwargs: Any) -> None:
# type: (*Any, **Any) -> None
super().__init__(*args, **kwargs) super().__init__(*args, **kwargs)
# define client for target websocket server # define client for target websocket server
self.client = None # type: Any self.client = None # type: Any
def get(self, *args, **kwargs): def get(self, *args: Any, **kwargs: Any) -> Optional[Callable[..., Any]]:
# type: (*Any, **Any) -> Optional[Callable[..., Any]]
# use get method from WebsocketHandler # use get method from WebsocketHandler
return super().get(*args, **kwargs) return super().get(*args, **kwargs)
def open(self): def open(self) -> None:
# type: () -> None
# setup connection with target websocket server # setup connection with target websocket server
websocket_url = "ws://{host}:{port}{uri}".format( websocket_url = "ws://{host}:{port}{uri}".format(
host=self.target_host, host=self.target_host,
@ -221,13 +216,11 @@ class BaseWebsocketHandler(WebSocketHandler):
websocket_connect(request, callback=self.open_callback, websocket_connect(request, callback=self.open_callback,
on_message_callback=self.on_client_message) on_message_callback=self.on_client_message)
def open_callback(self, future): def open_callback(self, future: Any) -> None:
# type: (Any) -> None
# callback on connect with target websocket server # callback on connect with target websocket server
self.client = future.result() self.client = future.result()
def on_client_message(self, message): def on_client_message(self, message: str) -> None:
# type: (str) -> None
if not message: if not message:
# if message empty -> target websocket server close connection # if message empty -> target websocket server close connection
return self.close() return self.close()
@ -235,20 +228,18 @@ class BaseWebsocketHandler(WebSocketHandler):
# send message to client if connection exists # send message to client if connection exists
self.write_message(message, False) self.write_message(message, False)
def on_message(self, message, binary=False): def on_message(self, message: str, binary: bool=False) -> Optional[Callable[..., Any]]:
# type: (str, bool) -> Optional[Callable[..., Any]]
if not self.client: if not self.client:
# close websocket proxy connection if no connection with target websocket server # close websocket proxy connection if no connection with target websocket server
return self.close() return self.close()
self.client.write_message(message, binary) self.client.write_message(message, binary)
return None return None
def check_origin(self, origin): def check_origin(self, origin: str) -> bool:
# type: (str) -> bool
return True return True
def _add_request_headers(self, exclude_lower_headers_list=None): def _add_request_headers(self,
# type: (Optional[List[str]]) -> httputil.HTTPHeaders exclude_lower_headers_list: Optional[List[str]]=None) -> httputil.HTTPHeaders:
exclude_lower_headers_list = exclude_lower_headers_list or [] exclude_lower_headers_list = exclude_lower_headers_list or []
headers = httputil.HTTPHeaders() headers = httputil.HTTPHeaders()
for header, v in self.request.headers.get_all(): for header, v in self.request.headers.get_all():
@ -259,38 +250,30 @@ class BaseWebsocketHandler(WebSocketHandler):
class CombineHandler(BaseWebsocketHandler): class CombineHandler(BaseWebsocketHandler):
def get(self, *args, **kwargs): def get(self, *args: Any, **kwargs: Any) -> Optional[Callable[..., Any]]:
# type: (*Any, **Any) -> Optional[Callable[..., Any]]
if self.request.headers.get("Upgrade", "").lower() == 'websocket': if self.request.headers.get("Upgrade", "").lower() == 'websocket':
return super().get(*args, **kwargs) return super().get(*args, **kwargs)
return None return None
def head(self): def head(self) -> None:
# type: () -> None
pass pass
def post(self): def post(self) -> None:
# type: () -> None
pass pass
def put(self): def put(self) -> None:
# type: () -> None
pass pass
def patch(self): def patch(self) -> None:
# type: () -> None
pass pass
def options(self): def options(self) -> None:
# type: () -> None
pass pass
def delete(self): def delete(self) -> None:
# type: () -> None
pass pass
def handle_response(self, response): def handle_response(self, response: Any) -> None:
# type: (Any) -> None
if response.error and not isinstance(response.error, httpclient.HTTPError): if response.error and not isinstance(response.error, httpclient.HTTPError):
self.set_status(500) self.set_status(500)
self.write('Internal server error:\n' + str(response.error)) self.write('Internal server error:\n' + str(response.error))
@ -309,8 +292,7 @@ class CombineHandler(BaseWebsocketHandler):
self.finish() self.finish()
@web.asynchronous @web.asynchronous
def prepare(self): def prepare(self) -> None:
# type: () -> None
if 'X-REAL-IP' not in self.request.headers: if 'X-REAL-IP' not in self.request.headers:
self.request.headers['X-REAL-IP'] = self.request.remote_ip self.request.headers['X-REAL-IP'] = self.request.remote_ip
if self.request.headers.get("Upgrade", "").lower() == 'websocket': if self.request.headers.get("Upgrade", "").lower() == 'websocket':
@ -354,8 +336,7 @@ class TornadoHandler(CombineHandler):
class Application(web.Application): class Application(web.Application):
def __init__(self, enable_logging=False): def __init__(self, enable_logging: bool=False) -> None:
# type: (bool) -> None
handlers = [ handlers = [
(r"/json/events.*", TornadoHandler), (r"/json/events.*", TornadoHandler),
(r"/api/v1/events.*", TornadoHandler), (r"/api/v1/events.*", TornadoHandler),
@ -365,19 +346,16 @@ class Application(web.Application):
] ]
super().__init__(handlers, enable_logging=enable_logging) super().__init__(handlers, enable_logging=enable_logging)
def log_request(self, handler): def log_request(self, handler: BaseWebsocketHandler) -> None:
# type: (BaseWebsocketHandler) -> None
if self.settings['enable_logging']: if self.settings['enable_logging']:
super().log_request(handler) super().log_request(handler)
def on_shutdown(): def on_shutdown() -> None:
# type: () -> None
IOLoop.instance().stop() IOLoop.instance().stop()
def shutdown_handler(*args, **kwargs): def shutdown_handler(*args: Any, **kwargs: Any) -> None:
# type: (*Any, **Any) -> None
io_loop = IOLoop.instance() io_loop = IOLoop.instance()
if io_loop._callbacks: if io_loop._callbacks:
io_loop.call_later(1, shutdown_handler) io_loop.call_later(1, shutdown_handler)

View File

@ -138,8 +138,7 @@ enforce_fully_covered = sorted(target_fully_covered - not_yet_fully_covered)
FAILED_TEST_PATH = 'var/last_test_failure.json' FAILED_TEST_PATH = 'var/last_test_failure.json'
def get_failed_tests(): def get_failed_tests() -> List[str]:
# type: () -> List[str]
try: try:
with open(FAILED_TEST_PATH, 'r') as f: with open(FAILED_TEST_PATH, 'r') as f:
return ujson.load(f) return ujson.load(f)
@ -147,18 +146,15 @@ def get_failed_tests():
print("var/last_test_failure.json doesn't exist; running all tests.") print("var/last_test_failure.json doesn't exist; running all tests.")
return [] return []
def write_failed_tests(failed_tests): def write_failed_tests(failed_tests: List[str]) -> None:
# type: (List[str]) -> None
if failed_tests: if failed_tests:
with open(FAILED_TEST_PATH, 'w') as f: with open(FAILED_TEST_PATH, 'w') as f:
ujson.dump(failed_tests, f) ujson.dump(failed_tests, f)
def block_internet(): def block_internet() -> None:
# type: () -> None
# We are blocking internet currently by assuming mostly any test would use # We are blocking internet currently by assuming mostly any test would use
# httplib2 to access internet. # httplib2 to access internet.
def internet_guard(*args, **kwargs): def internet_guard(*args: Any, **kwargs: Any) -> None:
# type: (*Any, **Any) -> None
raise Exception("Outgoing network requests are not allowed in the Zulip tests. " raise Exception("Outgoing network requests are not allowed in the Zulip tests. "
"More details and advice are available here:" "More details and advice are available here:"
"https://zulip.readthedocs.io/en/latest/testing/testing.html#internet-access-inside-test-suites") "https://zulip.readthedocs.io/en/latest/testing/testing.html#internet-access-inside-test-suites")
@ -275,8 +271,7 @@ if __name__ == "__main__":
for suite in args: for suite in args:
args[args.index(suite)] = suite.rstrip('/').replace("/", ".") args[args.index(suite)] = suite.rstrip('/').replace("/", ".")
def rewrite_arguments(search_key): def rewrite_arguments(search_key: str) -> None:
# type: (str) -> None
for root, dirs, files_names in os.walk(zerver_test_dir, topdown=False): for root, dirs, files_names in os.walk(zerver_test_dir, topdown=False):
for file_name in files_names: for file_name in files_names:
# Check for files starting with alphanumeric characters and ending with '.py' # Check for files starting with alphanumeric characters and ending with '.py'

View File

@ -110,8 +110,7 @@ except OSError:
print('Bad command: %s' % (command,)) print('Bad command: %s' % (command,))
raise raise
def check_line_coverage(line_coverage, line_mapping, log=True): def check_line_coverage(line_coverage: Dict[Any, Any], line_mapping: Dict[Any, Any], log: bool=True) -> bool:
# type: (Dict[Any, Any], Dict[Any, Any], bool) -> bool
missing_lines = [] missing_lines = []
for line in line_coverage: for line in line_coverage:
if line_coverage[line] == 0: if line_coverage[line] == 0:

View File

@ -35,10 +35,8 @@ successful_worker_launches = [
'launching queue worker thread user_activity' 'launching queue worker thread user_activity'
] ]
def check_worker_launch(logfile): def check_worker_launch(logfile: IO[str]) -> Text:
# type: (IO[str]) -> Text def check(content: str) -> bool:
def check(content):
# type: (str) -> bool
flag = True flag = True
for entry in successful_worker_launches: for entry in successful_worker_launches:
flag = flag and entry in content flag = flag and entry in content

View File

@ -16,8 +16,7 @@ from typing import IO
TOOLS_DIR = os.path.dirname(os.path.abspath(__file__)) TOOLS_DIR = os.path.dirname(os.path.abspath(__file__))
def start_server(logfile): def start_server(logfile: IO[str]) -> bool:
# type: (IO[str]) -> bool
failure = True failure = True
key = "Quit the server with CTRL-C." key = "Quit the server with CTRL-C."
for i in range(200): for i in range(200):
@ -31,8 +30,7 @@ def start_server(logfile):
return failure return failure
def test_nagios(nagios_logfile): def test_nagios(nagios_logfile: IO[str]) -> bool:
# type: (IO[str]) -> bool
ZULIP_DIR = os.path.join(TOOLS_DIR, '..') ZULIP_DIR = os.path.join(TOOLS_DIR, '..')
API_DIR = os.path.join(ZULIP_DIR, 'api') API_DIR = os.path.join(ZULIP_DIR, 'api')
os.chdir(API_DIR) os.chdir(API_DIR)
@ -57,8 +55,7 @@ def test_nagios(nagios_logfile):
return False return False
def close_and_get_content(file_handle): def close_and_get_content(file_handle: IO[str]) -> str:
# type: (IO[str]) -> str
file_handle.seek(0) file_handle.seek(0)
content = file_handle.read() content = file_handle.read()
file_handle.close() file_handle.close()

View File

@ -17,8 +17,7 @@ if __name__ == '__main__':
default=False, help='compute test coverage') default=False, help='compute test coverage')
args = parser.parse_args() args = parser.parse_args()
def dir_join(dir1, dir2): def dir_join(dir1: str, dir2: str) -> str:
# type: (str, str) -> str
return os.path.abspath(os.path.join(dir1, dir2)) return os.path.abspath(os.path.join(dir1, dir2))
tools_dir = os.path.dirname(os.path.abspath(__file__)) tools_dir = os.path.dirname(os.path.abspath(__file__))

View File

@ -18,15 +18,13 @@ os.chdir(settings.DEPLOY_ROOT)
STATIC_PATH = 'static/' STATIC_PATH = 'static/'
def run(): def run() -> None:
# type: () -> None
"""Builds for production, writing the output to disk""" """Builds for production, writing the output to disk"""
subprocess.check_call(['node', 'node_modules/.bin/webpack'] + subprocess.check_call(['node', 'node_modules/.bin/webpack'] +
['--config', 'tools/webpack.config.ts', '-p'] + ['--config', 'tools/webpack.config.ts', '-p'] +
['--env', 'production']) ['--env', 'production'])
def run_watch(host, port, minify): def run_watch(host: str, port: str, minify: bool) -> None:
# type: (str, str, bool) -> None
"""watches and rebuilds on changes, serving files from memory via webpack-dev-server""" """watches and rebuilds on changes, serving files from memory via webpack-dev-server"""
webpack_args = ['node', 'node_modules/.bin/webpack-dev-server'] webpack_args = ['node', 'node_modules/.bin/webpack-dev-server']
webpack_args += ['--config', 'tools/webpack.config.ts', '--watch-poll', '--port', port, "--host", host] webpack_args += ['--config', 'tools/webpack.config.ts', '--watch-poll', '--port', port, "--host", host]
@ -34,8 +32,7 @@ def run_watch(host, port, minify):
webpack_args.append('--optimize-minimize') webpack_args.append('--optimize-minimize')
subprocess.Popen(webpack_args) subprocess.Popen(webpack_args)
def run_test(): def run_test() -> None:
# type: () -> None
"""Generates a stub asset stat file for django so backend test can render a page""" """Generates a stub asset stat file for django so backend test can render a page"""
entries = {} entries = {}
with open('tools/webpack.assets.json') as json_data: with open('tools/webpack.assets.json') as json_data: