slack import: Eliminate need to load all messages into memory.

This works by yielding messages sorted based on timestamp.  Because
the Slack exports are broken into files by date, it's convenient to do
a 2-layer sorting process, where we open all the files for a given
day, and then sort their messages by timestamp before yielding them.

Fixes #10930.
This commit is contained in:
rht 2018-12-05 03:13:47 +00:00 committed by Tim Abbott
parent 0ff3a7da05
commit e59ff6e6db
2 changed files with 33 additions and 18 deletions

View File

@ -10,11 +10,13 @@ import logging
import random import random
import requests import requests
from collections import defaultdict
from django.conf import settings from django.conf import settings
from django.db import connection from django.db import connection
from django.utils.timezone import now as timezone_now from django.utils.timezone import now as timezone_now
from django.forms.models import model_to_dict from django.forms.models import model_to_dict
from typing import Any, Dict, List, Optional, Tuple, Set from typing import Any, Dict, List, Optional, Tuple, Set, Iterator
from zerver.forms import check_subdomain_available from zerver.forms import check_subdomain_available
from zerver.models import Reaction, RealmEmoji, Realm, UserProfile, Recipient, \ from zerver.models import Reaction, RealmEmoji, Realm, UserProfile, Recipient, \
CustomProfileField, CustomProfileFieldValue CustomProfileField, CustomProfileFieldValue
@ -448,11 +450,7 @@ def convert_slack_workspace_messages(slack_data_dir: str, users: List[ZerverFiel
2. uploads, which is a list of uploads to be mapped in uploads records.json 2. uploads, which is a list of uploads to be mapped in uploads records.json
3. attachment, which is a list of the attachments 3. attachment, which is a list of the attachments
""" """
all_messages = get_all_messages(slack_data_dir, added_channels) all_messages = get_messages_iterator(slack_data_dir, added_channels)
# we sort the messages according to the timestamp to show messages with
# the proper date order
all_messages = sorted(all_messages, key=lambda message: message['ts'])
logging.info('######### IMPORTING MESSAGES STARTED #########\n') logging.info('######### IMPORTING MESSAGES STARTED #########\n')
@ -461,8 +459,6 @@ def convert_slack_workspace_messages(slack_data_dir: str, users: List[ZerverFiel
total_uploads = [] # type: List[ZerverFieldsT] total_uploads = [] # type: List[ZerverFieldsT]
# The messages are stored in batches # The messages are stored in batches
low_index = 0
upper_index = low_index + chunk_size
dump_file_id = 1 dump_file_id = 1
subscriber_map = make_subscriber_map( subscriber_map = make_subscriber_map(
@ -470,9 +466,16 @@ def convert_slack_workspace_messages(slack_data_dir: str, users: List[ZerverFiel
) )
while True: while True:
message_data = all_messages[low_index:upper_index] message_data = []
_counter = 0
for msg in all_messages:
_counter += 1
message_data.append(msg)
if _counter == chunk_size:
break
if len(message_data) == 0: if len(message_data) == 0:
break break
zerver_message, zerver_usermessage, attachment, uploads, reactions = \ zerver_message, zerver_usermessage, attachment, uploads, reactions = \
channel_message_to_zerver_message( channel_message_to_zerver_message(
realm_id, users, added_users, added_recipient, message_data, realm_id, users, added_users, added_recipient, message_data,
@ -491,26 +494,38 @@ def convert_slack_workspace_messages(slack_data_dir: str, users: List[ZerverFiel
total_attachments += attachment total_attachments += attachment
total_uploads += uploads total_uploads += uploads
low_index = upper_index
upper_index = chunk_size + low_index
dump_file_id += 1 dump_file_id += 1
logging.info('######### IMPORTING MESSAGES FINISHED #########\n') logging.info('######### IMPORTING MESSAGES FINISHED #########\n')
return total_reactions, total_uploads, total_attachments return total_reactions, total_uploads, total_attachments
def get_all_messages(slack_data_dir: str, added_channels: AddedChannelsT) -> List[ZerverFieldsT]: def get_messages_iterator(slack_data_dir: str, added_channels: AddedChannelsT) -> Iterator[ZerverFieldsT]:
all_messages = [] # type: List[ZerverFieldsT] """This function is an iterator that returns all the messages across
all Slack channels, in order by timestamp. It's important to
not read all the messages into memory at once, because for
large imports that can OOM kill."""
all_json_names = defaultdict(list) # type: Dict[str, List[str]]
for channel_name in added_channels.keys(): for channel_name in added_channels.keys():
channel_dir = os.path.join(slack_data_dir, channel_name) channel_dir = os.path.join(slack_data_dir, channel_name)
json_names = os.listdir(channel_dir) json_names = os.listdir(channel_dir)
for json_name in json_names: for json_name in json_names:
all_json_names[json_name].append(channel_dir)
# Sort json_name by date
for json_name in sorted(all_json_names.keys()):
messages_for_one_day = [] # type: List[ZerverFieldsT]
for channel_dir in all_json_names[json_name]:
message_dir = os.path.join(channel_dir, json_name) message_dir = os.path.join(channel_dir, json_name)
messages = get_data_file(message_dir) messages = get_data_file(message_dir)
for message in messages: for message in messages:
# To give every message the channel information # To give every message the channel information
message['channel_name'] = channel_name message['channel_name'] = channel_name
all_messages += messages messages_for_one_day += messages
return all_messages
# we sort the messages according to the timestamp to show messages with
# the proper date order
for message in sorted(messages_for_one_day, key=lambda m: m['ts']):
yield message
def channel_message_to_zerver_message(realm_id: int, def channel_message_to_zerver_message(realm_id: int,
users: List[ZerverFieldsT], users: List[ZerverFieldsT],

View File

@ -486,8 +486,8 @@ class SlackImporter(ZulipTestCase):
self.assertEqual(zerver_message[3]['sender'], 24) self.assertEqual(zerver_message[3]['sender'], 24)
@mock.patch("zerver.data_import.slack.channel_message_to_zerver_message") @mock.patch("zerver.data_import.slack.channel_message_to_zerver_message")
@mock.patch("zerver.data_import.slack.get_all_messages") @mock.patch("zerver.data_import.slack.get_messages_iterator")
def test_convert_slack_workspace_messages(self, mock_get_all_messages: mock.Mock, def test_convert_slack_workspace_messages(self, mock_get_messages_iterator: mock.Mock,
mock_message: mock.Mock) -> None: mock_message: mock.Mock) -> None:
os.makedirs('var/test-slack-import', exist_ok=True) os.makedirs('var/test-slack-import', exist_ok=True)
added_channels = {'random': ('c5', 1), 'general': ('c6', 2)} # type: Dict[str, Tuple[str, int]] added_channels = {'random': ('c5', 1), 'general': ('c6', 2)} # type: Dict[str, Tuple[str, int]]
@ -501,7 +501,7 @@ class SlackImporter(ZulipTestCase):
zerver_usermessage = [{'id': 3}, {'id': 5}, {'id': 6}, {'id': 9}] zerver_usermessage = [{'id': 3}, {'id': 5}, {'id': 6}, {'id': 9}]
mock_get_all_messages.side_effect = [zerver_message] mock_get_messages_iterator.side_effect = [iter(zerver_message)]
mock_message.side_effect = [[zerver_message[:1], zerver_usermessage[:2], mock_message.side_effect = [[zerver_message[:1], zerver_usermessage[:2],
attachments, uploads, reactions[:1]], attachments, uploads, reactions[:1]],
[zerver_message[1:2], zerver_usermessage[2:5], [zerver_message[1:2], zerver_usermessage[2:5],