zulip/zerver/tests/test_messages.py

87 lines
3.2 KiB
Python
Raw Normal View History

import datetime
from typing import Dict, List
from django.utils.timezone import now as timezone_now
from zerver.lib.actions import get_active_presence_idle_user_ids, get_client
from zerver.lib.test_classes import ZulipTestCase
from zerver.models import (
Message,
UserPresence,
UserProfile,
bulk_get_huddle_user_ids,
get_huddle_user_ids,
)
class MissedMessageTest(ZulipTestCase):
def test_presence_idle_user_ids(self) -> None:
UserPresence.objects.all().delete()
sender = self.example_user('cordelia')
realm = sender.realm
hamlet = self.example_user('hamlet')
othello = self.example_user('othello')
recipient_ids = {hamlet.id, othello.id}
message_type = 'stream'
python: Convert assignment type annotations to Python 3.6 style. This commit was split by tabbott; this piece covers the vast majority of files in Zulip, but excludes scripts/, tools/, and puppet/ to help ensure we at least show the right error messages for Xenial systems. We can likely further refine the remaining pieces with some testing. Generated by com2ann, with whitespace fixes and various manual fixes for runtime issues: - invoiced_through: Optional[LicenseLedger] = models.ForeignKey( + invoiced_through: Optional["LicenseLedger"] = models.ForeignKey( -_apns_client: Optional[APNsClient] = None +_apns_client: Optional["APNsClient"] = None - notifications_stream: Optional[Stream] = models.ForeignKey('Stream', related_name='+', null=True, blank=True, on_delete=CASCADE) - signup_notifications_stream: Optional[Stream] = models.ForeignKey('Stream', related_name='+', null=True, blank=True, on_delete=CASCADE) + notifications_stream: Optional["Stream"] = models.ForeignKey('Stream', related_name='+', null=True, blank=True, on_delete=CASCADE) + signup_notifications_stream: Optional["Stream"] = models.ForeignKey('Stream', related_name='+', null=True, blank=True, on_delete=CASCADE) - author: Optional[UserProfile] = models.ForeignKey('UserProfile', blank=True, null=True, on_delete=CASCADE) + author: Optional["UserProfile"] = models.ForeignKey('UserProfile', blank=True, null=True, on_delete=CASCADE) - bot_owner: Optional[UserProfile] = models.ForeignKey('self', null=True, on_delete=models.SET_NULL) + bot_owner: Optional["UserProfile"] = models.ForeignKey('self', null=True, on_delete=models.SET_NULL) - default_sending_stream: Optional[Stream] = models.ForeignKey('zerver.Stream', null=True, related_name='+', on_delete=CASCADE) - default_events_register_stream: Optional[Stream] = models.ForeignKey('zerver.Stream', null=True, related_name='+', on_delete=CASCADE) + default_sending_stream: Optional["Stream"] = models.ForeignKey('zerver.Stream', null=True, related_name='+', on_delete=CASCADE) + default_events_register_stream: Optional["Stream"] = models.ForeignKey('zerver.Stream', null=True, related_name='+', on_delete=CASCADE) -descriptors_by_handler_id: Dict[int, ClientDescriptor] = {} +descriptors_by_handler_id: Dict[int, "ClientDescriptor"] = {} -worker_classes: Dict[str, Type[QueueProcessingWorker]] = {} -queues: Dict[str, Dict[str, Type[QueueProcessingWorker]]] = {} +worker_classes: Dict[str, Type["QueueProcessingWorker"]] = {} +queues: Dict[str, Dict[str, Type["QueueProcessingWorker"]]] = {} -AUTH_LDAP_REVERSE_EMAIL_SEARCH: Optional[LDAPSearch] = None +AUTH_LDAP_REVERSE_EMAIL_SEARCH: Optional["LDAPSearch"] = None Signed-off-by: Anders Kaseorg <anders@zulipchat.com>
2020-04-22 01:09:50 +02:00
user_flags: Dict[int, List[str]] = {}
def assert_missing(user_ids: List[int]) -> None:
presence_idle_user_ids = get_active_presence_idle_user_ids(
realm=realm,
sender_id=sender.id,
message_type=message_type,
active_user_ids=recipient_ids,
user_flags=user_flags,
)
self.assertEqual(sorted(user_ids), sorted(presence_idle_user_ids))
def set_presence(user: UserProfile, client_name: str, ago: int) -> None:
when = timezone_now() - datetime.timedelta(seconds=ago)
UserPresence.objects.create(
user_profile_id=user.id,
realm_id=user.realm_id,
client=get_client(client_name),
timestamp=when,
)
message_type = 'private'
assert_missing([hamlet.id, othello.id])
message_type = 'stream'
user_flags[hamlet.id] = ['mentioned']
assert_missing([hamlet.id])
set_presence(hamlet, 'iPhone', ago=5000)
assert_missing([hamlet.id])
set_presence(hamlet, 'webapp', ago=15)
assert_missing([])
message_type = 'private'
assert_missing([othello.id])
class TestBulkGetHuddleUserIds(ZulipTestCase):
def test_bulk_get_huddle_user_ids(self) -> None:
hamlet = self.example_user('hamlet')
cordelia = self.example_user('cordelia')
othello = self.example_user('othello')
iago = self.example_user('iago')
message_ids = [
self.send_huddle_message(hamlet, [cordelia, othello], 'test'),
self.send_huddle_message(cordelia, [hamlet, othello, iago], 'test'),
]
messages = Message.objects.filter(id__in=message_ids).order_by("id")
first_huddle_recipient = messages[0].recipient
first_huddle_user_ids = list(get_huddle_user_ids(first_huddle_recipient))
second_huddle_recipient = messages[1].recipient
second_huddle_user_ids = list(get_huddle_user_ids(second_huddle_recipient))
huddle_user_ids = bulk_get_huddle_user_ids([first_huddle_recipient, second_huddle_recipient])
self.assertEqual(huddle_user_ids[first_huddle_recipient.id], first_huddle_user_ids)
self.assertEqual(huddle_user_ids[second_huddle_recipient.id], second_huddle_user_ids)
def test_bulk_get_huddle_user_ids_empty_list(self) -> None:
self.assertEqual(bulk_get_huddle_user_ids([]), {})