2013-04-23 18:51:17 +02:00
|
|
|
from __future__ import absolute_import
|
|
|
|
|
2013-07-26 21:28:41 +02:00
|
|
|
from django.conf import settings
|
2013-01-11 21:16:42 +01:00
|
|
|
from django.core.management.base import BaseCommand
|
2013-07-29 23:03:31 +02:00
|
|
|
from zerver.lib.actions import process_user_activity_event, \
|
2013-06-25 20:22:40 +02:00
|
|
|
process_user_presence_event
|
2013-07-29 23:03:31 +02:00
|
|
|
from zerver.lib.queue import SimpleQueueClient
|
2013-02-15 17:03:28 +01:00
|
|
|
import sys
|
|
|
|
import signal
|
2013-07-26 21:28:41 +02:00
|
|
|
import os
|
|
|
|
import traceback
|
|
|
|
import ujson
|
|
|
|
|
|
|
|
ERROR_LOG_FILE = os.path.join(settings.ERROR_LOG_DIR, "process_user_activity")
|
2013-01-11 21:16:42 +01:00
|
|
|
|
|
|
|
class Command(BaseCommand):
|
|
|
|
option_list = BaseCommand.option_list
|
2013-02-15 17:41:33 +01:00
|
|
|
help = "Process UserActivity log messages."
|
2013-01-11 21:16:42 +01:00
|
|
|
|
|
|
|
def handle(self, *args, **options):
|
2013-02-11 21:47:45 +01:00
|
|
|
activity_queue = SimpleQueueClient()
|
2013-01-11 21:16:42 +01:00
|
|
|
|
2013-02-11 21:47:45 +01:00
|
|
|
def callback_activity(ch, method, properties, event):
|
|
|
|
print " [x] Received activity %r" % (event,)
|
2013-07-26 21:28:41 +02:00
|
|
|
try:
|
|
|
|
process_event(event)
|
|
|
|
except Exception:
|
|
|
|
if not os.path.exists(settings.ERROR_LOG_DIR):
|
|
|
|
os.mkdir(settings.ERROR_LOG_DIR)
|
|
|
|
# One can parse out just the JSON records from this log format using:
|
|
|
|
#
|
|
|
|
# grep "Error Processing" errors/process_user_activity | cut -f 2- -d:
|
|
|
|
file(ERROR_LOG_FILE, "a").write(
|
|
|
|
"Error Processing event: " + ujson.dumps(event) + "\n" +
|
|
|
|
traceback.format_exc())
|
|
|
|
|
|
|
|
def process_event(event):
|
2013-02-15 17:41:33 +01:00
|
|
|
msg_type = event['type']
|
|
|
|
if msg_type == 'user_activity':
|
|
|
|
process_user_activity_event(event)
|
|
|
|
elif msg_type == 'user_presence':
|
|
|
|
process_user_presence_event(event)
|
|
|
|
else:
|
2013-06-27 18:51:56 +02:00
|
|
|
print("[*] Unknown message type: %s" % (msg_type,))
|
2013-02-11 21:47:45 +01:00
|
|
|
|
2013-02-15 17:03:28 +01:00
|
|
|
def signal_handler(signal, frame):
|
|
|
|
print("[*] Closing and disconnecting from queues")
|
|
|
|
activity_queue.stop_consuming()
|
|
|
|
sys.exit(0)
|
|
|
|
|
|
|
|
signal.signal(signal.SIGTERM, signal_handler)
|
|
|
|
signal.signal(signal.SIGINT, signal_handler)
|
|
|
|
|
2013-01-11 21:16:42 +01:00
|
|
|
print ' [*] Waiting for messages. To exit press CTRL+C'
|
2013-02-11 21:47:45 +01:00
|
|
|
activity_queue.register_json_consumer('user_activity', callback_activity)
|
2013-01-11 21:16:42 +01:00
|
|
|
activity_queue.start_consuming()
|