2020-09-05 04:02:13 +02:00
|
|
|
import secrets
|
2020-06-11 00:54:34 +02:00
|
|
|
import time
|
|
|
|
from typing import Dict, List, Tuple, Type
|
|
|
|
from unittest import mock
|
|
|
|
|
2020-04-02 20:40:10 +02:00
|
|
|
from zerver.lib.rate_limiter import (
|
|
|
|
RateLimitedObject,
|
|
|
|
RateLimitedUser,
|
|
|
|
RateLimiterBackend,
|
|
|
|
RedisRateLimiterBackend,
|
2020-03-19 16:10:31 +01:00
|
|
|
TornadoInMemoryRateLimiterBackend,
|
2020-06-11 00:54:34 +02:00
|
|
|
add_ratelimit_rule,
|
|
|
|
remove_ratelimit_rule,
|
2020-04-02 20:40:10 +02:00
|
|
|
)
|
|
|
|
from zerver.lib.test_classes import ZulipTestCase
|
|
|
|
|
2020-09-05 04:02:13 +02:00
|
|
|
RANDOM_KEY_PREFIX = secrets.token_hex(16)
|
2020-04-02 20:40:10 +02:00
|
|
|
|
|
|
|
class RateLimitedTestObject(RateLimitedObject):
|
|
|
|
def __init__(self, name: str, rules: List[Tuple[int, int]],
|
|
|
|
backend: Type[RateLimiterBackend]) -> None:
|
|
|
|
self.name = name
|
|
|
|
self._rules = rules
|
|
|
|
self._rules.sort(key=lambda x: x[0])
|
|
|
|
super().__init__(backend)
|
|
|
|
|
|
|
|
def key(self) -> str:
|
|
|
|
return RANDOM_KEY_PREFIX + self.name
|
|
|
|
|
|
|
|
def rules(self) -> List[Tuple[int, int]]:
|
|
|
|
return self._rules
|
|
|
|
|
|
|
|
class RateLimiterBackendBase(ZulipTestCase):
|
|
|
|
__unittest_skip__ = True
|
|
|
|
|
|
|
|
def setUp(self) -> None:
|
python: Convert assignment type annotations to Python 3.6 style.
This commit was split by tabbott; this piece covers the vast majority
of files in Zulip, but excludes scripts/, tools/, and puppet/ to help
ensure we at least show the right error messages for Xenial systems.
We can likely further refine the remaining pieces with some testing.
Generated by com2ann, with whitespace fixes and various manual fixes
for runtime issues:
- invoiced_through: Optional[LicenseLedger] = models.ForeignKey(
+ invoiced_through: Optional["LicenseLedger"] = models.ForeignKey(
-_apns_client: Optional[APNsClient] = None
+_apns_client: Optional["APNsClient"] = None
- notifications_stream: Optional[Stream] = models.ForeignKey('Stream', related_name='+', null=True, blank=True, on_delete=CASCADE)
- signup_notifications_stream: Optional[Stream] = models.ForeignKey('Stream', related_name='+', null=True, blank=True, on_delete=CASCADE)
+ notifications_stream: Optional["Stream"] = models.ForeignKey('Stream', related_name='+', null=True, blank=True, on_delete=CASCADE)
+ signup_notifications_stream: Optional["Stream"] = models.ForeignKey('Stream', related_name='+', null=True, blank=True, on_delete=CASCADE)
- author: Optional[UserProfile] = models.ForeignKey('UserProfile', blank=True, null=True, on_delete=CASCADE)
+ author: Optional["UserProfile"] = models.ForeignKey('UserProfile', blank=True, null=True, on_delete=CASCADE)
- bot_owner: Optional[UserProfile] = models.ForeignKey('self', null=True, on_delete=models.SET_NULL)
+ bot_owner: Optional["UserProfile"] = models.ForeignKey('self', null=True, on_delete=models.SET_NULL)
- default_sending_stream: Optional[Stream] = models.ForeignKey('zerver.Stream', null=True, related_name='+', on_delete=CASCADE)
- default_events_register_stream: Optional[Stream] = models.ForeignKey('zerver.Stream', null=True, related_name='+', on_delete=CASCADE)
+ default_sending_stream: Optional["Stream"] = models.ForeignKey('zerver.Stream', null=True, related_name='+', on_delete=CASCADE)
+ default_events_register_stream: Optional["Stream"] = models.ForeignKey('zerver.Stream', null=True, related_name='+', on_delete=CASCADE)
-descriptors_by_handler_id: Dict[int, ClientDescriptor] = {}
+descriptors_by_handler_id: Dict[int, "ClientDescriptor"] = {}
-worker_classes: Dict[str, Type[QueueProcessingWorker]] = {}
-queues: Dict[str, Dict[str, Type[QueueProcessingWorker]]] = {}
+worker_classes: Dict[str, Type["QueueProcessingWorker"]] = {}
+queues: Dict[str, Dict[str, Type["QueueProcessingWorker"]]] = {}
-AUTH_LDAP_REVERSE_EMAIL_SEARCH: Optional[LDAPSearch] = None
+AUTH_LDAP_REVERSE_EMAIL_SEARCH: Optional["LDAPSearch"] = None
Signed-off-by: Anders Kaseorg <anders@zulipchat.com>
2020-04-22 01:09:50 +02:00
|
|
|
self.requests_record: Dict[str, List[float]] = {}
|
2020-04-02 20:40:10 +02:00
|
|
|
|
|
|
|
def create_object(self, name: str, rules: List[Tuple[int, int]]) -> RateLimitedTestObject:
|
|
|
|
obj = RateLimitedTestObject(name, rules, self.backend)
|
|
|
|
obj.clear_history()
|
|
|
|
|
|
|
|
return obj
|
|
|
|
|
|
|
|
def make_request(self, obj: RateLimitedTestObject, expect_ratelimited: bool=False,
|
|
|
|
verify_api_calls_left: bool=True) -> None:
|
|
|
|
key = obj.key()
|
|
|
|
if key not in self.requests_record:
|
|
|
|
self.requests_record[key] = []
|
|
|
|
|
|
|
|
ratelimited, secs_to_freedom = obj.rate_limit()
|
|
|
|
if not ratelimited:
|
|
|
|
self.requests_record[key].append(time.time())
|
|
|
|
|
|
|
|
self.assertEqual(ratelimited, expect_ratelimited)
|
|
|
|
|
|
|
|
if verify_api_calls_left:
|
|
|
|
self.verify_api_calls_left(obj)
|
|
|
|
|
|
|
|
def verify_api_calls_left(self, obj: RateLimitedTestObject) -> None:
|
|
|
|
now = time.time()
|
|
|
|
with mock.patch('time.time', return_value=now):
|
|
|
|
calls_remaining, time_till_reset = obj.api_calls_left()
|
|
|
|
|
|
|
|
expected_calls_remaining, expected_time_till_reset = self.expected_api_calls_left(obj, now)
|
|
|
|
self.assertEqual(expected_calls_remaining, calls_remaining)
|
|
|
|
self.assertEqual(expected_time_till_reset, time_till_reset)
|
|
|
|
|
|
|
|
def expected_api_calls_left(self, obj: RateLimitedTestObject, now: float) -> Tuple[int, float]:
|
2020-04-02 22:23:20 +02:00
|
|
|
longest_rule = obj.get_rules()[-1]
|
2020-04-02 20:40:10 +02:00
|
|
|
max_window, max_calls = longest_rule
|
|
|
|
history = self.requests_record.get(obj.key())
|
|
|
|
if history is None:
|
|
|
|
return max_calls, 0
|
|
|
|
history.sort()
|
|
|
|
|
|
|
|
return self.api_calls_left_from_history(history, max_window, max_calls, now)
|
|
|
|
|
|
|
|
def api_calls_left_from_history(self, history: List[float], max_window: int,
|
|
|
|
max_calls: int, now: float) -> Tuple[int, float]:
|
|
|
|
"""
|
|
|
|
This depends on the algorithm used in the backend, and should be defined by the test class.
|
|
|
|
"""
|
2020-04-02 23:00:56 +02:00
|
|
|
raise NotImplementedError()
|
2020-04-02 20:40:10 +02:00
|
|
|
|
|
|
|
def test_hit_ratelimits(self) -> None:
|
python: Use trailing commas consistently.
Automatically generated by the following script, based on the output
of lint with flake8-comma:
import re
import sys
last_filename = None
last_row = None
lines = []
for msg in sys.stdin:
m = re.match(
r"\x1b\[35mflake8 \|\x1b\[0m \x1b\[1;31m(.+):(\d+):(\d+): (\w+)", msg
)
if m:
filename, row_str, col_str, err = m.groups()
row, col = int(row_str), int(col_str)
if filename == last_filename:
assert last_row != row
else:
if last_filename is not None:
with open(last_filename, "w") as f:
f.writelines(lines)
with open(filename) as f:
lines = f.readlines()
last_filename = filename
last_row = row
line = lines[row - 1]
if err in ["C812", "C815"]:
lines[row - 1] = line[: col - 1] + "," + line[col - 1 :]
elif err in ["C819"]:
assert line[col - 2] == ","
lines[row - 1] = line[: col - 2] + line[col - 1 :].lstrip(" ")
if last_filename is not None:
with open(last_filename, "w") as f:
f.writelines(lines)
Signed-off-by: Anders Kaseorg <anders@zulipchat.com>
2020-04-10 05:23:40 +02:00
|
|
|
obj = self.create_object('test', [(2, 3)])
|
2020-04-02 20:40:10 +02:00
|
|
|
|
|
|
|
start_time = time.time()
|
|
|
|
for i in range(3):
|
|
|
|
with mock.patch('time.time', return_value=(start_time + i * 0.1)):
|
|
|
|
self.make_request(obj, expect_ratelimited=False)
|
|
|
|
|
|
|
|
with mock.patch('time.time', return_value=(start_time + 0.4)):
|
|
|
|
self.make_request(obj, expect_ratelimited=True)
|
|
|
|
|
|
|
|
with mock.patch('time.time', return_value=(start_time + 2.01)):
|
|
|
|
self.make_request(obj, expect_ratelimited=False)
|
|
|
|
|
|
|
|
def test_clear_history(self) -> None:
|
python: Use trailing commas consistently.
Automatically generated by the following script, based on the output
of lint with flake8-comma:
import re
import sys
last_filename = None
last_row = None
lines = []
for msg in sys.stdin:
m = re.match(
r"\x1b\[35mflake8 \|\x1b\[0m \x1b\[1;31m(.+):(\d+):(\d+): (\w+)", msg
)
if m:
filename, row_str, col_str, err = m.groups()
row, col = int(row_str), int(col_str)
if filename == last_filename:
assert last_row != row
else:
if last_filename is not None:
with open(last_filename, "w") as f:
f.writelines(lines)
with open(filename) as f:
lines = f.readlines()
last_filename = filename
last_row = row
line = lines[row - 1]
if err in ["C812", "C815"]:
lines[row - 1] = line[: col - 1] + "," + line[col - 1 :]
elif err in ["C819"]:
assert line[col - 2] == ","
lines[row - 1] = line[: col - 2] + line[col - 1 :].lstrip(" ")
if last_filename is not None:
with open(last_filename, "w") as f:
f.writelines(lines)
Signed-off-by: Anders Kaseorg <anders@zulipchat.com>
2020-04-10 05:23:40 +02:00
|
|
|
obj = self.create_object('test', [(2, 3)])
|
2020-04-02 20:40:10 +02:00
|
|
|
start_time = time.time()
|
|
|
|
for i in range(3):
|
|
|
|
with mock.patch('time.time', return_value=(start_time + i * 0.1)):
|
|
|
|
self.make_request(obj, expect_ratelimited=False)
|
|
|
|
with mock.patch('time.time', return_value=(start_time + 0.4)):
|
|
|
|
self.make_request(obj, expect_ratelimited=True)
|
|
|
|
|
|
|
|
obj.clear_history()
|
|
|
|
self.requests_record[obj.key()] = []
|
|
|
|
for i in range(3):
|
|
|
|
with mock.patch('time.time', return_value=(start_time + i * 0.1)):
|
|
|
|
self.make_request(obj, expect_ratelimited=False)
|
|
|
|
|
|
|
|
def test_block_unblock_access(self) -> None:
|
python: Use trailing commas consistently.
Automatically generated by the following script, based on the output
of lint with flake8-comma:
import re
import sys
last_filename = None
last_row = None
lines = []
for msg in sys.stdin:
m = re.match(
r"\x1b\[35mflake8 \|\x1b\[0m \x1b\[1;31m(.+):(\d+):(\d+): (\w+)", msg
)
if m:
filename, row_str, col_str, err = m.groups()
row, col = int(row_str), int(col_str)
if filename == last_filename:
assert last_row != row
else:
if last_filename is not None:
with open(last_filename, "w") as f:
f.writelines(lines)
with open(filename) as f:
lines = f.readlines()
last_filename = filename
last_row = row
line = lines[row - 1]
if err in ["C812", "C815"]:
lines[row - 1] = line[: col - 1] + "," + line[col - 1 :]
elif err in ["C819"]:
assert line[col - 2] == ","
lines[row - 1] = line[: col - 2] + line[col - 1 :].lstrip(" ")
if last_filename is not None:
with open(last_filename, "w") as f:
f.writelines(lines)
Signed-off-by: Anders Kaseorg <anders@zulipchat.com>
2020-04-10 05:23:40 +02:00
|
|
|
obj = self.create_object('test', [(2, 5)])
|
2020-04-02 20:40:10 +02:00
|
|
|
start_time = time.time()
|
|
|
|
|
|
|
|
obj.block_access(1)
|
|
|
|
with mock.patch('time.time', return_value=(start_time)):
|
|
|
|
self.make_request(obj, expect_ratelimited=True, verify_api_calls_left=False)
|
|
|
|
|
|
|
|
obj.unblock_access()
|
|
|
|
with mock.patch('time.time', return_value=(start_time)):
|
|
|
|
self.make_request(obj, expect_ratelimited=False, verify_api_calls_left=False)
|
|
|
|
|
|
|
|
def test_api_calls_left(self) -> None:
|
|
|
|
obj = self.create_object('test', [(2, 5), (3, 6)])
|
|
|
|
start_time = time.time()
|
|
|
|
|
|
|
|
# Check the edge case when no requests have been made yet.
|
|
|
|
with mock.patch('time.time', return_value=(start_time)):
|
|
|
|
self.verify_api_calls_left(obj)
|
|
|
|
|
|
|
|
with mock.patch('time.time', return_value=(start_time)):
|
|
|
|
self.make_request(obj)
|
|
|
|
|
|
|
|
# Check the correct default values again, after the reset has happened on the first rule,
|
|
|
|
# but not the other.
|
|
|
|
with mock.patch('time.time', return_value=(start_time + 2.1)):
|
|
|
|
self.make_request(obj)
|
|
|
|
|
|
|
|
class RedisRateLimiterBackendTest(RateLimiterBackendBase):
|
|
|
|
__unittest_skip__ = False
|
|
|
|
backend = RedisRateLimiterBackend
|
|
|
|
|
|
|
|
def api_calls_left_from_history(self, history: List[float], max_window: int,
|
|
|
|
max_calls: int, now: float) -> Tuple[int, float]:
|
|
|
|
latest_timestamp = history[-1]
|
|
|
|
relevant_requests = [t for t in history if (t >= now - max_window)]
|
|
|
|
relevant_requests_amount = len(relevant_requests)
|
|
|
|
|
|
|
|
return max_calls - relevant_requests_amount, latest_timestamp + max_window - now
|
|
|
|
|
|
|
|
def test_block_access(self) -> None:
|
|
|
|
"""
|
|
|
|
This test cannot verify that the user will get unblocked
|
|
|
|
after the correct amount of time, because that event happens
|
2020-10-23 02:43:28 +02:00
|
|
|
inside Redis, so we're not able to mock the timer. Making the test
|
2020-04-02 20:40:10 +02:00
|
|
|
sleep for 1s is also too costly to be worth it.
|
|
|
|
"""
|
python: Use trailing commas consistently.
Automatically generated by the following script, based on the output
of lint with flake8-comma:
import re
import sys
last_filename = None
last_row = None
lines = []
for msg in sys.stdin:
m = re.match(
r"\x1b\[35mflake8 \|\x1b\[0m \x1b\[1;31m(.+):(\d+):(\d+): (\w+)", msg
)
if m:
filename, row_str, col_str, err = m.groups()
row, col = int(row_str), int(col_str)
if filename == last_filename:
assert last_row != row
else:
if last_filename is not None:
with open(last_filename, "w") as f:
f.writelines(lines)
with open(filename) as f:
lines = f.readlines()
last_filename = filename
last_row = row
line = lines[row - 1]
if err in ["C812", "C815"]:
lines[row - 1] = line[: col - 1] + "," + line[col - 1 :]
elif err in ["C819"]:
assert line[col - 2] == ","
lines[row - 1] = line[: col - 2] + line[col - 1 :].lstrip(" ")
if last_filename is not None:
with open(last_filename, "w") as f:
f.writelines(lines)
Signed-off-by: Anders Kaseorg <anders@zulipchat.com>
2020-04-10 05:23:40 +02:00
|
|
|
obj = self.create_object('test', [(2, 5)])
|
2020-04-02 20:40:10 +02:00
|
|
|
|
|
|
|
obj.block_access(1)
|
|
|
|
self.make_request(obj, expect_ratelimited=True, verify_api_calls_left=False)
|
|
|
|
|
2020-03-19 16:10:31 +01:00
|
|
|
class TornadoInMemoryRateLimiterBackendTest(RateLimiterBackendBase):
|
|
|
|
__unittest_skip__ = False
|
|
|
|
backend = TornadoInMemoryRateLimiterBackend
|
|
|
|
|
|
|
|
def api_calls_left_from_history(self, history: List[float], max_window: int,
|
|
|
|
max_calls: int, now: float) -> Tuple[int, float]:
|
|
|
|
reset_time = 0.0
|
|
|
|
for timestamp in history:
|
|
|
|
reset_time = max(reset_time, timestamp) + (max_window / max_calls)
|
|
|
|
|
|
|
|
calls_left = (now + max_window - reset_time) * max_calls // max_window
|
|
|
|
calls_left = int(calls_left)
|
|
|
|
|
|
|
|
return calls_left, reset_time - now
|
|
|
|
|
|
|
|
def test_used_in_tornado(self) -> None:
|
|
|
|
user_profile = self.example_user("hamlet")
|
|
|
|
with self.settings(RUNNING_INSIDE_TORNADO=True):
|
2020-05-25 21:50:07 +02:00
|
|
|
obj = RateLimitedUser(user_profile, domain='api_by_user')
|
2020-03-19 16:10:31 +01:00
|
|
|
self.assertEqual(obj.backend, TornadoInMemoryRateLimiterBackend)
|
|
|
|
|
2020-05-25 21:50:07 +02:00
|
|
|
with self.settings(RUNNING_INSIDE_TORNADO=True):
|
|
|
|
obj = RateLimitedUser(user_profile, domain='some_domain')
|
|
|
|
self.assertEqual(obj.backend, RedisRateLimiterBackend)
|
|
|
|
|
2020-03-19 16:10:31 +01:00
|
|
|
def test_block_access(self) -> None:
|
python: Use trailing commas consistently.
Automatically generated by the following script, based on the output
of lint with flake8-comma:
import re
import sys
last_filename = None
last_row = None
lines = []
for msg in sys.stdin:
m = re.match(
r"\x1b\[35mflake8 \|\x1b\[0m \x1b\[1;31m(.+):(\d+):(\d+): (\w+)", msg
)
if m:
filename, row_str, col_str, err = m.groups()
row, col = int(row_str), int(col_str)
if filename == last_filename:
assert last_row != row
else:
if last_filename is not None:
with open(last_filename, "w") as f:
f.writelines(lines)
with open(filename) as f:
lines = f.readlines()
last_filename = filename
last_row = row
line = lines[row - 1]
if err in ["C812", "C815"]:
lines[row - 1] = line[: col - 1] + "," + line[col - 1 :]
elif err in ["C819"]:
assert line[col - 2] == ","
lines[row - 1] = line[: col - 2] + line[col - 1 :].lstrip(" ")
if last_filename is not None:
with open(last_filename, "w") as f:
f.writelines(lines)
Signed-off-by: Anders Kaseorg <anders@zulipchat.com>
2020-04-10 05:23:40 +02:00
|
|
|
obj = self.create_object('test', [(2, 5)])
|
2020-03-19 16:10:31 +01:00
|
|
|
start_time = time.time()
|
|
|
|
|
|
|
|
obj.block_access(1)
|
|
|
|
with mock.patch('time.time', return_value=(start_time)):
|
|
|
|
self.make_request(obj, expect_ratelimited=True, verify_api_calls_left=False)
|
|
|
|
|
|
|
|
with mock.patch('time.time', return_value=(start_time + 1.01)):
|
|
|
|
self.make_request(obj, expect_ratelimited=False, verify_api_calls_left=False)
|
|
|
|
|
2020-04-02 22:23:20 +02:00
|
|
|
class RateLimitedObjectsTest(ZulipTestCase):
|
2020-04-02 20:40:10 +02:00
|
|
|
def test_user_rate_limits(self) -> None:
|
|
|
|
user_profile = self.example_user("hamlet")
|
|
|
|
user_profile.rate_limits = "1:3,2:4"
|
|
|
|
obj = RateLimitedUser(user_profile)
|
|
|
|
|
2020-04-02 22:23:20 +02:00
|
|
|
self.assertEqual(obj.get_rules(), [(1, 3), (2, 4)])
|
2020-04-02 20:40:10 +02:00
|
|
|
|
|
|
|
def test_add_remove_rule(self) -> None:
|
|
|
|
user_profile = self.example_user("hamlet")
|
|
|
|
add_ratelimit_rule(1, 2)
|
|
|
|
add_ratelimit_rule(4, 5, domain='some_new_domain')
|
|
|
|
add_ratelimit_rule(10, 100, domain='some_new_domain')
|
|
|
|
obj = RateLimitedUser(user_profile)
|
|
|
|
|
python: Use trailing commas consistently.
Automatically generated by the following script, based on the output
of lint with flake8-comma:
import re
import sys
last_filename = None
last_row = None
lines = []
for msg in sys.stdin:
m = re.match(
r"\x1b\[35mflake8 \|\x1b\[0m \x1b\[1;31m(.+):(\d+):(\d+): (\w+)", msg
)
if m:
filename, row_str, col_str, err = m.groups()
row, col = int(row_str), int(col_str)
if filename == last_filename:
assert last_row != row
else:
if last_filename is not None:
with open(last_filename, "w") as f:
f.writelines(lines)
with open(filename) as f:
lines = f.readlines()
last_filename = filename
last_row = row
line = lines[row - 1]
if err in ["C812", "C815"]:
lines[row - 1] = line[: col - 1] + "," + line[col - 1 :]
elif err in ["C819"]:
assert line[col - 2] == ","
lines[row - 1] = line[: col - 2] + line[col - 1 :].lstrip(" ")
if last_filename is not None:
with open(last_filename, "w") as f:
f.writelines(lines)
Signed-off-by: Anders Kaseorg <anders@zulipchat.com>
2020-04-10 05:23:40 +02:00
|
|
|
self.assertEqual(obj.get_rules(), [(1, 2)])
|
2020-04-02 20:40:10 +02:00
|
|
|
obj.domain = 'some_new_domain'
|
2020-04-02 22:23:20 +02:00
|
|
|
self.assertEqual(obj.get_rules(), [(4, 5), (10, 100)])
|
2020-04-02 20:40:10 +02:00
|
|
|
|
|
|
|
remove_ratelimit_rule(10, 100, domain='some_new_domain')
|
python: Use trailing commas consistently.
Automatically generated by the following script, based on the output
of lint with flake8-comma:
import re
import sys
last_filename = None
last_row = None
lines = []
for msg in sys.stdin:
m = re.match(
r"\x1b\[35mflake8 \|\x1b\[0m \x1b\[1;31m(.+):(\d+):(\d+): (\w+)", msg
)
if m:
filename, row_str, col_str, err = m.groups()
row, col = int(row_str), int(col_str)
if filename == last_filename:
assert last_row != row
else:
if last_filename is not None:
with open(last_filename, "w") as f:
f.writelines(lines)
with open(filename) as f:
lines = f.readlines()
last_filename = filename
last_row = row
line = lines[row - 1]
if err in ["C812", "C815"]:
lines[row - 1] = line[: col - 1] + "," + line[col - 1 :]
elif err in ["C819"]:
assert line[col - 2] == ","
lines[row - 1] = line[: col - 2] + line[col - 1 :].lstrip(" ")
if last_filename is not None:
with open(last_filename, "w") as f:
f.writelines(lines)
Signed-off-by: Anders Kaseorg <anders@zulipchat.com>
2020-04-10 05:23:40 +02:00
|
|
|
self.assertEqual(obj.get_rules(), [(4, 5)])
|
2020-04-02 22:23:20 +02:00
|
|
|
|
|
|
|
def test_empty_rules_edge_case(self) -> None:
|
|
|
|
obj = RateLimitedTestObject("test", rules=[], backend=RedisRateLimiterBackend)
|
python: Use trailing commas consistently.
Automatically generated by the following script, based on the output
of lint with flake8-comma:
import re
import sys
last_filename = None
last_row = None
lines = []
for msg in sys.stdin:
m = re.match(
r"\x1b\[35mflake8 \|\x1b\[0m \x1b\[1;31m(.+):(\d+):(\d+): (\w+)", msg
)
if m:
filename, row_str, col_str, err = m.groups()
row, col = int(row_str), int(col_str)
if filename == last_filename:
assert last_row != row
else:
if last_filename is not None:
with open(last_filename, "w") as f:
f.writelines(lines)
with open(filename) as f:
lines = f.readlines()
last_filename = filename
last_row = row
line = lines[row - 1]
if err in ["C812", "C815"]:
lines[row - 1] = line[: col - 1] + "," + line[col - 1 :]
elif err in ["C819"]:
assert line[col - 2] == ","
lines[row - 1] = line[: col - 2] + line[col - 1 :].lstrip(" ")
if last_filename is not None:
with open(last_filename, "w") as f:
f.writelines(lines)
Signed-off-by: Anders Kaseorg <anders@zulipchat.com>
2020-04-10 05:23:40 +02:00
|
|
|
self.assertEqual(obj.get_rules(), [(1, 9999)])
|