2020-06-11 00:54:34 +02:00
|
|
|
from typing import Any, List, Mapping
|
|
|
|
|
2020-08-07 01:09:47 +02:00
|
|
|
import orjson
|
2016-10-12 20:57:59 +02:00
|
|
|
|
2020-06-11 00:54:34 +02:00
|
|
|
from zerver.lib.test_classes import ZulipTestCase
|
|
|
|
from zerver.models import Huddle, get_huddle_hash
|
|
|
|
|
2016-10-12 20:57:59 +02:00
|
|
|
|
2020-02-22 13:38:09 +01:00
|
|
|
class TypingValidateOperatorTest(ZulipTestCase):
|
2017-11-05 10:51:25 +01:00
|
|
|
def test_missing_parameter(self) -> None:
|
2016-10-12 20:57:59 +02:00
|
|
|
"""
|
|
|
|
Sending typing notification without op parameter fails
|
|
|
|
"""
|
2020-02-22 13:38:09 +01:00
|
|
|
sender = self.example_user("hamlet")
|
|
|
|
params = dict(
|
2020-08-07 01:09:47 +02:00
|
|
|
to=orjson.dumps([sender.id]).decode(),
|
2020-02-22 13:38:09 +01:00
|
|
|
)
|
2021-02-12 08:20:45 +01:00
|
|
|
result = self.api_post(sender, "/api/v1/typing", params)
|
|
|
|
self.assert_json_error(result, "Missing 'op' argument")
|
2016-10-12 20:57:59 +02:00
|
|
|
|
2020-12-24 21:00:20 +01:00
|
|
|
def test_invalid_parameter_pm(self) -> None:
|
2016-10-12 20:57:59 +02:00
|
|
|
"""
|
|
|
|
Sending typing notification with invalid value for op parameter fails
|
|
|
|
"""
|
2020-02-22 13:38:09 +01:00
|
|
|
sender = self.example_user("hamlet")
|
|
|
|
params = dict(
|
2020-08-07 01:09:47 +02:00
|
|
|
to=orjson.dumps([sender.id]).decode(),
|
2021-02-12 08:20:45 +01:00
|
|
|
op="foo",
|
2020-02-22 13:38:09 +01:00
|
|
|
)
|
2021-02-12 08:20:45 +01:00
|
|
|
result = self.api_post(sender, "/api/v1/typing", params)
|
2021-02-05 19:32:46 +01:00
|
|
|
self.assert_json_error(result, "Invalid op")
|
2016-10-12 20:57:59 +02:00
|
|
|
|
2020-12-24 21:00:20 +01:00
|
|
|
def test_invalid_parameter_stream(self) -> None:
|
|
|
|
sender = self.example_user("hamlet")
|
|
|
|
|
|
|
|
result = self.api_post(
|
|
|
|
sender, "/api/v1/typing", {"op": "foo", "stream_id": 1, "topic": "topic"}
|
|
|
|
)
|
|
|
|
self.assert_json_error(result, "Invalid op")
|
|
|
|
|
2021-02-12 08:19:30 +01:00
|
|
|
|
2021-04-04 12:11:19 +02:00
|
|
|
class TypingMessagetypeTest(ZulipTestCase):
|
|
|
|
def test_invalid_type(self) -> None:
|
|
|
|
sender = self.example_user("hamlet")
|
|
|
|
params = dict(
|
|
|
|
to=orjson.dumps([sender.id]).decode(),
|
|
|
|
type="invalid",
|
|
|
|
op="start",
|
|
|
|
)
|
|
|
|
result = self.api_post(sender, "/api/v1/typing", params)
|
|
|
|
self.assert_json_error(result, "Invalid type")
|
|
|
|
|
|
|
|
|
2020-12-24 21:00:20 +01:00
|
|
|
class TypingValidateToArgumentsTest(ZulipTestCase):
|
|
|
|
def test_empty_to_array_pms(self) -> None:
|
2020-02-22 13:38:09 +01:00
|
|
|
"""
|
2020-12-24 21:00:20 +01:00
|
|
|
Sending pms typing notification without recipient fails
|
2020-02-22 13:38:09 +01:00
|
|
|
"""
|
2020-03-10 11:48:26 +01:00
|
|
|
sender = self.example_user("hamlet")
|
2021-02-12 08:20:45 +01:00
|
|
|
result = self.api_post(sender, "/api/v1/typing", {"op": "start", "to": "[]"})
|
2020-12-24 21:00:20 +01:00
|
|
|
self.assert_json_error(result, "Empty 'to' list")
|
|
|
|
|
|
|
|
def test_empty_to_array_stream(self) -> None:
|
|
|
|
"""
|
|
|
|
Sending stream typing notification without recipient fails
|
|
|
|
"""
|
|
|
|
sender = self.example_user("hamlet")
|
|
|
|
result = self.api_post(
|
|
|
|
sender, "/api/v1/typing", {"type": "stream", "op": "start", "to": "[]"}
|
|
|
|
)
|
|
|
|
self.assert_json_error(result, "Empty 'to' list")
|
2020-02-22 13:38:09 +01:00
|
|
|
|
2017-11-05 10:51:25 +01:00
|
|
|
def test_missing_recipient(self) -> None:
|
2016-10-12 20:57:59 +02:00
|
|
|
"""
|
|
|
|
Sending typing notification without recipient fails
|
|
|
|
"""
|
2020-03-10 11:48:26 +01:00
|
|
|
sender = self.example_user("hamlet")
|
2021-02-12 08:20:45 +01:00
|
|
|
result = self.api_post(sender, "/api/v1/typing", {"op": "start"})
|
2020-02-28 18:23:57 +01:00
|
|
|
self.assert_json_error(result, "Missing 'to' argument")
|
2016-10-12 20:57:59 +02:00
|
|
|
|
2020-02-22 13:38:09 +01:00
|
|
|
def test_argument_to_is_not_valid_json(self) -> None:
|
2016-10-12 20:57:59 +02:00
|
|
|
"""
|
|
|
|
Sending typing notification to invalid recipient fails
|
|
|
|
"""
|
2020-03-10 11:48:26 +01:00
|
|
|
sender = self.example_user("hamlet")
|
2021-02-12 08:20:45 +01:00
|
|
|
invalid = "bad email"
|
|
|
|
result = self.api_post(sender, "/api/v1/typing", {"op": "start", "to": invalid})
|
2020-02-28 18:23:57 +01:00
|
|
|
self.assert_json_error(result, 'Argument "to" is not valid JSON.')
|
2016-10-12 20:57:59 +02:00
|
|
|
|
2020-02-22 13:38:09 +01:00
|
|
|
def test_bogus_user_id(self) -> None:
|
2016-10-12 20:57:59 +02:00
|
|
|
"""
|
2020-02-22 13:38:09 +01:00
|
|
|
Sending typing notification to invalid recipient fails
|
2016-10-12 20:57:59 +02:00
|
|
|
"""
|
2020-03-10 11:48:26 +01:00
|
|
|
sender = self.example_user("hamlet")
|
2021-02-12 08:20:45 +01:00
|
|
|
invalid = "[9999999]"
|
|
|
|
result = self.api_post(sender, "/api/v1/typing", {"op": "start", "to": invalid})
|
|
|
|
self.assert_json_error(result, "Invalid user ID 9999999")
|
2016-10-12 20:57:59 +02:00
|
|
|
|
2020-12-24 21:00:20 +01:00
|
|
|
def test_send_multiple_stream_ids(self) -> None:
|
|
|
|
sender = self.example_user("hamlet")
|
|
|
|
|
|
|
|
result = self.api_post(
|
|
|
|
sender, "/api/v1/typing", {"type": "stream", "op": "stop", "to": "[1, 2, 3]"}
|
|
|
|
)
|
|
|
|
self.assert_json_error(result, "Cannot send to multiple streams")
|
|
|
|
|
|
|
|
def test_includes_stream_id_but_not_topic(self) -> None:
|
|
|
|
sender = self.example_user("hamlet")
|
|
|
|
stream_id = self.get_stream_id("general")
|
|
|
|
|
|
|
|
result = self.api_post(
|
|
|
|
sender,
|
|
|
|
"/api/v1/typing",
|
|
|
|
{"type": "stream", "op": "start", "to": orjson.dumps([stream_id]).decode()},
|
|
|
|
)
|
|
|
|
self.assert_json_error(result, "Missing topic")
|
|
|
|
|
|
|
|
def test_stream_doesnt_exist(self) -> None:
|
|
|
|
sender = self.example_user("hamlet")
|
|
|
|
stream_id = self.INVALID_STREAM_ID
|
|
|
|
topic = "some topic"
|
|
|
|
|
|
|
|
result = self.api_post(
|
|
|
|
sender,
|
|
|
|
"/api/v1/typing",
|
|
|
|
{
|
|
|
|
"type": "stream",
|
|
|
|
"op": "start",
|
|
|
|
"to": orjson.dumps([stream_id]).decode(),
|
|
|
|
"topic": topic,
|
|
|
|
},
|
|
|
|
)
|
2022-05-27 14:03:08 +02:00
|
|
|
self.assert_json_error(result, "Invalid stream ID")
|
2021-02-12 08:19:30 +01:00
|
|
|
|
2020-12-24 21:00:20 +01:00
|
|
|
|
|
|
|
class TypingHappyPathTestPMs(ZulipTestCase):
|
2020-02-22 13:38:09 +01:00
|
|
|
def test_start_to_single_recipient(self) -> None:
|
2021-02-12 08:20:45 +01:00
|
|
|
sender = self.example_user("hamlet")
|
|
|
|
recipient_user = self.example_user("othello")
|
2020-04-09 21:51:58 +02:00
|
|
|
expected_recipients = {sender, recipient_user}
|
|
|
|
expected_recipient_emails = {user.email for user in expected_recipients}
|
|
|
|
expected_recipient_ids = {user.id for user in expected_recipients}
|
2018-08-28 21:48:47 +02:00
|
|
|
|
2020-02-23 17:24:41 +01:00
|
|
|
params = dict(
|
2020-08-07 01:09:47 +02:00
|
|
|
to=orjson.dumps([recipient_user.id]).decode(),
|
2021-02-12 08:20:45 +01:00
|
|
|
op="start",
|
2020-02-23 17:24:41 +01:00
|
|
|
)
|
|
|
|
|
python: Convert assignment type annotations to Python 3.6 style.
This commit was split by tabbott; this piece covers the vast majority
of files in Zulip, but excludes scripts/, tools/, and puppet/ to help
ensure we at least show the right error messages for Xenial systems.
We can likely further refine the remaining pieces with some testing.
Generated by com2ann, with whitespace fixes and various manual fixes
for runtime issues:
- invoiced_through: Optional[LicenseLedger] = models.ForeignKey(
+ invoiced_through: Optional["LicenseLedger"] = models.ForeignKey(
-_apns_client: Optional[APNsClient] = None
+_apns_client: Optional["APNsClient"] = None
- notifications_stream: Optional[Stream] = models.ForeignKey('Stream', related_name='+', null=True, blank=True, on_delete=CASCADE)
- signup_notifications_stream: Optional[Stream] = models.ForeignKey('Stream', related_name='+', null=True, blank=True, on_delete=CASCADE)
+ notifications_stream: Optional["Stream"] = models.ForeignKey('Stream', related_name='+', null=True, blank=True, on_delete=CASCADE)
+ signup_notifications_stream: Optional["Stream"] = models.ForeignKey('Stream', related_name='+', null=True, blank=True, on_delete=CASCADE)
- author: Optional[UserProfile] = models.ForeignKey('UserProfile', blank=True, null=True, on_delete=CASCADE)
+ author: Optional["UserProfile"] = models.ForeignKey('UserProfile', blank=True, null=True, on_delete=CASCADE)
- bot_owner: Optional[UserProfile] = models.ForeignKey('self', null=True, on_delete=models.SET_NULL)
+ bot_owner: Optional["UserProfile"] = models.ForeignKey('self', null=True, on_delete=models.SET_NULL)
- default_sending_stream: Optional[Stream] = models.ForeignKey('zerver.Stream', null=True, related_name='+', on_delete=CASCADE)
- default_events_register_stream: Optional[Stream] = models.ForeignKey('zerver.Stream', null=True, related_name='+', on_delete=CASCADE)
+ default_sending_stream: Optional["Stream"] = models.ForeignKey('zerver.Stream', null=True, related_name='+', on_delete=CASCADE)
+ default_events_register_stream: Optional["Stream"] = models.ForeignKey('zerver.Stream', null=True, related_name='+', on_delete=CASCADE)
-descriptors_by_handler_id: Dict[int, ClientDescriptor] = {}
+descriptors_by_handler_id: Dict[int, "ClientDescriptor"] = {}
-worker_classes: Dict[str, Type[QueueProcessingWorker]] = {}
-queues: Dict[str, Dict[str, Type[QueueProcessingWorker]]] = {}
+worker_classes: Dict[str, Type["QueueProcessingWorker"]] = {}
+queues: Dict[str, Dict[str, Type["QueueProcessingWorker"]]] = {}
-AUTH_LDAP_REVERSE_EMAIL_SEARCH: Optional[LDAPSearch] = None
+AUTH_LDAP_REVERSE_EMAIL_SEARCH: Optional["LDAPSearch"] = None
Signed-off-by: Anders Kaseorg <anders@zulipchat.com>
2020-04-22 01:09:50 +02:00
|
|
|
events: List[Mapping[str, Any]] = []
|
2022-10-15 22:47:40 +02:00
|
|
|
with self.assert_database_query_count(4):
|
2021-05-28 07:27:50 +02:00
|
|
|
with self.tornado_redirected_to_list(events, expected_num_events=1):
|
2021-02-12 08:20:45 +01:00
|
|
|
result = self.api_post(sender, "/api/v1/typing", params)
|
2018-08-28 21:48:47 +02:00
|
|
|
|
|
|
|
self.assert_json_success(result)
|
2021-05-17 05:41:32 +02:00
|
|
|
self.assert_length(events, 1)
|
2018-08-28 21:48:47 +02:00
|
|
|
|
2021-02-12 08:20:45 +01:00
|
|
|
event = events[0]["event"]
|
|
|
|
event_recipient_emails = {user["email"] for user in event["recipients"]}
|
|
|
|
event_user_ids = set(events[0]["users"])
|
|
|
|
event_recipient_user_ids = {user["user_id"] for user in event["recipients"]}
|
2018-08-28 21:48:47 +02:00
|
|
|
|
|
|
|
self.assertEqual(expected_recipient_ids, event_recipient_user_ids)
|
|
|
|
self.assertEqual(expected_recipient_ids, event_user_ids)
|
2021-02-12 08:20:45 +01:00
|
|
|
self.assertEqual(event["sender"]["email"], sender.email)
|
2018-08-28 21:48:47 +02:00
|
|
|
self.assertEqual(event_recipient_emails, expected_recipient_emails)
|
2021-02-12 08:20:45 +01:00
|
|
|
self.assertEqual(event["type"], "typing")
|
|
|
|
self.assertEqual(event["op"], "start")
|
2018-08-28 21:48:47 +02:00
|
|
|
|
2020-02-22 13:38:09 +01:00
|
|
|
def test_start_to_multiple_recipients(self) -> None:
|
2021-02-12 08:20:45 +01:00
|
|
|
sender = self.example_user("hamlet")
|
|
|
|
recipient_users = [self.example_user("othello"), self.example_user("cordelia")]
|
2020-04-09 21:51:58 +02:00
|
|
|
expected_recipients = set(recipient_users) | {sender}
|
|
|
|
expected_recipient_emails = {user.email for user in expected_recipients}
|
|
|
|
expected_recipient_ids = {user.id for user in expected_recipients}
|
2020-02-23 18:05:29 +01:00
|
|
|
|
|
|
|
huddle_hash = get_huddle_hash(list(expected_recipient_ids))
|
|
|
|
self.assertFalse(Huddle.objects.filter(huddle_hash=huddle_hash).exists())
|
|
|
|
|
python: Convert assignment type annotations to Python 3.6 style.
This commit was split by tabbott; this piece covers the vast majority
of files in Zulip, but excludes scripts/, tools/, and puppet/ to help
ensure we at least show the right error messages for Xenial systems.
We can likely further refine the remaining pieces with some testing.
Generated by com2ann, with whitespace fixes and various manual fixes
for runtime issues:
- invoiced_through: Optional[LicenseLedger] = models.ForeignKey(
+ invoiced_through: Optional["LicenseLedger"] = models.ForeignKey(
-_apns_client: Optional[APNsClient] = None
+_apns_client: Optional["APNsClient"] = None
- notifications_stream: Optional[Stream] = models.ForeignKey('Stream', related_name='+', null=True, blank=True, on_delete=CASCADE)
- signup_notifications_stream: Optional[Stream] = models.ForeignKey('Stream', related_name='+', null=True, blank=True, on_delete=CASCADE)
+ notifications_stream: Optional["Stream"] = models.ForeignKey('Stream', related_name='+', null=True, blank=True, on_delete=CASCADE)
+ signup_notifications_stream: Optional["Stream"] = models.ForeignKey('Stream', related_name='+', null=True, blank=True, on_delete=CASCADE)
- author: Optional[UserProfile] = models.ForeignKey('UserProfile', blank=True, null=True, on_delete=CASCADE)
+ author: Optional["UserProfile"] = models.ForeignKey('UserProfile', blank=True, null=True, on_delete=CASCADE)
- bot_owner: Optional[UserProfile] = models.ForeignKey('self', null=True, on_delete=models.SET_NULL)
+ bot_owner: Optional["UserProfile"] = models.ForeignKey('self', null=True, on_delete=models.SET_NULL)
- default_sending_stream: Optional[Stream] = models.ForeignKey('zerver.Stream', null=True, related_name='+', on_delete=CASCADE)
- default_events_register_stream: Optional[Stream] = models.ForeignKey('zerver.Stream', null=True, related_name='+', on_delete=CASCADE)
+ default_sending_stream: Optional["Stream"] = models.ForeignKey('zerver.Stream', null=True, related_name='+', on_delete=CASCADE)
+ default_events_register_stream: Optional["Stream"] = models.ForeignKey('zerver.Stream', null=True, related_name='+', on_delete=CASCADE)
-descriptors_by_handler_id: Dict[int, ClientDescriptor] = {}
+descriptors_by_handler_id: Dict[int, "ClientDescriptor"] = {}
-worker_classes: Dict[str, Type[QueueProcessingWorker]] = {}
-queues: Dict[str, Dict[str, Type[QueueProcessingWorker]]] = {}
+worker_classes: Dict[str, Type["QueueProcessingWorker"]] = {}
+queues: Dict[str, Dict[str, Type["QueueProcessingWorker"]]] = {}
-AUTH_LDAP_REVERSE_EMAIL_SEARCH: Optional[LDAPSearch] = None
+AUTH_LDAP_REVERSE_EMAIL_SEARCH: Optional["LDAPSearch"] = None
Signed-off-by: Anders Kaseorg <anders@zulipchat.com>
2020-04-22 01:09:50 +02:00
|
|
|
events: List[Mapping[str, Any]] = []
|
2020-02-23 17:24:41 +01:00
|
|
|
|
|
|
|
params = dict(
|
2020-08-07 01:09:47 +02:00
|
|
|
to=orjson.dumps([user.id for user in recipient_users]).decode(),
|
2021-02-12 08:20:45 +01:00
|
|
|
op="start",
|
2020-02-23 17:24:41 +01:00
|
|
|
)
|
|
|
|
|
2022-10-15 22:47:40 +02:00
|
|
|
with self.assert_database_query_count(5):
|
2021-05-28 07:27:50 +02:00
|
|
|
with self.tornado_redirected_to_list(events, expected_num_events=1):
|
2021-02-12 08:20:45 +01:00
|
|
|
result = self.api_post(sender, "/api/v1/typing", params)
|
2018-08-28 21:48:47 +02:00
|
|
|
self.assert_json_success(result)
|
2021-05-17 05:41:32 +02:00
|
|
|
self.assert_length(events, 1)
|
2018-08-28 21:48:47 +02:00
|
|
|
|
2020-02-23 14:10:26 +01:00
|
|
|
# We should not be adding new Huddles just because
|
|
|
|
# a user started typing in the compose box. Let's
|
|
|
|
# wait till they send an actual message.
|
|
|
|
self.assertFalse(Huddle.objects.filter(huddle_hash=huddle_hash).exists())
|
2020-02-23 18:05:29 +01:00
|
|
|
|
2021-02-12 08:20:45 +01:00
|
|
|
event = events[0]["event"]
|
|
|
|
event_recipient_emails = {user["email"] for user in event["recipients"]}
|
|
|
|
event_user_ids = set(events[0]["users"])
|
|
|
|
event_recipient_user_ids = {user["user_id"] for user in event["recipients"]}
|
2018-08-28 21:48:47 +02:00
|
|
|
|
|
|
|
self.assertEqual(expected_recipient_ids, event_recipient_user_ids)
|
|
|
|
self.assertEqual(expected_recipient_ids, event_user_ids)
|
2021-02-12 08:20:45 +01:00
|
|
|
self.assertEqual(event["sender"]["email"], sender.email)
|
2018-08-28 21:48:47 +02:00
|
|
|
self.assertEqual(event_recipient_emails, expected_recipient_emails)
|
2021-02-12 08:20:45 +01:00
|
|
|
self.assertEqual(event["type"], "typing")
|
|
|
|
self.assertEqual(event["op"], "start")
|
2018-08-28 21:48:47 +02:00
|
|
|
|
2020-02-22 13:38:09 +01:00
|
|
|
def test_start_to_self(self) -> None:
|
2018-08-28 21:48:47 +02:00
|
|
|
"""
|
|
|
|
Sending typing notification to yourself (using user IDs)
|
|
|
|
is successful.
|
|
|
|
"""
|
2021-02-12 08:20:45 +01:00
|
|
|
user = self.example_user("hamlet")
|
2018-08-28 21:48:47 +02:00
|
|
|
email = user.email
|
2020-04-09 21:51:58 +02:00
|
|
|
expected_recipient_emails = {email}
|
|
|
|
expected_recipient_ids = {user.id}
|
python: Convert assignment type annotations to Python 3.6 style.
This commit was split by tabbott; this piece covers the vast majority
of files in Zulip, but excludes scripts/, tools/, and puppet/ to help
ensure we at least show the right error messages for Xenial systems.
We can likely further refine the remaining pieces with some testing.
Generated by com2ann, with whitespace fixes and various manual fixes
for runtime issues:
- invoiced_through: Optional[LicenseLedger] = models.ForeignKey(
+ invoiced_through: Optional["LicenseLedger"] = models.ForeignKey(
-_apns_client: Optional[APNsClient] = None
+_apns_client: Optional["APNsClient"] = None
- notifications_stream: Optional[Stream] = models.ForeignKey('Stream', related_name='+', null=True, blank=True, on_delete=CASCADE)
- signup_notifications_stream: Optional[Stream] = models.ForeignKey('Stream', related_name='+', null=True, blank=True, on_delete=CASCADE)
+ notifications_stream: Optional["Stream"] = models.ForeignKey('Stream', related_name='+', null=True, blank=True, on_delete=CASCADE)
+ signup_notifications_stream: Optional["Stream"] = models.ForeignKey('Stream', related_name='+', null=True, blank=True, on_delete=CASCADE)
- author: Optional[UserProfile] = models.ForeignKey('UserProfile', blank=True, null=True, on_delete=CASCADE)
+ author: Optional["UserProfile"] = models.ForeignKey('UserProfile', blank=True, null=True, on_delete=CASCADE)
- bot_owner: Optional[UserProfile] = models.ForeignKey('self', null=True, on_delete=models.SET_NULL)
+ bot_owner: Optional["UserProfile"] = models.ForeignKey('self', null=True, on_delete=models.SET_NULL)
- default_sending_stream: Optional[Stream] = models.ForeignKey('zerver.Stream', null=True, related_name='+', on_delete=CASCADE)
- default_events_register_stream: Optional[Stream] = models.ForeignKey('zerver.Stream', null=True, related_name='+', on_delete=CASCADE)
+ default_sending_stream: Optional["Stream"] = models.ForeignKey('zerver.Stream', null=True, related_name='+', on_delete=CASCADE)
+ default_events_register_stream: Optional["Stream"] = models.ForeignKey('zerver.Stream', null=True, related_name='+', on_delete=CASCADE)
-descriptors_by_handler_id: Dict[int, ClientDescriptor] = {}
+descriptors_by_handler_id: Dict[int, "ClientDescriptor"] = {}
-worker_classes: Dict[str, Type[QueueProcessingWorker]] = {}
-queues: Dict[str, Dict[str, Type[QueueProcessingWorker]]] = {}
+worker_classes: Dict[str, Type["QueueProcessingWorker"]] = {}
+queues: Dict[str, Dict[str, Type["QueueProcessingWorker"]]] = {}
-AUTH_LDAP_REVERSE_EMAIL_SEARCH: Optional[LDAPSearch] = None
+AUTH_LDAP_REVERSE_EMAIL_SEARCH: Optional["LDAPSearch"] = None
Signed-off-by: Anders Kaseorg <anders@zulipchat.com>
2020-04-22 01:09:50 +02:00
|
|
|
events: List[Mapping[str, Any]] = []
|
2021-05-28 07:27:50 +02:00
|
|
|
with self.tornado_redirected_to_list(events, expected_num_events=1):
|
2018-08-28 21:48:47 +02:00
|
|
|
result = self.api_post(
|
2020-03-10 11:48:26 +01:00
|
|
|
user,
|
2021-02-12 08:20:45 +01:00
|
|
|
"/api/v1/typing",
|
2018-08-28 21:48:47 +02:00
|
|
|
{
|
2021-02-12 08:20:45 +01:00
|
|
|
"to": orjson.dumps([user.id]).decode(),
|
|
|
|
"op": "start",
|
python: Use trailing commas consistently.
Automatically generated by the following script, based on the output
of lint with flake8-comma:
import re
import sys
last_filename = None
last_row = None
lines = []
for msg in sys.stdin:
m = re.match(
r"\x1b\[35mflake8 \|\x1b\[0m \x1b\[1;31m(.+):(\d+):(\d+): (\w+)", msg
)
if m:
filename, row_str, col_str, err = m.groups()
row, col = int(row_str), int(col_str)
if filename == last_filename:
assert last_row != row
else:
if last_filename is not None:
with open(last_filename, "w") as f:
f.writelines(lines)
with open(filename) as f:
lines = f.readlines()
last_filename = filename
last_row = row
line = lines[row - 1]
if err in ["C812", "C815"]:
lines[row - 1] = line[: col - 1] + "," + line[col - 1 :]
elif err in ["C819"]:
assert line[col - 2] == ","
lines[row - 1] = line[: col - 2] + line[col - 1 :].lstrip(" ")
if last_filename is not None:
with open(last_filename, "w") as f:
f.writelines(lines)
Signed-off-by: Anders Kaseorg <anders@zulipchat.com>
2020-04-10 05:23:40 +02:00
|
|
|
},
|
2018-08-28 21:48:47 +02:00
|
|
|
)
|
|
|
|
self.assert_json_success(result)
|
2021-05-17 05:41:32 +02:00
|
|
|
self.assert_length(events, 1)
|
2016-10-12 20:57:59 +02:00
|
|
|
|
2021-02-12 08:20:45 +01:00
|
|
|
event = events[0]["event"]
|
|
|
|
event_recipient_emails = {user["email"] for user in event["recipients"]}
|
|
|
|
event_user_ids = set(events[0]["users"])
|
|
|
|
event_recipient_user_ids = {user["user_id"] for user in event["recipients"]}
|
2016-10-12 20:57:59 +02:00
|
|
|
|
2016-10-28 18:50:21 +02:00
|
|
|
self.assertEqual(expected_recipient_ids, event_recipient_user_ids)
|
|
|
|
self.assertEqual(expected_recipient_ids, event_user_ids)
|
|
|
|
self.assertEqual(event_recipient_emails, expected_recipient_emails)
|
2021-02-12 08:20:45 +01:00
|
|
|
self.assertEqual(event["sender"]["email"], email)
|
|
|
|
self.assertEqual(event["type"], "typing")
|
|
|
|
self.assertEqual(event["op"], "start")
|
2016-10-12 20:57:59 +02:00
|
|
|
|
2020-02-22 13:38:09 +01:00
|
|
|
def test_start_to_another_user(self) -> None:
|
2016-10-12 20:57:59 +02:00
|
|
|
"""
|
|
|
|
Sending typing notification to another user
|
|
|
|
is successful.
|
|
|
|
"""
|
2021-02-12 08:20:45 +01:00
|
|
|
sender = self.example_user("hamlet")
|
|
|
|
recipient = self.example_user("othello")
|
2020-04-09 21:51:58 +02:00
|
|
|
expected_recipients = {sender, recipient}
|
|
|
|
expected_recipient_emails = {user.email for user in expected_recipients}
|
|
|
|
expected_recipient_ids = {user.id for user in expected_recipients}
|
2016-10-28 18:50:21 +02:00
|
|
|
|
2020-02-22 13:38:09 +01:00
|
|
|
params = dict(
|
2020-08-07 01:09:47 +02:00
|
|
|
to=orjson.dumps([recipient.id]).decode(),
|
2021-02-12 08:20:45 +01:00
|
|
|
op="start",
|
2020-02-22 13:38:09 +01:00
|
|
|
)
|
|
|
|
|
python: Convert assignment type annotations to Python 3.6 style.
This commit was split by tabbott; this piece covers the vast majority
of files in Zulip, but excludes scripts/, tools/, and puppet/ to help
ensure we at least show the right error messages for Xenial systems.
We can likely further refine the remaining pieces with some testing.
Generated by com2ann, with whitespace fixes and various manual fixes
for runtime issues:
- invoiced_through: Optional[LicenseLedger] = models.ForeignKey(
+ invoiced_through: Optional["LicenseLedger"] = models.ForeignKey(
-_apns_client: Optional[APNsClient] = None
+_apns_client: Optional["APNsClient"] = None
- notifications_stream: Optional[Stream] = models.ForeignKey('Stream', related_name='+', null=True, blank=True, on_delete=CASCADE)
- signup_notifications_stream: Optional[Stream] = models.ForeignKey('Stream', related_name='+', null=True, blank=True, on_delete=CASCADE)
+ notifications_stream: Optional["Stream"] = models.ForeignKey('Stream', related_name='+', null=True, blank=True, on_delete=CASCADE)
+ signup_notifications_stream: Optional["Stream"] = models.ForeignKey('Stream', related_name='+', null=True, blank=True, on_delete=CASCADE)
- author: Optional[UserProfile] = models.ForeignKey('UserProfile', blank=True, null=True, on_delete=CASCADE)
+ author: Optional["UserProfile"] = models.ForeignKey('UserProfile', blank=True, null=True, on_delete=CASCADE)
- bot_owner: Optional[UserProfile] = models.ForeignKey('self', null=True, on_delete=models.SET_NULL)
+ bot_owner: Optional["UserProfile"] = models.ForeignKey('self', null=True, on_delete=models.SET_NULL)
- default_sending_stream: Optional[Stream] = models.ForeignKey('zerver.Stream', null=True, related_name='+', on_delete=CASCADE)
- default_events_register_stream: Optional[Stream] = models.ForeignKey('zerver.Stream', null=True, related_name='+', on_delete=CASCADE)
+ default_sending_stream: Optional["Stream"] = models.ForeignKey('zerver.Stream', null=True, related_name='+', on_delete=CASCADE)
+ default_events_register_stream: Optional["Stream"] = models.ForeignKey('zerver.Stream', null=True, related_name='+', on_delete=CASCADE)
-descriptors_by_handler_id: Dict[int, ClientDescriptor] = {}
+descriptors_by_handler_id: Dict[int, "ClientDescriptor"] = {}
-worker_classes: Dict[str, Type[QueueProcessingWorker]] = {}
-queues: Dict[str, Dict[str, Type[QueueProcessingWorker]]] = {}
+worker_classes: Dict[str, Type["QueueProcessingWorker"]] = {}
+queues: Dict[str, Dict[str, Type["QueueProcessingWorker"]]] = {}
-AUTH_LDAP_REVERSE_EMAIL_SEARCH: Optional[LDAPSearch] = None
+AUTH_LDAP_REVERSE_EMAIL_SEARCH: Optional["LDAPSearch"] = None
Signed-off-by: Anders Kaseorg <anders@zulipchat.com>
2020-04-22 01:09:50 +02:00
|
|
|
events: List[Mapping[str, Any]] = []
|
2021-05-28 07:27:50 +02:00
|
|
|
with self.tornado_redirected_to_list(events, expected_num_events=1):
|
2021-02-12 08:20:45 +01:00
|
|
|
result = self.api_post(sender, "/api/v1/typing", params)
|
2020-02-22 13:38:09 +01:00
|
|
|
|
2016-10-12 20:57:59 +02:00
|
|
|
self.assert_json_success(result)
|
2021-05-17 05:41:32 +02:00
|
|
|
self.assert_length(events, 1)
|
2016-10-12 20:57:59 +02:00
|
|
|
|
2021-02-12 08:20:45 +01:00
|
|
|
event = events[0]["event"]
|
|
|
|
event_recipient_emails = {user["email"] for user in event["recipients"]}
|
|
|
|
event_user_ids = set(events[0]["users"])
|
|
|
|
event_recipient_user_ids = {user["user_id"] for user in event["recipients"]}
|
2016-10-28 18:50:21 +02:00
|
|
|
|
|
|
|
self.assertEqual(expected_recipient_ids, event_recipient_user_ids)
|
|
|
|
self.assertEqual(expected_recipient_ids, event_user_ids)
|
|
|
|
self.assertEqual(event_recipient_emails, expected_recipient_emails)
|
2021-02-12 08:20:45 +01:00
|
|
|
self.assertEqual(event["sender"]["email"], sender.email)
|
|
|
|
self.assertEqual(event["type"], "typing")
|
|
|
|
self.assertEqual(event["op"], "start")
|
2016-10-12 20:57:59 +02:00
|
|
|
|
2020-02-22 13:38:09 +01:00
|
|
|
def test_stop_to_self(self) -> None:
|
2016-10-12 20:57:59 +02:00
|
|
|
"""
|
|
|
|
Sending stopped typing notification to yourself
|
|
|
|
is successful.
|
|
|
|
"""
|
2021-02-12 08:20:45 +01:00
|
|
|
user = self.example_user("hamlet")
|
2017-05-08 16:23:43 +02:00
|
|
|
email = user.email
|
2020-04-09 21:51:58 +02:00
|
|
|
expected_recipient_emails = {email}
|
|
|
|
expected_recipient_ids = {user.id}
|
2016-10-12 20:57:59 +02:00
|
|
|
|
python: Convert assignment type annotations to Python 3.6 style.
This commit was split by tabbott; this piece covers the vast majority
of files in Zulip, but excludes scripts/, tools/, and puppet/ to help
ensure we at least show the right error messages for Xenial systems.
We can likely further refine the remaining pieces with some testing.
Generated by com2ann, with whitespace fixes and various manual fixes
for runtime issues:
- invoiced_through: Optional[LicenseLedger] = models.ForeignKey(
+ invoiced_through: Optional["LicenseLedger"] = models.ForeignKey(
-_apns_client: Optional[APNsClient] = None
+_apns_client: Optional["APNsClient"] = None
- notifications_stream: Optional[Stream] = models.ForeignKey('Stream', related_name='+', null=True, blank=True, on_delete=CASCADE)
- signup_notifications_stream: Optional[Stream] = models.ForeignKey('Stream', related_name='+', null=True, blank=True, on_delete=CASCADE)
+ notifications_stream: Optional["Stream"] = models.ForeignKey('Stream', related_name='+', null=True, blank=True, on_delete=CASCADE)
+ signup_notifications_stream: Optional["Stream"] = models.ForeignKey('Stream', related_name='+', null=True, blank=True, on_delete=CASCADE)
- author: Optional[UserProfile] = models.ForeignKey('UserProfile', blank=True, null=True, on_delete=CASCADE)
+ author: Optional["UserProfile"] = models.ForeignKey('UserProfile', blank=True, null=True, on_delete=CASCADE)
- bot_owner: Optional[UserProfile] = models.ForeignKey('self', null=True, on_delete=models.SET_NULL)
+ bot_owner: Optional["UserProfile"] = models.ForeignKey('self', null=True, on_delete=models.SET_NULL)
- default_sending_stream: Optional[Stream] = models.ForeignKey('zerver.Stream', null=True, related_name='+', on_delete=CASCADE)
- default_events_register_stream: Optional[Stream] = models.ForeignKey('zerver.Stream', null=True, related_name='+', on_delete=CASCADE)
+ default_sending_stream: Optional["Stream"] = models.ForeignKey('zerver.Stream', null=True, related_name='+', on_delete=CASCADE)
+ default_events_register_stream: Optional["Stream"] = models.ForeignKey('zerver.Stream', null=True, related_name='+', on_delete=CASCADE)
-descriptors_by_handler_id: Dict[int, ClientDescriptor] = {}
+descriptors_by_handler_id: Dict[int, "ClientDescriptor"] = {}
-worker_classes: Dict[str, Type[QueueProcessingWorker]] = {}
-queues: Dict[str, Dict[str, Type[QueueProcessingWorker]]] = {}
+worker_classes: Dict[str, Type["QueueProcessingWorker"]] = {}
+queues: Dict[str, Dict[str, Type["QueueProcessingWorker"]]] = {}
-AUTH_LDAP_REVERSE_EMAIL_SEARCH: Optional[LDAPSearch] = None
+AUTH_LDAP_REVERSE_EMAIL_SEARCH: Optional["LDAPSearch"] = None
Signed-off-by: Anders Kaseorg <anders@zulipchat.com>
2020-04-22 01:09:50 +02:00
|
|
|
events: List[Mapping[str, Any]] = []
|
2021-05-28 07:27:50 +02:00
|
|
|
with self.tornado_redirected_to_list(events, expected_num_events=1):
|
2020-02-22 13:38:09 +01:00
|
|
|
params = dict(
|
2020-08-07 01:09:47 +02:00
|
|
|
to=orjson.dumps([user.id]).decode(),
|
2021-02-12 08:20:45 +01:00
|
|
|
op="stop",
|
2020-02-22 13:38:09 +01:00
|
|
|
)
|
2021-02-12 08:20:45 +01:00
|
|
|
result = self.api_post(user, "/api/v1/typing", params)
|
2020-02-22 13:38:09 +01:00
|
|
|
|
2016-10-12 20:57:59 +02:00
|
|
|
self.assert_json_success(result)
|
2021-05-17 05:41:32 +02:00
|
|
|
self.assert_length(events, 1)
|
2016-10-12 20:57:59 +02:00
|
|
|
|
2021-02-12 08:20:45 +01:00
|
|
|
event = events[0]["event"]
|
|
|
|
event_recipient_emails = {user["email"] for user in event["recipients"]}
|
|
|
|
event_user_ids = set(events[0]["users"])
|
|
|
|
event_recipient_user_ids = {user["user_id"] for user in event["recipients"]}
|
2016-10-28 18:50:21 +02:00
|
|
|
|
|
|
|
self.assertEqual(expected_recipient_ids, event_recipient_user_ids)
|
|
|
|
self.assertEqual(expected_recipient_ids, event_user_ids)
|
|
|
|
self.assertEqual(event_recipient_emails, expected_recipient_emails)
|
2021-02-12 08:20:45 +01:00
|
|
|
self.assertEqual(event["sender"]["email"], email)
|
|
|
|
self.assertEqual(event["type"], "typing")
|
|
|
|
self.assertEqual(event["op"], "stop")
|
2016-10-12 20:57:59 +02:00
|
|
|
|
2020-02-22 13:38:09 +01:00
|
|
|
def test_stop_to_another_user(self) -> None:
|
2016-10-12 20:57:59 +02:00
|
|
|
"""
|
|
|
|
Sending stopped typing notification to another user
|
|
|
|
is successful.
|
|
|
|
"""
|
2021-02-12 08:20:45 +01:00
|
|
|
sender = self.example_user("hamlet")
|
|
|
|
recipient = self.example_user("othello")
|
2020-04-09 21:51:58 +02:00
|
|
|
expected_recipients = {sender, recipient}
|
|
|
|
expected_recipient_emails = {user.email for user in expected_recipients}
|
|
|
|
expected_recipient_ids = {user.id for user in expected_recipients}
|
2016-10-28 18:50:21 +02:00
|
|
|
|
python: Convert assignment type annotations to Python 3.6 style.
This commit was split by tabbott; this piece covers the vast majority
of files in Zulip, but excludes scripts/, tools/, and puppet/ to help
ensure we at least show the right error messages for Xenial systems.
We can likely further refine the remaining pieces with some testing.
Generated by com2ann, with whitespace fixes and various manual fixes
for runtime issues:
- invoiced_through: Optional[LicenseLedger] = models.ForeignKey(
+ invoiced_through: Optional["LicenseLedger"] = models.ForeignKey(
-_apns_client: Optional[APNsClient] = None
+_apns_client: Optional["APNsClient"] = None
- notifications_stream: Optional[Stream] = models.ForeignKey('Stream', related_name='+', null=True, blank=True, on_delete=CASCADE)
- signup_notifications_stream: Optional[Stream] = models.ForeignKey('Stream', related_name='+', null=True, blank=True, on_delete=CASCADE)
+ notifications_stream: Optional["Stream"] = models.ForeignKey('Stream', related_name='+', null=True, blank=True, on_delete=CASCADE)
+ signup_notifications_stream: Optional["Stream"] = models.ForeignKey('Stream', related_name='+', null=True, blank=True, on_delete=CASCADE)
- author: Optional[UserProfile] = models.ForeignKey('UserProfile', blank=True, null=True, on_delete=CASCADE)
+ author: Optional["UserProfile"] = models.ForeignKey('UserProfile', blank=True, null=True, on_delete=CASCADE)
- bot_owner: Optional[UserProfile] = models.ForeignKey('self', null=True, on_delete=models.SET_NULL)
+ bot_owner: Optional["UserProfile"] = models.ForeignKey('self', null=True, on_delete=models.SET_NULL)
- default_sending_stream: Optional[Stream] = models.ForeignKey('zerver.Stream', null=True, related_name='+', on_delete=CASCADE)
- default_events_register_stream: Optional[Stream] = models.ForeignKey('zerver.Stream', null=True, related_name='+', on_delete=CASCADE)
+ default_sending_stream: Optional["Stream"] = models.ForeignKey('zerver.Stream', null=True, related_name='+', on_delete=CASCADE)
+ default_events_register_stream: Optional["Stream"] = models.ForeignKey('zerver.Stream', null=True, related_name='+', on_delete=CASCADE)
-descriptors_by_handler_id: Dict[int, ClientDescriptor] = {}
+descriptors_by_handler_id: Dict[int, "ClientDescriptor"] = {}
-worker_classes: Dict[str, Type[QueueProcessingWorker]] = {}
-queues: Dict[str, Dict[str, Type[QueueProcessingWorker]]] = {}
+worker_classes: Dict[str, Type["QueueProcessingWorker"]] = {}
+queues: Dict[str, Dict[str, Type["QueueProcessingWorker"]]] = {}
-AUTH_LDAP_REVERSE_EMAIL_SEARCH: Optional[LDAPSearch] = None
+AUTH_LDAP_REVERSE_EMAIL_SEARCH: Optional["LDAPSearch"] = None
Signed-off-by: Anders Kaseorg <anders@zulipchat.com>
2020-04-22 01:09:50 +02:00
|
|
|
events: List[Mapping[str, Any]] = []
|
2021-05-28 07:27:50 +02:00
|
|
|
with self.tornado_redirected_to_list(events, expected_num_events=1):
|
2020-02-22 13:38:09 +01:00
|
|
|
params = dict(
|
2020-08-07 01:09:47 +02:00
|
|
|
to=orjson.dumps([recipient.id]).decode(),
|
2021-02-12 08:20:45 +01:00
|
|
|
op="stop",
|
2020-02-22 13:38:09 +01:00
|
|
|
)
|
2021-02-12 08:20:45 +01:00
|
|
|
result = self.api_post(sender, "/api/v1/typing", params)
|
2020-02-22 13:38:09 +01:00
|
|
|
|
2016-10-12 20:57:59 +02:00
|
|
|
self.assert_json_success(result)
|
2021-05-17 05:41:32 +02:00
|
|
|
self.assert_length(events, 1)
|
2016-10-12 20:57:59 +02:00
|
|
|
|
2021-02-12 08:20:45 +01:00
|
|
|
event = events[0]["event"]
|
|
|
|
event_recipient_emails = {user["email"] for user in event["recipients"]}
|
|
|
|
event_user_ids = set(events[0]["users"])
|
|
|
|
event_recipient_user_ids = {user["user_id"] for user in event["recipients"]}
|
2016-10-28 18:50:21 +02:00
|
|
|
|
|
|
|
self.assertEqual(expected_recipient_ids, event_recipient_user_ids)
|
|
|
|
self.assertEqual(expected_recipient_ids, event_user_ids)
|
|
|
|
self.assertEqual(event_recipient_emails, expected_recipient_emails)
|
2021-02-12 08:20:45 +01:00
|
|
|
self.assertEqual(event["sender"]["email"], sender.email)
|
|
|
|
self.assertEqual(event["type"], "typing")
|
|
|
|
self.assertEqual(event["op"], "stop")
|
2020-12-24 21:00:20 +01:00
|
|
|
|
|
|
|
|
|
|
|
class TypingHappyPathTestStreams(ZulipTestCase):
|
|
|
|
def test_start(self) -> None:
|
|
|
|
sender = self.example_user("hamlet")
|
|
|
|
stream_name = self.get_streams(sender)[0]
|
|
|
|
stream_id = self.get_stream_id(stream_name)
|
|
|
|
topic = "Some topic"
|
|
|
|
|
|
|
|
expected_user_ids = {
|
|
|
|
user_profile.id
|
|
|
|
for user_profile in self.users_subscribed_to_stream(stream_name, sender.realm)
|
|
|
|
}
|
|
|
|
|
|
|
|
params = dict(
|
|
|
|
type="stream",
|
|
|
|
op="start",
|
|
|
|
to=orjson.dumps([stream_id]).decode(),
|
|
|
|
topic=topic,
|
|
|
|
)
|
|
|
|
|
|
|
|
events: List[Mapping[str, Any]] = []
|
2022-10-15 22:47:40 +02:00
|
|
|
with self.assert_database_query_count(5):
|
2021-05-28 07:27:50 +02:00
|
|
|
with self.tornado_redirected_to_list(events, expected_num_events=1):
|
2020-12-24 21:00:20 +01:00
|
|
|
result = self.api_post(sender, "/api/v1/typing", params)
|
|
|
|
self.assert_json_success(result)
|
2021-05-17 05:41:32 +02:00
|
|
|
self.assert_length(events, 1)
|
2020-12-24 21:00:20 +01:00
|
|
|
|
|
|
|
event = events[0]["event"]
|
|
|
|
event_user_ids = set(events[0]["users"])
|
|
|
|
|
|
|
|
self.assertEqual(expected_user_ids, event_user_ids)
|
|
|
|
self.assertEqual(sender.email, event["sender"]["email"])
|
|
|
|
self.assertEqual(stream_id, event["stream_id"])
|
|
|
|
self.assertEqual(topic, event["topic"])
|
|
|
|
self.assertEqual("typing", event["type"])
|
|
|
|
self.assertEqual("start", event["op"])
|
|
|
|
|
|
|
|
def test_stop(self) -> None:
|
|
|
|
sender = self.example_user("hamlet")
|
|
|
|
stream_name = self.get_streams(sender)[0]
|
|
|
|
stream_id = self.get_stream_id(stream_name)
|
|
|
|
topic = "Some topic"
|
|
|
|
|
|
|
|
expected_user_ids = {
|
|
|
|
user_profile.id
|
|
|
|
for user_profile in self.users_subscribed_to_stream(stream_name, sender.realm)
|
|
|
|
}
|
|
|
|
|
|
|
|
params = dict(
|
|
|
|
type="stream",
|
|
|
|
op="stop",
|
|
|
|
to=orjson.dumps([stream_id]).decode(),
|
|
|
|
topic=topic,
|
|
|
|
)
|
|
|
|
|
|
|
|
events: List[Mapping[str, Any]] = []
|
2022-10-15 22:47:40 +02:00
|
|
|
with self.assert_database_query_count(5):
|
2021-05-28 07:27:50 +02:00
|
|
|
with self.tornado_redirected_to_list(events, expected_num_events=1):
|
2020-12-24 21:00:20 +01:00
|
|
|
result = self.api_post(sender, "/api/v1/typing", params)
|
|
|
|
self.assert_json_success(result)
|
2021-05-17 05:41:32 +02:00
|
|
|
self.assert_length(events, 1)
|
2020-12-24 21:00:20 +01:00
|
|
|
|
|
|
|
event = events[0]["event"]
|
|
|
|
event_user_ids = set(events[0]["users"])
|
|
|
|
|
|
|
|
self.assertEqual(expected_user_ids, event_user_ids)
|
|
|
|
self.assertEqual(sender.email, event["sender"]["email"])
|
|
|
|
self.assertEqual(stream_id, event["stream_id"])
|
|
|
|
self.assertEqual(topic, event["topic"])
|
|
|
|
self.assertEqual("typing", event["type"])
|
|
|
|
self.assertEqual("stop", event["op"])
|
2021-07-25 21:30:44 +02:00
|
|
|
|
|
|
|
|
|
|
|
class TestSendTypingNotificationsSettings(ZulipTestCase):
|
|
|
|
def test_send_private_typing_notifications_setting(self) -> None:
|
|
|
|
sender = self.example_user("hamlet")
|
|
|
|
recipient_user = self.example_user("othello")
|
|
|
|
expected_recipients = {sender, recipient_user}
|
|
|
|
expected_recipient_ids = {user.id for user in expected_recipients}
|
|
|
|
|
|
|
|
params = dict(
|
|
|
|
to=orjson.dumps([recipient_user.id]).decode(),
|
|
|
|
op="start",
|
|
|
|
)
|
|
|
|
|
|
|
|
# Test typing events sent when `send_private_typing_notifications` set to `True`.
|
|
|
|
self.assertTrue(sender.send_private_typing_notifications)
|
|
|
|
|
|
|
|
events: List[Mapping[str, Any]] = []
|
|
|
|
with self.tornado_redirected_to_list(events, expected_num_events=1):
|
|
|
|
result = self.api_post(sender, "/api/v1/typing", params)
|
|
|
|
|
|
|
|
self.assert_json_success(result)
|
|
|
|
self.assert_length(events, 1)
|
|
|
|
event_user_ids = set(events[0]["users"])
|
|
|
|
self.assertEqual(expected_recipient_ids, event_user_ids)
|
|
|
|
self.assertEqual(orjson.loads(result.content)["msg"], "")
|
|
|
|
|
|
|
|
sender.send_private_typing_notifications = False
|
|
|
|
sender.save()
|
|
|
|
|
|
|
|
# No events should be sent now
|
|
|
|
events = []
|
|
|
|
with self.tornado_redirected_to_list(events, expected_num_events=0):
|
|
|
|
result = self.api_post(sender, "/api/v1/typing", params)
|
|
|
|
|
|
|
|
self.assert_json_error(
|
|
|
|
result, "User has disabled typing notifications for private messages"
|
|
|
|
)
|
|
|
|
self.assertEqual(events, [])
|
|
|
|
|
|
|
|
def test_send_stream_typing_notifications_setting(self) -> None:
|
|
|
|
sender = self.example_user("hamlet")
|
|
|
|
stream_name = self.get_streams(sender)[0]
|
|
|
|
stream_id = self.get_stream_id(stream_name)
|
|
|
|
topic = "Some topic"
|
|
|
|
|
|
|
|
expected_user_ids = {
|
|
|
|
user_profile.id
|
|
|
|
for user_profile in self.users_subscribed_to_stream(stream_name, sender.realm)
|
|
|
|
}
|
|
|
|
|
|
|
|
params = dict(
|
|
|
|
type="stream",
|
|
|
|
op="start",
|
|
|
|
to=orjson.dumps([stream_id]).decode(),
|
|
|
|
topic=topic,
|
|
|
|
)
|
|
|
|
|
|
|
|
# Test typing events sent when `send_stream_typing_notifications` set to `True`.
|
|
|
|
self.assertTrue(sender.send_stream_typing_notifications)
|
|
|
|
|
|
|
|
events: List[Mapping[str, Any]] = []
|
|
|
|
with self.tornado_redirected_to_list(events, expected_num_events=1):
|
|
|
|
result = self.api_post(sender, "/api/v1/typing", params)
|
|
|
|
self.assert_json_success(result)
|
|
|
|
self.assert_length(events, 1)
|
|
|
|
self.assertEqual(orjson.loads(result.content)["msg"], "")
|
|
|
|
event_user_ids = set(events[0]["users"])
|
|
|
|
self.assertEqual(expected_user_ids, event_user_ids)
|
|
|
|
|
|
|
|
sender.send_stream_typing_notifications = False
|
|
|
|
sender.save()
|
|
|
|
|
|
|
|
# No events should be sent now
|
|
|
|
events = []
|
|
|
|
with self.tornado_redirected_to_list(events, expected_num_events=0):
|
|
|
|
result = self.api_post(sender, "/api/v1/typing", params)
|
|
|
|
self.assert_json_error(result, "User has disabled typing notifications for stream messages")
|
|
|
|
self.assertEqual(events, [])
|