From dd954749bea1814d5664981f78033cb357b0eb8e Mon Sep 17 00:00:00 2001 From: Alex Vandiver Date: Tue, 21 Nov 2023 16:59:13 +0000 Subject: [PATCH] zilencer: Log, and drop, duplicated rows from remote servers. This may happen if there are multiple servers with the same UUID submitting data (e.g. if they were cloned after initial creation), or if there is one server, but `./manage.py clear_analytics_tables` was used to truncate the analytics tables. In the case of `clear_analytics_tables`, the data submitted likely has identical historical values with new remote `id` values; preserving the originally-submitted contemporaneous data is the best option. For the case of submissions from multiple servers, there is no completely sensible outcome, so the best we can do is detect the case and move on. Since we have a lock on the RemoteZulipServer, we know that no other inserts are happening, so counting before and after will return the true number of rows inserted (which `bulk_create` cannot do in the face of `ignore_conflicts`[^1]). We compare this to the expected number of new inserted rows to detect dropped duplicates. [^1]: See https://code.djangoproject.com/ticket/30138. --- zerver/tests/test_push_notifications.py | 41 ++++++++++--------------- zilencer/views.py | 32 +++++++++++-------- 2 files changed, 36 insertions(+), 37 deletions(-) diff --git a/zerver/tests/test_push_notifications.py b/zerver/tests/test_push_notifications.py index eefe121c7e..386ba4b73a 100644 --- a/zerver/tests/test_push_notifications.py +++ b/zerver/tests/test_push_notifications.py @@ -1201,30 +1201,23 @@ class AnalyticsBouncerTest(BouncerTestCase): ) self.assert_json_error(result, "Data is out of order.") - with mock.patch("zilencer.views.validate_incoming_table_data"), self.assertLogs( - level="WARNING" - ) as warn_log: - # We need to wrap a transaction here to avoid the - # IntegrityError that will be thrown in here from breaking - # the unittest transaction. - with transaction.atomic(): - result = self.uuid_post( - self.server_uuid, - "/api/v1/remotes/server/analytics", - { - "realm_counts": orjson.dumps(realm_count_data).decode(), - "installation_counts": orjson.dumps(installation_count_data).decode(), - "realmauditlog_rows": orjson.dumps(realmauditlog_data).decode(), - }, - subdomain="", - ) - self.assert_json_error(result, "Invalid data.") - self.assertEqual( - warn_log.output, - [ - "WARNING:root:Invalid data saving zilencer_remoteinstallationcount for server demo.example.com/6cde5f7a-1f7e-4978-9716-49f69ebfc9fe" - ], - ) + # Adjust the id of all existing rows so that they get re-sent. + # This is equivalent to running `./manage.py clear_analytics_tables` + RealmCount.objects.all().update(id=F("id") + RealmCount.objects.latest("id").id) + InstallationCount.objects.all().update( + id=F("id") + InstallationCount.objects.latest("id").id + ) + with self.assertLogs(level="WARNING") as warn_log: + send_analytics_to_push_bouncer() + self.assertEqual( + warn_log.output, + [ + f"WARNING:root:Dropped 3 duplicated rows while saving 3 rows of zilencer_remoterealmcount for server demo.example.com/{self.server_uuid}", + f"WARNING:root:Dropped 2 duplicated rows while saving 2 rows of zilencer_remoteinstallationcount for server demo.example.com/{self.server_uuid}", + ], + ) + # Only the request counts go up -- all of the other rows' duplicates are dropped + check_counts(10, 8, 3, 2, 5) @override_settings(PUSH_NOTIFICATION_BOUNCER_URL="https://push.zulip.org.example.com") @responses.activate diff --git a/zilencer/views.py b/zilencer/views.py index 0b7f5be0a9..8a6ddd1efd 100644 --- a/zilencer/views.py +++ b/zilencer/views.py @@ -504,19 +504,25 @@ def batch_create_table_data( model: Type[ModelT], row_objects: List[ModelT], ) -> None: - BATCH_SIZE = 1000 - while len(row_objects) > 0: - try: - model._default_manager.bulk_create(row_objects[:BATCH_SIZE]) - except IntegrityError: - logging.warning( - "Invalid data saving %s for server %s/%s", - model._meta.db_table, - server.hostname, - server.uuid, - ) - raise JsonableError(_("Invalid data.")) - row_objects = row_objects[BATCH_SIZE:] + # We ignore previously-existing data, in case it was truncated and + # re-created on the remote server. `ignore_concflicts=True` + # cannot return the ids, or count thereof, of the new inserts, + # (see https://code.djangoproject.com/ticket/0138) so we rely on + # having a lock to accurately count them before and after. This + # query is also well-indexed. + before_count = model._default_manager.filter(server=server).count() + model._default_manager.bulk_create(row_objects, batch_size=1000, ignore_conflicts=True) + after_count = model._default_manager.filter(server=server).count() + inserted_count = after_count - before_count + if inserted_count < len(row_objects): + logging.warning( + "Dropped %d duplicated rows while saving %d rows of %s for server %s/%s", + len(row_objects) - inserted_count, + len(row_objects), + model._meta.db_table, + server.hostname, + server.uuid, + ) def update_remote_realm_data_for_server(