[manual] Use rabbitmq for asynchronous presence updating

Note: When deploying, restarting the process-user-activity-commandline script is needed

(imported from commit 63ee795c9c7a7db4a40170cff5636dc1dd0b46a8)
This commit is contained in:
Leo Franchi 2013-02-11 15:47:45 -05:00
parent 31f87481d0
commit 0a0c4bb9a0
5 changed files with 99 additions and 35 deletions

View File

@ -180,6 +180,8 @@ define service {
# critical that it arrive to avoid falsely telling MIT folks
# their mirrors are down, so this should be changed to
# page_admins sometime after a week of no false alerts
# This service is also responsible for active/idle
# status.
contact_groups admins
}

View File

@ -28,18 +28,28 @@ def asynchronous(method):
wrapper.csrf_exempt = True
return wrapper
def update_user_activity(request, user_profile, client):
event={'query': request.META["PATH_INFO"],
'user_profile_id': user_profile.id,
'time': datetime_to_timestamp(now()),
'client': client.name}
if not settings.USING_RABBITMQ:
# Don't try to publish messages to rabbitmq if we're not using
# it. UserActivity updates aren't really important for most
# local development, so skipping publishing them here is
# reasonable.
return
SimpleQueueClient.get_instance().json_publish("user_activity", event)
if settings.USING_RABBITMQ:
# Don't try to publish messages to rabbitmq if we're not using
# it. UserActivity updates aren't really important for most
# local development, so skipping publishing them here is
# reasonable.
#
# update_active_status also pushes to rabbitmq, and we don't
# want to log it
activity_queue = SimpleQueueClient()
def update_user_activity(request, user_profile, client):
if request.META["PATH_INFO"] == '/json/update_active_status':
return
event={'query': request.META["PATH_INFO"],
'user_profile_id': user_profile.id,
'time': datetime_to_timestamp(now()),
'client': client.name}
activity_queue.json_publish("user_activity", event)
else:
update_user_activity = lambda request, user_profile, client: None
# I like the all-lowercase name better
require_post = require_POST

View File

@ -8,8 +8,9 @@ from zephyr.models import Realm, Stream, UserProfile, UserActivity, \
from django.db import transaction, IntegrityError
from zephyr.lib.initial_password import initial_password
from zephyr.lib.cache import cache_with_key
from zephyr.lib.timestamp import timestamp_to_datetime
from zephyr.lib.timestamp import timestamp_to_datetime, datetime_to_timestamp
from zephyr.lib.message_cache import cache_save_message
from zephyr.lib.queue import SimpleQueueClient
from django.utils import timezone
from django.contrib.auth.models import UserManager
@ -352,8 +353,15 @@ def do_update_user_activity(user_profile, client, query, log_time):
activity.last_visit = log_time
activity.save()
def process_user_activity_event(event):
user_profile = UserProfile.objects.get(id=event["user_profile_id"])
client = get_client(event["client"])
log_time = timestamp_to_datetime(event["time"])
query = event["query"]
return do_update_user_activity(user_profile, client, query, log_time)
@transaction.commit_on_success
def do_update_user_idle(user_profile, client, log_time, status):
def do_update_user_presence(user_profile, client, log_time, status):
try:
(presence, created) = UserPresence.objects.get_or_create(
user_profile = user_profile,
@ -367,12 +375,29 @@ def do_update_user_idle(user_profile, client, log_time, status):
presence.status = status
presence.save()
def process_user_activity_event(event):
if settings.USING_RABBITMQ or settings.TEST_SUITE:
# RabbitMQ is required for idle functionality
presence_queue = SimpleQueueClient()
def update_user_presence(user_profile, client, log_time, status):
event={'user_profile_id': user_profile.id,
'status': status,
'time': datetime_to_timestamp(log_time),
'client': client.name}
if settings.USING_RABBITMQ:
presence_queue.json_publish("user_presence", event)
elif settings.TEST_SUITE:
process_user_presence_event(event)
else:
update_user_presence = lambda user_profile, client, log_time, status: None
def process_user_presence_event(event):
user_profile = UserProfile.objects.get(id=event["user_profile_id"])
client = get_client(event["client"])
log_time = timestamp_to_datetime(event["time"])
query = event["query"]
return do_update_user_activity(user_profile, client, query, log_time)
status = event["status"]
return do_update_user_presence(user_profile, client, log_time, status)
def subscribed_to_stream(user_profile, stream):
try:

View File

@ -2,21 +2,26 @@ from optparse import make_option
from django.core.management.base import BaseCommand
import simplejson
import pika
from zephyr.lib.actions import process_user_activity_event
from zephyr.lib.actions import process_user_activity_event, process_user_presence_event
from zephyr.lib.queue import SimpleQueueClient
class Command(BaseCommand):
option_list = BaseCommand.option_list
help = "Process UserActivity log messages."
help = "Process UserActivity & UserPresence log messages."
def handle(self, *args, **options):
activity_queue = SimpleQueueClient.get_instance()
activity_queue = SimpleQueueClient()
def callback(ch, method, properties, event):
print " [x] Received %r" % (event,)
def callback_activity(ch, method, properties, event):
print " [x] Received activity %r" % (event,)
process_user_activity_event(event)
def callback_presence(ch, method, properties, event):
print " [x] Received presence %r" % (event,)
process_user_presence_event(event)
print ' [*] Waiting for messages. To exit press CTRL+C'
activity_queue.register_json_consumer('user_activity', callback)
activity_queue.register_json_consumer('user_activity', callback_activity)
activity_queue.register_json_consumer('user_presence', callback_presence)
activity_queue.start_consuming()

View File

@ -21,7 +21,7 @@ from zephyr.lib.actions import do_add_subscription, do_remove_subscription, \
do_activate_user, add_default_subs, do_create_user, do_send_message, \
log_subscription_property_change, internal_send_message, \
create_stream_if_needed, gather_subscriptions, subscribed_to_stream, \
do_update_user_idle
update_user_presence
from zephyr.forms import RegistrationForm, HomepageForm, ToSForm, is_unique, \
is_active, isnt_mit
from django.views.decorators.csrf import csrf_exempt
@ -1323,21 +1323,43 @@ def api_github_landing(request, user_profile, event=POST,
forged=False, subject_name=subject,
message_content=content)
def get_status_list(requesting_user_profile):
def presence_to_dict(presence):
if presence.status == UserPresence.ACTIVE:
presence_val = 'active'
elif presence.status == UserPresence.IDLE:
presence_val = 'idle'
else:
raise JsonableError("Invalid presence value in db: %s" % (presence,))
return {'status' : presence_val,
'timestamp': datetime_to_timestamp(presence.timestamp)}
user_statuses = defaultdict(dict)
for presence in UserPresence.objects.filter(
user_profile__realm=requesting_user_profile.realm).select_related('user_profile', 'client'):
user_statuses[presence.user_profile.user.email][presence.client.name] = \
presence_to_dict(presence)
return json_success({'presences': user_statuses})
@authenticated_json_post_view
@has_request_variables
def json_update_active_status(request, user_profile,
status=POST):
do_update_user_idle(user_profile, request._client, now(), status)
return json_success()
if status == 'active':
status_val = UserPresence.ACTIVE
elif status == 'idle':
status_val = UserPresence.IDLE
else:
raise JsonableError("Invalid presence status: %s" % (status,))
update_user_presence(user_profile, request._client, now(), status_val)
return get_status_list(user_profile)
@authenticated_json_post_view
def json_get_active_statuses(request, user_profile):
def presence_to_dict(presence):
return {'status': presence.status, 'timestamp': datetime_to_timestamp(presence.timestamp)}
user_statuses = {}
for user_profile in UserProfile.objects.filter(realm=user_profile.realm):
statuses = dict((presence.client.name, presence_to_dict(presence)) for presence in UserPresence.objects.filter(user_profile=user_profile))
user_statuses[user_profile.user.email] = statuses
return json_success({'presences': user_statuses})
return get_status_list(user_profile)