populate_db: Remove replay_old_messages code.

Now that we have the data import/export tool, that's a better
mechanism for importing data than this old and likely buggy code was.
This commit is contained in:
Tim Abbott 2016-11-15 21:04:43 -08:00
parent 6d4075cef9
commit 61816f6151
1 changed files with 7 additions and 476 deletions

View File

@ -5,29 +5,18 @@ from __future__ import print_function
from django.core.management.base import BaseCommand, CommandParser
from django.utils.timezone import now
from django.contrib.sites.models import Site
from zerver.models import Message, UserProfile, Stream, Recipient, Client, \
Subscription, Huddle, get_huddle, Realm, UserMessage, RealmAlias, \
get_huddle_hash, clear_database, get_client, get_user_profile_by_id, \
email_to_domain, email_to_username
from zerver.lib.actions import STREAM_ASSIGNMENT_COLORS, do_send_message, set_default_streams, \
do_activate_user, do_deactivate_user, do_change_password, do_change_is_admin,\
do_change_bot_type
from zerver.lib.parallel import run_parallel
from django.db.models import Count
from zerver.models import Message, UserProfile, Stream, Recipient, \
Subscription, get_huddle, Realm, UserMessage, RealmAlias, \
clear_database, get_client, get_user_profile_by_id, \
email_to_username
from zerver.lib.actions import STREAM_ASSIGNMENT_COLORS, do_send_message, \
do_change_is_admin
from django.conf import settings
from zerver.lib.bulk_create import bulk_create_realms, \
bulk_create_streams, bulk_create_users, bulk_create_huddles, \
bulk_create_clients
from zerver.lib.timestamp import timestamp_to_datetime
from zerver.models import MAX_MESSAGE_LENGTH
from zerver.lib.bulk_create import bulk_create_streams, bulk_create_users
from zerver.models import DefaultStream, get_stream, get_realm
from zilencer.models import Deployment
import ujson
import datetime
import random
import glob
import os
from optparse import make_option
from six import text_type
@ -116,12 +105,6 @@ class Command(BaseCommand):
action="store_true",
help='Whether to delete all the existing messages.')
parser.add_argument('--replay-old-messages',
action="store_true",
default=False,
dest='replay_old_messages',
help='Whether to replace the log of old messages.')
def handle(self, **options):
# type: (**Any) -> None
if options["percent_huddles"] + options["percent_personals"] > 100:
@ -293,8 +276,6 @@ class Command(BaseCommand):
UserMessage.objects.all().update(flags=UserMessage.flags.read)
self.stdout.write("Successfully populated test database.\n")
if options["replay_old_messages"]:
restore_saved_messages()
recipient_hash = {} # type: Dict[int, Recipient]
def get_recipient_by_id(rid):
@ -303,456 +284,6 @@ def get_recipient_by_id(rid):
return recipient_hash[rid]
return Recipient.objects.get(id=rid)
def restore_saved_messages():
# type: () -> None
old_messages = [] # type: List[Dict[str, Any]]
duplicate_suppression_hash = {} # type: Dict[str, bool]
stream_dict = {} # type: Dict[Tuple[text_type, text_type], Tuple[text_type, text_type]]
user_set = set() # type: Set[Tuple[text_type, text_type, text_type, bool]]
email_set = set([u.email for u in UserProfile.objects.all()]) # type: Set[text_type]
realm_set = set() # type: Set[text_type]
# Initial client_set is nonempty temporarily because we don't have
# clients in logs at all right now -- later we can start with nothing.
client_set = set(["populate_db", "website", "zephyr_mirror"])
huddle_user_set = set() # type: Set[Tuple[text_type, ...]]
# First, determine all the objects our messages will need.
print(datetime.datetime.now(), "Creating realms/streams/etc...")
def process_line(line):
# type: (str) -> None
old_message_json = line.strip()
# Due to populate_db's shakespeare mode, we have a lot of
# duplicate messages in our log that only differ in their
# logged ID numbers (same timestamp, content, etc.). With
# sqlite, bulk creating those messages won't work properly: in
# particular, the first 100 messages will actually only result
# in 20 rows ending up in the target table, which screws up
# the below accounting where for handling changing
# subscriptions, we assume that the Nth row populate_db
# created goes with the Nth non-subscription row of the input
# So suppress the duplicates when using sqlite.
if "sqlite" in settings.DATABASES["default"]["ENGINE"]:
tmp_message = ujson.loads(old_message_json)
tmp_message['id'] = '1'
duplicate_suppression_key = ujson.dumps(tmp_message)
if duplicate_suppression_key in duplicate_suppression_hash:
return
duplicate_suppression_hash[duplicate_suppression_key] = True
old_message = ujson.loads(old_message_json)
message_type = old_message["type"]
# Lower case emails and domains; it will screw up
# deduplication if we don't
def fix_email(email):
# type: (text_type) -> text_type
return email.strip().lower()
if message_type in ["stream", "huddle", "personal"]:
old_message["sender_email"] = fix_email(old_message["sender_email"])
# Fix the length on too-long messages before we start processing them
if len(old_message["content"]) > MAX_MESSAGE_LENGTH:
old_message["content"] = "[ This message was deleted because it was too long ]"
if message_type in ["subscription_added", "subscription_removed"]:
old_message["domain"] = old_message["domain"].lower()
old_message["user"] = fix_email(old_message["user"])
elif message_type == "subscription_property":
old_message["user"] = fix_email(old_message["user"])
elif message_type == "user_email_changed":
old_message["old_email"] = fix_email(old_message["old_email"])
old_message["new_email"] = fix_email(old_message["new_email"])
elif message_type.startswith("user_"):
old_message["user"] = fix_email(old_message["user"])
elif message_type.startswith("enable_"):
old_message["user"] = fix_email(old_message["user"])
if message_type == 'personal':
old_message["recipient"][0]["email"] = fix_email(old_message["recipient"][0]["email"])
elif message_type == "huddle":
for i in range(len(old_message["recipient"])):
old_message["recipient"][i]["email"] = fix_email(old_message["recipient"][i]["email"])
old_messages.append(old_message)
if message_type in ["subscription_added", "subscription_removed"]:
stream_name = old_message["name"].strip() # type: text_type
canon_stream_name = stream_name.lower()
if canon_stream_name not in stream_dict:
stream_dict[(old_message["domain"], canon_stream_name)] = \
(old_message["domain"], stream_name)
elif message_type == "user_created":
user_set.add((old_message["user"], old_message["full_name"], old_message["short_name"], False))
elif message_type == "realm_created":
realm_set.add(old_message["domain"])
if message_type not in ["stream", "huddle", "personal"]:
return
sender_email = old_message["sender_email"]
domain = email_to_domain(sender_email)
realm_set.add(domain)
if old_message["sender_email"] not in email_set:
user_set.add((old_message["sender_email"],
old_message["sender_full_name"],
old_message["sender_short_name"],
False))
if 'sending_client' in old_message:
client_set.add(old_message['sending_client'])
if message_type == 'stream':
stream_name = old_message["recipient"].strip()
canon_stream_name = stream_name.lower()
if canon_stream_name not in stream_dict:
stream_dict[(domain, canon_stream_name)] = (domain, stream_name)
elif message_type == 'personal':
u = old_message["recipient"][0]
if u["email"] not in email_set:
user_set.add((u["email"], u["full_name"], u["short_name"], False))
email_set.add(u["email"])
elif message_type == 'huddle':
for u in old_message["recipient"]:
user_set.add((u["email"], u["full_name"], u["short_name"], False))
if u["email"] not in email_set:
user_set.add((u["email"], u["full_name"], u["short_name"], False))
email_set.add(u["email"])
huddle_user_set.add(tuple(sorted(set(u["email"] for u in old_message["recipient"]))))
else:
raise ValueError('Bad message type')
event_glob = os.path.join(settings.EVENT_LOG_DIR, 'events.*')
for filename in sorted(glob.glob(event_glob)):
with open(filename, "r") as message_log:
for line in message_log.readlines():
process_line(line)
stream_recipients = {} # type: Dict[Tuple[int, text_type], Recipient]
user_recipients = {} # type: Dict[text_type, Recipient]
huddle_recipients = {} # type: Dict[text_type, Recipient]
# Then, create the objects our messages need.
print(datetime.datetime.now(), "Creating realms...")
bulk_create_realms(realm_set)
realms = {} # type: Dict[text_type, Realm]
for realm in Realm.objects.all():
realms[realm.domain] = realm
print(datetime.datetime.now(), "Creating clients...")
bulk_create_clients(client_set)
clients = {} # type: Dict[text_type, Client]
for client in Client.objects.all():
clients[client.name] = client
print(datetime.datetime.now(), "Creating streams...")
bulk_create_streams(realms, list(stream_dict.values()))
streams = {} # type: Dict[int, Stream]
for stream in Stream.objects.all():
streams[stream.id] = stream
for recipient in Recipient.objects.filter(type=Recipient.STREAM):
stream_recipients[(streams[recipient.type_id].realm_id,
streams[recipient.type_id].name.lower())] = recipient
print(datetime.datetime.now(), "Creating users...")
bulk_create_users(realms, user_set, tos_version=settings.TOS_VERSION)
users = {} # type: Dict[text_type, UserProfile]
users_by_id = {} # type: Dict[int, UserProfile]
for user_profile in UserProfile.objects.select_related().all():
users[user_profile.email] = user_profile
users_by_id[user_profile.id] = user_profile
for recipient in Recipient.objects.filter(type=Recipient.PERSONAL):
user_recipients[users_by_id[recipient.type_id].email] = recipient
print(datetime.datetime.now(), "Creating huddles...")
bulk_create_huddles(users, huddle_user_set)
huddles_by_id = {} # type: Dict[int, Huddle]
for huddle in Huddle.objects.all():
huddles_by_id[huddle.id] = huddle
for recipient in Recipient.objects.filter(type=Recipient.HUDDLE):
huddle_recipients[huddles_by_id[recipient.type_id].huddle_hash] = recipient
# TODO: Add a special entry type in the log that is a subscription
# change and import those as we go to make subscription changes
# take effect!
print(datetime.datetime.now(), "Importing subscriptions...")
subscribers = {} # type: Dict[int, Set[int]]
for s in Subscription.objects.select_related().all():
if s.active:
subscribers.setdefault(s.recipient.id, set()).add(s.user_profile.id)
# Then create all the messages, without talking to the DB!
print(datetime.datetime.now(), "Importing messages, part 1...")
first_message_id = None
if Message.objects.exists():
first_message_id = Message.objects.all().order_by("-id")[0].id + 1
messages_to_create = [] # type: List[Message]
for idx, old_message in enumerate(old_messages):
message_type = old_message["type"]
if message_type not in ["stream", "huddle", "personal"]:
continue
message = Message()
sender_email = old_message["sender_email"]
domain = email_to_domain(sender_email)
realm = realms[domain]
message.sender = users[sender_email]
type_hash = {"stream": Recipient.STREAM,
"huddle": Recipient.HUDDLE,
"personal": Recipient.PERSONAL}
if 'sending_client' in old_message:
message.sending_client = clients[old_message['sending_client']]
elif sender_email in ["othello@zulip.com", "iago@zulip.com", "prospero@zulip.com",
"cordelia@zulip.com", "hamlet@zulip.com"]:
message.sending_client = clients['populate_db']
elif realm.domain == "zulip.com":
message.sending_client = clients["website"]
elif realm.domain == "mit.edu":
message.sending_client = clients['zephyr_mirror']
else:
message.sending_client = clients['populate_db']
message.type = type_hash[message_type]
message.content = old_message["content"]
message.subject = old_message["subject"]
message.pub_date = timestamp_to_datetime(old_message["timestamp"])
if message.type == Recipient.PERSONAL:
message.recipient = user_recipients[old_message["recipient"][0]["email"]]
elif message.type == Recipient.STREAM:
message.recipient = stream_recipients[(realm.id,
old_message["recipient"].lower())]
elif message.type == Recipient.HUDDLE:
huddle_hash = get_huddle_hash([users[u["email"]].id
for u in old_message["recipient"]])
message.recipient = huddle_recipients[huddle_hash]
else:
raise ValueError('Bad message type')
messages_to_create.append(message)
print(datetime.datetime.now(), "Importing messages, part 2...")
Message.objects.bulk_create(messages_to_create)
messages_to_create = []
# Finally, create all the UserMessage objects
print(datetime.datetime.now(), "Importing usermessages, part 1...")
personal_recipients = {} # type: Dict[int, bool]
for r in Recipient.objects.filter(type = Recipient.PERSONAL):
personal_recipients[r.id] = True
all_messages = Message.objects.all() # type: Sequence[Message]
user_messages_to_create = [] # type: List[UserMessage]
messages_by_id = {} # type: Dict[int, Message]
for message in all_messages:
messages_by_id[message.id] = message
if len(messages_by_id) == 0:
print(datetime.datetime.now(), "No old messages to replay")
return
if first_message_id is None:
first_message_id = min(messages_by_id.keys())
tot_user_messages = 0
pending_subs = {} # type: Dict[Tuple[int, int], bool]
current_message_id = first_message_id
pending_colors = {} # type: Dict[Tuple[text_type, text_type], text_type]
for old_message in old_messages:
message_type = old_message["type"]
if message_type == 'subscription_added':
stream_key = (realms[old_message["domain"]].id, old_message["name"].strip().lower())
subscribers.setdefault(stream_recipients[stream_key].id,
set()).add(users[old_message["user"]].id)
pending_subs[(stream_recipients[stream_key].id,
users[old_message["user"]].id)] = True
continue
elif message_type == "subscription_removed":
stream_key = (realms[old_message["domain"]].id, old_message["name"].strip().lower())
user_id = users[old_message["user"]].id
subscribers.setdefault(stream_recipients[stream_key].id, set())
try:
subscribers[stream_recipients[stream_key].id].remove(user_id)
except KeyError:
print("Error unsubscribing %s from %s: not subscribed" % (
old_message["user"], old_message["name"]))
pending_subs[(stream_recipients[stream_key].id,
users[old_message["user"]].id)] = False
continue
elif message_type == "user_activated" or message_type == "user_created":
# These are rare, so just handle them the slow way
user_profile = users[old_message["user"]]
join_date = timestamp_to_datetime(old_message['timestamp'])
do_activate_user(user_profile, log=False, join_date=join_date)
# Update the cache of users to show this user as activated
users_by_id[user_profile.id] = user_profile
users[old_message["user"]] = user_profile
continue
elif message_type == "user_deactivated":
user_profile = users[old_message["user"]]
do_deactivate_user(user_profile, log=False)
continue
elif message_type == "user_change_password":
# Just handle these the slow way
user_profile = users[old_message["user"]]
do_change_password(user_profile, old_message["pwhash"], log=False,
hashed_password=True)
continue
elif message_type == "user_change_full_name":
# Just handle these the slow way
user_profile = users[old_message["user"]]
user_profile.full_name = old_message["full_name"]
user_profile.save(update_fields=["full_name"])
continue
elif message_type == "enable_desktop_notifications_changed":
# Just handle these the slow way
user_profile = users[old_message["user"]]
user_profile.enable_desktop_notifications = (old_message["enable_desktop_notifications"] != "false")
user_profile.save(update_fields=["enable_desktop_notifications"])
continue
elif message_type == "enable_sounds_changed":
user_profile = users[old_message["user"]]
user_profile.enable_sounds = (old_message["enable_sounds"] != "false")
user_profile.save(update_fields=["enable_sounds"])
elif message_type == "enable_offline_email_notifications_changed":
user_profile = users[old_message["user"]]
user_profile.enable_offline_email_notifications = (
old_message["enable_offline_email_notifications"] != "false")
user_profile.save(update_fields=["enable_offline_email_notifications"])
continue
elif message_type == "enable_offline_push_notifications_changed":
user_profile = users[old_message["user"]]
user_profile.enable_offline_push_notifications = (
old_message["enable_offline_push_notifications"] != "false")
user_profile.save(update_fields=["enable_offline_push_notifications"])
continue
elif message_type == "enable_online_push_notifications_changed":
user_profile = users[old_message["user"]]
user_profile.enable_online_push_notifications_changed = (
old_message["enable_online_push_notifications_changed"] != "false")
user_profile.save(update_fields=["enable_online_push_notifications_changed"])
continue
elif message_type == "default_streams":
set_default_streams(get_realm(old_message["domain"]),
old_message["streams"])
continue
elif message_type == "subscription_property":
property_name = old_message.get("property")
if property_name == "stream_color" or property_name == "color":
color = old_message.get("color", old_message.get("value"))
pending_colors[(old_message["user"],
old_message["stream_name"].lower())] = color
elif property_name in ["in_home_view", "notifications"]:
# TODO: Handle this
continue
else:
raise RuntimeError("Unknown property %s" % (property_name,))
continue
elif message_type == "realm_created":
# No action required
continue
elif message_type in ["user_email_changed", "update_onboarding", "update_message"]:
# TODO: Handle these
continue
if message_type not in ["stream", "huddle", "personal"]:
raise RuntimeError("Unexpected message type %s" % (message_type,))
message = messages_by_id[current_message_id]
current_message_id += 1
if message.recipient_id not in subscribers:
# Nobody received this message -- probably due to our
# subscriptions being out-of-date.
continue
recipient_user_ids = set() # type: Set[int]
for user_profile_id in subscribers[message.recipient_id]:
recipient_user_ids.add(user_profile_id)
if message.recipient_id in personal_recipients:
# Include the sender in huddle recipients
recipient_user_ids.add(message.sender_id)
for user_profile_id in recipient_user_ids:
if users_by_id[user_profile_id].is_active:
um = UserMessage(user_profile_id=user_profile_id,
message=message)
user_messages_to_create.append(um)
if len(user_messages_to_create) > 100000:
tot_user_messages += len(user_messages_to_create)
UserMessage.objects.bulk_create(user_messages_to_create)
user_messages_to_create = []
print(datetime.datetime.now(), "Importing usermessages, part 2...")
tot_user_messages += len(user_messages_to_create)
UserMessage.objects.bulk_create(user_messages_to_create)
print(datetime.datetime.now(), "Finalizing subscriptions...")
current_subs = {} # type: Dict[Tuple[int, int], bool]
current_subs_obj = {} # type: Dict[Tuple[int, int], Subscription]
for s in Subscription.objects.select_related().all():
current_subs[(s.recipient_id, s.user_profile_id)] = s.active
current_subs_obj[(s.recipient_id, s.user_profile_id)] = s
subscriptions_to_add = [] # type: List[Subscription]
subscriptions_to_change = [] # type: List[Tuple[Tuple[int, int], bool]]
for pending_sub in pending_subs.keys():
(recipient_id, user_profile_id) = pending_sub
current_state = current_subs.get(pending_sub)
if pending_subs[pending_sub] == current_state:
# Already correct in the database
continue
elif current_state is not None:
subscriptions_to_change.append((pending_sub, pending_subs[pending_sub]))
continue
s = Subscription(recipient_id=recipient_id,
user_profile_id=user_profile_id,
active=pending_subs[pending_sub])
subscriptions_to_add.append(s)
Subscription.objects.bulk_create(subscriptions_to_add)
for (sub_tuple, active) in subscriptions_to_change:
current_subs_obj[sub_tuple].active = active
current_subs_obj[sub_tuple].save(update_fields=["active"])
subs = {} # type: Dict[Tuple[int, int], Subscription]
for sub in Subscription.objects.all():
subs[(sub.user_profile_id, sub.recipient_id)] = sub
# TODO: do restore of subscription colors -- we're currently not
# logging changes so there's little point in having the code :(
print(datetime.datetime.now(), "Finished importing %s messages (%s usermessages)" % \
(len(all_messages), tot_user_messages))
site = Site.objects.get_current()
site.domain = 'zulip.com'
site.save()
print(datetime.datetime.now(), "Filling in user pointers...")
# Set restored pointers to the very latest messages
for user_profile in UserProfile.objects.all():
try:
top = UserMessage.objects.filter(
user_profile_id=user_profile.id).order_by("-message")[0]
user_profile.pointer = top.message_id
except IndexError:
user_profile.pointer = -1
user_profile.save(update_fields=["pointer"])
print(datetime.datetime.now(), "Done replaying old messages")
# Create some test messages, including:
# - multiple streams
# - multiple subjects per stream