diff --git a/analytics/lib/counts.py b/analytics/lib/counts.py index 704c48751b..85337dd48d 100644 --- a/analytics/lib/counts.py +++ b/analytics/lib/counts.py @@ -1,23 +1,35 @@ from django.db import connection, models +from django.utils import timezone from datetime import timedelta, datetime from analytics.models import InstallationCount, RealmCount, \ UserCount, StreamCount, BaseCount, FillState, get_fill_state, installation_epoch -from analytics.lib.interval import TimeInterval, floor_to_day from zerver.models import Realm, UserProfile, Message, Stream, models +from zerver.lib.timestamp import floor_to_day from typing import Any, Optional, Type from six import text_type +# First post office in Boston +MIN_TIME = datetime(1639, 1, 1, 0, 0, 0, tzinfo=timezone.utc) + class CountStat(object): - def __init__(self, property, zerver_count_query, filter_args, smallest_interval, frequency): - # type: (text_type, ZerverCountQuery, Dict[str, bool], str, str) -> None + HOUR = 'hour' + DAY = 'day' + FREQUENCIES = frozenset([HOUR, DAY]) + # Allowed intervals are HOUR, DAY, and, GAUGE + GAUGE = 'gauge' + + def __init__(self, property, zerver_count_query, filter_args, frequency, is_gauge): + # type: (text_type, ZerverCountQuery, Dict[str, bool], str, bool) -> None self.property = property self.zerver_count_query = zerver_count_query # might have to do something different for bitfields self.filter_args = filter_args - self.smallest_interval = smallest_interval + if frequency not in self.FREQUENCIES: + raise ValueError("Unknown frequency: %s" % (frequency,)) self.frequency = frequency + self.interval = self.GAUGE if is_gauge else frequency class ZerverCountQuery(object): def __init__(self, zerver_table, analytics_table, query): @@ -52,18 +64,22 @@ def process_count_stat(stat, fill_to_time): FillState.objects.filter(property = stat.property).update(state = FillState.DONE) currently_filled = currently_filled + timedelta(hours = 1) -# We assume end_time is on an hour boundary. It is the caller's -# responsibility to enforce this! -# Note: very soon we are going to disallow CountStats with -# (smallest_interval, frequency) = (hour, day) or (day, hour). The logic below -# reflects that assumption. +# We assume end_time is on an hour boundary, and is timezone aware. +# It is the caller's responsibility to enforce this! def do_fill_count_stat_at_hour(stat, end_time): # type: (CountStat, datetime) -> None - if stat.frequency == 'day' and (end_time != floor_to_day(end_time)): + if stat.frequency == CountStat.DAY and (end_time != floor_to_day(end_time)): return - time_interval = TimeInterval(stat.smallest_interval, end_time) - do_pull_from_zerver(stat, time_interval) - do_aggregate_to_summary_table(stat, time_interval) + + if stat.interval == CountStat.HOUR: + start_time = end_time - timedelta(hours = 1) + elif stat.interval == CountStat.DAY: + start_time = end_time - timedelta(days = 1) + else: # stat.interval == CountStat.GAUGE + start_time = MIN_TIME + + do_pull_from_zerver(stat, start_time, end_time, stat.interval) + do_aggregate_to_summary_table(stat, end_time, stat.interval) def do_delete_count_stat_at_hour(stat, end_time): # type: (CountStat, datetime) -> None @@ -72,8 +88,8 @@ def do_delete_count_stat_at_hour(stat, end_time): RealmCount.objects.filter(property = stat.property, end_time = end_time).delete() InstallationCount.objects.filter(property = stat.property, end_time = end_time).delete() -def do_aggregate_to_summary_table(stat, time_interval): - # type: (CountStat, TimeInterval) -> None +def do_aggregate_to_summary_table(stat, end_time, interval): + # type: (CountStat, datetime, str) -> None cursor = connection.cursor() # Aggregate into RealmCount @@ -96,9 +112,9 @@ def do_aggregate_to_summary_table(stat, time_interval): GROUP BY zerver_realm.id """ % {'analytics_table' : analytics_table._meta.db_table, 'property' : stat.property, - 'interval' : time_interval.interval} + 'interval' : interval} - cursor.execute(realmcount_query, {'end_time': time_interval.end}) + cursor.execute(realmcount_query, {'end_time': end_time}) # Aggregate into InstallationCount installationcount_query = """ @@ -114,9 +130,9 @@ def do_aggregate_to_summary_table(stat, time_interval): interval = '%(interval)s' ) """ % {'property': stat.property, - 'interval': time_interval.interval} + 'interval': interval} - cursor.execute(installationcount_query, {'end_time': time_interval.end}) + cursor.execute(installationcount_query, {'end_time': end_time}) cursor.close() ## methods that hit the prod databases directly @@ -124,8 +140,8 @@ def do_aggregate_to_summary_table(stat, time_interval): # written in slightly more than needed generality, to reduce copy-paste errors # as more of these are made / make it easy to extend to a pull_X_by_realm -def do_pull_from_zerver(stat, time_interval): - # type: (CountStat, TimeInterval) -> None +def do_pull_from_zerver(stat, start_time, end_time, interval): + # type: (CountStat, datetime, datetime, str) -> None zerver_table = stat.zerver_count_query.zerver_table._meta.db_table # type: ignore join_args = ' '.join('AND %s.%s = %s' % (zerver_table, key, value) \ for key, value in stat.filter_args.items()) @@ -134,10 +150,10 @@ def do_pull_from_zerver(stat, time_interval): # the string formatting prior so that cursor.execute runs it as sql query_ = stat.zerver_count_query.query % {'zerver_table' : zerver_table, 'property' : stat.property, - 'interval' : time_interval.interval, + 'interval' : interval, 'join_args' : join_args} cursor = connection.cursor() - cursor.execute(query_, {'time_start': time_interval.start, 'time_end': time_interval.end}) + cursor.execute(query_, {'time_start': start_time, 'time_end': end_time}) cursor.close() count_user_by_realm_query = """ @@ -224,7 +240,7 @@ zerver_count_stream_by_realm = ZerverCountQuery(Stream, RealmCount, count_stream COUNT_STATS = { 'active_humans': CountStat('active_humans', zerver_count_user_by_realm, - {'is_bot': False, 'is_active': True}, 'gauge', 'day'), + {'is_bot': False, 'is_active': True}, CountStat.DAY, True), 'active_bots': CountStat('active_bots', zerver_count_user_by_realm, - {'is_bot': True, 'is_active': True}, 'gauge', 'day'), - 'messages_sent': CountStat('messages_sent', zerver_count_message_by_user, {}, 'hour', 'hour')} + {'is_bot': True, 'is_active': True}, CountStat.DAY, True), + 'messages_sent': CountStat('messages_sent', zerver_count_message_by_user, {}, CountStat.HOUR, False)} diff --git a/analytics/lib/interval.py b/analytics/lib/interval.py deleted file mode 100644 index f2eac6215a..0000000000 --- a/analytics/lib/interval.py +++ /dev/null @@ -1,53 +0,0 @@ -from django.utils import timezone -from datetime import datetime, timedelta, MINYEAR - -from zerver.lib.timestamp import is_timezone_aware -from six import text_type - -MIN_TIME = datetime(MINYEAR, 1, 1, 0, 0, 0, tzinfo=timezone.utc) - -# Name isn't great .. fixedinterval? timerange? Trying to distinguish -# generic intervals like 'hour' or 'quarter' from fixed intervals like -# 'Aug 3 2016 from 9-10am' -class TimeInterval(object): - def __init__(self, interval, end = timezone.now(), floor_to_boundary = 'hour'): - # type: (str, datetime, str) -> None - # Not the best logic for when we have intervals like 'quarter', but okay for now - if not is_timezone_aware(end): - raise ValueError("end must be timezone aware") - if floor_to_boundary is None: - self.end = end - else: - self.end = floor_to_interval_boundary(end, floor_to_boundary) - self.interval = interval - if interval == 'gauge': - self.start = MIN_TIME - else: - self.start = subtract_interval(self.end, interval) - -# Perhaps the right way to do the next two functions is to have an interval class -# (subclassed to hourinterval, dayinterval, etc) with methods like floor, -# subtract, and subinterval. Seems like overkill for now, though. -def floor_to_interval_boundary(datetime_object, interval): - # type: (datetime, text_type) -> datetime - # datetime objects are (year, month, day, hour, minutes, seconds, microseconds) - if interval == 'day': - return datetime(*datetime_object.timetuple()[:3]).replace(tzinfo=datetime_object.tzinfo) - elif interval == 'hour': - return datetime(*datetime_object.timetuple()[:4]).replace(tzinfo=datetime_object.tzinfo) - else: - raise ValueError("Unknown or unfloorable interval", interval) - -def floor_to_day(datetime_object): - # type: (datetime) -> datetime - return datetime(*datetime_object.timetuple()[:3]).replace(tzinfo=datetime_object.tzinfo) - -# Don't have to worry about leap seconds, since datetime doesn't support it -def subtract_interval(datetime_object, interval): - # type: (datetime, str) -> datetime - if interval == 'day': - return datetime_object - timedelta(days = 1) - elif interval == 'hour': - return datetime_object - timedelta(seconds = 3600) - else: - raise ValueError("Unknown or unarithmetic interval", interval) diff --git a/analytics/models.py b/analytics/models.py index 68c5871418..d6e6e007ab 100644 --- a/analytics/models.py +++ b/analytics/models.py @@ -1,9 +1,10 @@ from django.db import models from django.utils import timezone -from analytics.lib.interval import floor_to_day from zerver.models import Realm, UserProfile, Stream, Recipient from zerver.lib.str_utils import ModelReprMixin +from zerver.lib.timestamp import datetime_to_UTC, floor_to_day + import datetime from six import text_type @@ -35,7 +36,8 @@ def get_fill_state(property): # We assume there is at least one realm def installation_epoch(): # type: () -> datetime.datetime - return floor_to_day(Realm.objects.aggregate(models.Min('date_created'))['date_created__min']) + earliest_realm_creation = Realm.objects.aggregate(models.Min('date_created'))['date_created__min'] + return floor_to_day(datetime_to_UTC(earliest_realm_creation)) # would only ever make entries here by hand class Anomaly(ModelReprMixin, models.Model): diff --git a/analytics/tests/test_counts.py b/analytics/tests/test_counts.py index 0fa3075958..25f2a5430d 100644 --- a/analytics/tests/test_counts.py +++ b/analytics/tests/test_counts.py @@ -2,7 +2,6 @@ from django.db import models from django.test import TestCase from django.utils import timezone -from analytics.lib.interval import TimeInterval from analytics.lib.counts import CountStat, COUNT_STATS, process_count_stat, \ zerver_count_user_by_realm, zerver_count_message_by_user, \ zerver_count_message_by_stream, zerver_count_stream_by_realm, \ @@ -26,7 +25,7 @@ class AnalyticsTestCase(TestCase): TIME_LAST_HOUR = TIME_ZERO - HOUR count_stat = CountStat('test stat', ZerverCountQuery(Recipient, UserCount, 'select 0'), - {}, 'hour', 'hour') + {}, CountStat.HOUR, False) def setUp(self): # type: () -> None @@ -72,7 +71,7 @@ class AnalyticsTestCase(TestCase): # Note that this doesn't work for InstallationCount, since InstallationCount has no realm_id # kwargs should only ever be a UserProfile or Stream. - def assertCountEquals(self, table, property, value, end_time = TIME_ZERO, interval = 'hour', + def assertCountEquals(self, table, property, value, end_time = TIME_ZERO, interval = CountStat.HOUR, realm = None, **kwargs): # type: (Type[BaseCount], text_type, int, datetime, str, Optional[Realm], **models.Model) -> None if realm is None: @@ -91,7 +90,7 @@ class TestUpdateAnalyticsCounts(AnalyticsTestCase): # might change if we refactor count_query stat = CountStat('test_stat_write', zerver_count_stream_by_realm, - {'invite_only': False}, 'hour', 'hour') + {'invite_only': False}, CountStat.HOUR, False) # add some stuff to zerver_* self.create_stream(name='stream1') @@ -106,7 +105,7 @@ class TestUpdateAnalyticsCounts(AnalyticsTestCase): def test_update_analytics_tables(self): # type: () -> None - stat = CountStat('test_messages_sent', zerver_count_message_by_user, {}, 'hour', 'hour') + stat = CountStat('test_messages_sent', zerver_count_message_by_user, {}, CountStat.HOUR, False) user1 = self.create_user('email1') user2 = self.create_user('email2') @@ -115,7 +114,7 @@ class TestUpdateAnalyticsCounts(AnalyticsTestCase): # run command do_fill_count_stat_at_hour(stat, self.TIME_ZERO) - usercount_row = UserCount.objects.filter(realm=self.default_realm, interval='hour', + usercount_row = UserCount.objects.filter(realm=self.default_realm, interval=CountStat.HOUR, property='test_messages_sent').values_list( 'value', flat=True)[0] assert (usercount_row == 1) @@ -143,32 +142,32 @@ class TestProcessCountStat(AnalyticsTestCase): process_count_stat(self.count_stat, current_time) self.assertFillStateEquals(current_time) self.assertEqual(InstallationCount.objects.filter(property = self.count_stat.property, - interval = 'hour').count(), 1) + interval = CountStat.HOUR).count(), 1) # dirty stat FillState.objects.filter(property=self.count_stat.property).update(state=FillState.STARTED) process_count_stat(self.count_stat, current_time) self.assertFillStateEquals(current_time) self.assertEqual(InstallationCount.objects.filter(property = self.count_stat.property, - interval = 'hour').count(), 1) + interval = CountStat.HOUR).count(), 1) # clean stat, no update process_count_stat(self.count_stat, current_time) self.assertFillStateEquals(current_time) self.assertEqual(InstallationCount.objects.filter(property = self.count_stat.property, - interval = 'hour').count(), 1) + interval = CountStat.HOUR).count(), 1) # clean stat, with update current_time = current_time + self.HOUR process_count_stat(self.count_stat, current_time) self.assertFillStateEquals(current_time) self.assertEqual(InstallationCount.objects.filter(property = self.count_stat.property, - interval = 'hour').count(), 2) + interval = CountStat.HOUR).count(), 2) # test users added in last hour def test_add_new_users(self): # type: () -> None - stat = CountStat('add_new_user_test', zerver_count_user_by_realm, {}, 'hour', 'hour') + stat = CountStat('add_new_user_test', zerver_count_user_by_realm, {}, CountStat.HOUR, False) # add new users to realm in last hour self.create_user('email1') @@ -183,38 +182,10 @@ class TestProcessCountStat(AnalyticsTestCase): self.assertCountEquals(RealmCount, 'add_new_user_test', 2) - def test_do_aggregate(self): - # type: () -> None - - # write some entries to analytics.usercount with smallest interval as day - stat = CountStat('test_messages_aggregate', zerver_count_message_by_user, {}, 'day', 'hour') - - # write some messages - user1 = self.create_user('email1') - user2 = self.create_user('email2') - recipient = Recipient.objects.create(type_id=user2.id, type=Recipient.PERSONAL) - - self.create_message(user1, recipient) - self.create_message(user1, recipient) - self.create_message(user1, recipient) - - # run command - do_fill_count_stat_at_hour(stat, self.TIME_ZERO) - - # check no rows for hour interval on usercount granularity - self.assertFalse(UserCount.objects.filter(realm=self.default_realm, interval='hour').exists()) - - # see if aggregated correctly to realmcount and installationcount - self.assertCountEquals(RealmCount, 'test_messages_aggregate', 3, interval = 'day') - - self.assertEquals(InstallationCount.objects.filter(interval='day', - property='test_messages_aggregate') \ - .values_list('value', flat=True)[0], 3) - def test_count_before_realm_creation(self): # type: () -> None stat = CountStat('test_active_humans', zerver_count_user_by_realm, - {'is_bot': False, 'is_active': True}, 'hour', 'hour') + {'is_bot': False, 'is_active': True}, CountStat.HOUR, False) realm = Realm.objects.create(domain='domain', name='name', date_created=self.TIME_ZERO) self.create_user('email', realm=realm) @@ -227,7 +198,7 @@ class TestProcessCountStat(AnalyticsTestCase): # type: () -> None # test that rows with empty counts are returned if realm exists stat = CountStat('test_active_humans', zerver_count_user_by_realm, - {'is_bot': False, 'is_active': True}, 'hour', 'hour') + {'is_bot': False, 'is_active': True}, CountStat.HOUR, False) do_fill_count_stat_at_hour(stat, self.TIME_ZERO) self.assertCountEquals(RealmCount, 'test_active_humans', 0) @@ -245,7 +216,7 @@ class TestAggregates(AnalyticsTestCase): class TestXByYQueries(AnalyticsTestCase): def test_message_to_stream_aggregation(self): # type: () -> None - stat = CountStat('test_messages_to_stream', zerver_count_message_by_stream, {}, 'hour', 'hour') + stat = CountStat('test_messages_to_stream', zerver_count_message_by_stream, {}, CountStat.HOUR, False) # write some messages user = self.create_user('email') @@ -266,9 +237,9 @@ class TestCountStats(AnalyticsTestCase): # type: () -> None stats = [ CountStat('test_active_humans', zerver_count_user_by_realm, {'is_bot': False, 'is_active': True}, - 'hour', 'hour'), + CountStat.HOUR, False), CountStat('test_active_bots', zerver_count_user_by_realm, {'is_bot': True, 'is_active': True}, - 'hour', 'hour')] + CountStat.HOUR, False)] # TODO these dates should probably be explicit, since the default args for the commands are timezone.now() dependent. self.create_user('email1-bot', is_bot=True) diff --git a/analytics/tests/test_interval.py b/analytics/tests/test_interval.py deleted file mode 100644 index f7d1249fd5..0000000000 --- a/analytics/tests/test_interval.py +++ /dev/null @@ -1,18 +0,0 @@ -from django.test import TestCase -from django.utils import timezone - -from analytics.lib.interval import TimeInterval, floor_to_interval_boundary, subtract_interval - -from datetime import datetime, timedelta - -class TimeIntervalTest(TestCase): - def test_time_interval_creation(self): - # type: () -> None - time_interval = TimeInterval('day', datetime(2016, 4, 29, 3, 14, 15, 926535).replace(tzinfo=timezone.utc)) - self.assertEqual(time_interval.start, datetime(2016, 4, 28, 3, 0, 0).replace(tzinfo=timezone.utc)) - self.assertEqual(time_interval.end, datetime(2016, 4, 29, 3, 0, 0).replace(tzinfo=timezone.utc)) - - def test_datetime_leap_second(self): - # type: () -> None - after_leap = datetime(2015, 7, 1) - self.assertEqual(subtract_interval(after_leap, 'hour'), datetime(2015, 6, 30, 23)) diff --git a/zerver/lib/timestamp.py b/zerver/lib/timestamp.py index 1cb029f9ce..9416a79272 100644 --- a/zerver/lib/timestamp.py +++ b/zerver/lib/timestamp.py @@ -8,6 +8,17 @@ def is_timezone_aware(datetime_object): # type: (datetime.datetime) -> bool return datetime_object.tzinfo is not None +def datetime_to_UTC(datetime_object): + # type: (datetime.datetime) -> datetime.datetime + if is_timezone_aware(datetime_object): + return datetime_object.astimezone(utc) + return datetime_object.replace(tzinfo=utc) + +def floor_to_day(datetime_object): + # type: (datetime.datetime) -> datetime.datetime + return datetime.datetime(*datetime_object.timetuple()[:3]) \ + .replace(tzinfo=datetime_object.tzinfo) + def timestamp_to_datetime(timestamp): # type: (float) -> datetime.datetime return datetime.datetime.utcfromtimestamp(float(timestamp)).replace(tzinfo=utc)