analytics: Simplify frequency and measurement interval options.

Change the CountStat object to take an is_gauge variable instead of a
smallest_interval variable. Previously, (smallest_interval, frequency)
could be any of (hour, hour), (hour, day), (hour, gauge), (day, hour),
(day, day), or (day, gauge).
The current change is equivalent to excluding (hour, day) and (day, hour)
from the list above.

This change, along with other recent changes, allows us to simplify how we
handle time intervals. This commit also removes the TimeInterval object.
This commit is contained in:
Rishi Gupta 2016-10-13 15:15:46 -07:00 committed by Tim Abbott
parent 807520411b
commit 82b814a1cd
6 changed files with 72 additions and 143 deletions

View File

@ -1,23 +1,35 @@
from django.db import connection, models
from django.utils import timezone
from datetime import timedelta, datetime
from analytics.models import InstallationCount, RealmCount, \
UserCount, StreamCount, BaseCount, FillState, get_fill_state, installation_epoch
from analytics.lib.interval import TimeInterval, floor_to_day
from zerver.models import Realm, UserProfile, Message, Stream, models
from zerver.lib.timestamp import floor_to_day
from typing import Any, Optional, Type
from six import text_type
# First post office in Boston
MIN_TIME = datetime(1639, 1, 1, 0, 0, 0, tzinfo=timezone.utc)
class CountStat(object):
def __init__(self, property, zerver_count_query, filter_args, smallest_interval, frequency):
# type: (text_type, ZerverCountQuery, Dict[str, bool], str, str) -> None
HOUR = 'hour'
DAY = 'day'
FREQUENCIES = frozenset([HOUR, DAY])
# Allowed intervals are HOUR, DAY, and, GAUGE
GAUGE = 'gauge'
def __init__(self, property, zerver_count_query, filter_args, frequency, is_gauge):
# type: (text_type, ZerverCountQuery, Dict[str, bool], str, bool) -> None
self.property = property
self.zerver_count_query = zerver_count_query
# might have to do something different for bitfields
self.filter_args = filter_args
self.smallest_interval = smallest_interval
if frequency not in self.FREQUENCIES:
raise ValueError("Unknown frequency: %s" % (frequency,))
self.frequency = frequency
self.interval = self.GAUGE if is_gauge else frequency
class ZerverCountQuery(object):
def __init__(self, zerver_table, analytics_table, query):
@ -52,18 +64,22 @@ def process_count_stat(stat, fill_to_time):
FillState.objects.filter(property = stat.property).update(state = FillState.DONE)
currently_filled = currently_filled + timedelta(hours = 1)
# We assume end_time is on an hour boundary. It is the caller's
# responsibility to enforce this!
# Note: very soon we are going to disallow CountStats with
# (smallest_interval, frequency) = (hour, day) or (day, hour). The logic below
# reflects that assumption.
# We assume end_time is on an hour boundary, and is timezone aware.
# It is the caller's responsibility to enforce this!
def do_fill_count_stat_at_hour(stat, end_time):
# type: (CountStat, datetime) -> None
if stat.frequency == 'day' and (end_time != floor_to_day(end_time)):
if stat.frequency == CountStat.DAY and (end_time != floor_to_day(end_time)):
return
time_interval = TimeInterval(stat.smallest_interval, end_time)
do_pull_from_zerver(stat, time_interval)
do_aggregate_to_summary_table(stat, time_interval)
if stat.interval == CountStat.HOUR:
start_time = end_time - timedelta(hours = 1)
elif stat.interval == CountStat.DAY:
start_time = end_time - timedelta(days = 1)
else: # stat.interval == CountStat.GAUGE
start_time = MIN_TIME
do_pull_from_zerver(stat, start_time, end_time, stat.interval)
do_aggregate_to_summary_table(stat, end_time, stat.interval)
def do_delete_count_stat_at_hour(stat, end_time):
# type: (CountStat, datetime) -> None
@ -72,8 +88,8 @@ def do_delete_count_stat_at_hour(stat, end_time):
RealmCount.objects.filter(property = stat.property, end_time = end_time).delete()
InstallationCount.objects.filter(property = stat.property, end_time = end_time).delete()
def do_aggregate_to_summary_table(stat, time_interval):
# type: (CountStat, TimeInterval) -> None
def do_aggregate_to_summary_table(stat, end_time, interval):
# type: (CountStat, datetime, str) -> None
cursor = connection.cursor()
# Aggregate into RealmCount
@ -96,9 +112,9 @@ def do_aggregate_to_summary_table(stat, time_interval):
GROUP BY zerver_realm.id
""" % {'analytics_table' : analytics_table._meta.db_table,
'property' : stat.property,
'interval' : time_interval.interval}
'interval' : interval}
cursor.execute(realmcount_query, {'end_time': time_interval.end})
cursor.execute(realmcount_query, {'end_time': end_time})
# Aggregate into InstallationCount
installationcount_query = """
@ -114,9 +130,9 @@ def do_aggregate_to_summary_table(stat, time_interval):
interval = '%(interval)s'
)
""" % {'property': stat.property,
'interval': time_interval.interval}
'interval': interval}
cursor.execute(installationcount_query, {'end_time': time_interval.end})
cursor.execute(installationcount_query, {'end_time': end_time})
cursor.close()
## methods that hit the prod databases directly
@ -124,8 +140,8 @@ def do_aggregate_to_summary_table(stat, time_interval):
# written in slightly more than needed generality, to reduce copy-paste errors
# as more of these are made / make it easy to extend to a pull_X_by_realm
def do_pull_from_zerver(stat, time_interval):
# type: (CountStat, TimeInterval) -> None
def do_pull_from_zerver(stat, start_time, end_time, interval):
# type: (CountStat, datetime, datetime, str) -> None
zerver_table = stat.zerver_count_query.zerver_table._meta.db_table # type: ignore
join_args = ' '.join('AND %s.%s = %s' % (zerver_table, key, value) \
for key, value in stat.filter_args.items())
@ -134,10 +150,10 @@ def do_pull_from_zerver(stat, time_interval):
# the string formatting prior so that cursor.execute runs it as sql
query_ = stat.zerver_count_query.query % {'zerver_table' : zerver_table,
'property' : stat.property,
'interval' : time_interval.interval,
'interval' : interval,
'join_args' : join_args}
cursor = connection.cursor()
cursor.execute(query_, {'time_start': time_interval.start, 'time_end': time_interval.end})
cursor.execute(query_, {'time_start': start_time, 'time_end': end_time})
cursor.close()
count_user_by_realm_query = """
@ -224,7 +240,7 @@ zerver_count_stream_by_realm = ZerverCountQuery(Stream, RealmCount, count_stream
COUNT_STATS = {
'active_humans': CountStat('active_humans', zerver_count_user_by_realm,
{'is_bot': False, 'is_active': True}, 'gauge', 'day'),
{'is_bot': False, 'is_active': True}, CountStat.DAY, True),
'active_bots': CountStat('active_bots', zerver_count_user_by_realm,
{'is_bot': True, 'is_active': True}, 'gauge', 'day'),
'messages_sent': CountStat('messages_sent', zerver_count_message_by_user, {}, 'hour', 'hour')}
{'is_bot': True, 'is_active': True}, CountStat.DAY, True),
'messages_sent': CountStat('messages_sent', zerver_count_message_by_user, {}, CountStat.HOUR, False)}

View File

@ -1,53 +0,0 @@
from django.utils import timezone
from datetime import datetime, timedelta, MINYEAR
from zerver.lib.timestamp import is_timezone_aware
from six import text_type
MIN_TIME = datetime(MINYEAR, 1, 1, 0, 0, 0, tzinfo=timezone.utc)
# Name isn't great .. fixedinterval? timerange? Trying to distinguish
# generic intervals like 'hour' or 'quarter' from fixed intervals like
# 'Aug 3 2016 from 9-10am'
class TimeInterval(object):
def __init__(self, interval, end = timezone.now(), floor_to_boundary = 'hour'):
# type: (str, datetime, str) -> None
# Not the best logic for when we have intervals like 'quarter', but okay for now
if not is_timezone_aware(end):
raise ValueError("end must be timezone aware")
if floor_to_boundary is None:
self.end = end
else:
self.end = floor_to_interval_boundary(end, floor_to_boundary)
self.interval = interval
if interval == 'gauge':
self.start = MIN_TIME
else:
self.start = subtract_interval(self.end, interval)
# Perhaps the right way to do the next two functions is to have an interval class
# (subclassed to hourinterval, dayinterval, etc) with methods like floor,
# subtract, and subinterval. Seems like overkill for now, though.
def floor_to_interval_boundary(datetime_object, interval):
# type: (datetime, text_type) -> datetime
# datetime objects are (year, month, day, hour, minutes, seconds, microseconds)
if interval == 'day':
return datetime(*datetime_object.timetuple()[:3]).replace(tzinfo=datetime_object.tzinfo)
elif interval == 'hour':
return datetime(*datetime_object.timetuple()[:4]).replace(tzinfo=datetime_object.tzinfo)
else:
raise ValueError("Unknown or unfloorable interval", interval)
def floor_to_day(datetime_object):
# type: (datetime) -> datetime
return datetime(*datetime_object.timetuple()[:3]).replace(tzinfo=datetime_object.tzinfo)
# Don't have to worry about leap seconds, since datetime doesn't support it
def subtract_interval(datetime_object, interval):
# type: (datetime, str) -> datetime
if interval == 'day':
return datetime_object - timedelta(days = 1)
elif interval == 'hour':
return datetime_object - timedelta(seconds = 3600)
else:
raise ValueError("Unknown or unarithmetic interval", interval)

View File

@ -1,9 +1,10 @@
from django.db import models
from django.utils import timezone
from analytics.lib.interval import floor_to_day
from zerver.models import Realm, UserProfile, Stream, Recipient
from zerver.lib.str_utils import ModelReprMixin
from zerver.lib.timestamp import datetime_to_UTC, floor_to_day
import datetime
from six import text_type
@ -35,7 +36,8 @@ def get_fill_state(property):
# We assume there is at least one realm
def installation_epoch():
# type: () -> datetime.datetime
return floor_to_day(Realm.objects.aggregate(models.Min('date_created'))['date_created__min'])
earliest_realm_creation = Realm.objects.aggregate(models.Min('date_created'))['date_created__min']
return floor_to_day(datetime_to_UTC(earliest_realm_creation))
# would only ever make entries here by hand
class Anomaly(ModelReprMixin, models.Model):

View File

@ -2,7 +2,6 @@ from django.db import models
from django.test import TestCase
from django.utils import timezone
from analytics.lib.interval import TimeInterval
from analytics.lib.counts import CountStat, COUNT_STATS, process_count_stat, \
zerver_count_user_by_realm, zerver_count_message_by_user, \
zerver_count_message_by_stream, zerver_count_stream_by_realm, \
@ -26,7 +25,7 @@ class AnalyticsTestCase(TestCase):
TIME_LAST_HOUR = TIME_ZERO - HOUR
count_stat = CountStat('test stat', ZerverCountQuery(Recipient, UserCount, 'select 0'),
{}, 'hour', 'hour')
{}, CountStat.HOUR, False)
def setUp(self):
# type: () -> None
@ -72,7 +71,7 @@ class AnalyticsTestCase(TestCase):
# Note that this doesn't work for InstallationCount, since InstallationCount has no realm_id
# kwargs should only ever be a UserProfile or Stream.
def assertCountEquals(self, table, property, value, end_time = TIME_ZERO, interval = 'hour',
def assertCountEquals(self, table, property, value, end_time = TIME_ZERO, interval = CountStat.HOUR,
realm = None, **kwargs):
# type: (Type[BaseCount], text_type, int, datetime, str, Optional[Realm], **models.Model) -> None
if realm is None:
@ -91,7 +90,7 @@ class TestUpdateAnalyticsCounts(AnalyticsTestCase):
# might change if we refactor count_query
stat = CountStat('test_stat_write', zerver_count_stream_by_realm,
{'invite_only': False}, 'hour', 'hour')
{'invite_only': False}, CountStat.HOUR, False)
# add some stuff to zerver_*
self.create_stream(name='stream1')
@ -106,7 +105,7 @@ class TestUpdateAnalyticsCounts(AnalyticsTestCase):
def test_update_analytics_tables(self):
# type: () -> None
stat = CountStat('test_messages_sent', zerver_count_message_by_user, {}, 'hour', 'hour')
stat = CountStat('test_messages_sent', zerver_count_message_by_user, {}, CountStat.HOUR, False)
user1 = self.create_user('email1')
user2 = self.create_user('email2')
@ -115,7 +114,7 @@ class TestUpdateAnalyticsCounts(AnalyticsTestCase):
# run command
do_fill_count_stat_at_hour(stat, self.TIME_ZERO)
usercount_row = UserCount.objects.filter(realm=self.default_realm, interval='hour',
usercount_row = UserCount.objects.filter(realm=self.default_realm, interval=CountStat.HOUR,
property='test_messages_sent').values_list(
'value', flat=True)[0]
assert (usercount_row == 1)
@ -143,32 +142,32 @@ class TestProcessCountStat(AnalyticsTestCase):
process_count_stat(self.count_stat, current_time)
self.assertFillStateEquals(current_time)
self.assertEqual(InstallationCount.objects.filter(property = self.count_stat.property,
interval = 'hour').count(), 1)
interval = CountStat.HOUR).count(), 1)
# dirty stat
FillState.objects.filter(property=self.count_stat.property).update(state=FillState.STARTED)
process_count_stat(self.count_stat, current_time)
self.assertFillStateEquals(current_time)
self.assertEqual(InstallationCount.objects.filter(property = self.count_stat.property,
interval = 'hour').count(), 1)
interval = CountStat.HOUR).count(), 1)
# clean stat, no update
process_count_stat(self.count_stat, current_time)
self.assertFillStateEquals(current_time)
self.assertEqual(InstallationCount.objects.filter(property = self.count_stat.property,
interval = 'hour').count(), 1)
interval = CountStat.HOUR).count(), 1)
# clean stat, with update
current_time = current_time + self.HOUR
process_count_stat(self.count_stat, current_time)
self.assertFillStateEquals(current_time)
self.assertEqual(InstallationCount.objects.filter(property = self.count_stat.property,
interval = 'hour').count(), 2)
interval = CountStat.HOUR).count(), 2)
# test users added in last hour
def test_add_new_users(self):
# type: () -> None
stat = CountStat('add_new_user_test', zerver_count_user_by_realm, {}, 'hour', 'hour')
stat = CountStat('add_new_user_test', zerver_count_user_by_realm, {}, CountStat.HOUR, False)
# add new users to realm in last hour
self.create_user('email1')
@ -183,38 +182,10 @@ class TestProcessCountStat(AnalyticsTestCase):
self.assertCountEquals(RealmCount, 'add_new_user_test', 2)
def test_do_aggregate(self):
# type: () -> None
# write some entries to analytics.usercount with smallest interval as day
stat = CountStat('test_messages_aggregate', zerver_count_message_by_user, {}, 'day', 'hour')
# write some messages
user1 = self.create_user('email1')
user2 = self.create_user('email2')
recipient = Recipient.objects.create(type_id=user2.id, type=Recipient.PERSONAL)
self.create_message(user1, recipient)
self.create_message(user1, recipient)
self.create_message(user1, recipient)
# run command
do_fill_count_stat_at_hour(stat, self.TIME_ZERO)
# check no rows for hour interval on usercount granularity
self.assertFalse(UserCount.objects.filter(realm=self.default_realm, interval='hour').exists())
# see if aggregated correctly to realmcount and installationcount
self.assertCountEquals(RealmCount, 'test_messages_aggregate', 3, interval = 'day')
self.assertEquals(InstallationCount.objects.filter(interval='day',
property='test_messages_aggregate') \
.values_list('value', flat=True)[0], 3)
def test_count_before_realm_creation(self):
# type: () -> None
stat = CountStat('test_active_humans', zerver_count_user_by_realm,
{'is_bot': False, 'is_active': True}, 'hour', 'hour')
{'is_bot': False, 'is_active': True}, CountStat.HOUR, False)
realm = Realm.objects.create(domain='domain', name='name', date_created=self.TIME_ZERO)
self.create_user('email', realm=realm)
@ -227,7 +198,7 @@ class TestProcessCountStat(AnalyticsTestCase):
# type: () -> None
# test that rows with empty counts are returned if realm exists
stat = CountStat('test_active_humans', zerver_count_user_by_realm,
{'is_bot': False, 'is_active': True}, 'hour', 'hour')
{'is_bot': False, 'is_active': True}, CountStat.HOUR, False)
do_fill_count_stat_at_hour(stat, self.TIME_ZERO)
self.assertCountEquals(RealmCount, 'test_active_humans', 0)
@ -245,7 +216,7 @@ class TestAggregates(AnalyticsTestCase):
class TestXByYQueries(AnalyticsTestCase):
def test_message_to_stream_aggregation(self):
# type: () -> None
stat = CountStat('test_messages_to_stream', zerver_count_message_by_stream, {}, 'hour', 'hour')
stat = CountStat('test_messages_to_stream', zerver_count_message_by_stream, {}, CountStat.HOUR, False)
# write some messages
user = self.create_user('email')
@ -266,9 +237,9 @@ class TestCountStats(AnalyticsTestCase):
# type: () -> None
stats = [
CountStat('test_active_humans', zerver_count_user_by_realm, {'is_bot': False, 'is_active': True},
'hour', 'hour'),
CountStat.HOUR, False),
CountStat('test_active_bots', zerver_count_user_by_realm, {'is_bot': True, 'is_active': True},
'hour', 'hour')]
CountStat.HOUR, False)]
# TODO these dates should probably be explicit, since the default args for the commands are timezone.now() dependent.
self.create_user('email1-bot', is_bot=True)

View File

@ -1,18 +0,0 @@
from django.test import TestCase
from django.utils import timezone
from analytics.lib.interval import TimeInterval, floor_to_interval_boundary, subtract_interval
from datetime import datetime, timedelta
class TimeIntervalTest(TestCase):
def test_time_interval_creation(self):
# type: () -> None
time_interval = TimeInterval('day', datetime(2016, 4, 29, 3, 14, 15, 926535).replace(tzinfo=timezone.utc))
self.assertEqual(time_interval.start, datetime(2016, 4, 28, 3, 0, 0).replace(tzinfo=timezone.utc))
self.assertEqual(time_interval.end, datetime(2016, 4, 29, 3, 0, 0).replace(tzinfo=timezone.utc))
def test_datetime_leap_second(self):
# type: () -> None
after_leap = datetime(2015, 7, 1)
self.assertEqual(subtract_interval(after_leap, 'hour'), datetime(2015, 6, 30, 23))

View File

@ -8,6 +8,17 @@ def is_timezone_aware(datetime_object):
# type: (datetime.datetime) -> bool
return datetime_object.tzinfo is not None
def datetime_to_UTC(datetime_object):
# type: (datetime.datetime) -> datetime.datetime
if is_timezone_aware(datetime_object):
return datetime_object.astimezone(utc)
return datetime_object.replace(tzinfo=utc)
def floor_to_day(datetime_object):
# type: (datetime.datetime) -> datetime.datetime
return datetime.datetime(*datetime_object.timetuple()[:3]) \
.replace(tzinfo=datetime_object.tzinfo)
def timestamp_to_datetime(timestamp):
# type: (float) -> datetime.datetime
return datetime.datetime.utcfromtimestamp(float(timestamp)).replace(tzinfo=utc)