ruff: Fix SIM115 Use context handler for opening files.

Signed-off-by: Anders Kaseorg <anders@zulip.com>
This commit is contained in:
Anders Kaseorg 2023-01-17 22:06:51 -05:00 committed by Tim Abbott
parent c6b6004432
commit 6303ebfc2f
4 changed files with 71 additions and 69 deletions

View File

@ -2,7 +2,7 @@ import os
import subprocess
import sys
import time
from contextlib import contextmanager
from contextlib import ExitStack, contextmanager
from typing import Iterator, Optional
# Verify the Zulip venv is available.
@ -58,45 +58,46 @@ def test_server_running(
log_file: Optional[str] = None,
dots: bool = False,
) -> Iterator[None]:
log = sys.stdout
if log_file:
if os.path.exists(log_file) and os.path.getsize(log_file) < 100000:
log = open(log_file, "a")
log.write("\n\n")
else:
log = open(log_file, "w")
with ExitStack() as stack:
log = sys.stdout
if log_file:
if os.path.exists(log_file) and os.path.getsize(log_file) < 100000:
log = stack.enter_context(open(log_file, "a"))
log.write("\n\n")
else:
log = stack.enter_context(open(log_file, "w"))
set_up_django(external_host)
set_up_django(external_host)
update_test_databases_if_required(rebuild_test_database=True)
update_test_databases_if_required(rebuild_test_database=True)
# Run this not through the shell, so that we have the actual PID.
run_dev_server_command = ["tools/run-dev.py", "--test", "--streamlined"]
if skip_provision_check:
run_dev_server_command.append("--skip-provision-check")
server = subprocess.Popen(run_dev_server_command, stdout=log, stderr=log)
# Run this not through the shell, so that we have the actual PID.
run_dev_server_command = ["tools/run-dev.py", "--test", "--streamlined"]
if skip_provision_check:
run_dev_server_command.append("--skip-provision-check")
server = subprocess.Popen(run_dev_server_command, stdout=log, stderr=log)
try:
# Wait for the server to start up.
print(end="\nWaiting for test server (may take a while)")
if not dots:
print("\n", flush=True)
t = time.time()
while not server_is_up(server, log_file):
if dots:
print(end=".", flush=True)
time.sleep(0.4)
if time.time() - t > MAX_SERVER_WAIT:
raise Exception("Timeout waiting for server")
print("\n\n--- SERVER IS UP! ---\n", flush=True)
try:
# Wait for the server to start up.
print(end="\nWaiting for test server (may take a while)")
if not dots:
print("\n", flush=True)
t = time.time()
while not server_is_up(server, log_file):
if dots:
print(end=".", flush=True)
time.sleep(0.4)
if time.time() - t > MAX_SERVER_WAIT:
raise Exception("Timeout waiting for server")
print("\n\n--- SERVER IS UP! ---\n", flush=True)
# DO OUR ACTUAL TESTING HERE!!!
yield
# DO OUR ACTUAL TESTING HERE!!!
yield
finally:
assert_server_running(server, log_file)
server.terminate()
server.wait()
finally:
assert_server_running(server, log_file)
server.terminate()
server.wait()
if __name__ == "__main__":

View File

@ -314,12 +314,13 @@ def main() -> None:
if not file_name[0].isalnum() or not file_name.endswith(".py"):
continue
filepath = os.path.join(root, file_name)
for line in open(filepath):
if search_key not in line:
continue
new_suite = filepath.replace(".py", ".") + suite
args[i] = new_suite
return
with open(filepath) as f:
for line in f:
if search_key not in line:
continue
new_suite = filepath.replace(".py", ".") + suite
args[i] = new_suite
return
for suite in args:
if suite[0].isupper() and "test_" in suite:

View File

@ -12,6 +12,24 @@ def error(*args: Any) -> None:
raise Exception("We cannot enqueue because settings.USING_RABBITMQ is False.")
def enqueue_file(queue_name: str, f: IO[str]) -> None:
for line in f:
line = line.strip()
try:
payload = line.split("\t")[1]
except IndexError:
payload = line
print(f"Queueing to queue {queue_name}: {payload}")
# Verify that payload is valid json.
data = orjson.loads(payload)
# This is designed to use the `error` method rather than
# the call_consume_in_tests flow.
queue_json_publish(queue_name, data, error)
class Command(BaseCommand):
help = """Read JSON lines from a file and enqueue them to a worker queue.
@ -36,26 +54,7 @@ You can use "-" to represent stdin.
file_name = options["file_name"]
if file_name == "-":
f: IO[str] = sys.stdin
enqueue_file(queue_name, sys.stdin)
else:
f = open(file_name)
while True:
line = f.readline()
if not line:
break
line = line.strip()
try:
payload = line.split("\t")[1]
except IndexError:
payload = line
print(f"Queueing to queue {queue_name}: {payload}")
# Verify that payload is valid json.
data = orjson.loads(payload)
# This is designed to use the `error` method rather than
# the call_consume_in_tests flow.
queue_json_publish(queue_name, data, error)
with open(file_name) as f:
enqueue_file(queue_name, f)

View File

@ -41,7 +41,8 @@ def test_generated_curl_examples_for_success(client: Client) -> None:
rest_endpoints_path = os.path.join(
settings.DEPLOY_ROOT, "templates/zerver/api/include/rest-endpoints.md"
)
rest_endpoints_raw = open(rest_endpoints_path).read()
with open(rest_endpoints_path) as f:
rest_endpoints_raw = f.read()
ENDPOINT_REGEXP = re.compile(r"/api/\s*(.*?)\)")
endpoint_list = sorted(set(re.findall(ENDPOINT_REGEXP, rest_endpoints_raw)))
@ -51,12 +52,12 @@ def test_generated_curl_examples_for_success(client: Client) -> None:
curl_commands_to_test = []
if os.path.exists(file_name):
f = open(file_name)
for line in f:
# A typical example from the Markdown source looks like this:
# {generate_code_example(curl)|...|...}
if line.startswith("{generate_code_example(curl"):
curl_commands_to_test.append(line)
with open(file_name) as f:
for line in f:
# A typical example from the Markdown source looks like this:
# {generate_code_example(curl)|...|...}
if line.startswith("{generate_code_example(curl"):
curl_commands_to_test.append(line)
else:
# If the file doesn't exist, then it has been
# deleted and its page is generated by the