mirror of https://github.com/zulip/zulip.git
[manual] Use rabbitmq queue to process UserActivity.
Before this is deployed, we need to install rabbitmq and pika on the target server (see the puppet part of this commit for how). When this is deployed, we need to start the new user activity bot: ./manage.py process_user_activity in the screen session on the relevant server, or user_activity logs won't be processed (which will eventually result in all users getting notifications about how their mirrors are out of date). (imported from commit 44d605aca0290bef2c94fb99267e15e26b21673b)
This commit is contained in:
parent
6a2bf7c4f3
commit
e592e71515
|
@ -8,6 +8,7 @@ STAGING_DEPLOYED = (platform.node() == 'staging.humbughq.com')
|
||||||
|
|
||||||
DEBUG = not DEPLOYED
|
DEBUG = not DEPLOYED
|
||||||
TEMPLATE_DEBUG = DEBUG
|
TEMPLATE_DEBUG = DEBUG
|
||||||
|
TEST_SUITE = False
|
||||||
|
|
||||||
if DEBUG:
|
if DEBUG:
|
||||||
INTERNAL_IPS = ('127.0.0.1',)
|
INTERNAL_IPS = ('127.0.0.1',)
|
||||||
|
@ -174,6 +175,10 @@ INSTALLED_APPS = (
|
||||||
'zephyr',
|
'zephyr',
|
||||||
)
|
)
|
||||||
|
|
||||||
|
USING_RABBITMQ = DEPLOYED
|
||||||
|
# This password also appears in servers/configure-rabbitmq
|
||||||
|
RABBITMQ_PASSWORD = 'xxxxxxxxxxxxxxxx'
|
||||||
|
|
||||||
# Caching
|
# Caching
|
||||||
if DEPLOYED:
|
if DEPLOYED:
|
||||||
CACHES = { 'default': {
|
CACHES = { 'default': {
|
||||||
|
|
|
@ -26,6 +26,11 @@ EMAIL_BACKEND = 'django.core.mail.backends.locmem.EmailBackend'
|
||||||
|
|
||||||
TEST_SUITE = True
|
TEST_SUITE = True
|
||||||
|
|
||||||
|
# Don't use rabbitmq from the test suite -- the user_profile_ids for
|
||||||
|
# any generated queue elements won't match those being used by the
|
||||||
|
# real app.
|
||||||
|
USING_RABBITMQ = False
|
||||||
|
|
||||||
# Disable use of memcached for caching
|
# Disable use of memcached for caching
|
||||||
CACHES = { 'default': {
|
CACHES = { 'default': {
|
||||||
'BACKEND': 'django.core.cache.backends.locmem.LocMemCache',
|
'BACKEND': 'django.core.cache.backends.locmem.LocMemCache',
|
||||||
|
|
|
@ -0,0 +1,9 @@
|
||||||
|
#!/bin/sh
|
||||||
|
#
|
||||||
|
# Delete the "guest" default user and replace it with a Humbug user
|
||||||
|
# with a real password
|
||||||
|
|
||||||
|
rabbitmqctl delete_user guest
|
||||||
|
rabbitmqctl add_user humbug 'xxxxxxxxxxxxxxxx'
|
||||||
|
rabbitmqctl set_user_tags humbug administrator
|
||||||
|
rabbitmqctl set_permissions -p / humbug '.*' '.*' '.*'
|
|
@ -49,6 +49,10 @@ class humbug_base {
|
||||||
command => "pip install requests",
|
command => "pip install requests",
|
||||||
onlyif => "test ! -d /usr/local/lib/python2.6/dist-packages/requests"
|
onlyif => "test ! -d /usr/local/lib/python2.6/dist-packages/requests"
|
||||||
}
|
}
|
||||||
|
exec {"pip4":
|
||||||
|
command => "pip install pika",
|
||||||
|
onlyif => "test ! -d /usr/local/lib/python2.6/dist-packages/pika"
|
||||||
|
}
|
||||||
|
|
||||||
group { 'humbug':
|
group { 'humbug':
|
||||||
ensure => present,
|
ensure => present,
|
||||||
|
@ -372,6 +376,13 @@ class humbug_postgres {
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
class humbug_rabbit {
|
||||||
|
$rabbit_packages = [ "rabbitmq-server" ]
|
||||||
|
package { $rabbit_packages: ensure => "installed" }
|
||||||
|
|
||||||
|
# TODO: Should also call exactly once "servers/configure-rabbitmq"
|
||||||
|
}
|
||||||
|
|
||||||
class humbug_bots {
|
class humbug_bots {
|
||||||
$bots_packages = [ "supervisor" ]
|
$bots_packages = [ "supervisor" ]
|
||||||
package { $bots_packages: ensure => "installed" }
|
package { $bots_packages: ensure => "installed" }
|
||||||
|
@ -394,3 +405,4 @@ class { "humbug_base": }
|
||||||
#class { "humbug_postgres": }
|
#class { "humbug_postgres": }
|
||||||
#class { "humbug_zmirror": }
|
#class { "humbug_zmirror": }
|
||||||
#class { "humbug_bots": }
|
#class { "humbug_bots": }
|
||||||
|
#class { "humbug_rabbit": }
|
||||||
|
|
|
@ -7,7 +7,8 @@ from django.db import transaction, IntegrityError
|
||||||
from django.conf import settings
|
from django.conf import settings
|
||||||
import simplejson
|
import simplejson
|
||||||
from zephyr.lib.cache import cache_with_key
|
from zephyr.lib.cache import cache_with_key
|
||||||
from zephyr.lib.actions import update_user_activity
|
from zephyr.lib.queue import SimpleQueue
|
||||||
|
from zephyr.lib.timestamp import datetime_to_timestamp
|
||||||
|
|
||||||
from functools import wraps
|
from functools import wraps
|
||||||
|
|
||||||
|
@ -27,6 +28,20 @@ def asynchronous(method):
|
||||||
wrapper.csrf_exempt = True
|
wrapper.csrf_exempt = True
|
||||||
return wrapper
|
return wrapper
|
||||||
|
|
||||||
|
activity_queue = SimpleQueue()
|
||||||
|
def update_user_activity(request, user_profile, client):
|
||||||
|
event={'query': request.META["PATH_INFO"],
|
||||||
|
'user_profile_id': user_profile.id,
|
||||||
|
'time': datetime_to_timestamp(now()),
|
||||||
|
'client': client.name}
|
||||||
|
if not settings.USING_RABBITMQ:
|
||||||
|
# Don't try to publish messages to rabbitmq if we're not using
|
||||||
|
# it. UserActivity updates aren't really important for most
|
||||||
|
# local development, so skipping publishing them here is
|
||||||
|
# reasonable.
|
||||||
|
return
|
||||||
|
activity_queue.json_publish("user_activity", event)
|
||||||
|
|
||||||
# I like the all-lowercase name better
|
# I like the all-lowercase name better
|
||||||
require_post = require_POST
|
require_post = require_POST
|
||||||
|
|
||||||
|
|
|
@ -8,6 +8,7 @@ from zephyr.models import Realm, Stream, UserProfile, UserActivity, \
|
||||||
from django.db import transaction, IntegrityError
|
from django.db import transaction, IntegrityError
|
||||||
from zephyr.lib.initial_password import initial_password
|
from zephyr.lib.initial_password import initial_password
|
||||||
from zephyr.lib.cache import cache_with_key
|
from zephyr.lib.cache import cache_with_key
|
||||||
|
from zephyr.lib.timestamp import timestamp_to_datetime
|
||||||
from zephyr.lib.message_cache import cache_save_message
|
from zephyr.lib.message_cache import cache_save_message
|
||||||
from django.utils import timezone
|
from django.utils import timezone
|
||||||
from django.contrib.auth.models import UserManager
|
from django.contrib.auth.models import UserManager
|
||||||
|
@ -297,19 +298,25 @@ def add_default_subs(user_profile):
|
||||||
do_add_subscription(user_profile, default.stream)
|
do_add_subscription(user_profile, default.stream)
|
||||||
|
|
||||||
@transaction.commit_on_success
|
@transaction.commit_on_success
|
||||||
def update_user_activity(request, user_profile, client):
|
def do_update_user_activity(user_profile, client, query, log_time):
|
||||||
current_time = timezone.now()
|
|
||||||
try:
|
try:
|
||||||
(activity, created) = UserActivity.objects.get_or_create(
|
(activity, created) = UserActivity.objects.get_or_create(
|
||||||
user_profile = user_profile,
|
user_profile = user_profile,
|
||||||
client = client,
|
client = client,
|
||||||
query = request.META["PATH_INFO"],
|
query = query,
|
||||||
defaults={'last_visit': current_time, 'count': 0})
|
defaults={'last_visit': log_time, 'count': 0})
|
||||||
except IntegrityError:
|
except IntegrityError:
|
||||||
transaction.commit()
|
transaction.commit()
|
||||||
activity = UserActivity.objects.get(user_profile = user_profile,
|
activity = UserActivity.objects.get(user_profile = user_profile,
|
||||||
client = client,
|
client = client,
|
||||||
query = request.META["PATH_INFO"])
|
query = query)
|
||||||
activity.count += 1
|
activity.count += 1
|
||||||
activity.last_visit = current_time
|
activity.last_visit = log_time
|
||||||
activity.save()
|
activity.save()
|
||||||
|
|
||||||
|
def process_user_activity_event(event):
|
||||||
|
user_profile = UserProfile.objects.get(id=event["user_profile_id"])
|
||||||
|
client = get_client(event["client"])
|
||||||
|
log_time = timestamp_to_datetime(event["time"])
|
||||||
|
query = event["query"]
|
||||||
|
return do_update_user_activity(user_profile, client, query, log_time)
|
||||||
|
|
|
@ -0,0 +1,64 @@
|
||||||
|
#!/usr/bin/python
|
||||||
|
from django.conf import settings
|
||||||
|
import pika
|
||||||
|
import simplejson
|
||||||
|
|
||||||
|
# This simple queuing library doesn't expose much of the power of
|
||||||
|
# rabbitmq/pika's queuing system; its purpose is to just provide an
|
||||||
|
# interface for external files to put things into queues and take them
|
||||||
|
# out from bots without having to import pika code all over our codebase.
|
||||||
|
class SimpleQueue(object):
|
||||||
|
connection = None
|
||||||
|
channel = None
|
||||||
|
queues = set()
|
||||||
|
_inited = False
|
||||||
|
|
||||||
|
def __init__(self):
|
||||||
|
pass
|
||||||
|
|
||||||
|
def initialize(self):
|
||||||
|
# Initialize the connection
|
||||||
|
credentials = pika.PlainCredentials('humbug', settings.RABBITMQ_PASSWORD)
|
||||||
|
parameters = pika.ConnectionParameters('localhost',
|
||||||
|
credentials=credentials)
|
||||||
|
self.connection = pika.BlockingConnection(parameters)
|
||||||
|
self.channel = self.connection.channel()
|
||||||
|
self._inited = True
|
||||||
|
|
||||||
|
def create_queue(self, queue_name):
|
||||||
|
# Initialize the queues we need
|
||||||
|
self.channel.queue_declare(queue=queue_name, durable=True)
|
||||||
|
self.queues.add(queue_name)
|
||||||
|
|
||||||
|
def publish(self, queue_name, body):
|
||||||
|
if not self._inited:
|
||||||
|
self.initialize()
|
||||||
|
if queue_name not in self.queues:
|
||||||
|
self.create_queue(queue_name)
|
||||||
|
self.channel.basic_publish(exchange='',
|
||||||
|
routing_key=queue_name,
|
||||||
|
properties=pika.BasicProperties(delivery_mode = 2,),
|
||||||
|
body=body)
|
||||||
|
|
||||||
|
def json_publish(self, queue_name, body):
|
||||||
|
return self.publish(queue_name, simplejson.dumps(body))
|
||||||
|
|
||||||
|
def register_consumer(self, queue_name, callback):
|
||||||
|
if not self._inited:
|
||||||
|
self.initialize()
|
||||||
|
if queue_name not in self.queues:
|
||||||
|
self.create_queue(queue_name)
|
||||||
|
|
||||||
|
def wrapped_callback(ch, method, properties, body):
|
||||||
|
callback(ch, method, properties, body)
|
||||||
|
ch.basic_ack(delivery_tag=method.delivery_tag)
|
||||||
|
|
||||||
|
self.channel.basic_consume(wrapped_callback, queue=queue_name)
|
||||||
|
|
||||||
|
def register_json_consumer(self, queue_name, callback):
|
||||||
|
def wrapped_callback(ch, method, properties, body):
|
||||||
|
return callback(ch, method, properties, simplejson.loads(body))
|
||||||
|
return self.register_consumer(queue_name, wrapped_callback)
|
||||||
|
|
||||||
|
def start_consuming(self):
|
||||||
|
self.channel.start_consuming()
|
|
@ -0,0 +1,22 @@
|
||||||
|
from optparse import make_option
|
||||||
|
from django.core.management.base import BaseCommand
|
||||||
|
import simplejson
|
||||||
|
import pika
|
||||||
|
from zephyr.lib.actions import process_user_activity_event
|
||||||
|
from zephyr.lib.queue import SimpleQueue
|
||||||
|
|
||||||
|
class Command(BaseCommand):
|
||||||
|
option_list = BaseCommand.option_list
|
||||||
|
help = "Process UserActivity log messages."
|
||||||
|
|
||||||
|
def handle(self, *args, **options):
|
||||||
|
activity_queue = SimpleQueue()
|
||||||
|
|
||||||
|
def callback(ch, method, properties, event):
|
||||||
|
print " [x] Received %r" % (event,)
|
||||||
|
process_user_activity_event(event)
|
||||||
|
|
||||||
|
print ' [*] Waiting for messages. To exit press CTRL+C'
|
||||||
|
activity_queue.register_json_consumer('user_activity', callback)
|
||||||
|
activity_queue.start_consuming()
|
||||||
|
|
Loading…
Reference in New Issue