Send an event when a stream is created, is deleted, becomes occupied, or becomes vacant

A stream is vacant when it has no subscribers and occupied when it has at least
one subscriber.

We have a slightly odd model where stream creation is conflated with
subscription creation.  Streams are created by attempting to subscribe to a
stream that doesn't exist.  We also hide streams with no subscribers from users
to make it seem like they've gone away.  However, we can't actually remove those
streams because we want to preserve history.

This commit moves us towards a separation of these two concepts.  By sending
events for stream creation, occupation, vacancy, and deletion, we allow clients
to directly observe the global state of streams rather than indirectly observing
subscription information.  A more complete solution would involve adding a view
for explicitly creating streams without subscribing to them.

This commit does not handle the intricacies of invite-only streams.  We
currently simply do not send these events for invite-only streams.

(imported from commit 5430e5a5eecefafcdba4f5d4f9aa665556fcc559)
This commit is contained in:
Zev Benjamin 2014-03-02 00:46:54 -05:00
parent 5c44fa9a29
commit 814aed7cbe
6 changed files with 124 additions and 40 deletions

View File

@ -216,6 +216,7 @@ def do_deactivate_stream(stream, log=True):
for user_profile in user_profiles: for user_profile in user_profiles:
do_remove_subscription(user_profile, stream) do_remove_subscription(user_profile, stream)
was_invite_only = stream.invite_only
stream.deactivated = True stream.deactivated = True
stream.invite_only = True stream.invite_only = True
# Preserve as much as possible the original stream name while giving it a # Preserve as much as possible the original stream name while giving it a
@ -242,6 +243,13 @@ def do_deactivate_stream(stream, log=True):
old_cache_key = get_stream_cache_key(old_name, stream.realm) old_cache_key = get_stream_cache_key(old_name, stream.realm)
cache_delete(old_cache_key) cache_delete(old_cache_key)
if not was_invite_only:
stream_dict = stream.to_dict()
stream_dict.update(dict(name=old_name, invite_only=was_invite_only))
event = dict(type="stream", op="delete",
streams=[stream_dict])
send_event(event, active_user_ids(stream.realm))
return return
def do_change_user_email(user_profile, new_email): def do_change_user_email(user_profile, new_email):
@ -475,6 +483,10 @@ def create_stream_if_needed(realm, stream_name, invite_only=False):
defaults={'name': stream_name, 'invite_only': invite_only}) defaults={'name': stream_name, 'invite_only': invite_only})
if created: if created:
Recipient.objects.create(type_id=stream.id, type=Recipient.STREAM) Recipient.objects.create(type_id=stream.id, type=Recipient.STREAM)
if not invite_only:
event = dict(type="stream", op="create",
streams=[stream.to_dict()])
send_event(event, active_user_ids(realm))
return stream, created return stream, created
def recipient_for_emails(emails, not_forged_mirror_message, def recipient_for_emails(emails, not_forged_mirror_message,
@ -977,8 +989,23 @@ def bulk_add_subscriptions(streams, users):
audible_notifications=user_profile.enable_stream_sounds) audible_notifications=user_profile.enable_stream_sounds)
subs_by_user[user_profile.id].append(sub_to_add) subs_by_user[user_profile.id].append(sub_to_add)
subs_to_add.append((sub_to_add, stream)) subs_to_add.append((sub_to_add, stream))
# TODO: XXX: This transaction really needs to be done at the serializeable
# transaction isolation level.
with transaction.atomic():
occupied_streams_before = list(get_occupied_streams(user_profile.realm))
Subscription.objects.bulk_create([sub for (sub, stream) in subs_to_add]) Subscription.objects.bulk_create([sub for (sub, stream) in subs_to_add])
Subscription.objects.filter(id__in=[sub.id for (sub, stream_name) in subs_to_activate]).update(active=True) Subscription.objects.filter(id__in=[sub.id for (sub, stream_name) in subs_to_activate]).update(active=True)
occupied_streams_after = list(get_occupied_streams(user_profile.realm))
new_occupied_streams = [stream for stream in
set(occupied_streams_after) - set(occupied_streams_before)
if not stream.invite_only]
if new_occupied_streams:
event = dict(type="stream", op="occupy",
streams=[stream.to_dict()
for stream in new_occupied_streams])
send_event(event, active_user_ids(user_profile.realm))
# Notify all existing users on streams that users have joined # Notify all existing users on streams that users have joined
@ -1036,6 +1063,10 @@ def bulk_add_subscriptions(streams, users):
def do_add_subscription(user_profile, stream, no_log=False): def do_add_subscription(user_profile, stream, no_log=False):
recipient = get_recipient(Recipient.STREAM, stream.id) recipient = get_recipient(Recipient.STREAM, stream.id)
color = pick_color(user_profile) color = pick_color(user_profile)
# TODO: XXX: This transaction really needs to be done at the serializeable
# transaction isolation level.
with transaction.atomic():
vacant_before = stream.num_subscribers() == 0
(subscription, created) = Subscription.objects.get_or_create( (subscription, created) = Subscription.objects.get_or_create(
user_profile=user_profile, recipient=recipient, user_profile=user_profile, recipient=recipient,
defaults={'active': True, 'color': color, defaults={'active': True, 'color': color,
@ -1046,8 +1077,12 @@ def do_add_subscription(user_profile, stream, no_log=False):
subscription.active = True subscription.active = True
subscription.save(update_fields=["active"]) subscription.save(update_fields=["active"])
if did_subscribe: if vacant_before and did_subscribe and not stream.invite_only:
event = dict(type="stream", op="occupy",
streams=[stream.to_dict()])
send_event(event, active_user_ids(user_profile.realm))
if did_subscribe:
emails_by_stream = {stream.id: maybe_get_subscriber_emails(stream)} emails_by_stream = {stream.id: maybe_get_subscriber_emails(stream)}
notify_subscriptions_added(user_profile, [(subscription, stream)], lambda stream: emails_by_stream[stream.id], no_log) notify_subscriptions_added(user_profile, [(subscription, stream)], lambda stream: emails_by_stream[stream.id], no_log)
@ -1112,8 +1147,22 @@ def bulk_remove_subscriptions(users, streams):
for recipient_id in recipients_to_unsub: for recipient_id in recipients_to_unsub:
not_subscribed.append((user_profile, stream_map[recipient_id])) not_subscribed.append((user_profile, stream_map[recipient_id]))
# TODO: XXX: This transaction really needs to be done at the serializeable
# transaction isolation level.
with transaction.atomic():
occupied_streams_before = list(get_occupied_streams(user_profile.realm))
Subscription.objects.filter(id__in=[sub.id for (sub, stream_name) in Subscription.objects.filter(id__in=[sub.id for (sub, stream_name) in
subs_to_deactivate]).update(active=False) subs_to_deactivate]).update(active=False)
occupied_streams_after = list(get_occupied_streams(user_profile.realm))
new_vacant_streams = [stream for stream in
set(occupied_streams_before) - set(occupied_streams_after)
if not stream.invite_only]
if new_vacant_streams:
event = dict(type="stream", op="vacate",
streams=[stream.to_dict()
for stream in new_vacant_streams])
send_event(event, active_user_ids(user_profile.realm))
streams_by_user = defaultdict(list) streams_by_user = defaultdict(list)
for (sub, stream) in subs_to_deactivate: for (sub, stream) in subs_to_deactivate:
@ -1136,7 +1185,15 @@ def do_remove_subscription(user_profile, stream, no_log=False):
subscription = maybe_sub[0] subscription = maybe_sub[0]
did_remove = subscription.active did_remove = subscription.active
subscription.active = False subscription.active = False
with transaction.atomic():
subscription.save(update_fields=["active"]) subscription.save(update_fields=["active"])
vacant_after = stream.num_subscribers() == 0
if vacant_after and did_remove and not stream.invite_only:
event = dict(type="stream", op="vacate",
streams=[stream.to_dict()])
send_event(event, active_user_ids(user_profile.realm))
if did_remove: if did_remove:
notify_subscriptions_removed(user_profile, [stream], no_log) notify_subscriptions_removed(user_profile, [stream], no_log)
@ -2090,6 +2147,9 @@ def fetch_initial_state_data(user_profile, event_types, queue_id):
# get any updates during a session from get_events() # get any updates during a session from get_events()
pass pass
if want('stream'):
state['streams'] = do_get_streams(user_profile)
return state return state
def apply_events(state, events, user_profile): def apply_events(state, events, user_profile):
@ -2119,6 +2179,18 @@ def apply_events(state, events, user_profile):
for obj in state['subscriptions']: for obj in state['subscriptions']:
if obj['name'].lower() == event['name'].lower(): if obj['name'].lower() == event['name'].lower():
obj[event['property']] = event['value'] obj[event['property']] = event['value']
# Also update the pure streams data
for stream in state['streams']:
if stream['name'].lower() == event['name'].lower():
prop = event['property']
if prop in stream:
stream[prop] = event['value']
elif event['op'] == "occupy":
state['streams'] += event['streams']
elif event['op'] == "vacate":
stream_ids = [s["stream_id"] for s in event['streams']]
state['streams'] = filter(lambda s: s["stream_id"] not in stream_ids,
state['streams'])
elif event['type'] == 'realm': elif event['type'] == 'realm':
field = 'realm_' + event['property'] field = 'realm_' + event['property']
state[field] = event['value'] state[field] = event['value']
@ -2481,6 +2553,14 @@ def get_emails_from_user_ids(user_ids):
def realm_aliases(realm): def realm_aliases(realm):
return [alias.domain for alias in realm.realmalias_set.all()] return [alias.domain for alias in realm.realmalias_set.all()]
def get_occupied_streams(realm):
""" Get streams with subscribers """
subs_filter = Subscription.objects.filter(active=True).values('recipient_id')
stream_ids = Recipient.objects.filter(
type=Recipient.STREAM, id__in=subs_filter).values('type_id')
return Stream.objects.filter(id__in=stream_ids, realm=realm, deactivated=False)
def do_get_streams(user_profile, include_public=True, include_subscribed=True, def do_get_streams(user_profile, include_public=True, include_subscribed=True,
include_all_active=False): include_all_active=False):
if include_all_active and not is_super_user(user_profile): if include_all_active and not is_super_user(user_profile):
@ -2490,14 +2570,8 @@ def do_get_streams(user_profile, include_public=True, include_subscribed=True,
# contractor for CUSTOMER5) and for the mit.edu realm. # contractor for CUSTOMER5) and for the mit.edu realm.
include_public = include_public and not (user_profile.public_streams_disabled or include_public = include_public and not (user_profile.public_streams_disabled or
user_profile.realm.domain == "mit.edu") user_profile.realm.domain == "mit.edu")
# Start out with all streams in the realm with subscribers
# Only get streams someone is currently subscribed to query = get_occupied_streams(user_profile.realm)
subs_filter = Subscription.objects.filter(active=True).values('recipient_id')
stream_ids = Recipient.objects.filter(
type=Recipient.STREAM, id__in=subs_filter).values('type_id')
# Start out with all active streams in the realm
query = Stream.objects.filter(id__in = stream_ids, realm=user_profile.realm)
if not include_all_active: if not include_all_active:
user_subs = Subscription.objects.select_related("recipient").filter( user_subs = Subscription.objects.select_related("recipient").filter(

View File

@ -305,9 +305,12 @@ class AuthedTestCase(TestCase):
self.assertEqual(self.get_json_error(result, status_code=status_code), msg) self.assertEqual(self.get_json_error(result, status_code=status_code), msg)
def assert_length(self, queries, count, exact=False): def assert_length(self, queries, count, exact=False):
actual_count = len(queries)
if exact: if exact:
return self.assertTrue(len(queries) == count, queries) return self.assertTrue(actual_count == count,
return self.assertTrue(len(queries) <= count, queries) "len(%s) == %s, != %s" % (queries, actual_count, count))
return self.assertTrue(actual_count <= count,
"len(%s) == %s, > %s" % (queries, actual_count, count))
def assert_json_error_contains(self, result, msg_substring): def assert_json_error_contains(self, result, msg_substring):
self.assertIn(msg_substring, self.get_json_error(result)) self.assertIn(msg_substring, self.get_json_error(result))

View File

@ -488,6 +488,13 @@ class Stream(models.Model):
active=True active=True
).count() ).count()
# This is stream information that is sent to clients
def to_dict(self):
return dict(name=self.name,
stream_id=self.id,
description=self.description,
invite_only=self.invite_only)
post_save.connect(flush_stream, sender=Stream) post_save.connect(flush_stream, sender=Stream)
post_delete.connect(flush_stream, sender=Stream) post_delete.connect(flush_stream, sender=Stream)

View File

@ -339,15 +339,15 @@ class EventsRegisterTest(AuthedTestCase):
action = lambda: do_remove_subscription(get_user_profile_by_email("hamlet@zulip.com"), stream) action = lambda: do_remove_subscription(get_user_profile_by_email("hamlet@zulip.com"), stream)
events = self.do_test(action) events = self.do_test(action)
error = remove_schema_checker('events[0]', events[0]) error = remove_schema_checker('events[1]', events[1])
self.assert_on_error(error) self.assert_on_error(error)
action = lambda: self.subscribe_to_stream("hamlet@zulip.com", "test_stream") action = lambda: self.subscribe_to_stream("hamlet@zulip.com", "test_stream")
events = self.do_test(action) events = self.do_test(action)
error = add_schema_checker('events[0]', events[0]) error = add_schema_checker('events[1]', events[1])
self.assert_on_error(error) self.assert_on_error(error)
action = lambda: do_change_stream_description(get_realm('zulip.com'), 'test_stream', 'new description') action = lambda: do_change_stream_description(get_realm('zulip.com'), 'test_stream', u'new description')
events = self.do_test(action) events = self.do_test(action)
error = stream_update_schema_checker('events[0]', events[0]) error = stream_update_schema_checker('events[0]', events[0])
self.assert_on_error(error) self.assert_on_error(error)

View File

@ -152,7 +152,7 @@ class LoginTest(AuthedTestCase):
with queries_captured() as queries: with queries_captured() as queries:
self.register("test", "test") self.register("test", "test")
# Ensure the number of queries we make is not O(streams) # Ensure the number of queries we make is not O(streams)
self.assert_length(queries, 59) self.assert_length(queries, 67)
user_profile = get_user_profile_by_email('test@zulip.com') user_profile = get_user_profile_by_email('test@zulip.com')
self.assertEqual(self.client.session['_auth_user_id'], user_profile.id) self.assertEqual(self.client.session['_auth_user_id'], user_profile.id)

View File

@ -226,9 +226,9 @@ class StreamAdminTest(AuthedTestCase):
result = self.client.delete('/json/streams/' + active_name) result = self.client.delete('/json/streams/' + active_name)
self.assert_json_success(result) self.assert_json_success(result)
deletion_events = [e['event'] for e in events if e['event']['type'] == 'subscription']
if subscribed: if subscribed:
deletion_event = events[0]['event'] self.assertEqual(deletion_events[0], dict(
self.assertEqual(deletion_event, dict(
op='remove', op='remove',
type='subscription', type='subscription',
subscriptions=[{'name': active_name, 'stream_id': stream.id}] subscriptions=[{'name': active_name, 'stream_id': stream.id}]
@ -236,7 +236,7 @@ class StreamAdminTest(AuthedTestCase):
else: else:
# You could delete the stream, but you weren't on it so you don't # You could delete the stream, but you weren't on it so you don't
# receive an unsubscription event. # receive an unsubscription event.
self.assertEqual(events, []) self.assertEqual(deletion_events, [])
with self.assertRaises(Stream.DoesNotExist): with self.assertRaises(Stream.DoesNotExist):
Stream.objects.get(realm=get_realm("zulip.com"), name=active_name) Stream.objects.get(realm=get_realm("zulip.com"), name=active_name)
@ -708,7 +708,7 @@ class SubscriptionAPITest(AuthedTestCase):
with tornado_redirected_to_list(events): with tornado_redirected_to_list(events):
self.helper_check_subs_before_and_after_add(self.streams + add_streams, {}, self.helper_check_subs_before_and_after_add(self.streams + add_streams, {},
add_streams, self.streams, self.test_email, self.streams + add_streams) add_streams, self.streams, self.test_email, self.streams + add_streams)
self.assert_length(events, 1, True) self.assert_length(events, 4, True)
def test_successful_subscriptions_notifies_pm(self): def test_successful_subscriptions_notifies_pm(self):
""" """
@ -892,10 +892,10 @@ class SubscriptionAPITest(AuthedTestCase):
streams_to_sub, streams_to_sub,
dict(principals=ujson.dumps([email1, email2])), dict(principals=ujson.dumps([email1, email2])),
) )
self.assert_length(queries, 37) self.assert_length(queries, 43)
self.assert_length(events, 4, exact=True) self.assert_length(events, 6, exact=True)
for ev in filter(lambda x: x['event']['type'] != 'message', events): for ev in filter(lambda x: x['event']['type'] not in ('message', 'stream'), events):
self.assertEqual(ev['event']['op'], 'add') self.assertEqual(ev['event']['op'], 'add')
self.assertEqual( self.assertEqual(
set(ev['event']['subscriptions'][0]['subscribers']), set(ev['event']['subscriptions'][0]['subscribers']),
@ -914,7 +914,7 @@ class SubscriptionAPITest(AuthedTestCase):
streams_to_sub, streams_to_sub,
dict(principals=ujson.dumps([self.test_email])), dict(principals=ujson.dumps([self.test_email])),
) )
self.assert_length(queries, 4) self.assert_length(queries, 8)
self.assert_length(events, 2, True) self.assert_length(events, 2, True)
add_event, add_peer_event = events add_event, add_peer_event = events
@ -975,7 +975,7 @@ class SubscriptionAPITest(AuthedTestCase):
) )
# Make sure MIT does not get any tornado subscription events # Make sure MIT does not get any tornado subscription events
self.assert_length(events, 0, True) self.assert_length(events, 0, True)
self.assert_length(queries, 5) self.assert_length(queries, 7)
def test_bulk_subscribe_many(self): def test_bulk_subscribe_many(self):
# Create a whole bunch of streams # Create a whole bunch of streams
@ -991,7 +991,7 @@ class SubscriptionAPITest(AuthedTestCase):
dict(principals=ujson.dumps([self.test_email])), dict(principals=ujson.dumps([self.test_email])),
) )
# Make sure we don't make O(streams) queries # Make sure we don't make O(streams) queries
self.assert_length(queries, 7) self.assert_length(queries, 9)
@slow(0.15, "common_subscribe_to_streams is slow") @slow(0.15, "common_subscribe_to_streams is slow")
def test_subscriptions_add_for_principal(self): def test_subscriptions_add_for_principal(self):