zulip/zerver/data_import/mattermost.py

734 lines
27 KiB
Python
Raw Normal View History

"""
spec:
https://docs.mattermost.com/administration/bulk-export.html
"""
import os
import logging
import subprocess
import ujson
import re
import shutil
from typing import Any, Callable, Dict, List, Set
from django.conf import settings
from django.utils.timezone import now as timezone_now
from django.forms.models import model_to_dict
from zerver.models import Recipient, RealmEmoji, Reaction
from zerver.lib.utils import (
process_list_in_batches,
)
from zerver.lib.emoji import NAME_TO_CODEPOINT_PATH
from zerver.data_import.import_util import ZerverFieldsT, build_zerver_realm, \
build_stream, build_realm, build_message, create_converted_data_files, \
make_subscriber_map, build_recipients, build_user_profile, \
build_stream_subscriptions, build_personal_subscriptions, SubscriberHandler, \
build_realm_emoji, make_user_messages
from zerver.data_import.mattermost_user import UserHandler
from zerver.data_import.sequencer import NEXT_ID, IdMapper
def make_realm(realm_id: int, team: Dict[str, Any]) -> ZerverFieldsT:
# set correct realm details
NOW = float(timezone_now().timestamp())
domain_name = settings.EXTERNAL_HOST
realm_subdomain = team["name"]
zerver_realm = build_zerver_realm(realm_id, realm_subdomain, NOW, 'Mattermost')
realm = build_realm(zerver_realm, realm_id, domain_name)
# We may override these later.
realm['zerver_defaultstream'] = []
return realm
def process_user(user_dict: Dict[str, Any], realm_id: int, team_name: str,
user_id_mapper: IdMapper) -> ZerverFieldsT:
def is_team_admin(user_dict: Dict[str, Any]) -> bool:
if user_dict["teams"] is None:
return False
for team in user_dict["teams"]:
if team["name"] == team_name and "team_admin" in team["roles"]:
return True
return False
def get_full_name(user_dict: Dict[str, Any]) -> str:
full_name = "{} {}".format(user_dict["first_name"], user_dict["last_name"])
if full_name.strip():
return full_name
return user_dict['username']
avatar_source = 'G'
full_name = get_full_name(user_dict)
id = user_id_mapper.get(user_dict['username'])
delivery_email = user_dict['email']
email = user_dict['email']
is_realm_admin = is_team_admin(user_dict)
is_guest = False
short_name = user_dict['username']
date_joined = int(timezone_now().timestamp())
timezone = 'UTC'
if user_dict["is_mirror_dummy"]:
is_active = False
is_mirror_dummy = True
else:
is_active = True
is_mirror_dummy = False
return build_user_profile(
avatar_source=avatar_source,
date_joined=date_joined,
delivery_email=delivery_email,
email=email,
full_name=full_name,
id=id,
is_active=is_active,
is_realm_admin=is_realm_admin,
is_guest=is_guest,
is_mirror_dummy=is_mirror_dummy,
realm_id=realm_id,
short_name=short_name,
timezone=timezone,
)
def convert_user_data(user_handler: UserHandler,
user_id_mapper: IdMapper,
user_data_map: Dict[str, Dict[str, Any]],
realm_id: int,
team_name: str) -> None:
user_data_list = []
for username in user_data_map:
user = user_data_map[username]
if check_user_in_team(user, team_name) or user["is_mirror_dummy"]:
user_data_list.append(user)
for raw_item in user_data_list:
user = process_user(raw_item, realm_id, team_name, user_id_mapper)
user_handler.add_user(user)
def convert_channel_data(channel_data: List[ZerverFieldsT],
user_data_map: Dict[str, Dict[str, Any]],
subscriber_handler: SubscriberHandler,
stream_id_mapper: IdMapper,
user_id_mapper: IdMapper,
realm_id: int,
team_name: str) -> List[ZerverFieldsT]:
channel_data_list = [
d
for d in channel_data
if d['team'] == team_name
]
channel_members_map = {} # type: Dict[str, List[str]]
channel_admins_map = {} # type: Dict[str, List[str]]
def initialize_stream_membership_dicts() -> None:
for channel in channel_data:
channel_name = channel["name"]
channel_members_map[channel_name] = []
channel_admins_map[channel_name] = []
for username in user_data_map:
user_dict = user_data_map[username]
teams = user_dict["teams"]
if user_dict["teams"] is None:
continue
for team in teams:
if team["name"] != team_name:
continue
for channel in team["channels"]:
channel_roles = channel["roles"]
channel_name = channel["name"]
if "channel_admin" in channel_roles:
channel_admins_map[channel_name].append(username)
elif "channel_user" in channel_roles:
channel_members_map[channel_name].append(username)
def get_invite_only_value_from_channel_type(channel_type: str) -> bool:
# Channel can have two types in Mattermost
# "O" for a public channel.
# "P" for a private channel.
if channel_type == 'O':
return False
elif channel_type == 'P':
return True
else: # nocoverage
raise Exception('unexpected value')
streams = []
initialize_stream_membership_dicts()
for channel_dict in channel_data_list:
now = int(timezone_now().timestamp())
stream_id = stream_id_mapper.get(channel_dict['name'])
stream_name = channel_dict["name"]
invite_only = get_invite_only_value_from_channel_type(channel_dict['type'])
stream = build_stream(
date_created=now,
realm_id=realm_id,
name=channel_dict['display_name'],
# Purpose describes how the channel should be used. It is similar to
# stream description and is shown in channel list to help others decide
# whether to join.
# Header text always appears right next to channel name in channel header.
# Can be used for advertising the purpose of stream, making announcements as
# well as including frequently used links. So probably not a bad idea to use
# this as description if the channel purpose is empty.
description=channel_dict["purpose"] or channel_dict['header'],
stream_id=stream_id,
# Mattermost export don't include data of archived(~ deactivated) channels.
deactivated=False,
invite_only=invite_only,
)
channel_users = set()
for username in channel_admins_map[stream_name]:
channel_users.add(user_id_mapper.get(username))
for username in channel_members_map[stream_name]:
channel_users.add(user_id_mapper.get(username))
if channel_users:
subscriber_handler.set_info(
stream_id=stream_id,
users=channel_users,
)
streams.append(stream)
return streams
def get_name_to_codepoint_dict() -> Dict[str, str]:
with open(NAME_TO_CODEPOINT_PATH) as fp:
return ujson.load(fp)
def build_reactions(realm_id: int, total_reactions: List[ZerverFieldsT], reactions: List[ZerverFieldsT],
message_id: int, name_to_codepoint: ZerverFieldsT,
user_id_mapper: IdMapper, zerver_realmemoji: List[ZerverFieldsT]) -> None:
realmemoji = {}
for realm_emoji in zerver_realmemoji:
realmemoji[realm_emoji['name']] = realm_emoji['id']
# For the unicode emoji codes, we use equivalent of
# function 'emoji_name_to_emoji_code' in 'zerver/lib/emoji' here
for mattermost_reaction in reactions:
emoji_name = mattermost_reaction['emoji_name']
username = mattermost_reaction["user"]
# Check in unicode emoji
if emoji_name in name_to_codepoint:
emoji_code = name_to_codepoint[emoji_name]
reaction_type = Reaction.UNICODE_EMOJI
# Check in realm emoji
elif emoji_name in realmemoji:
emoji_code = realmemoji[emoji_name]
reaction_type = Reaction.REALM_EMOJI
else: # nocoverage
continue
if not user_id_mapper.has(username):
continue
reaction_id = NEXT_ID('reaction')
reaction = Reaction(
id=reaction_id,
emoji_code=emoji_code,
emoji_name=emoji_name,
reaction_type=reaction_type)
reaction_dict = model_to_dict(reaction, exclude=['message', 'user_profile'])
reaction_dict['message'] = message_id
reaction_dict['user_profile'] = user_id_mapper.get(username)
total_reactions.append(reaction_dict)
def get_mentioned_user_ids(raw_message: Dict[str, Any], user_id_mapper: IdMapper) -> Set[int]:
user_ids = set()
content = raw_message["content"]
# usernames can be of the form user.name, user_name, username., username_, user.name_ etc
matches = re.findall("(?<=^|(?<=[^a-zA-Z0-9-_.]))@(([A-Za-z0-9]+[_.]?)+)", content)
for match in matches:
possible_username = match[0]
if user_id_mapper.has(possible_username):
user_ids.add(user_id_mapper.get(possible_username))
return user_ids
def process_raw_message_batch(realm_id: int,
raw_messages: List[Dict[str, Any]],
subscriber_map: Dict[int, Set[int]],
user_id_mapper: IdMapper,
user_handler: UserHandler,
get_recipient_id_from_receiver_name: Callable[[str, int], int],
is_pm_data: bool,
output_dir: str,
zerver_realmemoji: List[Dict[str, Any]],
total_reactions: List[Dict[str, Any]],
) -> None:
def fix_mentions(content: str, mention_user_ids: Set[int]) -> str:
for user_id in mention_user_ids:
user = user_handler.get_user(user_id=user_id)
mattermost_mention = '@{short_name}'.format(**user)
zulip_mention = '@**{full_name}**'.format(**user)
content = content.replace(mattermost_mention, zulip_mention)
content = content.replace('@channel', '@**all**')
content = content.replace('@all', '@**all**')
# We don't have an equivalent for Mattermost's @here mention which mentions all users
# online in the channel.
content = content.replace('@here', '@**all**')
return content
mention_map = dict() # type: Dict[int, Set[int]]
zerver_message = []
import html2text
h = html2text.HTML2Text()
name_to_codepoint = get_name_to_codepoint_dict()
for raw_message in raw_messages:
message_id = NEXT_ID('message')
mention_user_ids = get_mentioned_user_ids(raw_message, user_id_mapper)
mention_map[message_id] = mention_user_ids
content = fix_mentions(
content=raw_message['content'],
mention_user_ids=mention_user_ids,
)
content = h.handle(content)
if len(content) > 10000: # nocoverage
logging.info('skipping too-long message of length %s' % (len(content),))
continue
pub_date = raw_message['pub_date']
sender_user_id = raw_message['sender_id']
try:
recipient_id = get_recipient_id_from_receiver_name(raw_message["receiver_id"], Recipient.STREAM)
except KeyError:
logging.debug("Could not find recipient_id for a message, skipping.")
continue
rendered_content = None
topic_name = 'imported from mattermost'
message = build_message(
content=content,
message_id=message_id,
pub_date=pub_date,
recipient_id=recipient_id,
rendered_content=rendered_content,
topic_name=topic_name,
user_id=sender_user_id,
has_attachment=False,
)
zerver_message.append(message)
build_reactions(realm_id, total_reactions, raw_message["reactions"], message_id,
name_to_codepoint, user_id_mapper, zerver_realmemoji)
zerver_usermessage = make_user_messages(
zerver_message=zerver_message,
subscriber_map=subscriber_map,
is_pm_data=is_pm_data,
mention_map=mention_map,
)
message_json = dict(
zerver_message=zerver_message,
zerver_usermessage=zerver_usermessage,
)
dump_file_id = NEXT_ID('dump_file_id' + str(realm_id))
message_file = "/messages-%06d.json" % (dump_file_id,)
create_converted_data_files(message_json, output_dir, message_file)
def process_posts(num_teams: int,
team_name: str,
realm_id: int,
post_data: List[Dict[str, Any]],
get_recipient_id_from_receiver_name: Callable[[str, int], int],
subscriber_map: Dict[int, Set[int]],
output_dir: str,
is_pm_data: bool,
masking_content: bool,
user_id_mapper: IdMapper,
user_handler: UserHandler,
username_to_user: Dict[str, Dict[str, Any]],
zerver_realmemoji: List[Dict[str, Any]],
total_reactions: List[Dict[str, Any]]) -> None:
post_data_list = []
for post in post_data:
if "team" not in post:
# Mattermost doesn't specify a team for private messages
# in its export format. This line of code requires that
# we only be importing data from a single team (checked
# elsewhere) -- we just assume it's the target team.
post_team = team_name
else:
post_team = post["team"]
if post_team == team_name:
post_data_list.append(post)
def message_to_dict(post_dict: Dict[str, Any]) -> Dict[str, Any]:
sender_id = user_id_mapper.get(post_dict["user"])
content = post_dict['message']
if masking_content:
content = re.sub('[a-z]', 'x', content)
content = re.sub('[A-Z]', 'X', content)
if "reactions" in post_dict:
reactions = post_dict["reactions"] or []
else:
reactions = []
return dict(
sender_id=sender_id,
receiver_id=post_dict["channel"],
content=content,
pub_date=int(post_dict['create_at'] / 1000),
reactions=reactions
)
raw_messages = []
for post_dict in post_data_list:
raw_messages.append(message_to_dict(post_dict))
message_replies = post_dict["replies"]
# Replies to a message in Mattermost are stored in the main message object.
# For now, we just append the replies immediately after the original message.
if message_replies is not None:
for reply in message_replies:
reply["channel"] = post_dict["channel"]
raw_messages.append(message_to_dict(reply))
def process_batch(lst: List[Dict[str, Any]]) -> None:
process_raw_message_batch(
realm_id=realm_id,
raw_messages=lst,
subscriber_map=subscriber_map,
user_id_mapper=user_id_mapper,
user_handler=user_handler,
get_recipient_id_from_receiver_name=get_recipient_id_from_receiver_name,
is_pm_data=is_pm_data,
output_dir=output_dir,
zerver_realmemoji=zerver_realmemoji,
total_reactions=total_reactions,
)
chunk_size = 1000
process_list_in_batches(
lst=raw_messages,
chunk_size=chunk_size,
process_batch=process_batch,
)
def write_message_data(num_teams: int,
team_name: str,
realm_id: int,
post_data: List[Dict[str, Any]],
zerver_recipient: List[ZerverFieldsT],
subscriber_map: Dict[int, Set[int]],
output_dir: str,
masking_content: bool,
stream_id_mapper: IdMapper,
user_id_mapper: IdMapper,
user_handler: UserHandler,
username_to_user: Dict[str, Dict[str, Any]],
zerver_realmemoji: List[Dict[str, Any]],
total_reactions: List[Dict[str, Any]]) -> None:
stream_id_to_recipient_id = {}
for d in zerver_recipient:
if d['type'] == Recipient.STREAM:
stream_id_to_recipient_id[d['type_id']] = d['id']
def get_recipient_id_from_receiver_name(receiver_name: str, recipient_type: int) -> int:
if recipient_type == Recipient.STREAM:
receiver_id = stream_id_mapper.get(receiver_name)
recipient_id = stream_id_to_recipient_id[receiver_id]
return recipient_id
process_posts(
num_teams=num_teams,
team_name=team_name,
realm_id=realm_id,
post_data=post_data,
get_recipient_id_from_receiver_name=get_recipient_id_from_receiver_name,
subscriber_map=subscriber_map,
output_dir=output_dir,
is_pm_data=False,
masking_content=masking_content,
user_id_mapper=user_id_mapper,
user_handler=user_handler,
username_to_user=username_to_user,
zerver_realmemoji=zerver_realmemoji,
total_reactions=total_reactions,
)
def write_emoticon_data(realm_id: int,
custom_emoji_data: List[Dict[str, Any]],
data_dir: str,
output_dir: str) -> List[ZerverFieldsT]:
'''
This function does most of the work for processing emoticons, the bulk
of which is copying files. We also write a json file with metadata.
Finally, we return a list of RealmEmoji dicts to our caller.
In our data_dir we have a pretty simple setup:
The exported JSON file will have emoji rows if it contains any custom emoji
{
"type": "emoji",
"emoji": {"name": "peerdium", "image": "exported_emoji/h15ni7kf1bnj7jeua4qhmctsdo/image"}
}
{
"type": "emoji",
"emoji": {"name": "tick", "image": "exported_emoji/7u7x8ytgp78q8jir81o9ejwwnr/image"}
}
exported_emoji/ - contains a bunch of image files:
exported_emoji/7u7x8ytgp78q8jir81o9ejwwnr/image
exported_emoji/h15ni7kf1bnj7jeua4qhmctsdo/image
We move all the relevant files to Zulip's more nested
directory structure.
'''
logging.info('Starting to process emoticons')
flat_data = [
dict(
path=d['image'],
name=d['name'],
)
for d in custom_emoji_data
]
emoji_folder = os.path.join(output_dir, 'emoji')
os.makedirs(emoji_folder, exist_ok=True)
def process(data: ZerverFieldsT) -> ZerverFieldsT:
source_sub_path = data['path']
source_path = os.path.join(data_dir, source_sub_path)
target_fn = data["name"]
target_sub_path = RealmEmoji.PATH_ID_TEMPLATE.format(
realm_id=realm_id,
emoji_file_name=target_fn,
)
target_path = os.path.join(emoji_folder, target_sub_path)
os.makedirs(os.path.dirname(target_path), exist_ok=True)
source_path = os.path.abspath(source_path)
target_path = os.path.abspath(target_path)
shutil.copyfile(source_path, target_path)
return dict(
path=target_path,
s3_path=target_path,
file_name=target_fn,
realm_id=realm_id,
name=data['name'],
)
emoji_records = list(map(process, flat_data))
create_converted_data_files(emoji_records, output_dir, '/emoji/records.json')
realmemoji = [
build_realm_emoji(
realm_id=realm_id,
name=rec['name'],
id=NEXT_ID('realmemoji'),
file_name=rec['file_name'],
)
for rec in emoji_records
]
logging.info('Done processing emoticons')
return realmemoji
def create_username_to_user_mapping(user_data_list: List[Dict[str, Any]]) -> Dict[str, Dict[str, Any]]:
username_to_user = {}
for user in user_data_list:
username_to_user[user["username"]] = user
return username_to_user
def check_user_in_team(user: Dict[str, Any], team_name: str) -> bool:
if user["teams"] is None:
# This is null for users not on any team
return False
for team in user["teams"]:
if team["name"] == team_name:
return True
return False
def label_mirror_dummy_users(num_teams: int, team_name: str,
mattermost_data: Dict[str, List[Dict[str, Any]]],
username_to_user: Dict[str, Dict[str, Any]]) -> None:
# This function might looks like a great place to label admin users. But
# that won't be fully correct since we are iterating only though posts and
# it covers only users that has sent atleast one message.
for post in mattermost_data["post"]:
if "team" not in post:
if num_teams > 1:
raise AssertionError("Cannot import private messages in export with multiple teams.")
elif num_teams == 0:
raise AssertionError("Export with 0 teams")
else:
post_team = team_name
else:
post_team = post["team"]
if post_team == team_name:
user = username_to_user[post["user"]]
if not check_user_in_team(user, team_name):
user["is_mirror_dummy"] = True
def reset_mirror_dummy_users(username_to_user: Dict[str, Dict[str, Any]]) -> None:
for username in username_to_user:
user = username_to_user[username]
user["is_mirror_dummy"] = False
def mattermost_data_file_to_dict(mattermost_data_file: str) -> Dict[str, List[Dict[str, Any]]]:
mattermost_data = {} # type: Dict[str, List[Dict[str, Any]]]
mattermost_data["version"] = []
mattermost_data["team"] = []
mattermost_data["channel"] = []
mattermost_data["user"] = []
mattermost_data["post"] = []
mattermost_data["emoji"] = []
with open(mattermost_data_file, "r") as fp:
for line in fp:
row = ujson.loads(line.rstrip("\n"))
data_type = row["type"]
mattermost_data[data_type].append(row[data_type])
return mattermost_data
def do_convert_data(mattermost_data_dir: str, output_dir: str, masking_content: bool) -> None:
username_to_user = {} # type: Dict[str, Dict[str, Any]]
os.makedirs(output_dir, exist_ok=True)
if os.listdir(output_dir): # nocoverage
raise Exception("Output directory should be empty!")
mattermost_data_file = os.path.join(mattermost_data_dir, "export.json")
mattermost_data = mattermost_data_file_to_dict(mattermost_data_file)
username_to_user = create_username_to_user_mapping(mattermost_data["user"])
for team in mattermost_data["team"]:
realm_id = NEXT_ID("realm_id")
team_name = team["name"]
user_handler = UserHandler()
subscriber_handler = SubscriberHandler()
user_id_mapper = IdMapper()
stream_id_mapper = IdMapper()
print("Generating data for", team_name)
realm = make_realm(realm_id, team)
realm_output_dir = os.path.join(output_dir, team_name)
reset_mirror_dummy_users(username_to_user)
label_mirror_dummy_users(len(mattermost_data["team"]), team_name, mattermost_data, username_to_user)
convert_user_data(
user_handler=user_handler,
user_id_mapper=user_id_mapper,
user_data_map=username_to_user,
realm_id=realm_id,
team_name=team_name,
)
zerver_stream = convert_channel_data(
channel_data=mattermost_data["channel"],
user_data_map=username_to_user,
subscriber_handler=subscriber_handler,
stream_id_mapper=stream_id_mapper,
user_id_mapper=user_id_mapper,
realm_id=realm_id,
team_name=team_name,
)
realm['zerver_stream'] = zerver_stream
all_users = user_handler.get_all_users()
zerver_recipient = build_recipients(
zerver_userprofile=all_users,
zerver_stream=zerver_stream,
)
realm['zerver_recipient'] = zerver_recipient
stream_subscriptions = build_stream_subscriptions(
get_users=subscriber_handler.get_users,
zerver_recipient=zerver_recipient,
zerver_stream=zerver_stream,
)
personal_subscriptions = build_personal_subscriptions(
zerver_recipient=zerver_recipient,
)
# Mattermost currently supports only exporting messages from channels.
# Personal messages and huddles are not exported.
zerver_subscription = personal_subscriptions + stream_subscriptions
realm['zerver_subscription'] = zerver_subscription
zerver_realmemoji = write_emoticon_data(
realm_id=realm_id,
custom_emoji_data=mattermost_data["emoji"],
data_dir=mattermost_data_dir,
output_dir=realm_output_dir,
)
realm['zerver_realmemoji'] = zerver_realmemoji
subscriber_map = make_subscriber_map(
zerver_subscription=zerver_subscription,
)
total_reactions = [] # type: List[Dict[str, Any]]
write_message_data(
num_teams=len(mattermost_data["team"]),
team_name=team_name,
realm_id=realm_id,
post_data=mattermost_data["post"],
zerver_recipient=zerver_recipient,
subscriber_map=subscriber_map,
output_dir=realm_output_dir,
masking_content=masking_content,
stream_id_mapper=stream_id_mapper,
user_id_mapper=user_id_mapper,
user_handler=user_handler,
username_to_user=username_to_user,
zerver_realmemoji=zerver_realmemoji,
total_reactions=total_reactions,
)
realm['zerver_reaction'] = total_reactions
realm['zerver_userprofile'] = user_handler.get_all_users()
realm['sort_by_date'] = True
create_converted_data_files(realm, realm_output_dir, '/realm.json')
# Mattermost currently doesn't support exporting avatars
create_converted_data_files([], realm_output_dir, '/avatars/records.json')
# Mattermost currently doesn't support exporting uploads
create_converted_data_files([], realm_output_dir, '/uploads/records.json')
# Mattermost currently doesn't support exporting attachments
attachment = {"zerver_attachment": []} # type: Dict[str, List[Any]]
create_converted_data_files(attachment, realm_output_dir, '/attachment.json')
logging.info('Start making tarball')
subprocess.check_call(["tar", "-czf", realm_output_dir + '.tar.gz', realm_output_dir, '-P'])
logging.info('Done making tarball')