mirror of https://github.com/zulip/zulip.git
migrate: Add do_batch_update method for running batch updates.
* Refactor pgroonga_0002 migration to use new method.
This commit is contained in:
parent
eddcec5e86
commit
e95f972e24
|
@ -3,19 +3,13 @@ from django.db import models, migrations, connection
|
||||||
from django.contrib.postgres import operations
|
from django.contrib.postgres import operations
|
||||||
from django.db.backends.postgresql_psycopg2.schema import DatabaseSchemaEditor
|
from django.db.backends.postgresql_psycopg2.schema import DatabaseSchemaEditor
|
||||||
from django.db.migrations.state import StateApps
|
from django.db.migrations.state import StateApps
|
||||||
|
from zerver.lib.migrate import do_batch_update
|
||||||
|
|
||||||
def rebuild_pgroonga_index(apps: StateApps, schema_editor: DatabaseSchemaEditor) -> None:
|
def rebuild_pgroonga_index(apps: StateApps, schema_editor: DatabaseSchemaEditor) -> None:
|
||||||
BATCH_SIZE = 10000
|
|
||||||
|
|
||||||
Message = apps.get_model("zerver", "Message")
|
|
||||||
message_ids = Message.objects.values_list('id', flat=True)
|
|
||||||
with connection.cursor() as cursor:
|
with connection.cursor() as cursor:
|
||||||
for i in range(0, len(message_ids), BATCH_SIZE):
|
do_batch_update(cursor, 'zerver_message', ['search_pgroonga'],
|
||||||
batch_ids = ', '.join(str(id) for id in message_ids[i:i+BATCH_SIZE])
|
["escape_html(subject) || ' ' || rendered_content"],
|
||||||
cursor.execute("UPDATE zerver_message SET "
|
escape=False, batch_size=10000)
|
||||||
"search_pgroonga = "
|
|
||||||
"escape_html(subject) || ' ' || rendered_content "
|
|
||||||
"WHERE id IN (%s)" % (batch_ids,))
|
|
||||||
|
|
||||||
class Migration(migrations.Migration):
|
class Migration(migrations.Migration):
|
||||||
atomic = False
|
atomic = False
|
||||||
|
|
|
@ -449,6 +449,7 @@ def build_custom_checkers(by_lang):
|
||||||
'zerver/migrations/0041_create_attachments_for_old_messages.py',
|
'zerver/migrations/0041_create_attachments_for_old_messages.py',
|
||||||
'zerver/migrations/0060_move_avatars_to_be_uid_based.py',
|
'zerver/migrations/0060_move_avatars_to_be_uid_based.py',
|
||||||
'zerver/migrations/0104_fix_unreads.py',
|
'zerver/migrations/0104_fix_unreads.py',
|
||||||
|
'pgroonga/migrations/0002_html_escape_subject.py',
|
||||||
]),
|
]),
|
||||||
'description': "Don't import models or other code in migrations; see docs/subsystems/schema-migrations.md",
|
'description': "Don't import models or other code in migrations; see docs/subsystems/schema-migrations.md",
|
||||||
},
|
},
|
||||||
|
|
|
@ -1,8 +1,12 @@
|
||||||
from typing import Any, Callable, Dict, List, Tuple
|
|
||||||
from django.db.models.query import QuerySet
|
from django.db.models.query import QuerySet
|
||||||
|
from psycopg2.extensions import cursor
|
||||||
|
from typing import Any, Callable, Dict, List, Tuple, TypeVar
|
||||||
|
|
||||||
import re
|
import re
|
||||||
import time
|
import time
|
||||||
|
|
||||||
|
CursorObj = TypeVar('CursorObj', bound=cursor)
|
||||||
|
|
||||||
def create_index_if_not_exist(index_name: str, table_name: str, column_string: str,
|
def create_index_if_not_exist(index_name: str, table_name: str, column_string: str,
|
||||||
where_clause: str) -> str:
|
where_clause: str) -> str:
|
||||||
#
|
#
|
||||||
|
@ -25,3 +29,43 @@ def create_index_if_not_exist(index_name: str, table_name: str, column_string: s
|
||||||
END$$;
|
END$$;
|
||||||
''' % (index_name, index_name, table_name, column_string, where_clause)
|
''' % (index_name, index_name, table_name, column_string, where_clause)
|
||||||
return stmt
|
return stmt
|
||||||
|
|
||||||
|
|
||||||
|
def do_batch_update(cursor: CursorObj,
|
||||||
|
table: str,
|
||||||
|
cols: List[str],
|
||||||
|
vals: List[str],
|
||||||
|
batch_size: int=10000,
|
||||||
|
sleep: float=0.1,
|
||||||
|
escape: bool=True) -> None: # nocoverage
|
||||||
|
stmt = '''
|
||||||
|
UPDATE %s
|
||||||
|
SET (%s) = (%s)
|
||||||
|
WHERE id >= %%s AND id < %%s
|
||||||
|
''' % (table, ', '.join(cols), ', '.join(['%s'] * len(cols)))
|
||||||
|
|
||||||
|
cursor.execute("SELECT MIN(id), MAX(id) FROM %s" % (table,))
|
||||||
|
(min_id, max_id) = cursor.fetchall()[0]
|
||||||
|
if min_id is None:
|
||||||
|
return
|
||||||
|
|
||||||
|
print("\n Range of rows to update: [%s, %s]" % (min_id, max_id))
|
||||||
|
while min_id <= max_id:
|
||||||
|
lower = min_id
|
||||||
|
upper = min_id + batch_size
|
||||||
|
print(' Updating range [%s,%s)' % (lower, upper))
|
||||||
|
params = list(vals) + [lower, upper]
|
||||||
|
if escape:
|
||||||
|
cursor.execute(stmt, params=params)
|
||||||
|
else:
|
||||||
|
cursor.execute(stmt % tuple(params))
|
||||||
|
|
||||||
|
min_id = upper
|
||||||
|
time.sleep(sleep)
|
||||||
|
|
||||||
|
# Once we've finished, check if any new rows were inserted to the table
|
||||||
|
if min_id > max_id:
|
||||||
|
cursor.execute("SELECT MAX(id) FROM %s" % (table,))
|
||||||
|
max_id = cursor.fetchall()[0][0]
|
||||||
|
|
||||||
|
print(" Finishing...", end='')
|
||||||
|
|
Loading…
Reference in New Issue