mirror of https://github.com/zulip/zulip.git
tools: Revert to Python 2 typing syntax for now.
This reverts commit 66261f1cc
. See parent commit for reason; here,
provision worked but `tools/run-dev.py` would give errors.
We need to figure out a test that reproduces these issues, then make a
version of these changes that keeps that test working, before we
re-merge them.
This commit is contained in:
parent
17a6632c43
commit
137c0e65bb
|
@ -9,7 +9,8 @@ import subprocess
|
|||
from lib import sanity_check
|
||||
sanity_check.check_venv(__file__)
|
||||
|
||||
def validate(fn: str) -> None:
|
||||
def validate(fn):
|
||||
# type: (str) -> None
|
||||
text = open(fn).read()
|
||||
section_list = parse(text)
|
||||
if text != section_list.text():
|
||||
|
@ -18,7 +19,8 @@ def validate(fn: str) -> None:
|
|||
subprocess.call(['diff', fn, '/var/tmp/pretty_css.txt'], stderr=subprocess.STDOUT)
|
||||
sys.exit(1)
|
||||
|
||||
def check_our_files(filenames: Iterable[str]) -> None:
|
||||
def check_our_files(filenames):
|
||||
# type: (Iterable[str]) -> None
|
||||
for filename in filenames:
|
||||
if 'pygments.css' in filename:
|
||||
# This just has really strange formatting that our
|
||||
|
|
|
@ -14,7 +14,8 @@ import subprocess
|
|||
sys.path.append(os.path.join(os.path.dirname(__file__), '..'))
|
||||
from scripts.lib.zulip_tools import WARNING, FAIL, ENDC
|
||||
|
||||
def find_handlebars(translatable_strings: List[Text]) -> List[Text]:
|
||||
def find_handlebars(translatable_strings):
|
||||
# type: (List[Text]) -> List[Text]
|
||||
errored = []
|
||||
for string in translatable_strings:
|
||||
if '{{' in string:
|
||||
|
|
|
@ -19,7 +19,8 @@ EXCLUDED_FILES = [
|
|||
"docs/_templates",
|
||||
]
|
||||
|
||||
def check_our_files(modified_only: bool, all_dups: bool, targets: List[str]) -> None:
|
||||
def check_our_files(modified_only, all_dups, targets):
|
||||
# type: (bool, bool, List[str]) -> None
|
||||
by_lang = cast(
|
||||
Dict[str, List[str]],
|
||||
lister.list_files(
|
||||
|
@ -31,7 +32,8 @@ def check_our_files(modified_only: bool, all_dups: bool, targets: List[str]) ->
|
|||
check_handlebar_templates(by_lang['handlebars'])
|
||||
check_html_templates(by_lang['html'], args.all_dups)
|
||||
|
||||
def check_html_templates(templates: Iterable[str], all_dups: bool) -> None:
|
||||
def check_html_templates(templates, all_dups):
|
||||
# type: (Iterable[str], bool) -> None
|
||||
# Our files with .html extensions are usually for Django, but we also
|
||||
# have a few static .html files.
|
||||
#
|
||||
|
@ -128,7 +130,8 @@ def check_html_templates(templates: Iterable[str], all_dups: bool) -> None:
|
|||
if not validate_indent_html(fn):
|
||||
sys.exit(1)
|
||||
|
||||
def check_handlebar_templates(templates: Iterable[str]) -> None:
|
||||
def check_handlebar_templates(templates):
|
||||
# type: (Iterable[str]) -> None
|
||||
# Check all our handlebars templates.
|
||||
templates = [fn for fn in templates if fn.endswith('.handlebars')]
|
||||
|
||||
|
|
|
@ -19,30 +19,35 @@ from typing import Dict, List
|
|||
os.chdir(settings.DEPLOY_ROOT)
|
||||
STATIC_PATH = 'static/'
|
||||
|
||||
def get_templates() -> List[str]:
|
||||
def get_templates():
|
||||
# type: () -> List[str]
|
||||
return (glob.glob(os.path.join(STATIC_PATH, 'templates/*.handlebars')) +
|
||||
glob.glob(os.path.join(STATIC_PATH, 'templates/settings/*.handlebars')))
|
||||
|
||||
def run() -> None:
|
||||
def run():
|
||||
# type: () -> None
|
||||
subprocess.check_call(['node', 'node_modules/.bin/handlebars'] +
|
||||
get_templates() +
|
||||
['--output', os.path.join(STATIC_PATH, 'templates/compiled.js'),
|
||||
'--known', 'if,unless,each,with'])
|
||||
|
||||
|
||||
def add_error_stamp_file(file_path: str) -> None:
|
||||
def add_error_stamp_file(file_path):
|
||||
# type: (str) -> None
|
||||
file_dir = os.path.dirname(file_path)
|
||||
if not os.path.exists(file_dir):
|
||||
os.makedirs(file_dir)
|
||||
open(file_path, 'a').close()
|
||||
|
||||
|
||||
def remove_error_stamp_file(file_path: str) -> None:
|
||||
def remove_error_stamp_file(file_path):
|
||||
# type: (str) -> None
|
||||
if os.path.exists(file_path):
|
||||
os.remove(file_path)
|
||||
|
||||
|
||||
def run_forever() -> None:
|
||||
def run_forever():
|
||||
# type: () -> None
|
||||
# Keep polling for file changes, similar to how Django does it in
|
||||
# django/utils/autoreload.py. If any of our templates change, rebuild
|
||||
# compiled.js
|
||||
|
|
|
@ -15,7 +15,8 @@ import ujson
|
|||
|
||||
Call = Dict[str, Any]
|
||||
|
||||
def clean_up_pattern(s: str) -> str:
|
||||
def clean_up_pattern(s):
|
||||
# type: (str) -> str
|
||||
paren_level = 0
|
||||
in_braces = False
|
||||
result = ''
|
||||
|
@ -35,7 +36,8 @@ def clean_up_pattern(s: str) -> str:
|
|||
prior_char = c
|
||||
return result
|
||||
|
||||
def encode_info(info: Any) -> str:
|
||||
def encode_info(info):
|
||||
# type: (Any) -> str
|
||||
try:
|
||||
result = ''
|
||||
try:
|
||||
|
@ -53,10 +55,12 @@ def encode_info(info: Any) -> str:
|
|||
pass
|
||||
return 'NOT ENCODABLE'
|
||||
|
||||
def fix_test_name(s: str) -> str:
|
||||
def fix_test_name(s):
|
||||
# type: (str) -> str
|
||||
return s.replace('zerver.tests.', '')
|
||||
|
||||
def create_single_page(pattern: str, out_dir: str, href: str, calls: List[Call]) -> None:
|
||||
def create_single_page(pattern, out_dir, href, calls):
|
||||
# type: (str, str, str, List[Call]) -> None
|
||||
fn = out_dir + '/' + href
|
||||
with open(fn, 'w') as f:
|
||||
f.write('''
|
||||
|
@ -82,7 +86,8 @@ def create_single_page(pattern: str, out_dir: str, href: str, calls: List[Call])
|
|||
f.write('<br>')
|
||||
f.write('</div>')
|
||||
|
||||
def create_user_docs() -> None:
|
||||
def create_user_docs():
|
||||
# type: () -> None
|
||||
fn = 'var/url_coverage.txt' # TODO: make path more robust, maybe use json suffix
|
||||
|
||||
out_dir = 'var/api_docs'
|
||||
|
|
|
@ -9,7 +9,8 @@ import sys
|
|||
from lib import sanity_check
|
||||
sanity_check.check_venv(__file__)
|
||||
|
||||
def process_files() -> None:
|
||||
def process_files():
|
||||
# type: () -> None
|
||||
|
||||
description = '''
|
||||
Use this tool to find HTML classes that we use in our JS code.
|
||||
|
|
|
@ -9,17 +9,20 @@ sanity_check.check_venv(__file__)
|
|||
|
||||
from typing import Any, Dict, List
|
||||
|
||||
def debug(obj: Any) -> None:
|
||||
def debug(obj):
|
||||
# type: (Any) -> None
|
||||
print(json.dumps(obj, indent=4))
|
||||
|
||||
def parse_file(fn: str) -> Dict[str, Any]:
|
||||
def parse_file(fn):
|
||||
# type: (str) -> Dict[str, Any]
|
||||
text = open(fn).read()
|
||||
tags = re.findall('{+\s*(.*?)\s*}+', text)
|
||||
root = {} # type: Dict[str, Any]
|
||||
context = root
|
||||
stack = [] # type: List[Dict[str, Any]]
|
||||
|
||||
def set_var(var: str, val: Any) -> None:
|
||||
def set_var(var, val):
|
||||
# type: (str, Any) -> None
|
||||
num_levels_up = len(re.findall('\.\.', var))
|
||||
if num_levels_up:
|
||||
var = var.split('/')[-1]
|
||||
|
@ -91,7 +94,8 @@ def parse_file(fn: str) -> Dict[str, Any]:
|
|||
|
||||
set_var(tag, '')
|
||||
|
||||
def clean_this(obj: Any) -> Any:
|
||||
def clean_this(obj):
|
||||
# type: (Any) -> Any
|
||||
if isinstance(obj, list):
|
||||
return [clean_this(item) for item in obj]
|
||||
if isinstance(obj, dict):
|
||||
|
|
|
@ -46,7 +46,8 @@ USAGE = '''
|
|||
|
||||
TODO: allow specific files to be searched.'''
|
||||
|
||||
def check_our_files() -> None:
|
||||
def check_our_files():
|
||||
# type: () -> None
|
||||
|
||||
parser = argparse.ArgumentParser(description=USAGE,
|
||||
formatter_class=argparse.RawTextHelpFormatter)
|
||||
|
|
|
@ -6,7 +6,8 @@ import time
|
|||
|
||||
from contextlib import contextmanager
|
||||
|
||||
from typing import Any, Iterator, Optional
|
||||
if False:
|
||||
from typing import (Any, Iterator, Optional)
|
||||
|
||||
# Verify the Zulip venv is available.
|
||||
from tools.lib import sanity_check
|
||||
|
@ -21,14 +22,16 @@ if TOOLS_DIR not in sys.path:
|
|||
|
||||
from zerver.lib.test_fixtures import is_template_database_current
|
||||
|
||||
def set_up_django(external_host: str) -> None:
|
||||
def set_up_django(external_host):
|
||||
# type: (str) -> None
|
||||
os.environ['EXTERNAL_HOST'] = external_host
|
||||
os.environ["TORNADO_SERVER"] = "http://127.0.0.1:9983"
|
||||
os.environ['DJANGO_SETTINGS_MODULE'] = 'zproject.test_settings'
|
||||
django.setup()
|
||||
os.environ['PYTHONUNBUFFERED'] = 'y'
|
||||
|
||||
def assert_server_running(server: subprocess.Popen, log_file: Optional[str]) -> None:
|
||||
def assert_server_running(server, log_file):
|
||||
# type: (subprocess.Popen, Optional[str]) -> None
|
||||
"""Get the exit code of the server, or None if it is still running."""
|
||||
if server.poll() is not None:
|
||||
message = 'Server died unexpectedly!'
|
||||
|
@ -36,7 +39,8 @@ def assert_server_running(server: subprocess.Popen, log_file: Optional[str]) ->
|
|||
message += '\nSee %s\n' % (log_file,)
|
||||
raise RuntimeError(message)
|
||||
|
||||
def server_is_up(server: subprocess.Popen, log_file: Optional[str]) -> bool:
|
||||
def server_is_up(server, log_file):
|
||||
# type: (subprocess.Popen, Optional[str]) -> bool
|
||||
assert_server_running(server, log_file)
|
||||
try:
|
||||
# We could get a 501 error if the reverse proxy is up but the Django app isn't.
|
||||
|
|
27
tools/lint
27
tools/lint
|
@ -17,7 +17,8 @@ import lister
|
|||
from typing import cast, Callable, Dict, Iterator, List
|
||||
|
||||
|
||||
def run_parallel(lint_functions: Dict[str, Callable[[], int]]) -> bool:
|
||||
def run_parallel(lint_functions):
|
||||
# type: (Dict[str, Callable[[], int]]) -> bool
|
||||
pids = []
|
||||
for name, func in lint_functions.items():
|
||||
pid = os.fork()
|
||||
|
@ -37,7 +38,8 @@ def run_parallel(lint_functions: Dict[str, Callable[[], int]]) -> bool:
|
|||
failed = True
|
||||
return failed
|
||||
|
||||
def run() -> None:
|
||||
def run():
|
||||
# type: () -> None
|
||||
parser = argparse.ArgumentParser()
|
||||
parser.add_argument('--force', default=False,
|
||||
action="store_true",
|
||||
|
@ -115,11 +117,13 @@ def run() -> None:
|
|||
|
||||
lint_functions = {} # type: Dict[str, Callable[[], int]]
|
||||
|
||||
def lint(func: Callable[[], int]) -> Callable[[], int]:
|
||||
def lint(func):
|
||||
# type: (Callable[[], int]) -> Callable[[], int]
|
||||
lint_functions[func.__name__] = func
|
||||
return func
|
||||
|
||||
def external_linter(name: str, command: List[str], target_langs: List[str]=[]) -> None:
|
||||
def external_linter(name, command, target_langs=[]):
|
||||
# type: (str, List[str], List[str]) -> None
|
||||
"""Registers an external linter program to be run as part of the
|
||||
linter. This program will be passed the subset of files being
|
||||
linted that have extensions in target_langs. If there are no
|
||||
|
@ -129,7 +133,8 @@ def run() -> None:
|
|||
"""
|
||||
color = next(colors)
|
||||
|
||||
def run_linter() -> int:
|
||||
def run_linter():
|
||||
# type: () -> int
|
||||
targets = [] # type: List[str]
|
||||
if len(target_langs) != 0:
|
||||
targets = [target for lang in target_langs for target in by_lang[lang]]
|
||||
|
@ -167,22 +172,26 @@ def run() -> None:
|
|||
# external_linter('commit_messages', ['tools/commit-message-lint'])
|
||||
|
||||
@lint
|
||||
def custom_py() -> int:
|
||||
def custom_py():
|
||||
# type: () -> int
|
||||
failed = check_custom_checks_py()
|
||||
return 1 if failed else 0
|
||||
|
||||
@lint
|
||||
def custom_nonpy() -> int:
|
||||
def custom_nonpy():
|
||||
# type: () -> int
|
||||
failed = check_custom_checks_nonpy()
|
||||
return 1 if failed else 0
|
||||
|
||||
@lint
|
||||
def pyflakes() -> int:
|
||||
def pyflakes():
|
||||
# type: () -> int
|
||||
failed = check_pyflakes(args, by_lang)
|
||||
return 1 if failed else 0
|
||||
|
||||
@lint
|
||||
def pep8() -> int:
|
||||
def pep8():
|
||||
# type: () -> int
|
||||
failed = check_pep8(by_lang['py'])
|
||||
return 1 if failed else 0
|
||||
|
||||
|
|
|
@ -171,7 +171,8 @@ for cmd in cmds:
|
|||
subprocess.Popen(cmd)
|
||||
|
||||
|
||||
def transform_url(protocol: str, path: str, query: str, target_port: int, target_host: str) -> str:
|
||||
def transform_url(protocol, path, query, target_port, target_host):
|
||||
# type: (str, str, str, int, str) -> str
|
||||
# generate url with target host
|
||||
host = ":".join((target_host, str(target_port)))
|
||||
newpath = urlunparse((protocol, host, path, '', query, ''))
|
||||
|
@ -179,7 +180,8 @@ def transform_url(protocol: str, path: str, query: str, target_port: int, target
|
|||
|
||||
|
||||
@gen.engine
|
||||
def fetch_request(url: str, callback: Any, **kwargs: Any) -> Generator[Callable[..., Any], Any, None]:
|
||||
def fetch_request(url, callback, **kwargs):
|
||||
# type: (str, Any, **Any) -> Generator[Callable[..., Any], Any, None]
|
||||
# use large timeouts to handle polling requests
|
||||
req = httpclient.HTTPRequest(url, connect_timeout=240.0, request_timeout=240.0, **kwargs)
|
||||
client = httpclient.AsyncHTTPClient()
|
||||
|
@ -194,16 +196,19 @@ class BaseWebsocketHandler(WebSocketHandler):
|
|||
# target server port
|
||||
target_port = None # type: int
|
||||
|
||||
def __init__(self, *args: Any, **kwargs: Any) -> None:
|
||||
def __init__(self, *args, **kwargs):
|
||||
# type: (*Any, **Any) -> None
|
||||
super().__init__(*args, **kwargs)
|
||||
# define client for target websocket server
|
||||
self.client = None # type: Any
|
||||
|
||||
def get(self, *args: Any, **kwargs: Any) -> Optional[Callable[..., Any]]:
|
||||
def get(self, *args, **kwargs):
|
||||
# type: (*Any, **Any) -> Optional[Callable[..., Any]]
|
||||
# use get method from WebsocketHandler
|
||||
return super().get(*args, **kwargs)
|
||||
|
||||
def open(self) -> None:
|
||||
def open(self):
|
||||
# type: () -> None
|
||||
# setup connection with target websocket server
|
||||
websocket_url = "ws://{host}:{port}{uri}".format(
|
||||
host=self.target_host,
|
||||
|
@ -215,11 +220,13 @@ class BaseWebsocketHandler(WebSocketHandler):
|
|||
websocket_connect(request, callback=self.open_callback,
|
||||
on_message_callback=self.on_client_message)
|
||||
|
||||
def open_callback(self, future: Any) -> None:
|
||||
def open_callback(self, future):
|
||||
# type: (Any) -> None
|
||||
# callback on connect with target websocket server
|
||||
self.client = future.result()
|
||||
|
||||
def on_client_message(self, message: str) -> None:
|
||||
def on_client_message(self, message):
|
||||
# type: (str) -> None
|
||||
if not message:
|
||||
# if message empty -> target websocket server close connection
|
||||
return self.close()
|
||||
|
@ -227,18 +234,20 @@ class BaseWebsocketHandler(WebSocketHandler):
|
|||
# send message to client if connection exists
|
||||
self.write_message(message, False)
|
||||
|
||||
def on_message(self, message: str, binary: bool=False) -> Optional[Callable[..., Any]]:
|
||||
def on_message(self, message, binary=False):
|
||||
# type: (str, bool) -> Optional[Callable[..., Any]]
|
||||
if not self.client:
|
||||
# close websocket proxy connection if no connection with target websocket server
|
||||
return self.close()
|
||||
self.client.write_message(message, binary)
|
||||
return None
|
||||
|
||||
def check_origin(self, origin: str) -> bool:
|
||||
def check_origin(self, origin):
|
||||
# type: (str) -> bool
|
||||
return True
|
||||
|
||||
def _add_request_headers(self,
|
||||
exclude_lower_headers_list: Optional[List[str]]=None) -> httputil.HTTPHeaders:
|
||||
def _add_request_headers(self, exclude_lower_headers_list=None):
|
||||
# type: (Optional[List[str]]) -> httputil.HTTPHeaders
|
||||
exclude_lower_headers_list = exclude_lower_headers_list or []
|
||||
headers = httputil.HTTPHeaders()
|
||||
for header, v in self.request.headers.get_all():
|
||||
|
@ -249,30 +258,38 @@ class BaseWebsocketHandler(WebSocketHandler):
|
|||
|
||||
class CombineHandler(BaseWebsocketHandler):
|
||||
|
||||
def get(self, *args: Any, **kwargs: Any) -> Optional[Callable[..., Any]]:
|
||||
def get(self, *args, **kwargs):
|
||||
# type: (*Any, **Any) -> Optional[Callable[..., Any]]
|
||||
if self.request.headers.get("Upgrade", "").lower() == 'websocket':
|
||||
return super().get(*args, **kwargs)
|
||||
return None
|
||||
|
||||
def head(self) -> None:
|
||||
def head(self):
|
||||
# type: () -> None
|
||||
pass
|
||||
|
||||
def post(self) -> None:
|
||||
def post(self):
|
||||
# type: () -> None
|
||||
pass
|
||||
|
||||
def put(self) -> None:
|
||||
def put(self):
|
||||
# type: () -> None
|
||||
pass
|
||||
|
||||
def patch(self) -> None:
|
||||
def patch(self):
|
||||
# type: () -> None
|
||||
pass
|
||||
|
||||
def options(self) -> None:
|
||||
def options(self):
|
||||
# type: () -> None
|
||||
pass
|
||||
|
||||
def delete(self) -> None:
|
||||
def delete(self):
|
||||
# type: () -> None
|
||||
pass
|
||||
|
||||
def handle_response(self, response: Any) -> None:
|
||||
def handle_response(self, response):
|
||||
# type: (Any) -> None
|
||||
if response.error and not isinstance(response.error, httpclient.HTTPError):
|
||||
self.set_status(500)
|
||||
self.write('Internal server error:\n' + str(response.error))
|
||||
|
@ -291,7 +308,8 @@ class CombineHandler(BaseWebsocketHandler):
|
|||
self.finish()
|
||||
|
||||
@web.asynchronous
|
||||
def prepare(self) -> None:
|
||||
def prepare(self):
|
||||
# type: () -> None
|
||||
if 'X-REAL-IP' not in self.request.headers:
|
||||
self.request.headers['X-REAL-IP'] = self.request.remote_ip
|
||||
if self.request.headers.get("Upgrade", "").lower() == 'websocket':
|
||||
|
@ -335,7 +353,8 @@ class TornadoHandler(CombineHandler):
|
|||
|
||||
|
||||
class Application(web.Application):
|
||||
def __init__(self, enable_logging: bool=False) -> None:
|
||||
def __init__(self, enable_logging=False):
|
||||
# type: (bool) -> None
|
||||
handlers = [
|
||||
(r"/json/events.*", TornadoHandler),
|
||||
(r"/api/v1/events.*", TornadoHandler),
|
||||
|
@ -345,16 +364,19 @@ class Application(web.Application):
|
|||
]
|
||||
super().__init__(handlers, enable_logging=enable_logging)
|
||||
|
||||
def log_request(self, handler: BaseWebsocketHandler) -> None:
|
||||
def log_request(self, handler):
|
||||
# type: (BaseWebsocketHandler) -> None
|
||||
if self.settings['enable_logging']:
|
||||
super().log_request(handler)
|
||||
|
||||
|
||||
def on_shutdown() -> None:
|
||||
def on_shutdown():
|
||||
# type: () -> None
|
||||
IOLoop.instance().stop()
|
||||
|
||||
|
||||
def shutdown_handler(*args: Any, **kwargs: Any) -> None:
|
||||
def shutdown_handler(*args, **kwargs):
|
||||
# type: (*Any, **Any) -> None
|
||||
io_loop = IOLoop.instance()
|
||||
if io_loop._callbacks:
|
||||
io_loop.call_later(1, shutdown_handler)
|
||||
|
|
|
@ -138,7 +138,8 @@ enforce_fully_covered = sorted(target_fully_covered - not_yet_fully_covered)
|
|||
|
||||
FAILED_TEST_PATH = 'var/last_test_failure.json'
|
||||
|
||||
def get_failed_tests() -> List[str]:
|
||||
def get_failed_tests():
|
||||
# type: () -> List[str]
|
||||
try:
|
||||
with open(FAILED_TEST_PATH, 'r') as f:
|
||||
return ujson.load(f)
|
||||
|
@ -146,15 +147,18 @@ def get_failed_tests() -> List[str]:
|
|||
print("var/last_test_failure.json doesn't exist; running all tests.")
|
||||
return []
|
||||
|
||||
def write_failed_tests(failed_tests: List[str]) -> None:
|
||||
def write_failed_tests(failed_tests):
|
||||
# type: (List[str]) -> None
|
||||
if failed_tests:
|
||||
with open(FAILED_TEST_PATH, 'w') as f:
|
||||
ujson.dump(failed_tests, f)
|
||||
|
||||
def block_internet() -> None:
|
||||
def block_internet():
|
||||
# type: () -> None
|
||||
# We are blocking internet currently by assuming mostly any test would use
|
||||
# httplib2 to access internet.
|
||||
def internet_guard(*args: Any, **kwargs: Any) -> None:
|
||||
def internet_guard(*args, **kwargs):
|
||||
# type: (*Any, **Any) -> None
|
||||
raise Exception("Outgoing network requests are not allowed in the Zulip tests. "
|
||||
"More details and advice are available here:"
|
||||
"https://zulip.readthedocs.io/en/latest/testing/testing.html#internet-access-inside-test-suites")
|
||||
|
@ -271,7 +275,8 @@ if __name__ == "__main__":
|
|||
for suite in args:
|
||||
args[args.index(suite)] = suite.rstrip('/').replace("/", ".")
|
||||
|
||||
def rewrite_arguments(search_key: str) -> None:
|
||||
def rewrite_arguments(search_key):
|
||||
# type: (str) -> None
|
||||
for root, dirs, files_names in os.walk(zerver_test_dir, topdown=False):
|
||||
for file_name in files_names:
|
||||
# Check for files starting with alphanumeric characters and ending with '.py'
|
||||
|
|
|
@ -110,7 +110,8 @@ except OSError:
|
|||
print('Bad command: %s' % (command,))
|
||||
raise
|
||||
|
||||
def check_line_coverage(line_coverage: Dict[Any, Any], line_mapping: Dict[Any, Any], log: bool=True) -> bool:
|
||||
def check_line_coverage(line_coverage, line_mapping, log=True):
|
||||
# type: (Dict[Any, Any], Dict[Any, Any], bool) -> bool
|
||||
missing_lines = []
|
||||
for line in line_coverage:
|
||||
if line_coverage[line] == 0:
|
||||
|
|
|
@ -35,8 +35,10 @@ successful_worker_launches = [
|
|||
'launching queue worker thread user_activity'
|
||||
]
|
||||
|
||||
def check_worker_launch(logfile: IO[str]) -> Text:
|
||||
def check(content: str) -> bool:
|
||||
def check_worker_launch(logfile):
|
||||
# type: (IO[str]) -> Text
|
||||
def check(content):
|
||||
# type: (str) -> bool
|
||||
flag = True
|
||||
for entry in successful_worker_launches:
|
||||
flag = flag and entry in content
|
||||
|
|
|
@ -16,7 +16,8 @@ from typing import IO
|
|||
TOOLS_DIR = os.path.dirname(os.path.abspath(__file__))
|
||||
|
||||
|
||||
def start_server(logfile: IO[str]) -> bool:
|
||||
def start_server(logfile):
|
||||
# type: (IO[str]) -> bool
|
||||
failure = True
|
||||
key = "Quit the server with CTRL-C."
|
||||
for i in range(200):
|
||||
|
@ -30,7 +31,8 @@ def start_server(logfile: IO[str]) -> bool:
|
|||
return failure
|
||||
|
||||
|
||||
def test_nagios(nagios_logfile: IO[str]) -> bool:
|
||||
def test_nagios(nagios_logfile):
|
||||
# type: (IO[str]) -> bool
|
||||
ZULIP_DIR = os.path.join(TOOLS_DIR, '..')
|
||||
API_DIR = os.path.join(ZULIP_DIR, 'api')
|
||||
os.chdir(API_DIR)
|
||||
|
@ -55,7 +57,8 @@ def test_nagios(nagios_logfile: IO[str]) -> bool:
|
|||
return False
|
||||
|
||||
|
||||
def close_and_get_content(file_handle: IO[str]) -> str:
|
||||
def close_and_get_content(file_handle):
|
||||
# type: (IO[str]) -> str
|
||||
file_handle.seek(0)
|
||||
content = file_handle.read()
|
||||
file_handle.close()
|
||||
|
|
|
@ -17,7 +17,8 @@ if __name__ == '__main__':
|
|||
default=False, help='compute test coverage')
|
||||
args = parser.parse_args()
|
||||
|
||||
def dir_join(dir1: str, dir2: str) -> str:
|
||||
def dir_join(dir1, dir2):
|
||||
# type: (str, str) -> str
|
||||
return os.path.abspath(os.path.join(dir1, dir2))
|
||||
|
||||
tools_dir = os.path.dirname(os.path.abspath(__file__))
|
||||
|
|
|
@ -18,13 +18,15 @@ os.chdir(settings.DEPLOY_ROOT)
|
|||
STATIC_PATH = 'static/'
|
||||
|
||||
|
||||
def run() -> None:
|
||||
def run():
|
||||
# type: () -> None
|
||||
"""Builds for production, writing the output to disk"""
|
||||
subprocess.check_call(['node', 'node_modules/.bin/webpack'] +
|
||||
['--config', 'tools/webpack.config.ts', '-p'] +
|
||||
['--env', 'production'])
|
||||
|
||||
def run_watch(host: str, port: str, minify: bool) -> None:
|
||||
def run_watch(host, port, minify):
|
||||
# type: (str, str, bool) -> None
|
||||
"""watches and rebuilds on changes, serving files from memory via webpack-dev-server"""
|
||||
webpack_args = ['node', 'node_modules/.bin/webpack-dev-server']
|
||||
webpack_args += ['--config', 'tools/webpack.config.ts', '--watch-poll', '--port', port, "--host", host]
|
||||
|
@ -32,7 +34,8 @@ def run_watch(host: str, port: str, minify: bool) -> None:
|
|||
webpack_args.append('--optimize-minimize')
|
||||
subprocess.Popen(webpack_args)
|
||||
|
||||
def run_test() -> None:
|
||||
def run_test():
|
||||
# type: () -> None
|
||||
"""Generates a stub asset stat file for django so backend test can render a page"""
|
||||
entries = {}
|
||||
with open('tools/webpack.assets.json') as json_data:
|
||||
|
|
Loading…
Reference in New Issue