test_events: Bring zerver/lib/events.py to 100% coverage.

This commit is contained in:
Tim Abbott 2017-03-23 22:38:06 -07:00
parent 0c16cc1c1e
commit a6ae546f59
3 changed files with 42 additions and 7 deletions

View File

@ -75,9 +75,7 @@ not_yet_fully_covered = {
'confirmation/models.py', 'confirmation/models.py',
'zerver/decorator.py', 'zerver/decorator.py',
'zerver/lib/actions.py', 'zerver/lib/actions.py',
'zerver/lib/events.py',
'zerver/lib/bugdown/__init__.py', 'zerver/lib/bugdown/__init__.py',
'zerver/lib/events.py',
'zerver/lib/i18n.py', 'zerver/lib/i18n.py',
'zerver/lib/message.py', 'zerver/lib/message.py',
'zerver/lib/notifications.py', 'zerver/lib/notifications.py',

View File

@ -407,7 +407,7 @@ def apply_event(state, event, user_profile, include_subscribers):
elif event['notification_name'] == "enable_digest_emails": elif event['notification_name'] == "enable_digest_emails":
state['enable_digest_emails'] = event['setting'] state['enable_digest_emails'] = event['setting']
else: else:
raise ValueError("Unexpected event type %s" % (event['type'],)) raise AssertionError("Unexpected event type %s" % (event['type'],))
def do_events_register(user_profile, user_client, apply_markdown=True, def do_events_register(user_profile, user_client, apply_markdown=True,
event_types=None, queue_lifespan_secs=0, all_public_streams=False, event_types=None, queue_lifespan_secs=0, all_public_streams=False,

View File

@ -12,7 +12,7 @@ from django.utils import timezone
from zerver.models import ( from zerver.models import (
get_client, get_realm, get_recipient, get_stream, get_user_profile_by_email, get_client, get_realm, get_recipient, get_stream, get_user_profile_by_email,
Message, RealmAlias, Recipient, UserPresence, UserProfile Message, RealmAlias, Recipient, UserMessage, UserPresence, UserProfile
) )
from zerver.lib.actions import ( from zerver.lib.actions import (
@ -102,14 +102,42 @@ class EventsEndpointTest(ZulipTestCase):
def test_events_register_endpoint(self): def test_events_register_endpoint(self):
# type: () -> None # type: () -> None
# This test is intended to get minimal coverage on # This test is intended to get minimal coverage on the
# zerver.views.events_register.events_register_backend, so we can have # events_register code paths
# 100% views coverage.
email = 'hamlet@zulip.com' email = 'hamlet@zulip.com'
with mock.patch('zerver.views.events_register.do_events_register', return_value={}): with mock.patch('zerver.views.events_register.do_events_register', return_value={}):
result = self.client_post('/json/register', **self.api_auth(email)) result = self.client_post('/json/register', **self.api_auth(email))
self.assert_json_success(result) self.assert_json_success(result)
with mock.patch('zerver.lib.events.request_event_queue', return_value=None):
result = self.client_post('/json/register', **self.api_auth(email))
self.assert_json_error(result, "Could not allocate event queue")
with mock.patch('zerver.lib.events.request_event_queue', return_value='15:11'):
with mock.patch('zerver.lib.events.get_user_events',
return_value=[]):
result = self.client_post('/json/register', dict(event_types=ujson.dumps(['pointer'])),
**self.api_auth(email))
self.assert_json_success(result)
result_dict = ujson.loads(result.content)
self.assertEqual(result_dict['last_event_id'], -1)
self.assertEqual(result_dict['queue_id'], '15:11')
with mock.patch('zerver.lib.events.request_event_queue', return_value='15:12'):
with mock.patch('zerver.lib.events.get_user_events',
return_value=[{
'id': 6,
'type': 'pointer',
'pointer': 15,
}]):
result = self.client_post('/json/register', dict(event_types=ujson.dumps(['pointer'])),
**self.api_auth(email))
self.assert_json_success(result)
result_dict = ujson.loads(result.content)
self.assertEqual(result_dict['last_event_id'], 6)
self.assertEqual(result_dict['pointer'], 15)
self.assertEqual(result_dict['queue_id'], '15:12')
def test_tornado_endpoint(self): def test_tornado_endpoint(self):
# type: () -> None # type: () -> None
@ -1622,6 +1650,15 @@ class FetchInitialStateDataTest(ZulipTestCase):
result = fetch_initial_state_data(user_profile, None, "") result = fetch_initial_state_data(user_profile, None, "")
self.assertTrue(len(result['realm_bots']) > 5) self.assertTrue(len(result['realm_bots']) > 5)
def test_max_message_id_with_no_history(self):
# type: () -> None
email = 'aaron@zulip.com'
user_profile = get_user_profile_by_email(email)
# Delete all historical messages for this user
UserMessage.objects.filter(user_profile=user_profile).delete()
result = fetch_initial_state_data(user_profile, None, "")
self.assertEqual(result['max_message_id'], -1)
class EventQueueTest(TestCase): class EventQueueTest(TestCase):
def test_one_event(self): def test_one_event(self):
# type: () -> None # type: () -> None