import_realm: Close the memcached connection before forking.

This prevents the memcached connection from being shared across
multiple processes, and hopefully addresses unexpected behavior from
cached functions like get_user_profile_by_id invoked inside the worker
processes.

Signed-off-by: Anders Kaseorg <anders@zulip.com>
This commit is contained in:
Anders Kaseorg 2020-09-30 15:20:02 -07:00 committed by Tim Abbott
parent cf5ededa35
commit 46babbe9e1
2 changed files with 6 additions and 0 deletions

View File

@ -10,6 +10,7 @@ import boto3
import orjson import orjson
from bs4 import BeautifulSoup from bs4 import BeautifulSoup
from django.conf import settings from django.conf import settings
from django.core.cache import cache
from django.db import connection from django.db import connection
from django.db.models import Max from django.db.models import Max
from django.utils.timezone import now as timezone_now from django.utils.timezone import now as timezone_now
@ -759,6 +760,7 @@ def import_uploads(realm: Realm, import_dir: Path, processes: int, processing_av
process_avatars(record) process_avatars(record)
else: else:
connection.close() connection.close()
cache._cache.disconnect_all()
with multiprocessing.Pool(processes) as p: with multiprocessing.Pool(processes) as p:
for out in p.imap_unordered(process_avatars, records): for out in p.imap_unordered(process_avatars, records):
pass pass

View File

@ -4,6 +4,7 @@ import os
from mimetypes import guess_type from mimetypes import guess_type
from django.conf import settings from django.conf import settings
from django.core.cache import cache
from django.db import connection from django.db import connection
from zerver.lib.avatar_hash import user_avatar_path from zerver.lib.avatar_hash import user_avatar_path
@ -35,6 +36,7 @@ def transfer_avatars_to_s3(processes: int) -> None:
_transfer_avatar_to_s3(user) _transfer_avatar_to_s3(user)
else: # nocoverage else: # nocoverage
connection.close() connection.close()
cache._cache.disconnect_all()
with multiprocessing.Pool(processes) as p: with multiprocessing.Pool(processes) as p:
for out in p.imap_unordered(_transfer_avatar_to_s3, users): for out in p.imap_unordered(_transfer_avatar_to_s3, users):
pass pass
@ -56,6 +58,7 @@ def transfer_message_files_to_s3(processes: int) -> None:
_transfer_message_files_to_s3(attachment) _transfer_message_files_to_s3(attachment)
else: # nocoverage else: # nocoverage
connection.close() connection.close()
cache._cache.disconnect_all()
with multiprocessing.Pool(processes) as p: with multiprocessing.Pool(processes) as p:
for out in p.imap_unordered(_transfer_message_files_to_s3, attachments): for out in p.imap_unordered(_transfer_message_files_to_s3, attachments):
pass pass
@ -82,6 +85,7 @@ def transfer_emoji_to_s3(processes: int) -> None:
_transfer_emoji_to_s3(realm_emoji) _transfer_emoji_to_s3(realm_emoji)
else: # nocoverage else: # nocoverage
connection.close() connection.close()
cache._cache.disconnect_all()
with multiprocessing.Pool(processes) as p: with multiprocessing.Pool(processes) as p:
for out in p.imap_unordered(_transfer_emoji_to_s3, realm_emojis): for out in p.imap_unordered(_transfer_emoji_to_s3, realm_emojis):
pass pass