[manual] Send new-stream invitations using a bulk database query

Messages that get sent out when someone subscribes many people to a new stream each
cause individual database queries (and their associated transactions). With the patched
bulk_create (which sets the .id on created objects), we can reduce this query down to a constant
number of queries on the Message and UserMessage tables.

Note for deployment (local dev, staging and prod):
you must be running a patched django, found here: https://github.com/acrefoot/django/branches
use this branch: acrefoot-bulk_create_with_id-1.5.1
on acrefoot-bulk_create_with_id-1.5.1
relevant sha1: ac6d885b811f7e2e34f0db0da217983f7dfd357f

(imported from commit b0dab9dac784d3ff47751e65bf22c2dddc22edf5)
This commit is contained in:
acrefoot 2013-05-10 16:56:22 -04:00 committed by Tim Abbott
parent c749323e84
commit 9454683108
2 changed files with 128 additions and 66 deletions

View File

@ -190,64 +190,90 @@ def log_message(message):
if not message.sending_client.name.startswith("test:"):
log_event(message.to_log_dict())
def do_send_message(message, rendered_content=None, no_log=False,
stream=None):
# Log the message to our message log for populate_db to refill
if not no_log:
log_message(message)
# Helper function. Defaults here are overriden by those set in do_send_messages
def do_send_message(message, rendered_content = None, no_log = False, stream = None):
do_send_messages([{'message': message,
'rendered_content': rendered_content,
'no_log': no_log,
'stream': stream}])
if message.recipient.type == Recipient.PERSONAL:
recipients = list(set([get_user_profile_by_id(message.recipient.type_id),
get_user_profile_by_id(message.sender_id)]))
# For personals, you send out either 1 or 2 copies of the message, for
# personals to yourself or to someone else, respectively.
assert((len(recipients) == 1) or (len(recipients) == 2))
elif (message.recipient.type == Recipient.STREAM or
message.recipient.type == Recipient.HUDDLE):
recipients = [s.user_profile for
s in Subscription.objects.select_related(
"user_profile").filter(recipient=message.recipient, active=True)]
else:
raise ValueError('Bad recipient type')
def do_send_messages(messages):
# Filter out messages which didn't pass internal_prep_message properly
messages = [message for message in messages if message is not None]
# Filter out zephyr mirror anomalies where the message was already sent
messages = [message for message in messages if message['message'] is not None]
# For consistency, changes to the default values for these gets should also be applied
# to the default args in do_send_message
for message in messages:
message['rendered_content'] = message.get('rendered_content', None)
message['no_log'] = message.get('no_log', False)
message['stream'] = message.get('stream', None)
# Log the message to our message log for populate_db to refill
for message in messages:
if not message['no_log']:
log_message(message['message'])
for message in messages:
if message['message'].recipient.type == Recipient.PERSONAL:
message['recipients'] = list(set([get_user_profile_by_id(message['message'].recipient.type_id),
get_user_profile_by_id(message['message'].sender_id)]))
# For personals, you send out either 1 or 2 copies of the message, for
# personals to yourself or to someone else, respectively.
assert((len(message['recipients']) == 1) or (len(message['recipients']) == 2))
elif (message['message'].recipient.type == Recipient.STREAM or
message['message'].recipient.type == Recipient.HUDDLE):
message['recipients'] = [s.user_profile for
s in Subscription.objects.select_related(
"user_profile").filter(recipient=message['message'].recipient, active=True)]
else:
raise ValueError('Bad recipient type')
# Save the message receipts in the database
with transaction.commit_on_success():
message.save()
ums_to_create = [UserMessage(user_profile=user_profile, message=message)
for user_profile in recipients
if user_profile.is_active]
for um in ums_to_create:
sent_by_human = message.sending_client.name.lower() in \
['website', 'iphone', 'android']
if um.user_profile == message.sender and sent_by_human:
um.flags |= UserMessage.flags.read
UserMessage.objects.bulk_create(ums_to_create)
Message.objects.bulk_create([message['message'] for message in messages])
ums = []
for message in messages:
ums_to_create = [UserMessage(user_profile=user_profile, message=message['message'])
for user_profile in message['recipients']
if user_profile.is_active]
for um in ums_to_create:
sent_by_human = message['message'].sending_client.name.lower() in \
['website', 'iphone', 'android']
if um.user_profile == message['message'].sender and sent_by_human:
um.flags |= UserMessage.flags.read
ums.extend(ums_to_create)
UserMessage.objects.bulk_create(ums)
cache_save_message(message)
for message in messages:
cache_save_message(message['message'])
# We can only publish messages to longpolling clients if the Tornado server is running.
if settings.TORNADO_SERVER:
# Render Markdown etc. here and store (automatically) in
# memcached, so that the single-threaded Tornado server
# doesn't have to.
message.to_dict(apply_markdown=True, rendered_content=rendered_content)
message.to_dict(apply_markdown=False)
data = dict(
type = 'new_message',
message = message.id,
users = [user.id for user in recipients])
if message.recipient.type == Recipient.STREAM:
# Note: This is where authorization for single-stream
# get_updates happens! We only attach stream data to the
# notify new_message request if it's a public stream,
# ensuring that in the tornado server, non-public stream
# messages are only associated to their subscribed users.
if stream is None:
stream = Stream.objects.select_related("realm").get(id=message.recipient.type_id)
if stream.is_public():
data['realm_id'] = stream.realm.id
data['stream_name'] = stream.name
tornado_callbacks.send_notification(data)
for message in messages:
# Render Markdown etc. here and store (automatically) in
# memcached, so that the single-threaded Tornado server
# doesn't have to.
message['message'].to_dict(apply_markdown=True, rendered_content=message['rendered_content'])
message['message'].to_dict(apply_markdown=False)
data = dict(
type = 'new_message',
message = message['message'].id,
users = [user.id for user in message['recipients']])
if message['message'].recipient.type == Recipient.STREAM:
# Note: This is where authorization for single-stream
# get_updates happens! We only attach stream data to the
# notify new_message request if it's a public stream,
# ensuring that in the tornado server, non-public stream
# messages are only associated to their subscribed users.
if message['stream'] is None:
message['stream'] = Stream.objects.select_related("realm").get(id=message['message'].recipient.type_id)
if message['stream'].is_public():
data['realm_id'] = message['stream'].realm.id
data['stream_name'] = message['stream'].name
tornado_callbacks.send_notification(data)
def create_stream_if_needed(realm, stream_name, invite_only=False):
(stream, created) = Stream.objects.get_or_create(
@ -317,9 +343,20 @@ def extract_recipients(raw_recipients):
# check_send_message:
# Returns None on success or the error message on error.
def check_send_message(sender, client, message_type_name, message_to,
subject_name, message_content, realm=None, forged=False,
forged_timestamp=None, forwarder_user_profile=None):
# has same argspec as check_message
def check_send_message(*args, **kwargs):
message = check_message(*args, **kwargs)
if(type(message) != dict):
assert isinstance(message, basestring)
return message
do_send_messages([message])
return None
# check_message:
# Returns message ready for sending with do_send_message on success or the error message (string) on error.
def check_message(sender, client, message_type_name, message_to,
subject_name, message_content, realm=None, forged=False,
forged_timestamp=None, forwarder_user_profile=None):
stream = None
if len(message_to) == 0:
return "Message must have recipients."
@ -368,6 +405,7 @@ def check_send_message(sender, client, message_type_name, message_to,
recipient = recipient_for_emails(message_to, not_forged_zephyr_mirror,
forwarder_user_profile, sender)
except ValidationError, e:
assert isinstance(e.messages[0], basestring)
return e.messages[0]
else:
return "Invalid message type"
@ -392,15 +430,19 @@ def check_send_message(sender, client, message_type_name, message_to,
message.sending_client = client
if client.name == "zephyr_mirror" and already_sent_mirrored_message(message):
return None
return {'message': None}
do_send_message(message, rendered_content=rendered_content,
stream=stream)
return {'message': message, 'rendered_content': rendered_content,
'stream': stream}
return None
def internal_send_message(sender_email, recipient_type_name, recipients,
def internal_prep_message(sender_email, recipient_type_name, recipients,
subject, content, realm=None):
"""
Create a message object and checks it, but doesn't send it or save it to the database.
The internal function that calls this can therefore batch send a bunch of created
messages together as one database query.
Call do_send_messages with a list of the return values of this method.
"""
if len(content) > MAX_MESSAGE_LENGTH:
content = content[0:3900] + "\n\n[message was too long and has been truncated]"
@ -411,10 +453,26 @@ def internal_send_message(sender_email, recipient_type_name, recipients,
if recipient_type_name == "stream":
stream, _ = create_stream_if_needed(realm, parsed_recipients[0])
ret = check_send_message(sender, get_client("Internal"), recipient_type_name,
parsed_recipients, subject, content, realm)
if ret is not None:
logging.error("Error sending internal message by %s: %s" % (sender_email, ret))
ret = check_message(sender, get_client("Internal"), recipient_type_name,
parsed_recipients, subject, content, realm)
if isinstance(ret, basestring):
logging.error("Error queueing internal message by %s: %s" % (sender_email, ret))
elif isinstance(ret, dict):
return ret
else:
logging.error("Error queueing internal message; check message return unexpected type: %s" \
% (repr(ret),))
def internal_send_message(sender_email, recipient_type_name, recipients,
subject, content, realm=None):
msg = internal_prep_message(sender_email, recipient_type_name, recipients,
subject, content, realm)
# internal_prep_message encountered an error
if msg is None:
return
do_send_messages([msg])
def pick_color(user_profile):
subs = Subscription.objects.filter(user_profile=user_profile,

View File

@ -30,7 +30,8 @@ from zephyr.lib.actions import do_add_subscription, do_remove_subscription, \
update_user_presence, bulk_add_subscriptions, update_message_flags, \
recipient_for_emails, extract_recipients, do_events_register, do_finish_tutorial, \
get_status_dict, do_change_enable_offline_email_notifications, \
do_update_onboarding_steps, do_update_message
do_update_onboarding_steps, do_update_message, internal_prep_message, \
do_send_messages
from zephyr.forms import RegistrationForm, HomepageForm, ToSForm, CreateBotForm, \
is_unique, is_inactive, isnt_mit
from django.views.decorators.csrf import csrf_exempt
@ -1227,8 +1228,10 @@ def add_subscriptions_backend(request, user_profile,
result["already_subscribed"][subscriber.email].append(stream.name)
private_streams[stream.name] = stream.invite_only
# Inform the user if someone else subscribed them to stuff
if principals and result["subscribed"]:
notifications = []
for email, subscriptions in result["subscribed"].iteritems():
if email == user_profile.email:
# Don't send a Humbug if you invited yourself.
@ -1249,8 +1252,9 @@ def add_subscriptions_backend(request, user_profile,
stream,
" (**invite-only**)" if private_streams[stream] else "")
msg += "\nYou can see historical content on a non-invite-only stream by narrowing to it."
internal_send_message("humbug+notifications@humbughq.com",
"private", email, "", msg)
notifications.append(internal_prep_message("humbug+notifications@humbughq.com",
"private", email, "", msg))
do_send_messages(notifications)
result["subscribed"] = dict(result["subscribed"])
result["already_subscribed"] = dict(result["already_subscribed"])