mirror of https://github.com/zulip/zulip.git
346 lines
14 KiB
Python
346 lines
14 KiB
Python
from typing import Any, Dict, List, Optional
|
|
from unittest.mock import Mock, patch
|
|
|
|
from django.conf import settings
|
|
|
|
from zerver.apps import flush_cache
|
|
from zerver.lib.cache import (
|
|
MEMCACHED_MAX_KEY_LENGTH,
|
|
InvalidCacheKeyException,
|
|
NotFoundInCache,
|
|
bulk_cached_fetch,
|
|
cache_delete,
|
|
cache_delete_many,
|
|
cache_get,
|
|
cache_get_many,
|
|
cache_set,
|
|
cache_set_many,
|
|
cache_with_key,
|
|
get_cache_with_key,
|
|
safe_cache_get_many,
|
|
safe_cache_set_many,
|
|
user_profile_by_id_cache_key,
|
|
validate_cache_key,
|
|
)
|
|
from zerver.lib.test_classes import ZulipTestCase
|
|
from zerver.lib.test_helpers import queries_captured
|
|
from zerver.models import UserProfile, get_realm, get_system_bot, get_user, get_user_profile_by_id
|
|
|
|
|
|
class AppsTest(ZulipTestCase):
|
|
def test_cache_gets_flushed(self) -> None:
|
|
with self.assertLogs(level="INFO") as m:
|
|
with patch("zerver.apps.cache.clear") as mock:
|
|
# The argument to flush_cache doesn't matter
|
|
flush_cache(Mock())
|
|
mock.assert_called_once()
|
|
self.assertEqual(m.output, ["INFO:root:Clearing memcached cache after migrations"])
|
|
self.assertEqual(len(m.output), 1)
|
|
|
|
|
|
class CacheKeyValidationTest(ZulipTestCase):
|
|
def test_validate_cache_key(self) -> None:
|
|
validate_cache_key("nice_Ascii:string!~")
|
|
with self.assertRaises(InvalidCacheKeyException):
|
|
validate_cache_key("utf8_character:ą")
|
|
with self.assertRaises(InvalidCacheKeyException):
|
|
validate_cache_key("new_line_character:\n")
|
|
with self.assertRaises(InvalidCacheKeyException):
|
|
validate_cache_key("control_character:\r")
|
|
with self.assertRaises(InvalidCacheKeyException):
|
|
validate_cache_key("whitespace_character: ")
|
|
with self.assertRaises(InvalidCacheKeyException):
|
|
validate_cache_key("too_long:" + "X" * MEMCACHED_MAX_KEY_LENGTH)
|
|
|
|
with self.assertRaises(InvalidCacheKeyException):
|
|
# validate_cache_key does validation on a key with the
|
|
# KEY_PREFIX appended to the start, so even though we're
|
|
# passing something "short enough" here, it becomes too
|
|
# long after appending KEY_PREFIX.
|
|
validate_cache_key("X" * (MEMCACHED_MAX_KEY_LENGTH - 2))
|
|
|
|
def test_cache_functions_raise_exception(self) -> None:
|
|
invalid_key = "invalid_character:\n"
|
|
good_key = "good_key"
|
|
with self.assertRaises(InvalidCacheKeyException):
|
|
cache_get(invalid_key)
|
|
with self.assertRaises(InvalidCacheKeyException):
|
|
cache_set(invalid_key, 0)
|
|
with self.assertRaises(InvalidCacheKeyException):
|
|
cache_delete(invalid_key)
|
|
|
|
with self.assertRaises(InvalidCacheKeyException):
|
|
cache_get_many([good_key, invalid_key])
|
|
with self.assertRaises(InvalidCacheKeyException):
|
|
cache_set_many({good_key: 0, invalid_key: 1})
|
|
with self.assertRaises(InvalidCacheKeyException):
|
|
cache_delete_many([good_key, invalid_key])
|
|
|
|
|
|
class CacheWithKeyDecoratorTest(ZulipTestCase):
|
|
def test_cache_with_key_invalid_character(self) -> None:
|
|
def invalid_characters_cache_key_function(user_id: int) -> str:
|
|
return f"CacheWithKeyDecoratorTest:invalid_character:ą:{user_id}"
|
|
|
|
@cache_with_key(invalid_characters_cache_key_function, timeout=1000)
|
|
def get_user_function_with_bad_cache_keys(user_id: int) -> UserProfile:
|
|
return UserProfile.objects.get(id=user_id)
|
|
|
|
hamlet = self.example_user("hamlet")
|
|
with patch("zerver.lib.cache.cache_set") as mock_set, self.assertLogs(level="WARNING") as m:
|
|
with queries_captured() as queries:
|
|
result = get_user_function_with_bad_cache_keys(hamlet.id)
|
|
|
|
self.assertEqual(len(m.output), 1)
|
|
self.assertEqual(result, hamlet)
|
|
self.assert_length(queries, 1)
|
|
mock_set.assert_not_called()
|
|
|
|
def test_cache_with_key_key_too_long(self) -> None:
|
|
def too_long_cache_key_function(user_id: int) -> str:
|
|
return "CacheWithKeyDecoratorTest:very_long_key:{}:{}".format("a" * 250, user_id)
|
|
|
|
@cache_with_key(too_long_cache_key_function, timeout=1000)
|
|
def get_user_function_with_bad_cache_keys(user_id: int) -> UserProfile:
|
|
return UserProfile.objects.get(id=user_id)
|
|
|
|
hamlet = self.example_user("hamlet")
|
|
|
|
with patch("zerver.lib.cache.cache_set") as mock_set, self.assertLogs(level="WARNING") as m:
|
|
with queries_captured() as queries:
|
|
result = get_user_function_with_bad_cache_keys(hamlet.id)
|
|
|
|
self.assertEqual(len(m.output), 1)
|
|
self.assertEqual(result, hamlet)
|
|
self.assert_length(queries, 1)
|
|
mock_set.assert_not_called()
|
|
|
|
def test_cache_with_key_good_key(self) -> None:
|
|
def good_cache_key_function(user_id: int) -> str:
|
|
return f"CacheWithKeyDecoratorTest:good_cache_key:{user_id}"
|
|
|
|
@cache_with_key(good_cache_key_function, timeout=1000)
|
|
def get_user_function_with_good_cache_keys(user_id: int) -> UserProfile:
|
|
return UserProfile.objects.get(id=user_id)
|
|
|
|
hamlet = self.example_user("hamlet")
|
|
|
|
with queries_captured() as queries:
|
|
result = get_user_function_with_good_cache_keys(hamlet.id)
|
|
|
|
self.assertEqual(result, hamlet)
|
|
self.assert_length(queries, 1)
|
|
|
|
# The previous function call should have cached the result correctly, so now
|
|
# no database queries should happen:
|
|
with queries_captured(keep_cache_warm=True) as queries_two:
|
|
result_two = get_user_function_with_good_cache_keys(hamlet.id)
|
|
|
|
self.assertEqual(result_two, hamlet)
|
|
self.assert_length(queries_two, 0)
|
|
|
|
def test_cache_with_key_none_values(self) -> None:
|
|
def cache_key_function(user_id: int) -> str:
|
|
return f"CacheWithKeyDecoratorTest:test_cache_with_key_none_values:{user_id}"
|
|
|
|
@cache_with_key(cache_key_function, timeout=1000)
|
|
def get_user_function_can_return_none(user_id: int) -> Optional[UserProfile]:
|
|
try:
|
|
return UserProfile.objects.get(id=user_id)
|
|
except UserProfile.DoesNotExist:
|
|
return None
|
|
|
|
last_user_id = UserProfile.objects.last().id
|
|
with queries_captured() as queries:
|
|
result = get_user_function_can_return_none(last_user_id + 1)
|
|
|
|
self.assertEqual(result, None)
|
|
self.assert_length(queries, 1)
|
|
|
|
with queries_captured(keep_cache_warm=True) as queries:
|
|
result_two = get_user_function_can_return_none(last_user_id + 1)
|
|
|
|
self.assertEqual(result_two, None)
|
|
self.assert_length(queries, 0)
|
|
|
|
|
|
class GetCacheWithKeyDecoratorTest(ZulipTestCase):
|
|
def test_get_cache_with_good_key(self) -> None:
|
|
# Test with a good cache key function, but a get_user function
|
|
# that always returns None just to make it convenient to tell
|
|
# whether the cache was used (whatever we put in the cache) or
|
|
# we got the result from calling the function (None)
|
|
|
|
def good_cache_key_function(user_id: int) -> str:
|
|
return f"CacheWithKeyDecoratorTest:good_cache_key:{user_id}"
|
|
|
|
@get_cache_with_key(good_cache_key_function)
|
|
def get_user_function_with_good_cache_keys(user_id: int) -> Any: # nocoverage
|
|
return
|
|
|
|
hamlet = self.example_user("hamlet")
|
|
with self.assertRaises(NotFoundInCache):
|
|
get_user_function_with_good_cache_keys(hamlet.id)
|
|
|
|
cache_set(good_cache_key_function(hamlet.id), hamlet)
|
|
result = get_user_function_with_good_cache_keys(hamlet.id)
|
|
self.assertEqual(result, hamlet)
|
|
|
|
def test_get_cache_with_bad_key(self) -> None:
|
|
def bad_cache_key_function(user_id: int) -> str:
|
|
return f"CacheWithKeyDecoratorTest:invalid_character:ą:{user_id}"
|
|
|
|
@get_cache_with_key(bad_cache_key_function)
|
|
def get_user_function_with_bad_cache_keys(user_id: int) -> Any: # nocoverage
|
|
return
|
|
|
|
hamlet = self.example_user("hamlet")
|
|
with self.assertLogs(level="WARNING") as m:
|
|
with self.assertRaises(NotFoundInCache):
|
|
get_user_function_with_bad_cache_keys(hamlet.id)
|
|
self.assertEqual(len(m.output), 1)
|
|
|
|
|
|
class SafeCacheFunctionsTest(ZulipTestCase):
|
|
def test_safe_cache_functions_with_all_good_keys(self) -> None:
|
|
items = {
|
|
"SafeFunctionsTest:key1": 1,
|
|
"SafeFunctionsTest:key2": 2,
|
|
"SafeFunctionsTest:key3": 3,
|
|
}
|
|
safe_cache_set_many(items)
|
|
|
|
result = safe_cache_get_many(list(items.keys()))
|
|
for key, value in result.items():
|
|
self.assertEqual(value, items[key])
|
|
|
|
def test_safe_cache_functions_with_all_bad_keys(self) -> None:
|
|
items = {"SafeFunctionsTest:\nbadkey1": 1, "SafeFunctionsTest:\nbadkey2": 2}
|
|
with self.assertLogs(level="WARNING") as m:
|
|
safe_cache_set_many(items)
|
|
self.assertIn(
|
|
"WARNING:root:Invalid cache key used: ['SafeFunctionsTest:\\nbadkey1', 'SafeFunctionsTest:\\nbadkey2']",
|
|
m.output[0],
|
|
)
|
|
self.assertEqual(len(m.output), 1)
|
|
|
|
with self.assertLogs(level="WARNING") as m:
|
|
result = safe_cache_get_many(list(items.keys()))
|
|
self.assertEqual(result, {})
|
|
self.assertIn(
|
|
"WARNING:root:Invalid cache key used: ['SafeFunctionsTest:\\nbadkey1', 'SafeFunctionsTest:\\nbadkey2']",
|
|
m.output[0],
|
|
)
|
|
self.assertEqual(len(m.output), 1)
|
|
|
|
def test_safe_cache_functions_with_good_and_bad_keys(self) -> None:
|
|
bad_items = {"SafeFunctionsTest:\nbadkey1": 1, "SafeFunctionsTest:\nbadkey2": 2}
|
|
good_items = {"SafeFunctionsTest:goodkey1": 3, "SafeFunctionsTest:goodkey2": 4}
|
|
items = {**good_items, **bad_items}
|
|
|
|
with self.assertLogs(level="WARNING") as m:
|
|
safe_cache_set_many(items)
|
|
self.assertIn(
|
|
"WARNING:root:Invalid cache key used: ['SafeFunctionsTest:\\nbadkey1', 'SafeFunctionsTest:\\nbadkey2']",
|
|
m.output[0],
|
|
)
|
|
self.assertEqual(len(m.output), 1)
|
|
|
|
with self.assertLogs(level="WARNING") as m:
|
|
result = safe_cache_get_many(list(items.keys()))
|
|
self.assertEqual(result, good_items)
|
|
self.assertIn(
|
|
"WARNING:root:Invalid cache key used: ['SafeFunctionsTest:\\nbadkey1', 'SafeFunctionsTest:\\nbadkey2']",
|
|
m.output[0],
|
|
)
|
|
self.assertEqual(len(m.output), 1)
|
|
|
|
|
|
class BotCacheKeyTest(ZulipTestCase):
|
|
def test_bot_profile_key_deleted_on_save(self) -> None:
|
|
realm = get_realm(settings.SYSTEM_BOT_REALM)
|
|
# Get the profile cached on both cache keys:
|
|
user_profile = get_user(settings.EMAIL_GATEWAY_BOT, realm)
|
|
bot_profile = get_system_bot(settings.EMAIL_GATEWAY_BOT)
|
|
self.assertEqual(user_profile, bot_profile)
|
|
|
|
# Flip the setting and save:
|
|
flipped_setting = not bot_profile.can_forge_sender
|
|
bot_profile.can_forge_sender = flipped_setting
|
|
bot_profile.save()
|
|
|
|
# The .save() should have deleted cache keys, so if we fetch again,
|
|
# the returned objects should have can_forge_sender set correctly.
|
|
bot_profile2 = get_system_bot(settings.EMAIL_GATEWAY_BOT)
|
|
self.assertEqual(bot_profile2.can_forge_sender, flipped_setting)
|
|
|
|
user_profile2 = get_user(settings.EMAIL_GATEWAY_BOT, realm)
|
|
self.assertEqual(user_profile2.can_forge_sender, flipped_setting)
|
|
|
|
|
|
def get_user_id(user: UserProfile) -> int:
|
|
return user.id # nocoverage
|
|
|
|
|
|
def get_user_email(user: UserProfile) -> str:
|
|
return user.email # nocoverage
|
|
|
|
|
|
class GenericBulkCachedFetchTest(ZulipTestCase):
|
|
def test_query_function_called_only_if_needed(self) -> None:
|
|
hamlet = self.example_user("hamlet")
|
|
# Get the user cached:
|
|
get_user_profile_by_id(hamlet.id)
|
|
|
|
class CustomException(Exception):
|
|
pass
|
|
|
|
def query_function(ids: List[int]) -> List[UserProfile]:
|
|
raise CustomException("The query function was called")
|
|
|
|
# query_function shouldn't be called, because the only requested object
|
|
# is already cached:
|
|
result: Dict[int, UserProfile] = bulk_cached_fetch(
|
|
cache_key_function=user_profile_by_id_cache_key,
|
|
query_function=query_function,
|
|
object_ids=[hamlet.id],
|
|
id_fetcher=get_user_id,
|
|
)
|
|
self.assertEqual(result, {hamlet.id: hamlet})
|
|
with self.assertLogs(level="INFO") as info_log:
|
|
flush_cache(Mock())
|
|
self.assertEqual(info_log.output, ["INFO:root:Clearing memcached cache after migrations"])
|
|
|
|
# With the cache flushed, the query_function should get called:
|
|
with self.assertRaises(CustomException):
|
|
result = bulk_cached_fetch(
|
|
cache_key_function=user_profile_by_id_cache_key,
|
|
query_function=query_function,
|
|
object_ids=[hamlet.id],
|
|
id_fetcher=get_user_id,
|
|
)
|
|
|
|
def test_empty_object_ids_list(self) -> None:
|
|
class CustomException(Exception):
|
|
pass
|
|
|
|
def cache_key_function(
|
|
email: str,
|
|
) -> str: # nocoverage -- this is just here to make sure it's not called
|
|
raise CustomException("The cache key function was called")
|
|
|
|
def query_function(
|
|
emails: List[str],
|
|
) -> List[UserProfile]: # nocoverage -- this is just here to make sure it's not called
|
|
raise CustomException("The query function was called")
|
|
|
|
# query_function and cache_key_function shouldn't be called, because
|
|
# objects_ids is empty, so there's nothing to do.
|
|
result: Dict[str, UserProfile] = bulk_cached_fetch(
|
|
cache_key_function=cache_key_function,
|
|
query_function=query_function,
|
|
object_ids=[],
|
|
id_fetcher=get_user_email,
|
|
)
|
|
self.assertEqual(result, {})
|