zilencer: Log, and drop, duplicated rows from remote servers.

This may happen if there are multiple servers with the same UUID
submitting data (e.g. if they were cloned after initial creation), or
if there is one server, but `./manage.py clear_analytics_tables` was
used to truncate the analytics tables.

In the case of `clear_analytics_tables`, the data submitted likely has
identical historical values with new remote `id` values; preserving
the originally-submitted contemporaneous data is the best option.  For
the case of submissions from multiple servers, there is no completely
sensible outcome, so the best we can do is detect the case and move
on.

Since we have a lock on the RemoteZulipServer, we know that no other
inserts are happening, so counting before and after will return the
true number of rows inserted (which `bulk_create` cannot do in the
face of `ignore_conflicts`[^1]).  We compare this to the expected
number of new inserted rows to detect dropped duplicates.

[^1]: See https://code.djangoproject.com/ticket/30138.
This commit is contained in:
Alex Vandiver 2023-11-21 16:59:13 +00:00 committed by Tim Abbott
parent c6ae3e7242
commit dd954749be
2 changed files with 36 additions and 37 deletions

View File

@ -1201,30 +1201,23 @@ class AnalyticsBouncerTest(BouncerTestCase):
) )
self.assert_json_error(result, "Data is out of order.") self.assert_json_error(result, "Data is out of order.")
with mock.patch("zilencer.views.validate_incoming_table_data"), self.assertLogs( # Adjust the id of all existing rows so that they get re-sent.
level="WARNING" # This is equivalent to running `./manage.py clear_analytics_tables`
) as warn_log: RealmCount.objects.all().update(id=F("id") + RealmCount.objects.latest("id").id)
# We need to wrap a transaction here to avoid the InstallationCount.objects.all().update(
# IntegrityError that will be thrown in here from breaking id=F("id") + InstallationCount.objects.latest("id").id
# the unittest transaction. )
with transaction.atomic(): with self.assertLogs(level="WARNING") as warn_log:
result = self.uuid_post( send_analytics_to_push_bouncer()
self.server_uuid, self.assertEqual(
"/api/v1/remotes/server/analytics", warn_log.output,
{ [
"realm_counts": orjson.dumps(realm_count_data).decode(), f"WARNING:root:Dropped 3 duplicated rows while saving 3 rows of zilencer_remoterealmcount for server demo.example.com/{self.server_uuid}",
"installation_counts": orjson.dumps(installation_count_data).decode(), f"WARNING:root:Dropped 2 duplicated rows while saving 2 rows of zilencer_remoteinstallationcount for server demo.example.com/{self.server_uuid}",
"realmauditlog_rows": orjson.dumps(realmauditlog_data).decode(), ],
}, )
subdomain="", # Only the request counts go up -- all of the other rows' duplicates are dropped
) check_counts(10, 8, 3, 2, 5)
self.assert_json_error(result, "Invalid data.")
self.assertEqual(
warn_log.output,
[
"WARNING:root:Invalid data saving zilencer_remoteinstallationcount for server demo.example.com/6cde5f7a-1f7e-4978-9716-49f69ebfc9fe"
],
)
@override_settings(PUSH_NOTIFICATION_BOUNCER_URL="https://push.zulip.org.example.com") @override_settings(PUSH_NOTIFICATION_BOUNCER_URL="https://push.zulip.org.example.com")
@responses.activate @responses.activate

View File

@ -504,19 +504,25 @@ def batch_create_table_data(
model: Type[ModelT], model: Type[ModelT],
row_objects: List[ModelT], row_objects: List[ModelT],
) -> None: ) -> None:
BATCH_SIZE = 1000 # We ignore previously-existing data, in case it was truncated and
while len(row_objects) > 0: # re-created on the remote server. `ignore_concflicts=True`
try: # cannot return the ids, or count thereof, of the new inserts,
model._default_manager.bulk_create(row_objects[:BATCH_SIZE]) # (see https://code.djangoproject.com/ticket/0138) so we rely on
except IntegrityError: # having a lock to accurately count them before and after. This
logging.warning( # query is also well-indexed.
"Invalid data saving %s for server %s/%s", before_count = model._default_manager.filter(server=server).count()
model._meta.db_table, model._default_manager.bulk_create(row_objects, batch_size=1000, ignore_conflicts=True)
server.hostname, after_count = model._default_manager.filter(server=server).count()
server.uuid, inserted_count = after_count - before_count
) if inserted_count < len(row_objects):
raise JsonableError(_("Invalid data.")) logging.warning(
row_objects = row_objects[BATCH_SIZE:] "Dropped %d duplicated rows while saving %d rows of %s for server %s/%s",
len(row_objects) - inserted_count,
len(row_objects),
model._meta.db_table,
server.hostname,
server.uuid,
)
def update_remote_realm_data_for_server( def update_remote_realm_data_for_server(